spark kafka流式处理同一主题的多个消费者组不起作用

kupeojn6  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(197)

我对Kafka和斯帕克很陌生。我有一个用例,其中一个kafka主题需要从多个spark流窗口中使用。
主题。。。

kafka-topics.sh --create --topic feed --partitions 10 --zookeeper xxx.xxx.xxx.xxx:xxxx --replication-factor 2

代码。。。

package tech.webstar.speed

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author sasajib
  */
class FeedStream2(ssc: StreamingContext, group: String = "default") {

    def start(): Unit = {

        val kafkaStream: ReceiverInputDStream[(String, String)] = {
            KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:xxxx", group, Map("feed" -> 10))
        }

        val window: DStream[(String, String)] = kafkaStream.window(Duration(5000))
        window.foreachRDD(_.foreach(result => {
            println("<===============Window===============================>")
            println(result)
            println(">===============Window===============================<")
        }))
    }
}

object FeedStream2 extends App {
    val sparkConf: SparkConf = {
        new SparkConf()
                .setAppName("speed-layer")
                .setMaster("local[*]")
    }

    val context: SparkContext = new SparkContext(sparkConf)
    val ssc: StreamingContext = new StreamingContext(context, Duration(1000))
    ssc.checkpoint("checkpoint")
    //    context.setLogLevel("ERROR")

    //if I comment out one of these, the code works
    new FeedStream2(ssc, "group1").start()
    new FeedStream2(ssc, "group2").start()   

    sys.addShutdownHook(() => {
        ssc.stop()
    })

    ssc.start()
    ssc.awaitTermination()
}

据我所知,给定2个窗口应该工作并使用相同的消息,因为Kafka组id不同。但Windows都不开。如果我在其中一个窗口中添加注解,代码就会工作并打印主题消息。
如何在不同的spark流媒体窗口中使用相同的主题消息?
谢谢。
Kafka·维西翁:1.6.2

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题