【Flink】Flink实验特性--reinterpretAsKeyedStream 将DataStream重新解释为KeyedStream

x33g5p2x  于2022-03-28 转载在 Flink  
字(8.2k)|赞(0)|评价(0)|浏览(555)

1.概述

1.1背景

这个实验特性应该是在Flink 1.5版本已经引进,但是直到现在(1.11)仍然是实验特性。官网对于它的描述 :这个特性仍然在不断的优化,目前是可能是不稳定、不兼容的,并且在以后的版本甚至发生大的改变。

1.2 作用

将DataStream重新解释为KeyedStream,这种方式可以避免shuffle

那么自然它的使用也会受到相应的约束,这个只能去重新解释那些已经预分区的DataStream。

1.3 官网例子

在源码中找到了这样一个测试代码,结果是:Tests passed

public class ReinterpretAsKeyedStreamDemo {

    public void reinterpretAsKeyedStream() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3);

        KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        });

        SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
                .reduce(new ReduceFunction<Integer>() {
                    @Override
                    public Integer reduce(Integer value1, Integer value2) throws Exception {
                        return value1 + value2;
                    }
                });

        reducer.addSink(new PrintSinkFunction<>());

        env.execute("xx");

    }

}

上面结果我们可以看到 输出了2 4 6 其实就是

但是我们其实有9条数据,1,2,3分别是3组数据,为什么少输出呢?

因为前面两组1,2,3已经结束了一个窗口,满足同一个key下有两个数据,然后最后一组的1,2,3,并不满足有两个数据,无法触发窗口。

为了方便理解我们再次修改如下代码:

public void reinterpretAsKeyedStream() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3,2);

        KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        });

        SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
                .reduce(new ReduceFunction<Integer>() {
                    @Override
                    public Integer reduce(Integer value1, Integer value2) throws Exception {
                        return value1 + value2;
                    }
                });

        reducer.addSink(new PrintSinkFunction<>());

        env.execute("xx");

    }

运行结果如下

2
4
6
4

我们数据源数据里面最后加入了一个2,然后最后输出多了一个4。当然这个是countwindow的使用,因为官网例子给的不明确,这里只是简单给大家补充一下,便于理解,避免初次使用产生太多疑问。

1.3.1 实战Demo分析

代码功能:

  1. 从文件中读取数据然后构建ds1:DataStream[Event]流,然后输出文件数据;
  2. 接着ds1流会根据Event的字段 key 进行keyby操作,使用一个窗口大小为2的CountWindow,然后保留这两条数据中 partition 字段值最大的一条数据,构建数据流ds2:DataStreamp[Event]
  • 最后我们继续使用一个窗口大小为2的CountWindow,然后对窗口内两条数据处理:

  • 如果两条数据的event_type字段值不等,那么我们使用第一条数据的值去创建一个Event对象,然后新数据Event对象的event_type字段设置为3,并且把字段 v 设置为两个数据的字段 v 的字符串拼接;

  • 如果event_type字段值相等,那么我们保留time_字段值大的一条数据。

第三步中,正常情况我们会对ds2进行keyby然后继续按照key 字段值hash,这样会产生相应的Shuffle,但是通过使用本文的实验特性reinterpretAsKeyedStream,可以避免Shuffle。

// scala
object SessionwindowingOriginal {
// 主函数
  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(Level.WARN)
    val params = ParameterTool.fromArgs(args)
    val env = StreamExecutionEnvironment.createLocalEnvironment(2)

    env.getConfig.setGlobalJobParameters(params)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(2)
    env.setMaxParallelism(2)

    // 从文件读取数据 ds1:DataStream[Event]类型
    val ds1 = env.readTextFile("/Users/hehuiyuan/gitwarehouse/flinksql/src/main/resources/f1").map(e => {
      val l = e.split(",")
      val (key, time_, event_type, v, partition) = (l(0).trim, l(1).trim.toLong, l(2).trim.toInt, l(3).trim, l(4).trim)
      Event(key, time_, event_type, v, partition)
    }).name("f1_source")
    //输出原始数据
    ds1.addSink(new SinkFunction[Event] {
      override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("原始数据:"+value.toString)
    }).name("origin_data_sink")

    //按照event对象的key字段分组
    // 相同key下 窗口大小数据量是2,然后取partition字段取最大的数据
    val ds2 = ds1.keyBy(_.key).countWindow(2).max("partition")
    // 输出ds2:DataStream[Event]
    ds2.addSink(new SinkFunction[Event] {
      override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("ds2:"+value.toString)
    }).name("ds2")

    //ds2是DataSteam,ds1按照字段key分区处理后得到的流
    //此时还想继续使用KededStream的一些操作,需要把ds2进行keyby
    // 但是会存在shuffle,key不变情况下,我们可以直接把DataStream变为KeyedStream
    val aggregated = new DataStreamUtils(ds2)
      .reinterpretAsKeyedStream((event) => event.key)
      .countWindow(2)
      .reduce((e1, e2) =>
        if(e1.event_type != e2.event_type)
          Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
        else if(e2.time_ > e1.time_) e2
        else e1
      )
      .addSink(new SinkFunction[Event] {
        override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
          System.out.println(value.toString)
      }).name("result")

    env.execute()
  }

}

我们看一下读取的文件中的数据样式:

每一行都会被封装到一个Event对象中,然后构成DataStream。

//创建一个Pojo类,4个字段
case class Event(
                  key: String,
                  time_ : Long,
                  event_type: Int,
                  v: String,
                  partition: String
                )

