事件时间窗口未触发

fquxozlt  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(365)

我正在尝试使用 EventTime . 我将从Kafka消费。我的数据可能会延迟一分钟(因此我的水印会延迟一分钟)并且出现故障。我有30秒的窗口。我有以下设置:
Kafka消费者.scala

object KafkaConsumer {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val properties = getServerProperties
    val consumer = new FlinkKafkaConsumer010[ObjectNode](getKafkaTopic, new JsonNodeDeserializationSchema, properties)
    consumer.setStartFromGroupOffsets()

    val stream = env.addSource(consumer)
      .assignTimestampsAndWatermarks(new WMAssigner)

    stream
      .keyBy { jsonEvent =>
        val key = jsonEvent.findValue("key").toString replaceAll("\"","")
        key.toString
      }
      .window(TumblingEventTimeWindows.of(Time.seconds(30)))
      .process { new SessionWindowProcessor }
      .print
    env.execute("EventTime Test")
  }
}

wmassigner.scala文件

class WMAssigner extends AssignerWithPeriodicWatermarks[ObjectNode] {

  var currentMaxTimestamp: Long = 0
  var currentWaterMark: Long = 0

  override def extractTimestamp(element: ObjectNode, previousElementTimestamp: Long): Long = {
    val lStr = element.findValue("ts").toString replaceAll("\"", "")
    currentMaxTimestamp = if(currentMaxTimestamp > lStr.toLong) currentMaxTimestamp else lStr.toLong
    currentMaxTimestamp
  }

  override def getCurrentWatermark: Watermark = {
      currentWaterMark = if(currentMaxTimestamp - 60000 < 0) 0 else currentMaxTimestamp - 60000
      new Watermark(currentWaterMark)

  }
}

会话窗口处理器.scala

class SessionWindowProcessor extends ProcessWindowFunction[ObjectNode,Long,String,TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[Long]): Unit = {
    println("Processing!")
    var maxVal: Long = 0
    elements.foreach( value => {
      val valStr = value.findValue("value").toString replaceAll("\"","")
      maxVal = if(valStr.toLong > maxVal) valStr.toLong else maxVal
    })
    out.collect(maxVal)
  }
}

样本数据:

"{\"ts\": 0,     \"key\": \"a\", \"value\": 0}",
"{\"ts\": 15000, \"key\": \"a\", \"value\": 1}",
"{\"ts\": 30000, \"key\": \"a\", \"value\": 2}",
"{\"ts\": 90001, \"key\": \"a\", \"value\": 3}"

我希望在第四条消息传入后,我的第一个窗口将触发,值为 1 或者 2 (我现在不确定包容性是如何发挥作用的)。不幸的是,我甚至没有看到 printlnSessionWindowProcessor.scala 开火。是不是我做错了什么导致了我的开窗失败?

pprl5pva

pprl5pva1#

为了它的价值,你的 extractTimestamp() 方法应返回lstr.tolong,而不是currentmaxtimestamp。这个方法跟踪currentmaxtimestamp以便值可以用于水印生成,这是有意义的,但是extracttimestamp的作用是为每个流元素提供事件时间戳。
然而,我不相信这解释了为什么您没有看到任何输出——当然,如果您的数据实际上是按时间戳排序的,就像在示例中一样。
另外请注意,您可以使用BoundedAutoFordernessTimestampExtractor,它的使用稍微简单一些。

k0pti3hp

k0pti3hp2#

你的Kafka主题有多少个分区?如果它有多个分区,问题是flink必须为每个分区看到一个超过 90000 以发布带有值的水印 30000 . 因此,您要么添加更多的数据,使每个分区都有一个时间戳大于的元素 90000 或者将kafka主题的分区数设置为 1 . 下面是有关水印和Kafka连接器的更多信息。

相关问题