我对Kafka和斯帕克很陌生。我有一个用例,其中一个kafka主题需要从多个spark流窗口中使用。
主题。。。
kafka-topics.sh --create --topic feed --partitions 10 --zookeeper xxx.xxx.xxx.xxx:xxxx --replication-factor 2
代码。。。
package tech.webstar.speed
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author sasajib
*/
class FeedStream2(ssc: StreamingContext, group: String = "default") {
def start(): Unit = {
val kafkaStream: ReceiverInputDStream[(String, String)] = {
KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:xxxx", group, Map("feed" -> 10))
}
val window: DStream[(String, String)] = kafkaStream.window(Duration(5000))
window.foreachRDD(_.foreach(result => {
println("<===============Window===============================>")
println(result)
println(">===============Window===============================<")
}))
}
}
object FeedStream2 extends App {
val sparkConf: SparkConf = {
new SparkConf()
.setAppName("speed-layer")
.setMaster("local[*]")
}
val context: SparkContext = new SparkContext(sparkConf)
val ssc: StreamingContext = new StreamingContext(context, Duration(1000))
ssc.checkpoint("checkpoint")
// context.setLogLevel("ERROR")
//if I comment out one of these, the code works
new FeedStream2(ssc, "group1").start()
new FeedStream2(ssc, "group2").start()
sys.addShutdownHook(() => {
ssc.stop()
})
ssc.start()
ssc.awaitTermination()
}
据我所知,给定2个窗口应该工作并使用相同的消息,因为Kafka组id不同。但Windows都不开。如果我在其中一个窗口中添加注解,代码就会工作并打印主题消息。
如何在不同的spark流媒体窗口中使用相同的主题消息?
谢谢。
Kafka·维西翁:1.6.2
暂无答案!
目前还没有任何答案,快来回答吧!