我有一个场景,我需要使用一个Kafka主题的消息,并将其发布到另一个Kafka主题。
from(kafka:topic1)
.process(<business-logic>)
.to(kafka:topic2)
假设在发布到topic2时发生了异常(例如,代理未启动),我需要回滚已使用的消息,再次需要重新处理该消息(当代理再次启动时),而无需重新启动我的应用程序。
这里topic1和topic2经纪人是不同的。
我有一个场景,我需要使用一个Kafka主题的消息,并将其发布到另一个Kafka主题。
from(kafka:topic1)
.process(<business-logic>)
.to(kafka:topic2)
假设在发布到topic2时发生了异常(例如,代理未启动),我需要回滚已使用的消息,再次需要重新处理该消息(当代理再次启动时),而无需重新启动我的应用程序。
这里topic1和topic2经纪人是不同的。
1条答案
按热度按时间x0fgdtte1#
使用Kafka consumer API,默认情况下提交偏移量是自动完成的(
enable.auto.commit=true
),并且在(auto.commit.interval.ms=5000
)控制的每个时间段内发生。所以要达到你的目标,你需要设置'enable.auto.commit =false',并从你的应用程序管理它。