apachespark2.4结构流从kafka读取,在应用多次转换之后总是二进制结构

gwo2fgha  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(324)

我正在使用ApacheSpark2.4,在对流式查询应用多个转换之后,我正在从kafka读取json数据,最终输出仍然是二进制的。

val streamingDF = sparkSession.readStream
      .format("kafka")
      .option("subscribe", "test")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", value = false)
      .option("maxOffsetsPerTrigger", 50000L)
      .option("kafka.bootstrap.servers", "kafka_server")
      .option("enable.auto.commit" , "false")
      .load()

val dataSet = streamingDF.selectExpr("CAST(value AS STRING)").as[String] 
val stream = dataSet.map{value => convertJSONToCaseClass(value)}
.map{data => futherconvertions(data)}.writeStream.format("console")
.outputMode(OutputMode.Update()).start()

在这之后,我在控制台上得到这样的输出。

Batch: 8
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|

预期的输出假设是具有多列的Dataframe
我做错什么了吗。任何帮助都将不胜感激。
谢谢

n6lpvg4x

n6lpvg4x1#

spark 2.4不支持多聚合链。
https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#unsupported-操作
流式数据集尚不支持多个流式聚合(即流式df上的聚合链)。

gv8xihay

gv8xihay2#

不建议按照文档设置“enable.auto.commit”,请参考Kafka的具体配置https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html 您也可以尝试以下方法:

val streamingDF = sparkSession.readStream
  .format("kafka")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", value = false)
  .option("maxOffsetsPerTrigger", 50000L)
  .option("kafka.bootstrap.servers", "kafka_server")
  .load()
val df = streamingDF.selectExpr("CAST(value as STRING)")         

 val mySchema = StructType(Array(
  StructField("X", StringType, true),
  StructField("Y", StringType, true),
  StructField("Z", StringType, true))                            

val Resultdf = df.select(from_json($"value", mySchema).as("data")).select("data.*")

相关问题