【04】Flink 之 DataStream API(二):Transformations 操作

x33g5p2x  于2021-12-25 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(328)

1、DataStream API Transformations 操作

Transformations 常见API:

  • map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
  • flatmap:输入一个元素,可以返回零个,一个或者多个元素
  • filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
  • keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区
  • Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
  • Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
  • Split:根据规则把一个数据流切分为多个流,
  • Select:和 Split 配合使用,选择切分后的流
  • reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值- 进行聚合操作,然后返回一个新的值
  • aggregations:sum(),min(),max()等

2、map()、flatmap()

同Spark中的操作

3、filter()

3.1、Java代码实现filter()

package com.Streaming.StreamAPI;

import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @Author: Henry
 * @Description: Filter演示
 * @Date: Create in 2019/5/12 13:57
 **/
public class StreamingDemoFilter {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("原始接收到数据:" + value);
                return value;
            }
        });

        //执行filter过滤,满足条件的数据会被留下
        DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {
            //把所有的奇数过滤掉
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2 == 0;
            }
        });

        DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("过滤之后的数据:" + value);
                return value;
            }
        });

        //每2秒钟处理一次数据
        DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);

        //打印结果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoFilter.class.getSimpleName();
        env.execute(jobName);
    }
}

3.2、运行结果

将满足条件的数据进行保留

3.3、Scala代码实现filter()

package cn.Streaming.StreamAPI

import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Author: Henry
  * @Description: 过滤偶数并求和
  * @Date: Create in 2019/5/14 22:03 
  **/
object StreamingDemoFilter {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MyNoParallelSourceScala)

    val mapData = text.map(line=>{
      println("原始接收到的数据:"+line)
      line
    }).filter(_ % 2 == 0)

    val sum = mapData.map(line=>{
      println("过滤之后的数据:"+line)
      line
    }).timeWindowAll(Time.seconds(2)).sum(0)

    sum.print().setParallelism(1)

    env.execute("StreamingDemoWithMyNoParallelSourceScala")
  }
}

3.4、运行结果

4、keyBy()

4.1、keyBy()典型用法

keyBy()中的数据类型要不是tuple,要不是自定义类型,如Long就不可以进行
keyBy操作
两种典型用法:

  1. dataStream.keyBy(“someKey”) // 指定对象中的 "someKey"字段作为分组key
  2. dataStream.keyBy(0) // 指定Tuple中的第一个元素作为分组key

注意:以下类型是无法作为key的

  1. 一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法
  2. 一个任意形式的数组类型
  3. 基本数据类型,int,long

4.2、keyBy()实现代码

同SocketWordCount代码中使用部分:

// 3.2、将输入的文本分为不相交的分区,每个分区包含的都是具有相同key的元素。
        // 也就是说,相同的单词被分在了同一个区域,下一步的reduce就是统计分区中的个数
        .keyBy("word")

4.3、运行结果

同SocketWordCount

5、Union

5.1、Union使用

合并多个数据流(可以大于2个流),新的流会包含所有流中的数据,但是union是一个限制,
就是所有合并的流类型必须是一致的。

DATAStreamSource<Long> text1 = env.addsource(...)
DATAStreamSource<Long> text2 = env.addsource(...)
// 把text1和text2组装到一起
DATAStreamSource<Long> text = text1.union(text2)

5.2、Java实现代码Union

package com.Streaming.StreamAPI;

import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * @Author: Henry
 * @Description: 合并多个流,新的流会包含所有流中的数据,
 *               但是union是一个限制,就是所有合并的流类型必须是一致的
 * @Date: Create in 2019/5/12 13:58
 **/
public class StreamingDemoUnion {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        //把text1和text2组装到一起
        DataStream<Long> text = text1.union(text2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("原始接收到数据:" + value);
                return value;
            }
        });

        //每2秒钟处理一次数据
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2))
                .sum(0);

        //打印结果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoUnion.class.getSimpleName();
        env.execute(jobName);
    }
}

5.3、运行结果

5.4、Scala实现代码Union

package cn.Streaming.StreamAPI

