Kafka消费者从多个主题阅读

yrefmtwq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(386)

我对Kafka很陌生。我正在创建两个主题和出版这两个主题从两个生产者。我有一个消费者,它使用来自这两个主题的消息。这是因为我想按优先级处理。
我从这两个主题都得到了一个流,但只要我开始迭代 ConsumerItreator 在任何一条小溪中,它都会阻塞那里。因为它是在文档中写的,所以它将被阻止,直到它收到新消息。
有没有人知道如何从一个Kafka消费者的两个主题和两条流中阅读?

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
                topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
                ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();

                while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
                {
                    byte[] bytes = highPrioerityIterator.next().message();
                    Object obj = null;
                    CLoudDataObject thunderDataObject = null;
                    try
                    {

                        obj = SerializationUtils.deserialize(bytes);
                        if (obj instanceof CLoudDataObject)
                        {
                            thunderDataObject = (CLoudDataObject) obj;
                            System.out.println(thunderDataObject);
                            // TODO Got the Thunder object here, now write code to send it to Thunder service.
                        }

                    }
                    catch (Exception e)
                    {
                    }
                }
vs3odd8k

vs3odd8k1#

如果您不想先处理低优先级的消息,然后再处理高优先级的消息,那么设置consumer.timeout.ms属性并捕获consumertimeoutexception以检测高优先级的流是否到达最后一条可用的消息呢?默认情况下,它被设置为-1以阻止直到新消息到达(http://kafka.apache.org/07/configuration.html)
下面解释了一种同时处理具有不同优先级的多个流的方法。
Kafka需要多线程编程。在您的例子中,这两个主题的流需要由流的线程处理。因为每个线程将独立运行以处理消息,所以一个阻塞流(线程)不会影响其他流。
java的线程池实现可以帮助创建多线程应用程序。您可以在这里找到示例实现:
https://cwiki.apache.org/confluence/display/kafka/consumer+group+example
关于执行的优先级,可以调用thread.currentthread.setpriority方法,根据线程的服务kafka主题来确定线程的适当优先级。

相关问题