kafka系列之消费者与分区(7)

x33g5p2x  于2021-12-19 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(339)

消费者与分区

消费者组

我们知道 kafka 支持两种消息模型 队列模型和发布订阅模型(publish-subscribe)

  • 队列的处理方式是一组消费者从服务器读取消息,一条消息只由其中的一个消费者来处理
  • 发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。
消费者组

Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了队列模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布订阅模型

一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。

当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

也就是说一个消费者可以消费多个分区,但是一个分区只能被一个消费者消费,所以一个组内的消费者数量多余分区的数量是没意义的。所以可以看出分区不论是从数据消费还是数据生产上都是kafka 负载均衡的基本单位

  • 消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。同一个组内包含若干个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,即 Group ID。
  • 组内的所有消费者协调在一起来消费订阅主题的所有分区。每个分区只能由同一个消费组内的一个消费者实例来消费。
分区分配策略

前面我们知道,kafak 的一个分区只能被一个消费者组内的一个消费者消费,一个消费者是可以消费多个分区的,但是一个分区不能被多个消费者消费,这里强调的实同时,当然消费者组重平衡之后,一个分区可能会被其他消费者消费。这里就涉及到一个问题,分区和消费者之间的分配关系。

RangeAssignor分配策略

RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度(步长),然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

为了更加通俗的讲解RangeAssignor策略,我们不妨再举一些示例。假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有4个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3

这样分配的很均匀,那么此种分配策略能够一直保持这种良好的特性呢?我们再来看下另外一种情况。假设上面例子中2个主题都只有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2

可以明显的看到这样的分配并不均匀,如果将类似的情形扩大,有可能会出现部分消费者过载的情况。对此我们再来看下另一种RoundRobinAssignor策略的分配效果如何。

为什么会这样呢,这是因为对于第一个Topic 来说,C0 要消费2 分分区,对于第二个Topic 来说,C0 还是要消费两个分区。

RoundRobinAssignor 分配策略

RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者,RoundRobinAssignor策略对应的partition.assignment.strategy参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor。

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2
StickyAssignor分配策略

Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

  1. 分区的分配要尽可能的均匀 这个看起来句废话,所有的分配策略都希望均匀分配
  2. 分区的分配尽可能的与上次分配的保持相同(这个主要是为了在重平衡的时候提高重平衡的效率),当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

消费者组重平衡

在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

其实Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

Rebalance发生的时机
  1. 组成员数量发生变化(添加或者挂掉了)
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

为什么发生了上面的变化就需要Rebalance呢,因为有新的分区(可能是老的需要再分配)需要被分配

Rebalance 的影响
  1. Rebalance 影响 Consumer 端 TPS。这个有点JVM 的 STOP THE WORLD ,总之就是,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。
  2. Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点,一次可能需要几十分钟甚至个把小时
  3. Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的
Rebalance 的实现

比如一个 Group 下有 10 个成员,每个成员平均消费 5 个分区。假设现在有一个成员退出了,此时就需要开启新一轮的 Rebalance,把这个成员之前负责的 5 个分区“转移”给其他成员。显然,比较好的做法是维持当前 9 个成员消费分区的方案不变,然后将 5 个分区随机分配给这 9 个成员,这样能最大限度地减少 Rebalance 对剩余 Consumer 成员的冲击。

遗憾的是,目前 Kafka 并不是这样设计的。在默认情况下,每次 Rebalance 时,之前的分配方案都不会被保留。就拿刚刚这个例子来说,当 Rebalance 开始时,Group 会打散这 50 个分区(10 个成员 * 5 个分区),由当前存活的 9 个成员重新分配它们。显然这不是效率很高的做法。

基于这个原因,社区于 0.10 版本推出了 StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。不过有些遗憾的是,这个策略目前还有一些 bug,而且需要升级到 0.11 才能使用,因此在实际生产环境中用得还不是很多

Rebalance 的避免

要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 发生的时机有三个:

  1. 组成员数量发生变化
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。

如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。

Consumer 实例增加的情况很好理解,当我们启动一个配置有相同 group.id 值的 Consumer 程序时,实际上就向这个 Group 添加了一个新的 Consumer 实例。此时,Coordinator 会接纳这个新实例,将其加入到组中,并重新分配分区。通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要 Rebalance”。

我们更在意的是 Group 下实例数减少这件事。如果你就是要停掉某些 Consumer 实例,那自不必说,关键是在某些情况下,Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance,我们就不能不管了。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。

Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timeout.ms 决定了 Consumer 存活性的时间间隔。

Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。

消费kafka 数据

Kafka 为我们提供了Java 的消费者客户端和生产者一样,我们需要一些基本的参数配置

public class MockConsumer {
    private static KafkaConsumer<String,String> consumer;
    /** * 初始化消费者 */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList("flink_json_source_4"));

    }
    /** * 初始化配置 */
    private static Properties initConfig(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    public static void main(String[] args) {

        while (true) {
            // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record)->{
                System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
            });
        }
    }
}

这里有一点需要注意一下:这里我们提供了一个参数叫做"group.id",因为我们知道我们的消费者是以组的形式进行消费的,如果不提供的话,就会有下面的异常

Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

