Flink addSink自定义Sink

x33g5p2x  于2021-03-14 发布在 Flink  
字(1.5k)|赞(0)|评价(0)|浏览(1495)
package com.gosuncn;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class WordCountStreamingJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.fromElements("Apollo 1", "Paul 3", "Tom 2", "Paul 1", "Apollo 2", "Tom 4", "Marry 6", "Apollo 5", "Apollo 4", "Apollo 3");
        SingleOutputStreamOperator<Tuple2> stream = dataStreamSource.map((MapFunction<String, Tuple2>) item -> Tuple2.of(item, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
        stream.addSink(new CustomSinkFunction());
        env.execute("WordCountStreamingJob");
    }
}

/**
 * 自定义SinkFunction 
 */
class CustomSinkFunction implements SinkFunction {
    @Override
    public void invoke(Object value, Context context) throws Exception {
        System.out.println(value);
    }
}

/**
 * 自定义RichSinkFunction 
 */
class CustomSinkFunction extends RichSinkFunction {
    @Override
    public void invoke(Object value, Context context) throws Exception {
        // TODO 业务逻辑
    }
}

相关文章