我如何在Flink的两个不同的Kafka流上应用相同的模式?

2fjabf4q  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(168)

我的flink计划如下:

object WindowedWordCount {
  val configFactory = ConfigFactory.load()

  def main(args: Array[String]) = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val kafkaStream1 = env.addSource(new FlinkKafkaConsumer010[String](topic1, new SimpleStringSchema(), props))
      .assignTimestampsAndWatermarks(new TimestampExtractor)

    val kafkaStream2 = env.addSource(new FlinkKafkaConsumer010[String](topic2, new SimpleStringSchema(), props))
      .assignTimestampsAndWatermarks(new TimestampExtractor)

    val partitionedStream1 = kafkaStream1.keyBy(jsonString => {
      extractUserId(jsonString)
    })

    val partitionedStream2 = kafkaStream2.keyBy(jsonString => {
      extractUserId(jsonString)
    })

    //Is there a way to match the userId from partitionedStream1 and partitionedStream2 in this same pattern?
    val patternForMatchingUserId = Pattern.begin[String]("start")
        .where(stream1.getUserId() == stream2.getUserId()) //I want to do something like this

    //Is there a way to pass in partitionedStream1 and partitionedStream2 to this CEP.pattern function?
    val patternStream = CEP.pattern(partitionedStream1, patternForMatchingUserId)

    env.execute()
  }
}

在上面的flink程序中,我有两个流 partitionedStream1 以及 partitionedStream2 哪个是 keyedBy 用户ID。
我想以某种方式比较 patternForMatchingUserId 模式(类似于我上面展示的)。有没有办法分成两条溪流到河边 CEP.Pattern 功能?
像这样: val patternStream = CEP.pattern(partitionedStream1, partitionedStream2, patternForMatchingUserId)

7vux5j2d

7vux5j2d1#

你不可能把两条溪流都给我 CEP ,但您可以通过一个组合流。
如果两个流具有相同的类型/架构。你可以把他们结合起来。我相信这和你的情况相符。

partitionedStream1.union(partitionedStream2).keyBy(...)

如果他们有不同的模式。您可以使用内部的一些自定义逻辑将它们转换为一个流。 coFlatMap .

相关问题