使用spark streamingcontext从kafka主题消费

eivgtgni  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(319)

我是spark&kafka的新手,我正在尝试获取一些scala代码(作为spark作业运行)作为一个长时间运行的进程(而不仅仅是一个短期/计划任务)并不断轮询kafka代理以获取消息。当它收到消息时,我只想把它们打印到控制台/标准输出。同样,这需要一个长期运行的过程,并且基本上(尝试)永生。
在做了一些挖掘之后,它看起来像一个 StreamingContext 是我想用的。以下是我最好的尝试:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
    val topicsSet = kafkaTopics.split(",").toSet
    val props = Map(
        "bootstrap.servers" -> "my-kafka.example.com:9092",
        "metadata.broker.list" -> "my-kafka.example.com:9092",
        "serializer.class" -> "kafka.serializer.StringEncoder",
        "value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
        "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
        "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
        "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}

def processEngine(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(1))

    val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()

当我运行这个程序时,没有异常/错误,但似乎什么都没有发生。我可以确认有关于这个主题的消息。你知道我要去哪里吗?

qnzebej0

qnzebej01#

当你 foreachRDD ,输出在工作节点中打印,而不是在主节点中打印。我假设您正在查看主机的控制台输出。你可以用 DStream.print 取而代之的是:

val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()

另外,别忘了打电话 ssc.awaitTermination() 之后 ssc.start() :

ssc.start()
ssc.awaitTermination()

作为旁注,我假设您复制粘贴了这个示例,但是没有必要使用 transformDStream 如果你真的不打算和 OffsetRange .

dy2hfwbg

dy2hfwbg2#

这是你的完整密码吗?你在哪里创建了sc?您必须在流式处理上下文之前创建spark上下文。您可以这样创建sc:

SparkConf sc = new SparkConf().setAppName("SparkConsumer");

而且,没有 awaitTermination ,很难捕获和打印后台数据处理过程中出现的异常。你能补充一下吗 ssc1.awaitTermination(); 最后,看看你是否有任何错误。

相关问题