我对scala和spark还很陌生。我正在将字符串消息从kafka(real)发送到spark(local),但是如何访问它们?例如,我想要一个包含所有消息的字符串列表。最后我只打印了它们:
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, Set[String]("testTopic"))
directKafkaStream.print() //I can see it in console, but how to get my message string?
1条答案
按热度按时间yzxexxkh1#
正如您在api中看到的,inputdstream是dstream的一个子类。这意味着您可以访问inputdstream,就像它只是一个数据流一样。
您只需在rdd上迭代即可“获取”消息字符串,如下所示:
一般来说,它取决于您所期望的类型(例如,一个定制的avro记录),但是在您处理字符串的情况下,将rdd视为字符串的集合就足够了。
理论上,如果您想应用一个转换(例如。,
filter
, ...). 例如,如果要筛选包含特定单词的所有字符串,可以使用:请注意
infoLines
仍将返回一个DStream[String]
,所以您仍然会遇到相同的初始问题:如何访问单个字符串?您需要了解dstream和rdd是spark和spark streaming使用的高级数据抽象-通常您将首先对具有转换的数据进行操作,然后应用操作(例如。,saveAsTextFile
)-很少println
声明。然而,从你的问题来看,似乎你需要阅读一些关于spark流媒体的文档:官方文档是一个很好的资源(尽管有些书,比如learning spark,也可以帮助你理解)。