如何在kafkajs中使autocommit错误工作

qyyhg6bp  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(247)

脚本:
im设置自动提交为假,
生成12条消息
吃掉他们(从偏移量100开始)
关闭耗电元件
创建新的消费者
在那个阶段,我希望第二个消费者从偏移量100开始再次读取所有消息(因为没有提交)
但是当生成新消息时,我看到第二个消费者从新的偏移量(113)开始,即提交仍然以某种方式发生。。
我做错什么了?
这是我的消费代码

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
  // admin
  await admin.connect();
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'topic-test2'});
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString()
      });
      }
    }
  });
};
run().catch(console.error);

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题