我正在尝试使用 EventTime
. 我将从Kafka消费。我的数据可能会延迟一分钟(因此我的水印会延迟一分钟)并且出现故障。我有30秒的窗口。我有以下设置:
Kafka消费者.scala
object KafkaConsumer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = getServerProperties
val consumer = new FlinkKafkaConsumer010[ObjectNode](getKafkaTopic, new JsonNodeDeserializationSchema, properties)
consumer.setStartFromGroupOffsets()
val stream = env.addSource(consumer)
.assignTimestampsAndWatermarks(new WMAssigner)
stream
.keyBy { jsonEvent =>
val key = jsonEvent.findValue("key").toString replaceAll("\"","")
key.toString
}
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.process { new SessionWindowProcessor }
.print
env.execute("EventTime Test")
}
}
wmassigner.scala文件
class WMAssigner extends AssignerWithPeriodicWatermarks[ObjectNode] {
var currentMaxTimestamp: Long = 0
var currentWaterMark: Long = 0
override def extractTimestamp(element: ObjectNode, previousElementTimestamp: Long): Long = {
val lStr = element.findValue("ts").toString replaceAll("\"", "")
currentMaxTimestamp = if(currentMaxTimestamp > lStr.toLong) currentMaxTimestamp else lStr.toLong
currentMaxTimestamp
}
override def getCurrentWatermark: Watermark = {
currentWaterMark = if(currentMaxTimestamp - 60000 < 0) 0 else currentMaxTimestamp - 60000
new Watermark(currentWaterMark)
}
}
会话窗口处理器.scala
class SessionWindowProcessor extends ProcessWindowFunction[ObjectNode,Long,String,TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[Long]): Unit = {
println("Processing!")
var maxVal: Long = 0
elements.foreach( value => {
val valStr = value.findValue("value").toString replaceAll("\"","")
maxVal = if(valStr.toLong > maxVal) valStr.toLong else maxVal
})
out.collect(maxVal)
}
}
样本数据:
"{\"ts\": 0, \"key\": \"a\", \"value\": 0}",
"{\"ts\": 15000, \"key\": \"a\", \"value\": 1}",
"{\"ts\": 30000, \"key\": \"a\", \"value\": 2}",
"{\"ts\": 90001, \"key\": \"a\", \"value\": 3}"
我希望在第四条消息传入后,我的第一个窗口将触发,值为 1
或者 2
(我现在不确定包容性是如何发挥作用的)。不幸的是,我甚至没有看到 println
在 SessionWindowProcessor.scala
开火。是不是我做错了什么导致了我的开窗失败?
2条答案
按热度按时间pprl5pva1#
为了它的价值,你的
extractTimestamp()
方法应返回lstr.tolong,而不是currentmaxtimestamp。这个方法跟踪currentmaxtimestamp以便值可以用于水印生成,这是有意义的,但是extracttimestamp的作用是为每个流元素提供事件时间戳。然而,我不相信这解释了为什么您没有看到任何输出——当然,如果您的数据实际上是按时间戳排序的,就像在示例中一样。
另外请注意,您可以使用BoundedAutoFordernessTimestampExtractor,它的使用稍微简单一些。
k0pti3hp2#
你的Kafka主题有多少个分区?如果它有多个分区,问题是flink必须为每个分区看到一个超过
90000
以发布带有值的水印30000
. 因此,您要么添加更多的数据,使每个分区都有一个时间戳大于的元素90000
或者将kafka主题的分区数设置为1
. 下面是有关水印和Kafka连接器的更多信息。