offset 的管理

这里我们简单提一下offset 的管理,后面我们有专门的一节来说offset 的管理问题,前面我们提到过offset 是标识kafka 里面一条记录的ID,也就是说我们如何确定一条记录就是通过offset 确定的。

所以我们的消费者需要维护这个信息,从而确定自己消费到哪里了,从而在发生故障重启之后接着上次的位置进行消费,我们将这个维护信息的过程叫做提交偏移量(offset),其实从前面的消费组重平衡的角度来看,平衡后的消费者依然是需要这个信息,进行数据消费的。

提交偏移量带来的问题

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

提交偏移量的方式

关于偏移量的提交,我们下面主要介绍这两种,后面我们可以再后面看到更好的设计,如何去兼容性能和数据准确性。

自动提交 Automatic Commit

enable.auto.commit设置成true(默认为true),那么每过5s,消费者自动把从poll()方法接收到的最大的偏移量提交。

提交的时间间隔由auto.commit.interval.ms控制,默认是5s

可以在创建消费者客户端的时候,添加这两个参数加以控制

props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms", 1000);
手动提交 Manual Commit

enable.auto.commit设置成false,让应用程序决定何时提交偏移量。

commitSync()提交由poll()方法返回的最新偏移量,所以在处理完所有消息后要确保调用commitSync,否则会有消息重复消费的风险。

commitSync在提交成功或碰到无法恢复的错误之前,会一直重试。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都会被重复处理。

不足:broker在对提交请求作出回应之前,应用程序会一直阻塞,会限制应用程序的吞吐量

public class ManualConsumer {
    private static KafkaConsumer<String, String> consumer;
    /** * 初始化消费者 */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList("flink_json_source_4"));

    }
    /** * 初始化配置 */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", false);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    public static void main(String[] args) {

        while (true) {
            // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record) -> {
                System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
            });
            try {
                //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
                consumer.commitSync();
            } catch (CommitFailedException e) {
                e.printStackTrace();
            }
        }
    }
}
消费者的配置

1:fetch.min.bytes,指定消费者从broker获取消息的最小字节数,即等到有足够的数据时才把它返回给消费者

2:fetch.max.wait.ms,等待broker返回数据的最大时间,默认是500ms。fetch.min.bytes和fetch.max.wait.ms哪个条件先得到满足,就按照哪种方式返回数据

3:max.partition.fetch.bytes,指定broker从每个partition中返回给消费者的最大字节数,默认1MB

4:session.timeout.ms,指定消费者被认定死亡之前可以与服务器断开连接的时间,默认是3s

5:auto.offset.reset,消费者在读取一个没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认是latest(消费者从最新的记录开始读取数据)。另一个值是 earliest(消费者从起始位置读取partition的记录)

6:enable.auto.commit,指定消费者是否自动提交偏移量,默认为true

7:partition.assignment.strategy,指定partition如何分配给消费者,默认是Range。Range:把Topic的若干个连续的partition分配给消费者。RoundRobin:把Topic的所有partition逐个分配给消费者

8:max.poll.records,单次调用poll方法能够返回的消息数量

再均衡监听器

前面我们提到了消费者组的重平衡,kafka 也为我们提供了这样的组件,让我们可以去发生Rebalance的时候做一些操作,实现ConsumerRebalanceListener 接口,然后在订阅kafka topic 的时候传入,一个常见的常见就是对offset 的管理。因为Rebalance 可能导致数据重复消费。

对于Kafka而言,从poll方法返回消息的那一刻开始这条消息已经算是“消费”完成了,这个时候如果发生了重平衡,你的offset 没有提交的话,重平衡之后会重复消费。所以我们希望在重平衡之前进行offset 的提交

public class ConsumerRebalance {
    private static KafkaConsumer<String, String> consumer;
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    /** * 初始化消费者 */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList("flink_json_source_4"), new RebalanceListener(consumer));

    }

    /** * 初始化配置 */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 1000);
        props.put("session.timeout.ms", 30000);
        props.put("max.poll.records", 1000);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    public static void main(String[] args) {

        while (true) {
            // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record) -> {
                System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no matadata"));
            });
            consumer.commitAsync();
        }
    }

    static class RebalanceListener implements ConsumerRebalanceListener {
        KafkaConsumer<String, String> consumer;

        public RebalanceListener(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        // 在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管partition的消费者就知道该从哪里开始读取了。
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //用于跟踪偏移量的map
            consumer.commitSync(currentOffsets);
        }

        // 在重新分配partition之后和消费者开始读取消息之前被调用。
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.commitSync(currentOffsets);
        }
    }
}

总结

  1. 分区是kafka 高吞吐高扩展的重要基石,也是kafka 负载均衡的基本单位,所以我们要从三个方面去理解分区

  2. 是从Topic 的角度

  3. 从生产者的角度

  4. 从消费者的角度

  5. consumer group 一方面通过优秀的设计达到了两种消息模型的实现,也实现了可扩展的消费机制

  6. 重平衡会严重影响消费者的性能,但是没有重平衡的话就不能做到消费者的高扩展。

相关文章