我正在使用ApacheSpark2.4,在对流式查询应用多个转换之后,我正在从kafka读取json数据,最终输出仍然是二进制的。
val streamingDF = sparkSession.readStream
.format("kafka")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", value = false)
.option("maxOffsetsPerTrigger", 50000L)
.option("kafka.bootstrap.servers", "kafka_server")
.option("enable.auto.commit" , "false")
.load()
val dataSet = streamingDF.selectExpr("CAST(value AS STRING)").as[String]
val stream = dataSet.map{value => convertJSONToCaseClass(value)}
.map{data => futherconvertions(data)}.writeStream.format("console")
.outputMode(OutputMode.Update()).start()
在这之后,我在控制台上得到这样的输出。
Batch: 8
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
预期的输出假设是具有多列的Dataframe
我做错什么了吗。任何帮助都将不胜感激。
谢谢
2条答案
按热度按时间n6lpvg4x1#
spark 2.4不支持多聚合链。
https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#unsupported-操作
流式数据集尚不支持多个流式聚合(即流式df上的聚合链)。
gv8xihay2#
不建议按照文档设置“enable.auto.commit”,请参考Kafka的具体配置https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html 您也可以尝试以下方法: