我正在使用键控,非窗口流。尝试将上述流化对象缩减为一个,方法是将该对象中的变量求和并返回该对象,从而将两个值缩减为一。下面是一个例子
例如,有两个对象,object1和object2都有count变量。
object1 ->
name
count
object2 ->
name
count
flink键控的非窗口reduce函数实际上并没有还原和返回一个对象。它同时返回两个对象:object1原样和object2更新计数
deDupedStream.keyBy(msg -> keyConstruct())
.reduce((ReduceFunction<Object>) (value1, value2) -> {
value2.setCount(value1.getCount() + value2.getCount());
return value2;
})
上述输出:
actual:
value1 -> object with no change
value2 -> object with updated count
expectation:
value2 -> object with updated count
我所期望的只是返回更新的对象,而不是两者。这就是我假设的减少。有谁能建议我怎样才能只输出更新的那个吗?
更新:上述聚合步骤之前的逻辑:重复数据消除。
SingleOutputStreamOperator<Object> deDupedStream = inputStream.keyBy(new DeDupKeyConstructor())
.timeWindow(Time.milliseconds(timeWindowInMillis))
.reduce(new DedupReduceFunction(), new DedupProcessFunction());
重复处理函数.java
public class DedupProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
Class1 obj = in.iterator().next();
obj.setWindowStartTime(context.window().getStart());
out.collect(obj);
}
}
重复数据减少功能.java
public class DedupReduceFunction implements ReduceFunction<Class1> {
@Override
public Class1 reduce(Class1 value1, Class1 value2) throws Exception {
return value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
}
}
上面的流实际上是收集1个小时的数据并删除其中的重复项
下面,我尝试执行聚合。实际上我在尝试的是,我是否能够将这两种逻辑结合到一个窗口中。我们还假设两个键构造函数是相同的。如果我尝试将两者结合到一个窗口逻辑中,我可以减少作业中运算符的数量。这就是我如何开始玩,删除一个窗口,看看它的行为。如果不可能,我如何将两个逻辑组合成一个逻辑(假设密钥分组逻辑几乎相同(aggregatekeyconstructor中只有一个变量从dedupkeyconstructor中删除))。
SingleOutputStreamOperator<Object> aggregatedStream = deDupedStream
.keyBy(aggregateKeyConstructor())
.timeWindow(Time.milliseconds(timeWindowInMillis))
.reduce(new AggregateReduceFunction(), new AggregateProcessFunction());
聚合进程函数.java
public class AggregateProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
out.collect(in.iterator().next());
}
}
aggregatereducefunction.java文件
public class AggregateReduceFunction implements ReduceFunction<Class1> {
@Override
public Class1 reduce(Class1 value1, Class1 value2) {
Class1 value = value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
value.setCount(value1.getCount() + value2.getCount());
return value;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!