我有一个管道,我想计算一个累计总和。这是密码
input
.apply(WithTimestamps.<Row>of(row -> Instant.ofEpochMilli(row.getInt64("my_timestamp"))))
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
.via((Row row) -> KV.of(row.getString("key"), row.getInt64("val"))))
.apply(Window.<KV<String, Long>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withTimestampCombiner(TimestampCombiner.LATEST))
.apply(Combine.<String, Long, Long>perKey(Sum.longsGlobally().getFn()));
kv元素各代表一个 Long
数一下 String
为特定的一天分组。对于每一组和每一天,只有一个kv条目。每个条目都有一个正确日期的时间戳。当我在 Combine
第一步,时间戳没问题。但是在那之后 Combine
步骤中,每个项目的时间戳 9223371950454775
.
我知道这与在窗口处理之后分配时间戳有关。我发现了这个问题,似乎也有同样的问题。但解决方案不起作用。每 TimestampCombiner
( EARLIEST
, LATEST
以及 END_OF_WINDOW
)我得到相同的时间戳 9223371950454775
.
暂无答案!
目前还没有任何答案,快来回答吧!