阅读几个主题

pepwfjgg  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(274)

我正在尝试开发一个应用程序,它从kafka服务器上获取四个不同的主题,并对每个主题采取特定的操作。
我创建了一个类,它接收数据流,并有一个方法可以转换数据流。
例如,handler类:

class StreamHandler(stream:DStream[String]) {
  val stream:DStream[String] = stream

  def doActions():DStream[String] =  {
    //Do smth. to DStream
  }
}

现在,假设我从主类中为我想要的每个处理程序类调用doactions(),它会在每个到达的数据流中重复还是只重复一次?

val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2)
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2)

topicHandler1.doActions()
topicHandler2 .doActions()

ssc.start()

有更好的方法吗?

lnvxswe2

lnvxswe21#

上声明的转换 StreamHandler 将应用于每批数据流。目前的代码是相当不完整的给你一个肯定的答案。在dstream转换管道中,您将需要一个具体化dstream的操作,否则什么都不会发生。
关于该方法,一个接受数据流并对其应用转换的函数就足够了,并且易于测试:

val pipeline:DStream[Data] => () = dstream => 
    dstream.map(...).filter(...).print()

从目前的情况来看,班级建设似乎买的不多。

相关问题