import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Author: Henry
  * @Description:
  * @Date: Create in 2019/5/14 22:09 
  **/
object StreamingDemoUnionScala {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text1 = env.addSource(new MyNoParallelSourceScala)
    val text2 = env.addSource(new MyNoParallelSourceScala)

    val unionall = text1.union(text2)

    val sum = unionall.map(line=>{
      println("接收到的数据:"+line)
      line
    }).timeWindowAll(Time.seconds(2)).sum(0)

    sum.print().setParallelism(1)

    env.execute("StreamingDemoWithMyNoParallelSourceScala")
  }
}

5.5、运行结果

6、Connect

6.1、Connect使用

Connect和union类似,但是只能连接两个流,两个流的数据类型可以不同,
会对两个流中的数据应用不同的处理方法。
CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap。

6.2、Java代码实现Connect

package com.Streaming.StreamAPI;

import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @Author: Henry
 * @Description:  connect和union类似,但是只能连接两个流,
 *                两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
 *
 * @Date: Create in 2019/5/12 13:58
 **/
public class StreamingDemoConnect {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "str_" + value;
            }
        });

        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);

        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
            // CoMapFunction 需要实现两个 map 方法
            // map1 Long 类型,map2 是 String 类型,即分别处理不同的两种类型数据
            @Override
            public Object map1(Long value) throws Exception {
                return value;
            }

            @Override
            public Object map2(String value) throws Exception {
                return value;
            }
        });

        //打印结果
        result.print().setParallelism(1);

        String jobName = StreamingDemoConnect.class.getSimpleName();
        env.execute(jobName);
    }
}

6.3、运行结果

分别处理Long和String类型的两种数据:

6.3、Scala代码实现Connect

package cn.Streaming.StreamAPI

import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * @Author: Henry
  * @Description:
  * @Date: Create in 2019/5/14 22:14 
  **/
object StreamingDemoConnectScala {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text1 = env.addSource(new MyNoParallelSourceScala)
    val text2 = env.addSource(new MyNoParallelSourceScala)

    val text2_str = text2.map("str" + _)
    val connectedStreams = text1.connect(text2_str)
    val result = connectedStreams.map(
      line1=>{line1},
      line2=>{line2} )

    result.print().setParallelism(1)

    env.execute("StreamingDemoWithMyNoParallelSourceScala")
  }
}

6.4、运行结果

7、Split

7.1、Split使用

根据规则把一个数据流切分为多个流

  • 应用场景:
    可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
    把一个数据流切分成多个数据流,这样每个数据流就可以使用不同的处理逻辑了

7.2、Java代码实现

package com.Streaming.StreamAPI;

import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.ArrayList;

/**
 * @Author: Henry
 * @Description: 根据规则把一个数据流切分为多个流
 * @Date: Create in 2019/5/12 13:57
 **/
public class StreamingDemoSplit {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1

        //对流进行切分,按照数据的奇偶性进行区分
        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
            @Override
            public Iterable<String> select(Long value) {
                ArrayList<String> outPut = new ArrayList<>();
                if (value % 2 == 0) {
                    outPut.add("even");//偶数
                } else {
                    outPut.add("odd");//奇数
                }
                return outPut;
            }
        });

        //选择一个或者多个切分后的流
        DataStream<Long> evenStream = splitStream.select("even");
        DataStream<Long> oddStream = splitStream.select("odd");

        DataStream<Long> moreStream = splitStream.select("odd","even");
        
        //打印结果
        moreStream.print().setParallelism(1);

        String jobName = StreamingDemoSplit.class.getSimpleName();
        env.execute(jobName);
    }
}

7.3、运行结果

打印偶数数据流:

7.4、Scala代码实现

//		关键代码处理部分
    val splitStream = text.split(new OutputSelector[Long] {
      override def select(value: Long) = {
        // 这里的list需要使用Java中的ArrayList类型
        val list = new util.ArrayList[String]()
        if(value%2 == 0){
          list.add("even")// 偶数
        }else{
          list.add("odd")// 奇数
        }
        list
      }
    })

7.5、运行结果

打印偶数数据流:

相关文章