为什么在flink会话窗口中每个键都有多个窗口

x4shl7ld  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(355)

我的要求是检查会话中的开始事件和成功事件。当然,我使用会话窗口,但似乎每个键都有重叠的窗口。我在网上搜索过,不知道为什么。
数据格式: myForm(timestamp, roomId, role, sessionId, event) ,例如:

myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605  
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844  
myform(1559129977, 456, kid, 38239, begin) # timestamp equals to 2019-05-29 19:39:37  
...

会话可能只有一对开始和成功事件,也可能有几对开始和成功事件。
活动可能会迟到,最多允许迟到3分钟。
我的钥匙是 roomId + role + sessionId 就像“123\u kid\u 37890”, seesionGap 是60年代

// use event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = ... // from kafka, steam of myform
val sessionStream = stream
    .assignTimestampsAndWatermarks(new MyFormEventWatermarks(0L))
    .keyBy(mf => mf.roomId + "_" + mf.role + "_" + mf.sessionId)
 .window(EventTimeSessionWindows.withGap(Time.milliseconds(60 * 1000L))      
.allowedLateness(Time.minutes(3))
.apply(myFormWindowFunction)

//MyFormEventWatermarks is :
class MyFormEventWatermarks[T <: AbstractForm](dely: Long) extends AssignerWithPeriodicWatermarks[T] {

  private var currentMaxTimestamp = Long.MinValue
  val maxOutOfOrderness = dely

  @transient
  var waterMark : Watermark = null

  override def getCurrentWatermark: Watermark = {
    if (currentMaxTimestamp == Long.MinValue){
      waterMark = new Watermark(Long.MinValue)
      waterMark
    }
    else{
      waterMark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      waterMark
    }
  }

  override def extractTimestamp(data: T, previousElementTimestamp: Long): Long = {
    val timestamp = data.timestamp
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
    timestamp
  }

}

//window func is 
class myFormWindowFunction extends RichWindowFunction ... {
    ...
    override def apply(key: String, window: TimeWindow, input: Iterable[myForm], out: Collector[List[myForm]]): Unit = {
        println("window is " + window.getStart() + "-" + window.getEnd() + "|" + data.tostring)

    }
    ...

}

in方法 applymyFormWindowFunction ,结果 println 比如:

// like this session data:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605  
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844

我有一扇Windows 2019-05-29 19:22:22.605- 2019-05-29 19:23:22.605 ,数据为 myform(1559128942, 123, kid, 37890, begin) ,然后我得到了第二个窗口 2019-05-29 19:22:22.605 - 2019-05-29 19:23:24.844 数据是 myform(1559128942, 123, kid, 37890, begin), myform(1559128944, 123, kid, 37890, success) . 它看起来像窗口初始化到(2019-05-29 19:22:22.605,2019-05-29 19:23:22.605)和(2019-05-29 19:22:24.844,2019-05-29 19:23:24.844),以及 onMerge 方法合并但不“删除”窗口(2019-05-29 19:22:22.605,2019-05-29 19:23:22.605)。我查过函数的源代码 EventTimeSessionWindows 还有flink会话窗口的例子,还不知道程序哪里出错了?

dsf9zpds

dsf9zpds1#

在允许的延迟时间间隔内分配给窗口的事件的默认行为是在每个延迟事件添加到窗口时触发该窗口,但是也可以实现一个自定义触发器,在允许的延迟时间到期时触发,而不是在其他触发事件的同时触发。
请注意,对于会话窗口,延迟到达的事件可能会导致延迟合并。
您可能需要考虑水印延迟和允许延迟之间的权衡。因为您的水印延迟为零,所以您可能会有相当多的延迟事件(每次事件流不是完全按时间戳排序时)。例如,如果您使用3分钟作为水印延迟,并将allowed lateness设置为零,那么您将生成相同的最终结果,但是没有延迟触发和延迟合并,但是在每个窗口的初始触发之前有3分钟的延迟。

uqjltbpv

uqjltbpv2#

我发现了问题,我误解了允许性。当它被使用时,窗口被保存,当窗口+允许延迟时间到达时,窗口将被再次触发。

相关问题