cep-添加窗口后未执行模式

ejk8hzay  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(275)

//创建一个包含十个项目的窗口

WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);

//应用一个窗口函数,添加一些自定义值来计算窗口中的所有值

DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() { 

                        @Override 
                        public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out) 
                                //custom evaluation logic 
                                out.collect(new ObservationEvent(1,"temperature", "stable")); 
                        } 
                });

//定义简单cep模式

Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first") 
                                .subtype(ObservationEvent.class) 
                                .where(new FilterFunction<ObservationEvent>() { 

                                        @Override 
                                        public boolean filter(ObservationEvent arg0) throws Exception { 
                                                System.out.println( arg0 );  //This function is not at all called
                                                return false; 
                                        } 
                });

PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern);

当我运行这段代码时,where子句中的filter函数根本没有被调用。我已经打印了inactivitystream.print(),并且可以看到匹配的值。
现在,当我直接插入inputstream而不应用窗口时。模式匹配
我打印了inputstream和windowedstream,可以看到它们都发送类似的数据。
我错过了什么

9jyewag0

9jyewag01#

filterfunction最终应该会被调用,但是在第一次看到filterfunction被调用之前,您必须为同一个键等待10个事件。是不是因为你在窗口测试中等待的时间不够长?
请记住,如果您有许多唯一的键,这意味着您必须在窗口测试中等待10倍以上的时间,然后才能看到您的过滤器函数被调用。

相关问题