java—使用过去5分钟的消息,并循环地将其转储到另一个主题中

ozxc1zmp  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(160)

我想做的是:
我正在尝试在5分钟的时间窗口内使用消息,并计算到达的消息数。之后,我想复制最后一条到达组中的消息,复制它,编辑它的字段“count”以匹配时间间隔中的消息数,然后将它发送到主题。
hoever,我的Windows有问题,需要帮忙。我已经创建了自己的json serde(messageserde),类消息表示json消息,但是我不确定现在应该如何处理窗口过程。

final StreamsBuilder builder = new StreamsBuilder();

    final KStream<String, Message> inputStream = 
    builder

    .stream("users", Consumed.with(Serdes.String(), MessageSerde));

    inputStream
    .filter(new Predicate<String, Message>() {
    @Override
    public boolean test(String s, Message s2) {
        return s2.getPriority().equals("Low");
        }
    })

    KTable<Windowed<String>, Message> fiveMinuteWindowed = inputStream

    .groupByKey(Serialized.with(Serdes.String(), MessageSerde))
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
    .count()
    .to("test-filter", Produced.with(Serdes.String(), MessageSerde));

    final KafkaStreams streams = new KafkaStreams(builder.build(), props);

更新

根据matthias的建议,我已经创建了以下代码,但是遇到了一些编译错误,在.to方法中:

inputStream

     .filter(new Predicate<String, Message>() {
    @Override
    public boolean test(String s, Message s2) {
        return s2.getPriority().equals("Low");
      }
    })
    .groupByKey(Serialized.with(Serdes.String(), MessageSerde))
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
    .aggregate(
    new Initializer<Message>() { /* initializer */
        @Override
        public Message apply() {
            Message tuple = new Message();
            return tuple;
        }
    },
    new Aggregator<String, Message, Message>() { /* adder */
        @Override
        public Message apply(String aggKey, Message curMsg, Message tuple) {

            tuple.setCount(tuple.getCount() + 1);

            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");

            try{
                Date parsedDate_cur = dateFormat.parse(curMsg.getTimestamp());
                Timestamp timestamp_cur = new java.sql.Timestamp(parsedDate_cur.getTime());
                Date parsedDate_agg = dateFormat.parse(tuple.getTimestamp());
                Timestamp timestamp_agg = new java.sql.Timestamp(parsedDate_agg.getTime());

                if(timestamp_cur.after(timestamp_agg)){
                    tuple.setTimestamp(timestamp_cur.toString());
                }

                tuple.setId(curMsg.getId());
                tuple.setSourceip(curMsg.getSourceip());
                tuple.setType(curMsg.getType());
                tuple.setPriority(curMsg.getPriority());
                tuple.setName(curMsg.getName());
                tuple.setMetadata(curMsg.getMetadata());  
            }
            catch(Exception e){
                System.out.println("Error parsing dates"); 
            }    
            return tuple;
        }
    },
    Materialized.with(Serdes.String(), MessageSerde))
    .suppress(untilWindowCloses(unbounded())) 
    .toStream();
    .to("test-filter", Produced.with(Serdes.String(), MessageSerde));

错误包括:

Compilation failure
[ERROR] /home/x/Documents/project/src/main/java/com/streams/app/App.java:[194,9] no suitable method found for to(java.lang.String,org.apache.kafka.streams.kstream.Produced<java.lang.String,Pojo.Message>)
[ERROR]method org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>) is not applicable
[ERROR](argument mismatch; inference variable K has incompatible equality constraints org.apache.kafka.streams.kstream.Windowed<java.lang.String>,java.lang.String)
[ERROR]method org.apache.kafka.streams.kstream.KStream.to(org.apache.kafka.streams.processor.TopicNameExtractor<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>,org.apache.kafka.streams.kstream.Produced<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>) is not applicable
[ERROR](argument mismatch; java.lang.String cannot be converted to org.apache.kafka.streams.processor.TopicNameExtractor<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题