spark结构化流式处理—仅一次—未实现—重复事件

nle07wnf  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(385)

我正在使用spark结构化流媒体来消费来自kafka的事件并将它们上传到s3。
在s3上提交检查点:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");

补偿是通过Kafka承诺的 StreamingQueryListener :

kafkaConsumer.commitSync(topicPartitionMap);

启动应用程序后,它将从kafka检索偏移贴图并启动流:

reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)

我存储 topic/partition/offset orc文件里的数据。
该数据包含事件的多个副本,具有精确的 topic/partition/offset .
如何配置流以实现一次处理?

ep6jt1vc

ep6jt1vc1#

发现那些参数应该设置为
true spark.streaming.driver.writeAheadLog.closeFileAfterWrite 以及 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite 如果要将s3用于元数据,请将其设置为“true”
https://spark.apache.org/docs/latest/configuration.html
更多详情请参见:https://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=iwar17x1aftlh1pjq1qpkdsqt6du4hgi7wneiyunw25hvquoj-4yqu10r0gem

相关问题