kafka消息在达到最新偏移量之前不会写入parquet文件

htzpubme  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(245)

我想读一个Kafka主题,并写入一个Parquet或三角洲文件,并能够从该Parquet文件读取之前,所有的消息在Kafka主题已经阅读。我有这个工作,但后来我做了一个改变,现在我必须等到所有的消息都被消费之前,有任何在Parquet文件。我的代码在下面。

import org.apache.spark.sql.SparkSession

object MinimalTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("MinimalTest")
      .getOrCreate()

    val kafkaBrokers = "localhost:9092"
    val topic = "FakeTopic"

    val startingOffsets = "earliest"

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokers)
      .option("startingOffsets", startingOffsets)
      .option("subscribe", topic)
      .load()

    val path = "<dir>/MinimalTest"
    val checkpointLocation = "<dir>/CheckpointMinimalTest"

    df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", checkpointLocation)
      .option("path", path)
      .start()

    spark.streams.awaitAnyTermination()
  }
}

我没有发现任何人有同样的问题,我也没有找到一个解决办法,通过阅读相关文件。我想是有人叫我去做的。我尝试将“enable.auto.commit”设置为true,但收到一条错误消息,指出“enable.auto.commit”不受支持。
我正在使用spark.2.4.4

uttx8gqw

uttx8gqw1#

您可以通过设置 maxOffsetsPerTrigger 在kafka源选项(结构化流媒体+kafka集成指南)中:

val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokers)
      .option("startingOffsets", startingOffsets)
      .option("maxOffsetsPerTrigger", 10)
      .option("subscribe", topic)
      .load()

如果 maxOffsetsPerTrigger 未定义,将使用spark 2.4.4代码中显示的最新偏移量。

相关问题