我试图把Kafka和 Camel 结合起来,却不知道如何使用一条信息。
我正在尝试使用apachecamel为apachekafka中的一个主题创建一个消费者。我使用的是apachecamel2.18.2、kafka2.10和jdk1.8。
下面是一个向名为-syncinout的主题生成消息的程序。
final String toKafka= "kafka:localhost:9092?topic=SyncInOut&groupId=group1";
final String fromKafka = "kafka:localhost:9092?topic=SyncInOut&groupId=group1&autoOffsetReset=earliest&consumersCount=1";
final String TEST_PAYLOAD = "Sync Payload InOut!";
CamelContext camelContext = new DefaultCamelContext();
ProducerTemplate template = camelContext.createProducerTemplate();
Exchange exchange = template.send(toKafka, ExchangePattern.InOut, new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(TEST_PAYLOAD);
}
});
在执行上面的代码时,我能够使用producertemplate生成消息,并且消息被成功地推送到syncinout。但是我不能理解这个信息。在这里,我添加了一个新的路线,并试图消费。
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");
log.info("About to start route: Kafka Server -> Log ");
from(fromKafka)
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String messageKey = "";
if (exchange.getIn() != null) {
Message message = exchange.getIn();
Integer partitionId = (Integer) message
.getHeader(KafkaConstants.PARTITION);
String topicName = (String) message
.getHeader(KafkaConstants.TOPIC);
if (message.getHeader(KafkaConstants.KEY) != null)
messageKey = (String) message
.getHeader(KafkaConstants.KEY);
Object data = message.getBody();
System.out.println("topicName :: "
+ topicName + " partitionId :: "
+ partitionId + " messageKey :: "
+ messageKey + " message :: "
+ data + "\n");
}
}
}).to("log:input");
}
});
camelContext.start();
上述消费者路线无法按预期工作。理想情况下,应该打印所消费的消息,但似乎什么也没有发生。似乎不是这样触发的。因此,我尝试使用下面的consumertemplate来消费消息,但也没有成功。
Exchange exchangeConsumer = consumerTemplate.receive(fromKafka);
System.out.println("Body is : "+exchangeConsumer.getOut().getBody().toString());
上面这一行永远不会完成。我不知道如何使用ApacheCamel来使用来自kafka主题的消息。我从官方文件中找不到多少帮助。任何其他帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!