用flink匹配没有重复的最大模式

tpgth1q7  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(237)

我正在尝试使用flink完成一个小型poc,第一步是评估一个事件流,并不断输出不包含重复项的最大活动组。我正试图用cep模式匹配来实现这一点。
例如,对于以下输入:

>a
>a
>b
>c
>a
>c

我希望输出:

a
a
a:b
a:b:c
b:c:a
a:c

我能得到的最接近的结果是:

a
a
a:b
a:b:c
b:c:a
b:c
b
c:a
a:c
a
c

当初始模式继续增长时,它看起来很好,但是一旦看到重复,我就会收到比我想要的更多的结果。本例使用skiptofirst策略;我原以为其他人会更有帮助,但结果却不太理想。
这感觉应该很容易解决,但我还没有找到正确的组合选项,让它工作。
以下是我最初的方法的细节:

public static void cep() throws Exception {
  log.info("Initializing cep processor");

  String inputTopic = "inputTopic";
  String outputTopic = "outputTopic";
  String consumerGroup = "testGroup";
  String address = "localhost:9092";

  StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

  log.info("Creating consumer");
  FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
  flinkKafkaConsumer.setStartFromLatest();

  log.info("Creating producer");
  FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);

  log.info("Configuring sources");
  DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);

  log.info("Processing kafka messages");
  AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("start");
  Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
      .oneOrMore()
      .until(new IterativeCondition<>() {
        @Override
        public boolean filter(String s, Context<String> context) throws Exception {
          return StreamSupport.stream(context.getEventsForPattern("start").spliterator(), false)
              .anyMatch(state -> state.equals(s));
        }
      });

  PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
  DataStream<String> result = patternStream.select(
      (PatternSelectFunction<String, String>) map ->
          String.format("Evaluated these states %s", String.join(":", map.get("start")))
  );
  result.addSink(flinkKafkaProducer);

  environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  environment.execute("Flink cep Example");

从那以后,我尝试了很多不同的模式,比如这样的

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("middle");
    Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
        .next("middle").oneOrMore().until(new DuplicateFound())
        .followedBy("end");

在这里,我将输出“start”和“middle”中的事件,但我的所有尝试都没有达到我所希望的效果。
任何协助都将不胜感激。
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题