scala spark结构化流媒体接收重复消息

ekqde3dh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(280)
I am using Kafka and Spark 2.4.5 Structured Streaming.I am doing the average operation.but i am facing issues due to getting duplicate records from the Kafka topic in a current batch.

For example ,Kafka topic message received on 1st batch batch on update mode

car,Brand=Honda,speed=110,1588569015000000000
car,Brand=ford,speed=90,1588569015000000000
car,Brand=Honda,speed=80,15885690150000000000

here the result is average on car brand per timestamp
i.e groupby on  1588569015000000000 and Brand=Honda , the result we got 
110+90/2 = 100

now second message received late data with the duplicate message with same timestamp
car,Brand=Honda,speed=50,1588569015000000000
car,Brand=Honda,speed=50,1588569015000000000

i am expecting average should update to 110+90+50/3 = 83.33
but result update to 110+90+50+50/4=75,which is wrong
val rawDataStream: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "topic1") // Both topics on same stream!
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as data")

按时间戳和品牌分组
用检查点写信给Kafka
如何使用spark结构化流来做这件事或任何错误的代码?

t1qtbnec

t1qtbnec1#

spark structured streaming允许使用 dropDuplicates . 您需要指定字段来标识重复记录,并且在批处理之间,spark将只考虑每个组合的第一个记录,并且具有重复值的记录将被丢弃。
下面的代码片段将根据品牌、速度和时间戳组合对您的流式Dataframe进行重复数据消除。

rawDataStream.dropDuplicates("Brand", "speed", "timestamp")

请参阅此处的spark文档

相关问题