kafka系列之offset管理那些事(8)

x33g5p2x  于2021-12-19 转载在 其他  
字(17.9k)|赞(0)|评价(0)|浏览(379)

位移 offset

  • 每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset),它是消费者消费进度的指示器
  • 看上去Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对,Key 是分区,V 对应 Consumer 消费该分区的最新位移 TopicPartition->long
  • 不过切记的是消费者位移是下一条消息的位移,而不是目前最新消费消息的位移。
  • 提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。

offset 的分类

LogStartOffset
  • 表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
ConsumerOffset
  • 消费位移,表示某个消费者消费某个Partition消费到的位移位置。
HighWatermark
  • Kafka 的水位不是时间戳,更与时间无关。它是和位置信息绑定的,具体来说,它是用消息位移来表征的
  • 简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
定义

  • 在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。
  • 消费者只能消费已提交消息,即图中位移小于 8 的所有消息
  • 位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。
  • 事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。
主要作用
  • 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
  • 帮助 Kafka 完成副本同步
HW更新机制
  • 每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值

  • 我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)
  • leader副本上保存这些远程副本的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。

LogEndOffset

  • LEO (Log End Offset)是所有的副本都会有的一个offset标记,它指向追加到当前副本的最后一个消息的offset。当生产者向Leader副本追加消息的时候,Leader副本的LEO标记会递增;当Follower副本成功从Leader副本拉取消息并更新到本地的时候,Follower副本的LEO就会增加。
  • 注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15。显然,介于高水位和 LEO 之间的消息就属于未提交消息
  • 简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。
主要作用
  • Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。
  • 只不过 Leader 副本比较特殊,Kafka **使用 Leader 副本的高水位来定义所在分区的高水位 **。换句话说,分区的高水位就是其 Leader 副本的高水位。
  • 比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己
  • HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。
  • 由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。

Last Stable Offset

Last Stable Offset 简称LSO,它与kafka的事物有关。

当消费端的参数isolation.level 设置为“read_committed"的时候,那么消费者就会忽略事务未提交的消息,既只能消费到LSO(LastStableOffset)的位置,默认情况下,”read_uncommitted",可以消费到HW(High Watermak)的位置。

也就是说开启kafka事务的同时,生产者发送了若干消息,(msg1,msg2)到broker中,如果生产者没有提交事务(执行CommitTransaction)那么对于isolation.level=read_committed的消费者而言是看不多这些消息的,而isolation.level=read_uncommitted则可以看到。

位移的保存

其实Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

老版本的位移保存
  • 老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。
  • 比较流行的做法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
  • 不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能
新版本的位移保存
  • 新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka内部主题的方法,也就是__consumer_offsets,俗称位移主题

位移主题(Offsets Topic)

  • 将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。
  • 它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情
  • 虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。
  • 事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。你千万不要自己写个 Producer 随意向该主题发送消息。
分区数
  • 但如果是 Kafka 自动创建的,分区数是怎么设置的呢?这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50,因此 Kafka 会自动创建一个 50 分区的位移主题。
消息格式
  • 虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。
key
  • 位移主题的 Key 中应该保存 3 部分内容:Group ID,主题名,分区号
value
  • 主要保存的是offset 的信息,当然还有时间戳等信息,你还记得你可以根据时间重置一个消费者开始消费的地方吗

位移的提交

  • Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
  • 位移提交的方式,用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
  • 大数据组件都关闭了自动提交,采取了手动提交。
自动提交位移
  • Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms来控制
  • 它实际保证的是位移至少要隔一段时间才会提交,如果你是单线程处理消息,那么只有处理完消息后才会提交位移,可能远比你设置的间隔长,因为你的处理逻辑可能需要一定的时间
  • Kafka 任务poll 方法返回之后就已经完成了消费。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms", 1000);
public static void main(String[] args) {
        while (true) {
            // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record)->{
                System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
            });
        }
    }
提交的时机
  • Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。
  • 从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
  • 但自动提交位移的一个问题在于,它可能会出现重复消费,如果处理失败了下次开始的时候就会从上一次提交的offset 开始处理
存在的问题
  • 假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的
手动提交位移
  • 很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false
  • Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。
同步手动提价
  • 设置 enable.auto.commit 为 false
  • 代码中手动提交
public static void main(String[] args) {
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            // 模拟消息的处理逻辑
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        try {
            //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
            consumer.commitSync();
        } catch (CommitFailedException e) {
            e.printStackTrace();
        }
    }
}

存在的问题

  • 从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。
  • commitSync()的问题在于,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,需要注意的是同步提交会在提交失败之后进行重试
  • 在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS
异步手动提交

下面都是三个测试用例都是异步提交,不同之处在于有没有去实现回调函数。建议生产环境中一定要实现,至少记录下日志。

@Test
public void asynCommit1(){
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        consumer.commitAsync();
    }
}

@Test
public void asynCommit2(){
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        // 异步回调机制
        consumer.commitAsync(new OffsetCommitCallback(){
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception!=null){
                    System.out.println(String.format("提交失败:%s", offsets.toString()));
                }
            }
        });
    }
}

@Test
public void asynCommit3(){
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        consumer.commitAsync((offsets, exception) ->{
            if (exception!=null){
                System.out.println(String.format("提交失败:%s", offsets.toString()));
            }
        });
    }
}
  • 从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等

存在的问题

  • commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的,所以只要在程序停止前最后一次提交成功即可。
  • 这里提供一个解决方案,那就是不论成功还是失败我们都将offsets信息记录下来,如果最后一次提交成功那就忽略,如果最后一次没有提交成功,我们可以在下次重启的时候手动指定offset
