流处理—flink是否可以生成聚合/滚动/累积数据的每小时快照?

flmtquvp  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(344)

流处理的文本书示例是一个带时间戳的字数计算程序。使用以下数据示例

mario 10:00
luigi 10:01
mario 11:00
mario 12:00

我看过在以下几年里制作的字数统计程序:
总数据集

mario 3
luigi 1

一组时间窗口分区

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1

但是,我还没有找到一个滚动时间窗口上的字数计算程序示例,即我希望从时间开始每小时为每个字生成一个字数:

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1

apache flink或任何其他流处理库都可以这样做吗?谢谢!
编辑:
到目前为止,我尝试了davidanderson方法的一个变体,只改变了事件时间的处理时间,因为数据是timestsamped。但它并没有像我预期的那样起作用。下面是代码、示例数据、它提供的结果以及我的后续问题:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);

    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp - 1);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                    }

                    long l = ((value.getTimestamp() / 10) + 1) * 10;
                    ctx.timerService().registerEventTimeTimer(l);

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static long fileCounter = 0;

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            out.collect(new TimestampedWord(value, fileCounter++));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
    }
}

一个包含以下单词的文件,每个单词都在一个新行中:
mario,luigi,mario,mario,vilma,fred,bob,bob,mario,dan,dylan,dylan,fred,mario,mario,carl,bambam,summer,anna,anna,edu,anna,anna,anna,anna,anna,anna
生成以下输出:

mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807

显然出了问题。我数到三了 anna 在20岁的时候,即使这个词的第三个示例 anna 直到位置22才出现。奇怪的是 edu 只出现在上一个快照中,即使它以前出现过 anna 第三个案子。即使没有消息到达(即应生成相同的数据),如何触发每10个“时间单位”生成一个快照?
如果有人能给我指出正确的方向,我会非常感激的!

ivqmmu1c

ivqmmu1c1#

是的,这不仅可以和Flink一起做,而且很容易。您可以使用keyedprocessfunction来实现这一点,该函数将计数器保持在keyed状态,表示到目前为止每个字/键在输入流中出现的次数。然后使用计时器触发报告。
下面是一个使用处理时间计时器的示例。它每10秒打印一份报告。

public class DSExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
            .keyBy(x -> x)
            .process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
                private transient ValueState<Integer> counter;

                @Override
                public void open(Configuration parameters) throws Exception {
                    counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
                }

                @Override
                public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
                    if (counter.value() == null) {
                        counter.update(0);
                        long now = context.timerService().currentProcessingTime();
                        context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    }
                    counter.update(counter.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
                    long now = context.timerService().currentProcessingTime();
                    context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
                }
            })
            .print();

        env.execute();
    }
}

更新时间:
使用事件时间总是更好,但这确实增加了复杂性。大多数增加的复杂性来自于这样一个事实,即在实际应用程序中,您很可能必须处理无序事件——在您的示例中,您已经避免了这种情况,因此在本例中,我们可以通过一个相当简单的实现来解决。
如果你改变两件事,你会得到你期望的结果。首先,将水印设置为 extractedTimestamp - 1 是结果错误的原因(例如,这就是为什么anna=3=20)。如果将水印设置为 extractedTimestamp 相反,这个问题会消失。
说明:正是第三个安娜的到来,创造了在时间20关闭窗口的水印。第三个anna的时间戳是21,因此在流中后跟一个20的水印,它关闭第二个窗口并生成anna=3的报告。是的,第一个edu来得更早,但它是第一个edu,时间戳是20。在edu到达的时候,没有为edu设置计时器,并且创建的计时器被正确地设置为30点启动,所以直到至少30的水印到达,我们才听说edu。
另一个问题是定时器逻辑。flink为每个键创建一个单独的计时器,并且每次启动计时器时都需要创建一个新的计时器。否则,您将只收到有关窗口中到达的单词的报告。您应该修改代码,使其更像这样:

@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    if (count.value() == null) {
        count.update(0L);
        setTimer(ctx.timerService(), value.getTimestamp());
    }

    count.update(count.value() + 1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    long currentWatermark = ctx.timerService().currentWatermark();
    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
    if (currentWatermark < Long.MAX_VALUE) {
        setTimer(ctx.timerService(), currentWatermark);
    }
}

private void setTimer(TimerService service, long t) {
    service.registerEventTimeTimer(((t / 10) + 1) * 10);
}

通过这些更改,我得到了以下结果:

mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807

现在,如果你真的需要处理无序事件,这会变得更加复杂。有必要使水印滞后于时间戳一个实际的量,反映流中存在的无序的实际量,这就需要能够处理一次打开多个窗口的情况。任何给定的事件/字可能不属于下一个将关闭的窗口,因此不应递增其计数器。例如,您可以将这些“早期”事件缓冲在另一个状态(例如liststate)中,或者以某种方式维护多个计数器(可能在mapstate中)。此外,有些事件可能会延迟,从而使早期的报告无效,您需要定义一些策略来处理这些事件。

相关问题