spark流挂起,kafka最早开始偏移(kafka 2,spark 2.4.3)

hwazgwia  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(341)

我对spark流媒体和Kafka有意见。当运行一个示例程序来使用kafka主题并将微批处理结果输出到终端时,当我设置以下选项时,我的工作似乎挂起了: df.option("startingOffsets", "earliest") 从最新的胶印开始工作很好,当每一个微批流通过时,结果被打印到终端。
我在想也许这是一个资源问题——我试着从一个有大量数据的主题中阅读。但是,我似乎没有内存/cpu问题(使用本地[*]集群运行此作业)。这项工作似乎从来没有真正开始过,但只是悬在一线: 19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A ```
val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("spark-test")
.getOrCreate()

val topic = "topic.with.alotta.data"

//subscribe tokafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

//write
df.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()

我希望看到结果打印到控制台…但是,应用程序似乎就像我提到的那样挂起了。有什么想法吗?这感觉像是一个spark资源问题(因为我正在针对一个有大量数据的主题运行一个本地“集群”)。是否有一些关于流式Dataframe的性质我遗漏了?
vlju58qv

vlju58qv1#

写入控制台会导致在每个触发器的驱动程序内存中收集所有数据。由于当前没有限制批处理的大小,这意味着整个主题内容都在驱动程序中累积。看到了吗https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-Flume
设置批处理大小的限制应该可以解决您的问题。尝试添加 maxOffsetsPerTrigger 读Kafka的时候。。。

val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .load()

看到了吗https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html 详情。

相关问题