前面我们详细介绍了拦截器的原理,可以参考kafka系列之Producer 拦截器(06),其实拦截器的在很多技术中都有,关于拦截器的应用场景我们在前面一节中也介绍过了,这一节我们直接看一下消费者端拦截器的使用。我觉得为了学习kafka ,你可以打开kafka 的源码包,看看都有什么,你可以从kafka-client 包开始
我们今天要介绍的ConsumerInterceptor
就是在consumer 包下面,我么前面介绍的很多东西你都可以在这里看到响应的代码,例如分区的分配策略、再均衡监听器等等。
/** * A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case * is for third-party components to hook into the consumer applications for custom monitoring, logging, etc. * 其实这一句已经说明了它的主要使用场景,而且告诉你它是个插件,也就是可有可无的,主要用在第三方组件里,用来记录和监控 */
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/** * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)} * 发生的时机:在返回给客户端之前,也就是poll() 方法之前 * <p> * This method is allowed to modify consumer records, in which case the new records will be returned. * 这个方法允许你修改records(记录集合),然后信息的记录集合被返回 * There is no limitation on number of records that could be returned from this * method. I.e., the interceptor can filter the records or generate new records. * 没有返回记录条数上的限制,你可以在这里可以可以过滤或者是生成新的记录 */
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/** * This is called when offsets get committed. * 当offset 被调教之后条用 */
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/** * This is called when interceptor is closed */
public void close();
}
onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
这里我们注意到一个问题,那就是ConsumerInterceptor.onConsume
处理的对象是ConsumerRecords,这个Producer 拦截器的参数ProducerRecord不一样,而且从名称上我们就可以看出来这是个集合,那是什么集合呢
它呢其实就是ConsumerRecord的一个集合,这是因为我们生产者生产记录是一条条生产的,而我们的消费者是一批批消费的。
这就是我们的拦截器实现
public class ChangeValueInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
if (records == null || records.isEmpty()) {
return records;
}
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
if ("kingcall".equals(record.key())) {
record = new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), "刀光剑影江湖情,摧枯拉朽浪滔滔。功名利禄拂衣去,山高水远路迢迢");
}
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
List<ConsumerRecord<String, String>> consumerRecords = newRecords.getOrDefault(topicPartition, new ArrayList<ConsumerRecord<String, String>>());
consumerRecords.add(record);
newRecords.put(topicPartition, consumerRecords);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
下面是我们的consumer 代码
public class ConsumerInterceptorDemo {
private static KafkaConsumer<String, String> consumer;
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 1000);
props.put("session.timeout.ms", 30000);
props.put("max.poll.records", 1000);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
@Test
public void changeValue() {
Properties configs = initConfig();
List<String> interceptors = new ArrayList<>();
// 拦截器 1
interceptors.add("com.kingcall.clients.interceptors.interceptorEntity.consumer.ChangeValueInterceptor");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
}
}
}
当我们尝试发送两条消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "你好祖国", "你好祖国");
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "kingcall", "kingcall");
输出如下:
这个怎么玩呢,前面我们在学习Producer 拦截器的时候记录下了,总共发送的数据条数,这里我们只要计算出来总的记录延迟,我们就可以算出来平均延时,从了解我们当前的处理能力,是否可以满足业务的需求
定义拦截器
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private static Jedis jedis;
static {
jedis = new Jedis("localhost");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalMessageCount"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
jedis.close();
jedis = null;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
使用拦截器
@Test
public void avgLatency() {
Properties configs = initConfig();
List<String> interceptors = new ArrayList<>();
// 拦截器 1
interceptors.add("com.kingcall.clients.interceptors.interceptorEntity.consumer.AvgLatencyConsumerInterceptor");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
}
}
结果:
“
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114675696
内容来源于网络,如有侵权,请联系作者删除!