我想读一个Kafka主题,并写入一个Parquet或三角洲文件,并能够从该Parquet文件读取之前,所有的消息在Kafka主题已经阅读。我有这个工作,但后来我做了一个改变,现在我必须等到所有的消息都被消费之前,有任何在Parquet文件。我的代码在下面。
import org.apache.spark.sql.SparkSession
object MinimalTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("MinimalTest")
.getOrCreate()
val kafkaBrokers = "localhost:9092"
val topic = "FakeTopic"
val startingOffsets = "earliest"
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("startingOffsets", startingOffsets)
.option("subscribe", topic)
.load()
val path = "<dir>/MinimalTest"
val checkpointLocation = "<dir>/CheckpointMinimalTest"
df.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.option("path", path)
.start()
spark.streams.awaitAnyTermination()
}
}
我没有发现任何人有同样的问题,我也没有找到一个解决办法,通过阅读相关文件。我想是有人叫我去做的。我尝试将“enable.auto.commit”设置为true,但收到一条错误消息,指出“enable.auto.commit”不受支持。
我正在使用spark.2.4.4
1条答案
按热度按时间uttx8gqw1#
您可以通过设置
maxOffsetsPerTrigger
在kafka源选项(结构化流媒体+kafka集成指南)中:如果
maxOffsetsPerTrigger
未定义,将使用spark 2.4.4代码中显示的最新偏移量。