Event对象有五个字段,会使用逗号分割

输出结果:

原始数据:Event(a,1,1,banana,0)
原始数据:Event(b,21,2,tomato,0)
原始数据:Event(c,12,1,apple,0)
原始数据:Event(d,10,2,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
原始数据:Event(a,3,1,ba,1)
原始数据:Event(b,11,2,to,0)
原始数据:Event(c,42,1,ap,0)
原始数据:Event(d,20,2,or,0)
原始数据:Event(e,111,2,wa,0)
ds2:Event(a,1,1,banana,1)
ds2:Event(b,21,2,tomato,0)
ds2:Event(c,12,1,apple,0)
ds2:Event(d,10,2,orange,0)
ds2:Event(e,101,1,watermeleon,0)
原始数据:Event(a,2,1,ba,0)
原始数据:Event(b,2,2,to,0)
原始数据:Event(c,88,1,ap,0)
原始数据:Event(d,44,2,or,0)
原始数据:Event(e,11,2,wa,0)
原始数据:Event(a,33,1,banana,0)
原始数据:Event(b,21,2,tomato,1)
原始数据:Event(c,55,2,apple,0)
原始数据:Event(d,66,1,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
ds2:Event(a,2,1,ba,0)
ds2:Event(b,2,2,to,1)
Event(a,2,1,ba,0)
Event(b,21,2,tomato,0)
ds2:Event(c,88,1,ap,0)
Event(c,88,1,ap,0)
ds2:Event(d,44,2,or,0)
Event(d,44,2,or,0)
ds2:Event(e,11,2,wa,0)
Event(e,101,3,wa_watermeleon,0)

我们拿其中一个输出结果的数据简单分析一下:(上面最后一行)

Event(e,101,3,wa_watermeleon,0)

那么这个数据是如何输出的呢?

我们会发现ds1经过keyby以及counwindow后的max处理以后,留下了两条数据:

ds2:Event(e,101,1,watermeleon,0)
ds2:Event(e,11,2,wa,0)

紧接着,把数据流ds2转为keyedStream,然后又做了一次CountWindow操作,窗口大小是2,具体实现的代码我们下面分析,这里先把结果分析完:

因为上面对于key = e 下,满足了两条数据,也就是满足了countwindow的触发计算,这个时候会对这两个数据处理,根据我们第三步功能描述可知处理如下:

event_type = 1 

event_type = 2

这两条数据的该字段不等,根据(3.1)可知,会创建一个新的Event对象,该对象的 event_type = 3, v = wa_watermeleon(两条数据的该字段的字符串拼接构成)

最终得到如下结果:

Event(e,101,3,wa_watermeleon,0)

最后,我们对ds2使用了本文的主要介绍的特性reinterpretAsKeyedStream进行分析,这个方法在DataStreamUtils中。

使用reinterpretAsKeyedStream的代码:

val aggregated = new DataStreamUtils(ds2)
  .reinterpretAsKeyedStream((event) => event.key)
  .countWindow(2)
  .reduce((e1, e2) =>
    if(e1.event_type != e2.event_type)
      Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
    else if(e2.time_ > e1.time_) e2
    else e1
  )
  .addSink(new SinkFunction[Event] {
    override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
      System.out.println(value.toString)
  }).name("result")

不用reinterpretAsKeyedStream的代码:

val aggregated = ds2

  .keyBy(_.key)

  .countWindow(2)
  .reduce((e1, e2) =>
    if(e1.event_type != e2.event_type)
      Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
    else if(e2.time_ > e1.time_) e2
    else e1
  )
  .addSink(new SinkFunction[Event] {
    override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
      System.out.println(value.toString)
  }).name("result")

在这里就涉及到使用reinterpretAsKeyedStream的优势了,可能代码你无法更好的体会,我们通过StreamGraph来了解这两者的区别:

图片可能有点小,我们把关键地方放大查看:

在这里我们可以发现,同样是Window Operator,但是第一个Window Operator 的数据是通过上游HASH过来的,第二个是通过FORWARD方式过来

两个Operator之间的边展示的关键词,其实展示了两个算子之间数据是如何传输的,在之前的文章提到过关于partition的概念以及Flink已经提供的实现,此处阅读 。

2.重点源码分析

public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
    DataStream<T> stream,
    KeySelector<T, K> keySelector,
    TypeInformation<K> typeInfo) {

    PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(
      stream.getTransformation(),
      new ForwardPartitioner<>());

    return new KeyedStream<>(
      stream,
      partitionTransformation,
      keySelector,
      typeInfo);
  }

上面代码是 方法reinterpretAsKeyedStream的具体实现,最后我们可以看到return了一个KeyedStream流,创建这个流的时候首先创建了PartitionTransformation对象,其中使用了ForwardPartitioner分区器,那么FORWARD其实也是来源于此。

我们看一下keyby操作如何生产KeyedStream的:

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
    Preconditions.checkNotNull(key);
    Preconditions.checkNotNull(keyType);
    return new KeyedStream<>(this, clean(key), keyType);
  }
  
 public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
    this(
      dataStream,
      new PartitionTransformation<>(
        dataStream.getTransformation(),
        new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
      keySelector,
      keyType);
  }

上面代码我们可以看出,keyby同样构建keyedStream流,但是使用的分区器是KeyGroupStreamPartitioner。

M.概述

转载:Flink实验特性–reinterpretAsKeyedStream

相关文章