我正在使用spark结构化流媒体来消费来自kafka的事件并将它们上传到s3。
在s3上提交检查点:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
补偿是通过Kafka承诺的 StreamingQueryListener
:
kafkaConsumer.commitSync(topicPartitionMap);
启动应用程序后,它将从kafka检索偏移贴图并启动流:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
我存储 topic/partition/offset
orc文件里的数据。
该数据包含事件的多个副本,具有精确的 topic/partition/offset
.
如何配置流以实现一次处理?
1条答案
按热度按时间ep6jt1vc1#
发现那些参数应该设置为
true
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
以及spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
如果要将s3用于元数据,请将其设置为“true”https://spark.apache.org/docs/latest/configuration.html
更多详情请参见:https://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=iwar17x1aftlh1pjq1qpkdsqt6du4hgi7wneiyunw25hvquoj-4yqu10r0gem