我正试图从一个 kafka
溪流。下面是我目前正在做的一些事情。
Import /* … */
Object MyObject {
Def main (args: Array[String]){
/*spark streaming context set up*/
val kafkaStream = KafkaUtils.createStream(streamingContext,zkQuorum,groupID,[per-topic number of Kafka partitions to consume])
kafkaStream.persist(/*Storage Level*/)
val field_1_Retrieved = kafkaStream.parsingFunctionToRetrieveField1().print
val field_2_Retrieved = kafkaStream.parsingFunctionToRetrieveField2().print
val field_3_Retrieved = kafkaStream.parsingFunctionToRetrieveField3().print
ssc.start()
ssc.awaitTermination()
}
}
但是,这里是我的输出:
-----------------------
Time xxxxxxxxxx ms
-----------------------
field_1_Retrieved
field_1_Retrieved
-----------------------
Time xxxxxxxxxy ms
-----------------------
field_2_Retrieved
field_2_Retrieved
-----------------------
Time xxxxxxxxxz ms
-----------------------
field_3_Retrieved
field_3_Retrieved
这是随机的,肯定不是我期望从我的代码。它将是如下所示:
Time xxxxxxxxxx ms
-----------------------
field_1_Retrieved
field_2_Retrieved
field_3_Retrieved
-----------------------
Time xxxxxxxxxy ms
-----------------------
field_1_Retrieved
field_2_Retrieved
field_3_Retrieved
我在工作中缺少什么 spark
或者 kafka
会导致这种行为吗?还是我做错了?
1条答案
按热度按时间m4pnthwp1#
这是预期的行为。
print
的dstream实现以时间戳横幅开始。打电话print
在多个数据流上会产生多个横幅。要实现版本2,需要将原始数据流中的数据转换为同一数据流中的3个不同版本(可能使用flatmap)。鉴于此
parsingFunctionToRetrieveFieldx
没有提供,不可能提供更多的细节。