spark流窗口输出

mpgws1up  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(339)

我正在开发一个spark流应用程序,我需要打印json属性的min,max值,该属性应该每隔20秒在窗口上打印min,max,滑动窗口为2秒。基本上(对于poc)我想在工作组sparkcontext的spark ui上打印min,max。

SetJobGroup ("count-min-max", "count-min-max value of quality attribute").

这应该每20秒在spark ui显示屏上显示一次。
下面是我的代码,我可以得到最小值,最大值,计数,但打印是每2秒执行,这是流批量间隔不是在20秒的窗口。

val ssc = new StreamingContext(sparkContext, Seconds(2))

val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)

      //val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print

      val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
                            .window(Seconds(20),Seconds(2))

      valueDtsream.foreachRDD
      {
         rdd => 
           if (!rdd.partitions.isEmpty)
           {
             val stats = rdd.flatMap(x => x)
             println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString)
           }
      }

      ssc.start()
      ssc.awaitTermination()
pcrecxhr

pcrecxhr1#

我想你把两者混淆了 slideInterval 以及 windowLength . 在 window(windowLength, slideInterval) : windowLength 是窗口的长度,表示窗口在计算时应考虑的数据间隔数。 slideInterval 窗口计算完成后,窗口移动的间隔数。
如果我对你的问题理解正确,你应该将其编辑为: .window(Seconds(x),Seconds(20)) .

相关问题