flink对数据流进行排序

1l5u6lss  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(1301)

我正在学习flink,我开始使用datastream进行简单的字数计算。为了增强处理,我过滤了输出,只显示找到3个或更多单词的结果。

DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9000)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .apply(new MyWindowFunction())
            .sum(1)
            .filter(word -> word.f1 >= 3);

我想创建一个windowfunction,根据找到的单词的值对输出进行排序。我试图实现的windowfunction根本无法编译。我正在努力定义windowfunction接口的apply方法和参数。

public static class MyWindowFunction implements WindowFunction<
        Tuple2<String, Integer>, // input type
        Tuple2<String, Integer>, // output type
        Tuple2<String, Integer>, // key type
        TimeWindow> {

    void apply(Tuple2<String, Integer> key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {

        String word = ((Tuple2<String, Integer>)key).f0;
        Integer count = ((Tuple2<String, Integer>)key).f1;

        .........
        out.collect(new Tuple2<>(word, count));
    }
}
34gzjxbg

34gzjxbg1#

下面是一个运行示例。我没花一年时间就解决了。我刚刚在我的机器上找到了以前没有发布过的解决方案=)

/**
 * on the terminal execute "nc -lk 9000", run this class and type words back on the terminal
 */
public class SocketWindowWordCountJava {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure event time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//        DataStream<Tuple2<String, Integer>> dataStream = env
//                .socketTextStream("localhost", 9000)
//                .map(new UpperCaserMap())
//                .flatMap(new SplitterFlatMap())
//                .keyBy(0) // select the first value as a key
//                .timeWindow(Time.seconds(5)) // use this if Apache Flink server is up
//                .sum(1) // reduce to sum all values with same key
//                .filter(word -> word.f1 >= 3) // use simple filter
//                .filter(new FilterWordCount()) // use more elaborated filter
//                ;

        /*
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .map(new UpperCaserMap())
                .flatMap(new SplitterFlatMap())
                .keyBy(new SumWordSelect()) // select the first value as a key using the KeySelector class
                .reduce(new SumWordsReduce()) // using ReduceFunction
        ;*/

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                .map(new UpperCaserMap())
                .flatMap(new SplitterFlatMap())
                .keyBy(new SumWordSelect()) // select the first value as a key using the KeySelector class
                .timeWindow(Time.seconds(5)) // use this if Apache Flink server is up
                .reduce(new SumWordsReduce(), new FilterWindowFunction());

//        DataStream<Tuple2<String, Integer>> dataStream = env
//                .socketTextStream("localhost", 9000)
//                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
//                .map(new UpperCaserMap())
//                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
//                .apply(new SplitterAllWindowFunction());

        dataStream.print();
        // .setParallelism(1);

        env.execute("Window WordCount");
    }

    public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<String> {
        private final long maxOutOfOrderness = 3500; // 3.5 seconds
        private long currentMaxTimestamp;
        @Override
        public long extractTimestamp(String element, long previousElementTimestamp) {
            long timestamp = System.currentTimeMillis();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }

    public static class SumWordSelect implements KeySelector<Tuple2<String, Integer>, String> {
        @Override
        public String getKey(Tuple2<String, Integer> value) {
            return value.f0;
        }
    }

    public static class SumWordsReduce implements ReduceFunction<Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) {
            Integer sum = v1.f1 + v2.f1;
            return new Tuple2<String, Integer>(v1.f0, sum);
        }
    }

    public static class UpperCaserMap implements MapFunction<String, String> {
        @Override
        public String map(String value) throws Exception {
            return value.toUpperCase();
        }
    }

    public static class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class SplitterAllWindowFunction implements AllWindowFunction<String, Tuple2<String, Integer>, TimeWindow> {

        @Override
        public void apply(TimeWindow window, Iterable<String> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : values) {
                String[] tokens = word.split(" ");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    }

    public static class FilterWordCount implements FilterFunction<Tuple2<String, Integer>> {
        @Override
        public boolean filter(Tuple2<String, Integer> value) throws Exception {
            return value.f1 > 3;
        }
    }

    public static class ReduceWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            Integer sum = 0;
            for (Tuple2<String, Integer> input : inputs) {
                sum = sum + input.f1;
            }
            out.collect(new Tuple2<>(key, sum));
        }
    }

    public static class FilterWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            // Integer value = 0;
            for (Tuple2<String, Integer> input : inputs) {
                // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
                out.collect(new Tuple2<>(key, input.f1));
            }
        }
    }
}
3mpgtkmj

3mpgtkmj2#

这个 .sum(1) 方法将完成您需要的所有操作(不需要使用 apply() ),只要 Splitter 类(应该是 FlatMapFunction )正在发射 Tuple2<String, Integer> 记录,在哪里 String 是这个词,而且 Integer 总是 1 .
那么呢 .sum(1) 将为您进行聚合。如果你需要不同的东西 sum() 是的,你通常会用 .reduce(new MyCustomReduceFunction()) ,因为这将是最有效和可扩展的方法,因为不需要在内存中缓冲很多。

相关问题