print()的意外行为

fzwojiic  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(328)

我正试图从一个 kafka 溪流。下面是我目前正在做的一些事情。

Import /* … */

Object MyObject {
    Def main (args: Array[String]){
        /*spark streaming context set up*/

        val kafkaStream = KafkaUtils.createStream(streamingContext,zkQuorum,groupID,[per-topic number of Kafka partitions to consume])
        kafkaStream.persist(/*Storage Level*/)

        val field_1_Retrieved = kafkaStream.parsingFunctionToRetrieveField1().print
        val field_2_Retrieved = kafkaStream.parsingFunctionToRetrieveField2().print
        val field_3_Retrieved = kafkaStream.parsingFunctionToRetrieveField3().print

        ssc.start()
        ssc.awaitTermination()
    }
}

但是,这里是我的输出:

-----------------------
Time xxxxxxxxxx ms
-----------------------
field_1_Retrieved
field_1_Retrieved
-----------------------
Time xxxxxxxxxy ms
-----------------------
field_2_Retrieved
field_2_Retrieved
-----------------------
Time xxxxxxxxxz ms
-----------------------
field_3_Retrieved
field_3_Retrieved

这是随机的,肯定不是我期望从我的代码。它将是如下所示:

Time xxxxxxxxxx ms
-----------------------
field_1_Retrieved
field_2_Retrieved
field_3_Retrieved
-----------------------
Time xxxxxxxxxy ms
-----------------------
field_1_Retrieved
field_2_Retrieved
field_3_Retrieved

我在工作中缺少什么 spark 或者 kafka 会导致这种行为吗?还是我做错了?

m4pnthwp

m4pnthwp1#

这是预期的行为。 print 的dstream实现以时间戳横幅开始。打电话 print 在多个数据流上会产生多个横幅。
要实现版本2,需要将原始数据流中的数据转换为同一数据流中的3个不同版本(可能使用flatmap)。鉴于此 parsingFunctionToRetrieveFieldx 没有提供,不可能提供更多的细节。

相关问题