本节教程在window下演示,如果是在linux上学习的同学,可以将命令的前缀进行替换即可,比如 window 下的 命令前缀 bin\windows\kafka-topics.bat
,则linux下的命令前缀为 bin\kafka-topics.sh
;
kafka生产消息使用producer生产者,其核心组件服务器为broker, 消费消息使用comsumer消费者, 消息接收需要使用到 topic; topic中又有分区和副本;
创建一个名为test的topic,并且指定分区为1,副本为1;
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
使用如下查看topic描述
bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
其结果如下,分区为1,副本为1,名称为test
Topic: test TopicId: hkPExRf8T72y2FFNEOiFnQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
创建一个生产者,向topic test 发送消息
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
回车键后发送如下消息
welcome to my site that is zszxz.com
创建一个消费者,向topic test获取消息
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
获取内容如下
kafaka 的 producer 负责的职能就是向 kafka 写入数据;kafka的每个producer 都是独立工作,producer 实例之间没有任何关系;kafka 在向topic 发送消息的时候, 如果消息指定了 key , kafka会计算key的hash值将消息存入不同的分区提高吞吐量,如果消息没有指定Key, kafka会将消息进行轮询存储到分区!确认分区后,kafka 的 producer 会去寻找分区对应 的 leader 也只有leader 能够响应client发送过来的请求,而另一个副本follower和leader 保持同步;
引入 client 依赖, producer 和 cosumer 相对 kafka 都是 客户端,所以都是引入客户端依赖;
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
发送示例消息示例
public static void main(String[] args) {
Properties properties = new Properties();
// 指定server地址 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
// 指定client ID
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"test-producer");
// key序列化配置类型为String key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// value序列化配置类型为String value.serializer
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 创建生产者
KafkaProducer<String,String> producer=new KafkaProducer(properties);
// 消息
String msg = "welcome to zszxz.com";
// 主题
String topic = "test";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
producer.send(record);
producer.close();
}
bootstrap.servers
, key.serializer
, value.serializer
, 这三个属性必须指定;StringSerializer.class.getName()
表示的序列化器为org.apache.kafka.common.serialization.StringDeserializer
;在创建 KafkaProducer 也可以指定 key 和value的序列化器;
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}
异步发送
KafkaProducer 的 send 方法 实现了 Callback回调, 并且提供了Future对象用于获取消息发送的结果;
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
同步发送
同步发送会使用 Future.get() 方法 实现无限等待结果;如果发送消息失败可以进行异常捕获进行处理
producer.send(record).get();
消息异常捕获
try {
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
也可以进行回调异常处理,实现重试机制等
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("发送消息成功");
}else {
// 重试或者其它处理
}
}
}).get();
除了 bootstrap.servers, key.serializer, value.serializer 三个必须指定的参数之外,还有一些重要的参数;
通常我们需要将 acks 设置为 1
// 设置acks 应答
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 设置缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
// 设置压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
retries = 2147483647
, 保险起见,我们需要自己设置一般情况下为 3~5次为佳;// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
// 设置 batch
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
// 设置延迟发送
properties.put(ProducerConfig.LINGER_MS_CONFIG,"200");
// 设置消息大小
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"1048576");
// 设置超时响应时间
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,"50000");
kafka 接收到消息后,默认会通过Key值进行计算出hash 值,然后将消息发送到分区,如果未指定分区,则会进行轮询发送以保证消息在分区上分布比较均衡;如果想自定义分区策略则需要实现 Partitioner 接口;
/**
* @author lsc
* <p> </p>
*/
public class ZszxzPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
主要的参数
自定义分区示例
/**
* @author lsc
* <p> </p>
*/
public class ZszxzPartitioner implements Partitioner {
private Random random;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取集群中的分区
List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
// 分区数量
int size = partitionInfos.size();
int partitionNum = 0;
if (key==null){
// key 没有设置, 随机分区
partitionNum = random.nextInt(size);
}else {
// 使用hash值计算分区
partitionNum = Math.abs((key.hashCode())) % size;
}
System.out.println("分区:"+partitionNum);
return partitionNum;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
random = new Random();
}
}
在发送消息的时候配置上分区属性即可
// 自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.zszxz.kafka.partition.ZszxzPartitioner");
kafka 序列化和反序列化,根据不同的数据类型进行配置即可
之前的示例代码使用了简化方式,回顾下,否则要替换为全类名
// key序列化配置类型为String
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// value序列化配置类型为String
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producer 中 KafkaProducer 是线程共享的一个变量,可以作为成员变量使用,并且线程安全;每个线程中都维护着 一个ProducerRecord 用于存储消息;
示例代码如下
/**
* @author lsc
* <p> </p>
*/
public class ProducerThread extends Thread{
private final KafkaProducer<String,String> producer;
private final String topic;
public ProducerThread(String topic) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"test-producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producer=new KafkaProducer<String, String>(properties);
this.topic = topic;
}
@Override
public void run() {
int num=0;
while(num<30) {
String msg="hello this message from producer:"+num;
try {
producer.send(new ProducerRecord<String, String>(topic,msg)).get();
TimeUnit.SECONDS.sleep(2);
num++;
System.out.println(num);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new ProducerThread("test").start();
}
}
消费者就是从topic获取消息;但是kafka的consumer 还有一些特性; consumer 被归类到 consumer group 底下;每个group 底下可能有多个consumer,;
由此引申出2 个模型
队列模型
发布订阅模型
如图所示2 个client 就组成一个group;每个gruop 都有一个group.id当作唯一标识;
消费者如果宕机会从宕机的位置开始发送消息,其识别位置就是使用offset实现;consumer会定期向kafka发送offset实现位移提交。Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为 __consumer_offsets
/**
* @author lsc
* <p> </p>
*/
public class ConsumerTest {
public static void main(String[] args) {
Properties properties=new Properties();
// 设置地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
// 设置group id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
// 设置 offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交间隔时间
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 设置value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 设置 key
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 对于当前groupid来说,消息的offset从最早的消息开始消费
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 消费者实例
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 主题
String topic = "test";
// 订阅
kafkaConsumer.subscribe(Arrays.asList(topic));
try{
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(30));
records.forEach(record -> {
System.out.println("key:"+record.key()+" value:"+record.value()+" "+record.offset());
});
}
}finally {
kafkaConsumer.close();
}
}
}
必须指定参数 bootstrap.servers, value.deserializer, key.deserializer, group.id;其中bootstrap.servers 也可以指定多个值 ip1:port1,ip2:port2; group.id 为消费组id,通常与业务名称挂钩; value.deserializer和 key.deserializer 分别对 producer 发送的消息进行反序列化;
KafkaConsumer 对象构造器如下所示
public KafkaConsumer(Properties properties) {
this((Properties)properties, (Deserializer)null, (Deserializer)null);
}
还可以指定key,value 的反序列化
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
}
属性参数不一定是写Properties,也可以写 map;
public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
}
kafkaConsumer.subscribe(Arrays.asList(topic));
为订阅主题,也可以订阅多个主题
kafkaConsumer.subscribe(Arrays.asList("topic1","topic2"));
kafkaConsumer.poll(Duration.ofSeconds(1));
是从 topic 中获取消息;1表示超时设置,如果未拿到数据,在1秒内会进行阻塞,直到拿到数据;
回顾下,我们使用了类名的获取方式实现全类名字符串
// 设置value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 设置 key
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumer group 是一组 rebalance 协议; 其规定了comsuer group 如何订阅topic 分区 达到平衡的目的;Kafka 有三种分配策略: RoundRobin, Range,Sticky;
简单的继承 Thread类
/**
* @author lsc
* <p> </p>
*/
public class ConsumerThread extends Thread{
private final KafkaConsumer<String,String> consumer;
public ConsumerThread(String topic){
Properties properties=new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
// 设置 offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交间隔时间
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 设置
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// key序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// value序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 对于当前groupid来说,消息的offset从最早的消息开始消费
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer= new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println("key:"+record.key() + " value:" + record.value() + "offset:" + record.offset());
});
}
}
public static void main(String[] args) {
new ConsumerThread("test").start();
}
}