我试图从uu consumer_uoffset主题中消费,因为这似乎是检索有关消费者的kafka度量(如消息延迟等)的最简单方法。理想的方法是从jmx访问它,但想先尝试一下,返回的消息似乎是加密的或不可读的形式。也尝试添加stringdeserializer属性。有人对如何纠正这个问题有什么建议吗?再次提到这是一个复制品
重复消费单元偏移
没有帮助,因为它没有引用我的问题,即在java中以字符串形式读取消息。也更新了代码,尝试使用kafka.client consumer创建一个consumerrecord。
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext()) {
try {
String mesg = new String(it.next().message());
System.out.println( mesg);
代码更改:
try {
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);
//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}
以及下面信息的快照;在图像底部:
1条答案
按热度按时间ncecgwcz1#
虽然可以直接从
__consumer_offsets
主题,这不是推荐的或最简单的方法。如果可以使用kafka 2.0,最好使用adminclient API来描述组:
listconsumergroupoffsets():查找特定组的所有偏移
descripbeconsumergroups():查找组成员的详细信息
万一,你绝对想直接读表格
__consumer_offset
,则需要对记录进行解码以使其可读。可以使用GroupMetadataManager
班级:可以使用groupmetadatamanager.readmessagekey()对消息键进行解码并检索此项所引用的主题分区。这可以返回2种类型的对象,对于消费者的位置,您只感兴趣
OffsetKey
物体。readoffsetmessagevalue()可用于解码消息值(对于
OffsetKey
)找到偏移量信息。您链接的问题的答案包含执行所有这些操作的基本代码。
还要注意的是,不应将记录反序列化为字符串,而应将它们保留为原始字节,以便这些方法能够正确解码它们。