flink reduce键控非窗口流

i2loujxw  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(215)

我正在使用键控,非窗口流。尝试将上述流化对象缩减为一个,方法是将该对象中的变量求和并返回该对象,从而将两个值缩减为一。下面是一个例子
例如,有两个对象,object1和object2都有count变量。

object1 -> 
    name
    count
object2 ->
    name
    count

flink键控的非窗口reduce函数实际上并没有还原和返回一个对象。它同时返回两个对象:object1原样和object2更新计数

deDupedStream.keyBy(msg -> keyConstruct())
.reduce((ReduceFunction<Object>) (value1, value2) -> {
    value2.setCount(value1.getCount() + value2.getCount());
    return value2;
})

上述输出:

actual:
value1 -> object with no change
value2 -> object with updated count

expectation:
value2 -> object with updated count

我所期望的只是返回更新的对象,而不是两者。这就是我假设的减少。有谁能建议我怎样才能只输出更新的那个吗?
更新:上述聚合步骤之前的逻辑:重复数据消除。

SingleOutputStreamOperator<Object> deDupedStream = inputStream.keyBy(new DeDupKeyConstructor())
                .timeWindow(Time.milliseconds(timeWindowInMillis))
                .reduce(new DedupReduceFunction(), new DedupProcessFunction());

重复处理函数.java

public class DedupProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {

    @Override
    public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
        Class1 obj = in.iterator().next();
        obj.setWindowStartTime(context.window().getStart());
        out.collect(obj);
    }
}

重复数据减少功能.java

public class DedupReduceFunction implements ReduceFunction<Class1> {
    @Override
    public Class1 reduce(Class1 value1, Class1 value2) throws Exception {
        return value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
    }
}

上面的流实际上是收集1个小时的数据并删除其中的重复项
下面,我尝试执行聚合。实际上我在尝试的是,我是否能够将这两种逻辑结合到一个窗口中。我们还假设两个键构造函数是相同的。如果我尝试将两者结合到一个窗口逻辑中,我可以减少作业中运算符的数量。这就是我如何开始玩,删除一个窗口,看看它的行为。如果不可能,我如何将两个逻辑组合成一个逻辑(假设密钥分组逻辑几乎相同(aggregatekeyconstructor中只有一个变量从dedupkeyconstructor中删除))。

SingleOutputStreamOperator<Object> aggregatedStream = deDupedStream
.keyBy(aggregateKeyConstructor())
.timeWindow(Time.milliseconds(timeWindowInMillis))
.reduce(new AggregateReduceFunction(), new AggregateProcessFunction());

聚合进程函数.java

public class AggregateProcessFunction extends ProcessWindowFunction<Class1, Class1, String, TimeWindow> {

    @Override
    public void process(String key, Context context, Iterable<Class1> in, Collector<Class1> out) throws Exception {
        out.collect(in.iterator().next());
    }
}

aggregatereducefunction.java文件

public class AggregateReduceFunction implements ReduceFunction<Class1> {
    @Override
    public Class1 reduce(Class1 value1, Class1 value2) {
        Class1 value =  value1.getEventTimeStamp() > value2.getEventTimeStamp() ? value1 : value2;
        value.setCount(value1.getCount() + value2.getCount());
        return value;
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题