kafka consumer:从 kafka broker 获取消息

x33g5p2x  于2020-12-20 发布在 Kafka  
字(4.9k)|赞(0)|评价(0)|浏览(1359)

在本章中,我们来了解一下kafka消费者(消费)。如果 kafka producer 充当生成消息并将其发送到主题,则 consumer 是指充当获取和使用消息的应用程序或服务器。消费者的主要功能是向管理特定分区的分区主管发出获取消息的请求。每个请求都指定日志的偏移量,并接收来自该位置的日志消息。因此,消费者可以调整要导入的消息的位置,并重新导入已导入的消息。

###consumer option

卡夫卡消费者有两种:旧消费者和新消费者。两个消费者的最大区别是,有没有使用主守护者。old consumer 支持将消费者关闭集存储在主守护者的守护节点中,但自 0.9 版本以来,它已更改为将 offset 存储在卡夫卡主题中,而不是主守护者。

  • 此处的 offset 是卡夫卡分区中标识消息的单位。

虽然 old consumer(在主守护者中存储 offset 的消费者)仍然支持,但该功能将在将来的版本中消失,因此,在启动新的卡夫卡项目或构建基础结构时,最好基于 new consumer。

bootstrap.servers

表示用于连接到卡夫卡群集的卡夫卡信息。如制作人位置中所述,即使只应用一台主机,也会有其行为,但建议输入主机列表完整 [,因为群集是活的,但当主机关闭时无法访问。

fetch.min.bytes

可一次导入的最小数据大小。如果聚集的数据小于选项中指定的大小,则立即响应请求,等待数据累积,或等待等待响应的最长时间。

group.id

标识消费者所属的消费者组的标识符。

enable.auto.commit

在后台定期提交 offset。

auto.offset.reset

如果卡夫卡没有初始 offset,或者当前 offset 不再存在(如果删除了数据),请重置为下面的选项。

earliest :设置为最早的偏移值。

latest :设置为最后一个偏移值。

none :如果找不到以前的偏移值,则表示错误。

fetch.max.bytes

一次可导入的最大数据大小

fetch.max.wait.ms

等待响应请求的最大时间(如果小于 fetch.min.bytes 中设置的数据)。

request.timeout.ms

等待响应请求的最大时间。如果最小数据大小未占用,则等待该时间并响应数据。

session.timeout.ms

指定消费者和经纪商之间的会话超时时间的选项。换句话说,这是经纪人认为消费者活着的时候了。(default 10 秒) 如果消费者未向组协调员发送心跳,而是 session.timeout.ms 后,代理将确定消费者已关闭或出现故障,并且消费者组将尝试重新平衡。

该选项通常与 heartbeat.interval.ms 一起使用。将 session.timeout.ms 设置为低于默认值可以更快地检测要失败,但完成垃圾回收或 poll 循环的时间越长,就会导致不需要的重平衡。另一方面,将 session.timeout.ms 设置为较高的值将减少发生不需要的重平衡的可能性,但检测实际错误可能需要很长时间。

heartbeat.interval.ms

指定向组协调员发送检测信号的频率。当然,它应该低于 session.timeout.ms,通常设置为 session.timeout.ms 的三分之一。(默认值为 3 秒)

max.poll.records

调整单个调用的最大记录数。

max.poll.interval.ms

之前,我们定期发送检测信号来检查消费者是否还活着。但是,有些用户可能只发送检测信号,而实际上不接收消息。在这些情况下,如果您定期请求 poll,以便该消费者无法无限期地占用分区,则它确定这是一个故障,并且允许其他消费者在除消费者之外从该分区中接收消息。

auto.commit.interval.ms

定期提交 offset 的时间。

※ 重新平衡意味着消费者的所有权将超越。

###consumer实现

那么,让我们亲自使用java实现consumer。实现代码如下所示:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerBasic {
    public void receiveFromKafka() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "basic-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("basic-consumer"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

查看代码时,键入卡夫卡列表,就像 producer 一样,然后指定与 gropd id 和 auto commit 和 offset 重置相关的选项。由于前面的消息制作器使用字符串来表示消息和键值,因此指定内置 stringdeserializer。

使用 subscribe() 方法订阅要获取消息的主题。(也可以以列表的形式输入多个主题。)无限循环不断在主题中 poll()消息。重要的是,我们必须继续轮询卡夫卡。否则,消费者将判断为已关闭,因此分配给该消费者的分区将传递给其他消费者并消耗。

poll() 返回整个记录。因此,记录包含各种数据,如主题、分区、分区的 offset、key 和 value。此外,由于一次只调用一条消息,因此请使用重复语句来处理 n 条消息。在此示例中,我们仅添加代码以 system.out.printf() 输出数据,但在实际操作环境中,我们要求编写其他逻辑,例如分析存储在 hadoop 或数据库中的消息。

最后,在 consumer 退出之前,使用 close() 方法关闭网络连接和套接字。close() 方法会立即发生回发,因为消费者不会发送检测信号,因此协调器的速度比检测该消费者退出的速度快。

###partition消息顺序

进入消费者的消息按分区排列。例如,假设您发送了一条消息:0、1、2、3、4 和 5,因为主题具有四个分区,如下图所示。由于每个分区并行接收消息,因此 0、1、2 和 3 都装载到 offset1。4,5 被装载到 offset2。Kafka 仅按 offset 发送消息与 2 人 4 和 5 相比,offset 是一个人 0、1、2 和 3,因为 offset 确实可以保证首先到达,但不能保证具有相同 offset 的消息之间的顺序。因此,您发送到 0、1、2、3、4 和 5,如下图所示,但 consumer 会以与 0、2、1、3、5 和 4 相同的顺序接收消息。

为了确保消息的顺序,分区之间也只应指定一个分区。但是,如果指定 1 个分区数,则所有消息的顺序可以保证,但吞吐量并不高,因为只有一个分区,因此无法分发处理,并且只能由一个消费者处理。

###consumer group

这次,让我们来了解一下消费者组。消费者组是降低卡夫卡耦合的一大特点。消费者组允许多个消费者组同时访问一个主题,以保持消息的顺序和导入。这意味着,通过一致地从多个组调用一个数据,数据可以以各种形式进行转换和使用。

此外,消费者组可以扩展消费者。如果制作人发送到主题的消息速度激增,并且比消费者接收消息的速度快,会发生什么情况?消费者无法处理的信息将越来越多。最初,问题可能不会出现,但传入消息和传入消息之间的差异可能会拉大,从而导致服务问题。

在这种情况下,消费者组提供了有用的功能。默认情况下,在消费者组中,消费者共享用于获取消息的主题分区的西游权。因此,如果消费者组中的消费者数量不足,无法处理生产者发送的消息,则可以添加消费者数量,以并行处理一个消费者处理的消息,如上图所示。

如下图,当消费者组中消费者增加时,将经历一个重新平衡的过程,这意味着每个消费者对分区的所有权将移动。但是,请注意,在重新平衡期间,暂时无法获取消息。当重平衡发生时,主题中的每个分区都会连接一个消费者。当重新平衡结束时,消费者将从他们负责的分区获取消息。

消费者组的功能使添加消费者变得简单。但是,即使我们像图像一样添加了四个消费者,但消费者所传递的信息量却比制作人发送的信息少,该怎么办?您可以简单地认为增加消费者的数量,但您添加的新消费者将处于待机状态,因为只有一个消费者可以连接到主题中的分区。因此,如果创建者的数量与主题中的分区数相同,但您无法跟上制作人发送的消息的速度,则您不仅应添加消费者,还应同时增加主题中的分区和用户。

此外,如果相反,它的行为良好的 consumer4 突然关闭。在这种情况下,向 consumer4 发送消息的 partition3 将重新平衡为 consumer3,因此部分ition2 和 3 都将分配给 consumer3。在这种情况下,吞吐量与 consumer 1 和 2 相比不均,因此有必要通过持续监控进一步分配消费者。

当我第一次介绍卡夫卡时,我告诉过你,卡夫卡是一个集中化数据的系统。(为了重新使用,我将再次查看图像。与右侧的卡夫卡系统流程图相比,卡夫卡之前的系统流程图非常耦合和复杂。卡夫卡如何改进这些复杂的系统流程图?

(左)卡夫卡之前 / (右) 卡夫卡之后

例如,一个名为 a 的服务团队将日志消息发送到 topic-01 主题,并使用消费者组 01 来处理这些消息。但是,不久之后,b 服务团队需要来自服务团队的日志消息。如果以前的情况相同,则服务团队会直接转发日志消息(这种情况重复,从而创建复杂的系统流程图),但使用 Kafka 时,a 团队正在使用卡夫卡和主题信息告诉 b 团队,以便 b 团队可以以新的消费者组访问,并获取 a 团队从 Kafka 接收的消息。这些消费者组之所以能够从一个主题中传递消息,是因为每个消费者组分别管理各自的偏移量。因此,即使将多个消费者组与多个消费者组关联到一个主题中,您也能够在不影响其他消费者组的情况下传递消息。

相关文章

最新文章

更多