综合异步和同步来提交
try {
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        consumer.commitAsync();
    }
} catch (CommitFailedException e) {
    System.out.println(String.format("提交失败:%s", e.toString()));
} finally {
    consumer.commitSync();
}
  • 同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。
精细化提交(分批提交)
  • 设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。
  • 对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {
    // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
    for (ConsumerRecord<String, String> record : records) {
        // 数据的处理逻辑
        System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        // 记录下offset 信息
        offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
        if (count % 100 == 0) {
            // 回调处理逻辑是null
            consumer.commitAsync(offsets, null);
        }
        count++;
    }
    try {
        //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
        consumer.commitSync(offsets);
    } catch (CommitFailedException e) {
        e.printStackTrace();
    }
}
位移提交和rebalance
  • 在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。
  • 在 Rebalance 之后,所有Consumer从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
  • 虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷(其实就是重复消费的问题)
  • 其实你要是仔细思考,手动提交也存在这个问题,因为rebalance会先让所以的消费者停止消费,因为在kafka 的角度来看,消息消费的那一刻,消费已经完成,所以停止消费的时候,你的逻辑很可能没有完成,那么你的offset 也很可能没有提交。
再均衡监听器提交

kafka 也为我们提供了这样的组件,让我们可以去在发生Rebalance的时候做一些操作,实现ConsumerRebalanceListener 接口,然后在订阅kafka topic 的时候传入,一个常见的常见就是对offset 的管理。因为Rebalance 可能导致数据重复消费。

对于Kafka而言,从poll方法返回消息的那一刻开始这条消息已经算是“消费”完成了,这个时候如果发生了重平衡,你的offset 没有提交的话,重平衡之后会重复消费。所以我们希望在重平衡之前进行offset 的提交。

public class ConsumerRebalance {
    private static KafkaConsumer<String, String> consumer;
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    /** * 初始化消费者 */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList("flink_json_source_4"), new RebalanceListener(consumer));

    }

    /** * 初始化配置 */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 1000);
        props.put("session.timeout.ms", 30000);
        props.put("max.poll.records", 1000);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    public static void main(String[] args) {

        while (true) {
            // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record) -> {
                System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no matadata"));
            });
            consumer.commitAsync();
        }
    }

    static class RebalanceListener implements ConsumerRebalanceListener {
        KafkaConsumer<String, String> consumer;

        public RebalanceListener(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        // 在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管partition的消费者就知道该从哪里开始读取了。
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //用于跟踪偏移量的map
            consumer.commitSync(currentOffsets);
        }

        // 在重新分配partition之后和消费者开始读取消息之前被调用。
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.commitSync(currentOffsets);
        }
    }
}

再均衡监听器有其他很多的作用,这里只是其应用的一个场景。

位移提交的异常处理

CommitFailedException

从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。因为KafkaConsumer.commitSync()有重试机制,所以一般的网络原因可以排除,发生这个异常的原因主要就是超时了,但是这个超时不是说提交本身超时了,而是消息的处理时间超长,导致发生了Rebalance,已经将要提交位移的分区分配给了另一个消费者实例

我们首先模拟一下自动提交会不会发生这样的情况

private static Properties initConfig(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");
    props.put("enable.auto.commit",true);
    props.put("auto.commit.interval.ms", 3000);
    props.put("session.timeout.ms", 30000);
    props.put("max.poll.records", 1000);
    props.put("max.poll.interval.ms", 5000);
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    return props;
}

public static void main(String[] args) throws InterruptedException {

    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record)->{
            System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
        });
        Thread.sleep(6000L);
    }
}

我们发现并不会,然后我们模拟一下手动提交

/** * 初始化配置 */
private static Properties initConfig(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");
    props.put("enable.auto.commit",false);
    props.put("session.timeout.ms", 30000);
    props.put("max.poll.records", 1000);
    props.put("max.poll.interval.ms", 5000);
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    return props;
}

public static void main(String[] args) throws InterruptedException {

    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record)->{
            System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
        });
        Thread.sleep(6000L);
        consumer.commitSync();
    }
}

这下我们就看到这个熟悉的错误了

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
缩短单条消息处理的时间
  • 比如之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
增加 Consumer 端允许下游系统消费一批消息的最大时长。
  • 这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。
  • 值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。
减少下游系统一次性消费的消息总数
  • 这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。
  • 可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。
  • 如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
下游系统使用多线程来加速消费。
  • 这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。
  • 之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。
  • 如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。
  • 事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsu

消费者位移重设

  • Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)
  • 像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。
  • 由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能——重放
  • 重设位移主要是为了实现消息的重演。目前 Kafka 支持 7 种重设策略和 2 种重设方法。在实际使用过程中,我推荐你使用第 2 种方法,即用命令行的方式来重设位移。

位移重设

位移维度
  • 这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。
Earliest

这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略

Latest
  • Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略
Current
  • Current 策略表示将位移调整成消费者当前提交的最新位移。
  • 有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。
Specified-Offset
  • Specified-Offset策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。
  • 这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理
hift-By-N
  • Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。
时间维度
  • 我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。
DateTime
  • DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。
  • 常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。
Duration
  • Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。
  • 举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。

位移重设的方式

  • 通过消费者 API 来实现。
  • 通过 kafka-consumer-groups 命令行脚本来实现
代码实现
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
命令行方式设置
Earliest 策略直接指定–to-earliest
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest 策略直接指定–to-latest
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current 策略直接指定–to-current
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略直接指定–to-offset
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset --execute
Shift-By-N 策略直接指定–shift-by N
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by --execute
DateTime 策略直接指定–to-datetime
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
Duration 策略直接指定–by-duration
  • bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --execute

相关文章