消费者阅读\uuu消费者\u偏移量传递不可读的消息

wwodge7n  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(315)

我试图从uu consumer_uoffset主题中消费,因为这似乎是检索有关消费者的kafka度量(如消息延迟等)的最简单方法。理想的方法是从jmx访问它,但想先尝试一下,返回的消息似乎是加密的或不可读的形式。也尝试添加stringdeserializer属性。有人对如何纠正这个问题有什么建议吗?再次提到这是一个复制品
重复消费单元偏移
没有帮助,因为它没有引用我的问题,即在java中以字符串形式读取消息。也更新了代码,尝试使用kafka.client consumer创建一个consumerrecord。

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);

consumerProps.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");  
consumerProps.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(
       consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
   consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (KafkaStream stream : streams) {

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");

    while (it.hasNext()) {
         try {

             String mesg = new String(it.next().message());
             System.out.println( mesg);

代码更改:

try {       
    // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");   
    Properties consumerProps = new Properties();
    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , "test");
    consumerProps.put("bootstrap.servers", servers);
    consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    //       consumerConfig);

    //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    //topicCountMap.put(topic, new Integer(1));
    //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
    kconsumer.subscribe(Arrays.asList(topic)); 

    try {
        while (true) {
            ConsumerRecords<String, String> records = kconsumer.poll(10);

            for (ConsumerRecord<String, String> record : records)

                System.out.println(record.offset() + ": " + record.value());
        }
    } finally {
          kconsumer.close();
    }

以及下面信息的快照;在图像底部:

ncecgwcz

ncecgwcz1#

虽然可以直接从 __consumer_offsets 主题,这不是推荐的或最简单的方法。
如果可以使用kafka 2.0,最好使用adminclient API来描述组:
listconsumergroupoffsets():查找特定组的所有偏移
descripbeconsumergroups():查找组成员的详细信息
万一,你绝对想直接读表格 __consumer_offset ,则需要对记录进行解码以使其可读。可以使用 GroupMetadataManager 班级:
可以使用groupmetadatamanager.readmessagekey()对消息键进行解码并检索此项所引用的主题分区。这可以返回2种类型的对象,对于消费者的位置,您只感兴趣 OffsetKey 物体。
readoffsetmessagevalue()可用于解码消息值(对于 OffsetKey )找到偏移量信息。
您链接的问题的答案包含执行所有这些操作的基本代码。
还要注意的是,不应将记录反序列化为字符串,而应将它们保留为原始字节,以便这些方法能够正确解码它们。

相关问题