在使用kafka python时,定期轮询kafka消费者的最佳方式是什么?

oipij1gg  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(551)

我有多个制作人正在把数据输入Kafka。我希望每小时运行一个consumer,一次获取所有累积的数据并进一步处理。
我想到的选择是:
使用python线程并使用setinterval的等效值来调用使用者
设置max\u poll\u interval\u ms变量:(如其他几个答案中所述)。然而,官方文件指出
这为消费者在获取更多记录之前可以空闲的时间量设置了一个上限。如果poll()在此超时过期之前未被调用,则认为使用者失败,组将重新平衡这听起来不像是它负责将使用者置于睡眠状态,然后再次启动它。
我不是每小时轮询一次,而是跟踪消费者补偿,并在10000条记录附加到kafka之后进行轮询
然而,我想在消费者内部管理同样的问题。最好的方法是什么?

z9zf31ra

z9zf31ra1#

如果您阅读了官方文档中的max\u poll\u interval\u ms,则这是消费者可以空闲的最大间隔。之后,消费者被视为死亡,消费者群体再平衡发生。
这就是为什么我建议你不要在10公里后关闭消费者。尽管使用偏移量进行轮询是一种很好的策略,但也存在一个问题。每一个新的消费者补偿并不意味着它是一个新的信息。根据您的auto.offset.reset配置,您可能有重复的消息。
为了节省运行pod的成本,我建议您创建一个分区较少的主题。这样可以节省数据传输和存储成本。尽管示例应该保持运行。

d7v8vwbk

d7v8vwbk2#

使用cron或os调度器每小时调用一个脚本。
如果你需要等到10k唱片出现在这个主题上才能做任何有用的事情,那么我并不完全确定Kafka是否适合这个架构。此外,消费者的落后实际上会不断地落后

相关问题