SpringKafka2.6.5通过有状态重试和seekTocurInterrorHandler的无限重试策略

yshpjwxd  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(381)

正如标题所示,我使用的是SpringKafka版本2.6.5。在我的体系结构中,我有一个主主题,它有一个简单的策略和一个指数备份策略。如果重试尝试用尽,我有一个recoverycallback,它会将消息发送到错误主题。错误主题是我的问题所在。
在这个错误主题中,我需要能够执行无限次重试,并且不让任何消息被丢弃。我所说的“dropped”是指,如果spring崩溃或其他同样糟糕的事情发生,我需要确保,当恢复时,任何处于处理过程中的消息都可以被重新轮询(顺序无关紧要)。基本上,我认为我需要配置ack,以便在处理完成后确认它们。至于无限重试,我四处搜索,从加里·拉塞尔(gary russell)这样的用户那里找到了一些有用的建议。不幸的是,不同的SpringKafka版本和反对意见使得为我的需求和版本拼凑一个清晰的解决方案有点困难。
目前,我的设置如下所示:

@KafkaListener(topics = "my_topic", 
               groupId = "my_group_id", 
               containerFactory = "kafkaErrorListenerContainerFactory")
public void listenErrorTopic(String message) throws Exception {
    processingLogic(message);
    // Do I need to manually ACK afterwards (and thus also include additional params to access needed 
    // message components)?
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap();
    ...
    // Basing the need for the below 2 props off of previously found posts
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    // Unsure if the below prop is needed
    // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaErrorListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // A previous post said that infinite retries could only be achieved via state retry and STCEH,
    // but there is an alternative in 2.6?
    factory.setStatefulRetry(true);
    // A previous post had '-1' passed to SeekToCurrentErrorHandler, but that is no longer possible.
    // It was suggested instead to pass Long.MAX_VALUE to the backoff period for later versions, but the 
    // policy shown was a FixedBackOffPolicy.
    factory.setErrorHandler(new SeekToCurrentErrorHandler());

    RetryTemplate retryTemplate = new retryTemplate();
    retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
    // Do I need a recovery callback set in my RetryTemplate if I want it to be infinite?
    ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(<props file value insertion here>)
    backOffPolicy.setMultiplier(<props file value insertion here>)
    backOffPolicy.setMaxInterval(<props file value insertion here>)
    retryTemplate.setBackOffPolicy(backoffPolicy);
    factory.setRetryTemplate(retryTemplate);

    return factory;
}

理想情况下,我更喜欢指数型而不是固定型,但我主要关心的是在不使用max.interval.ms触发重新平衡的情况下无限地完成它的能力。我在有不确定性的代码块中留下了注解。如果有人能澄清一些事情,我们将不胜感激!

ep6jt1vc

ep6jt1vc1#

在stceh支持后退之前,使用有状态重试是专门为stceh设计的,以避免重新平衡。
但是,既然stceh支持后退,那么最好在重试模板上使用后退。
如果两者都使用,则实际重试次数是stceh和retry模板重试次数的倍数。
既然 SeekToCurrentErrorHandler 可以配置为 BackOff 并且能够仅重试某些异常(自版本2.3以来),不再需要通过侦听器适配器重试配置使用有状态重试。您可以通过错误处理程序的适当配置提供相同的功能,并从侦听器适配器中删除所有重试配置。有关详细信息,请参阅查找当前容器错误处理程序。
配置要简单得多。
您不需要使用手动确认;容器将基于
AckMode BATCH (默认)或 RECORD . 后者成本更高,但重新交付的机会更小。
对于无限次的重试,请使用 FixedBackOff 无限次的尝试( Long.MAX_VALUE )在 maxAttempts 财产。
这个 ExponentialBackOff 默认情况下将无限重试。你只需要确定 maxInterval 小于 max.poll.interval.ms 以避免重新平衡。

相关问题