I am using Kafka and Spark 2.4.5 Structured Streaming.I am doing the average operation.but i am facing issues due to getting duplicate records from the Kafka topic in a current batch.
For example ,Kafka topic message received on 1st batch batch on update mode
car,Brand=Honda,speed=110,1588569015000000000
car,Brand=ford,speed=90,1588569015000000000
car,Brand=Honda,speed=80,15885690150000000000
here the result is average on car brand per timestamp
i.e groupby on 1588569015000000000 and Brand=Honda , the result we got
110+90/2 = 100
now second message received late data with the duplicate message with same timestamp
car,Brand=Honda,speed=50,1588569015000000000
car,Brand=Honda,speed=50,1588569015000000000
i am expecting average should update to 110+90+50/3 = 83.33
but result update to 110+90+50+50/4=75,which is wrong
val rawDataStream: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "topic1") // Both topics on same stream!
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as data")
按时间戳和品牌分组
用检查点写信给Kafka
如何使用spark结构化流来做这件事或任何错误的代码?
1条答案
按热度按时间t1qtbnec1#
spark structured streaming允许使用
dropDuplicates
. 您需要指定字段来标识重复记录,并且在批处理之间,spark将只考虑每个组合的第一个记录,并且具有重复值的记录将被丢弃。下面的代码片段将根据品牌、速度和时间戳组合对您的流式Dataframe进行重复数据消除。
请参阅此处的spark文档