所以,问题是因为我的模式可能依赖于kafka头/键,所以我想在消息级别而不是Dataframe级别应用模式。如何做到这一点?谢谢
要为Dataframe级别应用架构的代码段是:
val ParsedDataFrame = kafkaStreamData.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)")
.select(from_json(col("value"), Schema), col("key"))
.select("value.*","key")
我想要这样的东西,
if(key == 'a'){
use Schema1
}
else{
use Schema2
}
p、 s:我试过使用foreach和map函数,但都不起作用,可能用得不对
1条答案
按热度按时间jexiocij1#
在同一行中应用不同的模式是不可能的,因为最终会得到一个
AnalysisException
由于数据类型不匹配。为了测试这一点,你可以做下面的实验。
在表格中有Kafka主题中的以下数据
key:::value
:在流式处理查询中,您可以定义:
这将导致
AnalysisException
:正如@onecricketeer在注解中提到的那样,您需要首先基于一个过滤器将kafka输入流分离成几个Dataframe,然后应用不同的模式来解析带有json字符串的列。