我最近注意到我对spark流有一个困惑(我目前正在学习spark)。
我从Kafka的一个主题中读到如下数据:
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
从而产生一个数据流。
为了处理事件时间(而不是处理时间),我做了以下工作:
outputStream
.foreachRDD(rdd => {
rdd.toDF().withWatermark("timestamp", "60 seconds")
.groupBy(
window($"timestamp", "60 seconds", "10 seconds")
)
.sum("meterIncrement")
.toJSON
.toDF("value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "taxi-dollar-accurate")
.start()
)
})
我得到了错误
“writestream”只能在流式数据集/Dataframe上调用
这让我很惊讶,因为df的来源是一个dstream。无论如何,我通过将.writestream改为.write并将.start()改为.save()来解决这个问题。
但我觉得我不知怎么失去了那台foreach的流式电源。显然这就是我写这个问题的原因。这是正确的方法吗?我见过其他使用
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
但我不知道这与在数据流上调用foreach,然后将每个rdd转换为df有什么不同。
1条答案
按热度按时间eagi6jfj1#
但我不知道这与在数据流上调用foreach,然后将每个rdd转换为df有什么不同。
当您打电话时:
你的变量
rdd
(或者Dataframe)变成了一个不再是流的rdd。因此rdd.toDF.[...].writeStream
不会再工作了。继续rdd
如果您选择使用dsream方法,那么可以发送那些调用kafkaproducerapi的rdd。
举个例子:
但是,这不是推荐的方法,因为您在每个执行器的每个批处理间隔中创建和关闭kafkaproducer。但这应该让您对如何使用directstreamapi将数据写入kafka有一个基本的了解。
为了进一步优化发送数据到Kafka,您可以按照这里给出的指导。
继续Dataframe
不过,您也可以将rdd转换为Dataframe,但要确保调用面向批处理的api将数据写入kafka:
有关如何将批处理Dataframe写入kafka的所有详细信息,请参阅spark structured streaming+kafka integration guide
注意
不过,最重要的是,我强烈建议在这种情况下不要混淆rdd和结构化api,而要坚持两者。