我正在尝试使用flink完成一个小型poc,第一步是评估一个事件流,并不断输出不包含重复项的最大活动组。我正试图用cep模式匹配来实现这一点。
例如,对于以下输入:
>a
>a
>b
>c
>a
>c
我希望输出:
a
a
a:b
a:b:c
b:c:a
a:c
我能得到的最接近的结果是:
a
a
a:b
a:b:c
b:c:a
b:c
b
c:a
a:c
a
c
当初始模式继续增长时,它看起来很好,但是一旦看到重复,我就会收到比我想要的更多的结果。本例使用skiptofirst策略;我原以为其他人会更有帮助,但结果却不太理想。
这感觉应该很容易解决,但我还没有找到正确的组合选项,让它工作。
以下是我最初的方法的细节:
public static void cep() throws Exception {
log.info("Initializing cep processor");
String inputTopic = "inputTopic";
String outputTopic = "outputTopic";
String consumerGroup = "testGroup";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
log.info("Creating consumer");
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromLatest();
log.info("Creating producer");
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
log.info("Configuring sources");
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
log.info("Processing kafka messages");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("start");
Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
.oneOrMore()
.until(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return StreamSupport.stream(context.getEventsForPattern("start").spliterator(), false)
.anyMatch(state -> state.equals(s));
}
});
PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
DataStream<String> result = patternStream.select(
(PatternSelectFunction<String, String>) map ->
String.format("Evaluated these states %s", String.join(":", map.get("start")))
);
result.addSink(flinkKafkaProducer);
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
environment.execute("Flink cep Example");
从那以后,我尝试了很多不同的模式,比如这样的
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("middle");
Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
.next("middle").oneOrMore().until(new DuplicateFound())
.followedBy("end");
在这里,我将输出“start”和“middle”中的事件,但我的所有尝试都没有达到我所希望的效果。
任何协助都将不胜感激。
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!