无法使用apache camel使用kafka消息

wwodge7n  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(274)

我试图把Kafka和 Camel 结合起来,却不知道如何使用一条信息。
我正在尝试使用apachecamel为apachekafka中的一个主题创建一个消费者。我使用的是apachecamel2.18.2、kafka2.10和jdk1.8。
下面是一个向名为-syncinout的主题生成消息的程序。

final String toKafka= "kafka:localhost:9092?topic=SyncInOut&groupId=group1";
final String fromKafka = "kafka:localhost:9092?topic=SyncInOut&groupId=group1&autoOffsetReset=earliest&consumersCount=1";

final String TEST_PAYLOAD = "Sync Payload InOut!";

CamelContext camelContext = new DefaultCamelContext();
ProducerTemplate template = camelContext.createProducerTemplate();
    Exchange exchange = template.send(toKafka, ExchangePattern.InOut, new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody(TEST_PAYLOAD);
        }
    });

在执行上面的代码时,我能够使用producertemplate生成消息,并且消息被成功地推送到syncinout。但是我不能理解这个信息。在这里,我添加了一个新的路线,并试图消费。

camelContext.addRoutes(new RouteBuilder() {
             public void configure() {
                 PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
                 pc.setLocation("classpath:application.properties");

                 log.info("About to start route: Kafka Server -> Log ");

                 from(fromKafka)
                 .process(new Processor() {
                     @Override
                     public void process(Exchange exchange)
                             throws Exception {
                         String messageKey = "";
                         if (exchange.getIn() != null) {
                             Message message = exchange.getIn();
                             Integer partitionId = (Integer) message
                                     .getHeader(KafkaConstants.PARTITION);
                             String topicName = (String) message
                                     .getHeader(KafkaConstants.TOPIC);
                             if (message.getHeader(KafkaConstants.KEY) != null)
                                 messageKey = (String) message
                                         .getHeader(KafkaConstants.KEY);
                             Object data = message.getBody();

                             System.out.println("topicName :: "
                                     + topicName + " partitionId :: "
                                     + partitionId + " messageKey :: "
                                     + messageKey + " message :: "
                                     + data + "\n");
                         }
                     }
                 }).to("log:input");
             }
         });
         camelContext.start();

上述消费者路线无法按预期工作。理想情况下,应该打印所消费的消息,但似乎什么也没有发生。似乎不是这样触发的。因此,我尝试使用下面的consumertemplate来消费消息,但也没有成功。

Exchange exchangeConsumer = consumerTemplate.receive(fromKafka);
    System.out.println("Body is : "+exchangeConsumer.getOut().getBody().toString());

上面这一行永远不会完成。我不知道如何使用ApacheCamel来使用来自kafka主题的消息。我从官方文件中找不到多少帮助。任何其他帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题