scala—在spark结构化流媒体中在消息级而不是Dataframe级应用模式

rvpgvaaj  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(265)

所以,问题是因为我的模式可能依赖于kafka头/键,所以我想在消息级别而不是Dataframe级别应用模式。如何做到这一点?谢谢
要为Dataframe级别应用架构的代码段是:

val ParsedDataFrame = kafkaStreamData.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)")
  .select(from_json(col("value"), Schema), col("key"))
  .select("value.*","key")

我想要这样的东西,

if(key == 'a'){
   use Schema1
}
else{
   use Schema2
}

p、 s:我试过使用foreach和map函数,但都不起作用,可能用得不对

jexiocij

jexiocij1#

在同一行中应用不同的模式是不可能的,因为最终会得到一个 AnalysisException 由于数据类型不匹配。
为了测试这一点,你可以做下面的实验。
在表格中有Kafka主题中的以下数据 key:::value :

a:::{"a":"foo","b":"bar"}
b:::{"a":"foo","b":"bar"}

在流式处理查询中,您可以定义:

val schemaA = new StructType().add("a", StringType)
val schemaB = new StructType().add("b", StringType)

val df = spark.readStream
  .format("kafka")
  .[...]
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .withColumn("parsedJson", 
    when(col("key") === "a", from_json(col("value"), schemaA))
    .otherwise(from_json(col("value"), schemaB)))

这将导致 AnalysisException :

org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`key` = 'a') THEN jsontostructs(`value`) ELSE jsontostructs(`value`) END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;;
'Project [key#21, value#22, CASE WHEN (key#21 = a) THEN jsontostructs(StructField(a,StringType,true), value#22) ELSE jsontostructs(StructField(b,StringType,true), value#22) END AS parsedJson#27]

正如@onecricketeer在注解中提到的那样,您需要首先基于一个过滤器将kafka输入流分离成几个Dataframe,然后应用不同的模式来解析带有json字符串的列。

相关问题