kafka producer:向 kafka broker 发送消息(sync, async)

x33g5p2x  于2020-12-20 发布在 Kafka  
字(6.4k)|赞(0)|评价(0)|浏览(762)

###producer

在卡夫卡,所有应用程序、服务器等都称为 producer,这些应用程序、服务器等都用于生成消息并发送到卡夫卡主题。生产者的主要功能是将每个消息映射到主题分区,并向分区的读取器发送请求。如果为要发送的消息指定键值,则这些消息将发送到所需的分区。但是,如果未指定键值,则分区将按 round-robin 方法将消息均匀地分布到每个分区。

###kafka / zookeeper install

在这次报道中,我们特别使用 docker-compose 来轻松移动 Kafka 经纪人,然后从制作人向经纪人发送消息。虽然您不必使用 docker 安装 kafka 和 zookeeper,但为了方便安装 Kafka,请使用下面的 docker-compose 模板将卡夫卡放在容器中,然后进行动手练习。

version: '2'

networks:
  test:

services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - test

  kafka:
    image: wurstmeister/kafka:2.12-2.0.1
    container_name: kafka
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "kafka-my-topic:1:1"   # Topic명:Partition개수:Replica개수
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    networks:
      - test

###producer试用

这一次,我们创建一个制作人项目,以向卡夫卡发送消息。无论以哪种语言实现制作人,它都无关紧要,但本轮撰写时,我使用 java 语言。卡夫卡是基于斯卡拉开发的。主客户端库是使用 Zaa 创建的应用程序,提供的功能最大。创建 java 项目后,我们可以使用 maven 或 gradle 添加客户端库。

由于卡夫卡与客户端库的所有版本不兼容,因此我们强烈建议在验证代理和客户端库是否兼容后继续。

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
}

添加 dependecy 后,让我们自己实现制作人代码。令人惊讶的是,与概念相比,卡夫卡制作人的代码本身很简单。

####전송传输而不检查结果

您现在编写的代码是生产者的代码,它不会验证制作人在向服务器发送消息后是否成功到达。如果卡夫卡还活着,制作人将自动重新发送,即使消息传输失败,在大多数情况下,它会自动发送,但可能会丢失一些消息,如前面的消息。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerApplication {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            producer.send(new ProducerRecord<String, String>("kafka-my-topic", "Apache Kafka Producer Test"));
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

首先,创建属性对象。(properties 对象是将 Java 的 hashtable 作为继承对象,作为 key-value 存储的对象。)现在,在属性对象 props 中,将指定与创建者相关的设置。

在 props 中定义代理列表。目前只有一个经纪商列表,以"127.0.0.1:9092"的形式输入值,但如果有多个,则输入形式为"127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092"。
(建议尽可能从该选项添加所有卡夫卡群集。

指定 stringserializer,因为您将使用发送到卡夫卡的消息的键和值字符串。

现在,创建实际 producer 对象,该对象接收指定选项作为参数。

创建 producuerrecord 对象,使用 producer 的 send() 方法将消息发送到名为 kafka-my-topic 的主题。

关闭 producer。

※ 分区并发送消息

如果指定所需的分区以发送消息,而不是以循环方式向分区发送消息,则 producer.send() 方法将 key 值作为第二个参数。

####sync传输

事实上,生产者将发送消息,并接收 recordmetadata 作为 send() 方法中的 java future 对象。这一次,让我们使用 .get() 方法等待 future,然后检查 send() 方法是否成功执行。这种方法的优点是,您可以验证消息是否已成功转发给代理,从而可以执行更可靠的消息传输。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class ProducerApplication {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("kafka-my-topic", "Apache Kafka Producer Test")).get();
            System.out.printf("Partition: %d, Offset %d", metadata.partition(), metadata.offset());
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

####async传输

如果以上述同步方式发送消息,则当线程停止等待对生产者发送的所有消息的响应时,效率会下降,并且需要很长时间。但是,如果异步传输,则可以加快传输速度,因为您可以立即执行下一步操作,而无需等待响应。在下面的代码中,生产者调用 send() 方法(如回调),并在卡夫卡代理收到响应时进行回调。

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerCallback implements Callback {

    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (metadata != null) {
            System.out.printf("Partition: %d, Offset: %d", metadata.partition(), metadata.offset());
        } else {
            exception.printStackTrace();
        }
    }
}
import com.kafka.producer.async.ProducerCallback;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class ProducerApplication {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            producer.send(new ProducerRecord<String, String>("kafka-my-topic", "Apache Kafka Producer Test"), new ProducerCallback());
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

通过以回调形式将 producercallback 对象放在回调形式中,而不是等待从以前同步方法接收到 .get 方法的 future 对象,可以异步执行它。在上面的示例中,我们编写了一个示例代码,该代码在卡夫卡错误时输出 exception,但在实际操作中需要额外的异常处理。

###producer选项

在到目前为止的示例中,我们简要地了解了向卡夫卡主题发送消息所需的一些选项。还有其他与制作人操作相关的选项,我们将一起来了解它们。

bootstrap.servers

由于卡夫卡群集没有主群集的概念,因此任何群集服务器都可能收到来自客户端的请求。虽然并非所有卡夫卡主机都指定,但卡夫卡群集是活的,但如果输入的主机出现故障,则生产者可能会找不到其他卡夫卡主机,因此无法发送消息。但是,如果输入所有主机,则即使给定列表中的某个服务器出现故障,生产者也会自动尝试重新连接到另一台服务器,因此对故障具有耐受性。

acks

在制作人向 Kafka 主题的领导者发送消息后,在完成请求之前,选择 ack(批准)数。该选项的整数较低,虽然性能良好,但消息丢失的可能性越大,数量越高,性能越高,但消息丢失率越低。

ack=0
制作人不会等待卡夫卡的任何错误。这意味着重新请求设置也不适用,因为即使生产者发送的消息失败,您也不会知道结果。但是,由于您不会等待来自 Kafka 的 ack 响应**,因此您可以非常快速地发送消息,从而以高吞吐量执行功能。

ack=1
只有卡夫卡领导收到来自正常数据的答复。但是,对于消息是否正常到达所有关注者,我们没有得到回复。

ack=all / ack=-1
如果设置为 all 或 -1,则只要存在一个或多个关注者,数据不会丢失,因为您会等待来自所有关注者的数据的 ack。因此,它最有力地保证数据丢失,但同时需要等待所有关注者的 ack 响应,从而导致性能下降。

buffer.memory

完整的内存字节,生产者可以等待一段时间,将数据发送到卡夫卡服务器。迪莱,如批次传输发生这种情况时可用。

compression.type

制作人可以压缩和发送数据,以决定要挤出什么类型。作为选项,您可以选择各种格式,如 none、gzip、snappy 和 lz4

batch.size

生产者将收集一定容量的多个数据,并批量发送到同一分区。此时,使用这些设置可以调整批处理大小(以字节为单位)。大于定义的批大小的数据将不会尝试放置。此外,如果客户端在发送批处理之前出现故障,则批处理中的消息将不会转发。因此,如果消息需要高可用性,则不使用批处理功能也是一种方法。

linger.ms

当放置大小仍然不足时,将调整等待其他消息的时间。当达到指定的放置大小时,linger.ms,无论选择哪个选项,制作人都会立即发送消息,但如果尚未达到批处理大小,则当达到该设置的超时时,他们都会发送消息。default 值为 0(无延迟),如果设置的值大于零,则会出现一些延迟,但吞吐量会提高。

max.request.size

生产者可以发送的最大消息字节大小**。default 值为 1 mb。

相关文章

微信公众号

最新文章

更多