如何访问inputdstream数据?

vmpqdwk3  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(205)

我对scala和spark还很陌生。我正在将字符串消息从kafka(real)发送到spark(local),但是如何访问它们?例如,我想要一个包含所有消息的字符串列表。最后我只打印了它们:

val sc = new SparkContext(conf)
val ssc =  new StreamingContext(sc, Seconds(1))
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, Set[String]("testTopic"))
directKafkaStream.print() //I can see it in console, but how to get my message string?
yzxexxkh

yzxexxkh1#

正如您在api中看到的,inputdstream是dstream的一个子类。这意味着您可以访问inputdstream,就像它只是一个数据流一样。
您只需在rdd上迭代即可“获取”消息字符串,如下所示:

directKafkaStream.foreachRDD { rdd => 
  rdd.foreach { content => 
    // code to handle the string here
  }
}

一般来说,它取决于您所期望的类型(例如,一个定制的avro记录),但是在您处理字符串的情况下,将rdd视为字符串的集合就足够了。
理论上,如果您想应用一个转换(例如。, filter , ...). 例如,如果要筛选包含特定单词的所有字符串,可以使用:

val infoLines = directKafkaStream.filter { line =>
  line.contains("INFO")
}

请注意 infoLines 仍将返回一个 DStream[String] ,所以您仍然会遇到相同的初始问题:如何访问单个字符串?您需要了解dstream和rdd是spark和spark streaming使用的高级数据抽象-通常您将首先对具有转换的数据进行操作,然后应用操作(例如。, saveAsTextFile )-很少 println 声明。
然而,从你的问题来看,似乎你需要阅读一些关于spark流媒体的文档:官方文档是一个很好的资源(尽管有些书,比如learning spark,也可以帮助你理解)。

相关问题