有没有办法在deadletterpublishingrecoverer中找到消费者组?

drkbr07n  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(345)

我有一个使用dlq方法的项目,其中 @KafkaListener ,错误将被发送到具有该结构的主题 error-<topic>-<consumergroup> . 当我们想要手动重试来自这个dlq的kafka消息时,我们将生成一个具有类似结构的主题 retry-<topic>-<consumergroup> . 例如,如果我们有一个主题 foo ,由消费群体消费 bar ,我们将讨论以下主题: foo , error-foo-bar 以及 retry-foo-bar .
这样,我们可以为特定的使用者组重试消息。消息处理程序同时读取主主题和重试主题。
由于项目中有多个监听器都使用同一个容器,因此我们将主题(main、error和retry)放在应用程序属性中,并配置了 DeadLetterPublishingRecoverer 要查找相应的dlq(注意,这是kotlin):

val recoverer = DeadLetterPublishingRecoverer(template) { record, _ ->
    val dlq = kafkaProperties.topics.values.firstOrNull { it.main == record.topic() || it.retry == record.topic() }!!.dlq

    return@DeadLetterPublishingRecoverer TopicPartition(dlq, -1)
}

一切正常。不过,现在我想补充一点 @KafkaListener 和另一个听众在同一个主题上交谈。因此,我需要给这个听众一个单独的消费群体。
但是,如果我使用与上面相同的逻辑来查找相应的dlq主题,那么两个侦听器中的一个将使用另一个的dlq,因为它不在这里查看消费者组。
所以我的问题是:在 Spring Kafka,有没有办法找到哪个消费群体(又名 groupId )发生错误了吗?我查看了文档和源代码,但自己找不到。感谢您的帮助。

vlju58qv

vlju58qv1#

这个 group.id 消费者可拨打 KafkaUtils.getGroupId() (存储在 ThreadLocal 容器线程启动时)。

相关问题