Kafka:承诺不会发生

tquggr8v  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(283)

我是Kafka流的新手,我正在尝试Kafka流在超时情况下的行为。
下面是我正在使用处理器api测试的场景:
我的Kafka流媒体应用程序从Kafka主题(字符串键,字符串消息)消费并写入Kafka主题(字符串键,字符串消息)
我已将consumer config参数max.poll.interval.ms设置为60000 ms。
我的处理方法如下所示:

public void process(String key, String value) {
    System.out.println("the key is : " +key);
    LocalDateTime start= LocalDateTime.now();
    System.out.println("startTime:" + dtf.format(start));
    if ( key.startsWith("12345678"){
        Thread.sleep(80000);
    }
    System.out.println("done sleeping");
    LocalDateTime end=LocalDateTime.now();
    System.out.println("endTime:" + dtf.format(end));
    System.out.println("Offset*****"+context.offset()+" 
        partitionId****"+context.partition()+"taskId*****"+context.taskId()+ 
        "javaThreadId*******"+ Thread.currentThread().getId()+ " 
        value****"+value);
}

所有其他配置都设置为默认值。
如果处理时间超过max.poll.interval.ms,我将尝试查看应用程序的行为。
事情是这样的:在第一次尝试时,它开始使用来自kafka主题的消息,并在调用process()时开始休眠。在60000毫秒之后,它再次调用process方法,没有抛出任何异常,但此时它仅在20000毫秒内退出sleep打印,“done sleeping”并将消息发布到output topic。在此之后,它再次开始使用来自同一偏移量的同一消息而不提交。这是一个循环。
样本输出:
钥匙是:12345678
starttime:2018/07/09 07:34:25
钥匙是:12345678
starttime:2018/07/09 07:35:27
睡着了吗
endtime:2018/07/09 07:35:45
偏移量224分区ID0taskid**0\u 0javathreadid12值**
钥匙是:12345678
starttime:2018/07/09 07:36:27
睡着了吗
endtime:2018/07/09 07:36:47
偏移量
224分区ID0taskid0\u 0javathreadid
14值***
钥匙是:12345678
starttime:2018/07/09 07:37:27
睡着了吗
endtime:2018/07/09 07:37:47
偏移量224分区ID0taskid**0\u 0javathreadid****12值**
我曾尝试显式调用context#commit(),但也不起作用。我错过了什么?Kafka流还记得以前的处理状态吗?如果不是,为什么在第一次尝试后20000毫秒(mall.poll.interval.ms-60000,处理时间(sleep)设置为80000毫秒)之后,它会显示“done sleeping”?
其他信息:
我的输入和输出主题各有2个分区,我已将streamsconfig num.streams.threads配置为2。
我有一个3节点Kafka集群-Kafka和Kafka流版本1.1.0
我不使用标点法,也没有任何复杂的处理任何地方。
提前谢谢。

qltillow

qltillow1#

不是100%确定,但是注意:如果你打电话 context#commit() 您只“请求”一个提交,而kafka streams试图尽快提交--但是在之后 context#commit() 返回时,提交尚未发生。。。
另请注意,如果您的超时是 60.000 你睡了好几天 80.000 您的应用程序应该从使用者组中退出,因此不允许再提交。对于这种情况,日志中应该有一条警告日志消息。
希望这有帮助。

相关问题