Kafka连接器中的动态主题

h7wcgrx3  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(322)

kafka添加了一个新特性,在连接器中使用regex,但是在连接器启动之后,新添加的主题中的主题数据似乎在连接器重新启动之前不会被使用。我们需要动态添加新主题,并让连接器根据连接器属性中定义的regex使用该主题。如何实现?例如:regex:topic-.*topic:topic-1,topic-2如果我引入了新的topic-3,那么如何使连接器在不重新启动的情况下使用topic数据?

at0kjp5o

at0kjp5o1#

遵循其他人在评论中已经给出的想法,基本上你需要做什么构建一个机制,确定一个新的主题已经被引入,connecter需要干净地重新启动。
我会做这样的事,
1> 在已连接的主题(例如主题1)中发送特定类型的消息,如果收到此类消息,则代码应保留所有新消息轮询,并等待所有偏移提交完成。
2> 然后从轮询循环中断并从使用者(consumer.unsubscribe())中删除订阅。
3> 在从regex主题订阅的常规流之后,需要遵循在开始时完成的流程,因为新主题现在将成为regex的一部分。
请记住提交很重要,如果您匆忙重新启动connecter,可能会得到重复的提交。还有一点很明显,就是不要更改group.id并保持auto.offset.reset为“最新”。

jm2pwxwz

jm2pwxwz2#

Kafka消费者有一个选择 metadata.max.age.ms -使用者刷新主题元数据的时间间隔。如果你不需要实时的话,它会有帮助的。另请参见:kafka consumer to dynamic detect topics added
/etc/kafka-connect/kafka-connect.properties 您应该指定 consumer.metadata.max.age.ms=1000 一秒钟。

相关问题