camel文件读写错误

9njqaruj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(247)

我尝试将apachecamel与kafka集成,并编写了一个示例程序来读取文件并写入kafka主题。但这样做的时候,我发现了一个错误。我可以用相反的方式读Kafka的主题,然后写进一个文件。

堆栈跟踪

org.apache.kafka.common.errors.serializationexception:无法将类org.apache.camel.component.file.genericfile的值转换为value.serializer[#0-file://c用法:%5cshare%5cinput]kafkaproducer警告未设置消息键或分区键[#0-file://c:%5c共享%5c输入]genericfileoncompletion警告回滚文件策略:org.apache.camel.component.file.strategy。genericfilerenameprocessstrategy@7127845b 对于文件:genericfile[c:\share\input\file.txt][#0-file://c:%5cshare%5cinput]defaulterrorhandler错误传递失败(exchangeid:id-l8-cwbl462-49953-1480494317350-0-21上的messageid:id-l8-cwbl462-49953-1480494317350-0-21):编号:l8-cwbl462-49953-1480494317350-0-22)。传递尝试后耗尽:1捕获到:org.apache.kafka.common.errors.serializationexception:无法将类org.apache.camel.component.file.genericfile的值转换为value.serializer中指定的类org.apache.kafka.common.serialization.stringserializer
代码
@contextname(“mycdicamelcontext”)公共类myroutes扩展了routebuilder{

@Inject
 @Uri("file:C:\\share\\input?fileName=file.txt&noop=true")
 private Endpoint inputEndpoint;

 @Inject
 @Uri("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
 private Endpoint resultEndpoint;

@Override
public void configure() throws Exception {
    from(inputEndpoint)
         .to(resultEndpoint);
}

}

bvn4nwqk

bvn4nwqk1#

在添加了一个新的处理器之后,它为我工作了

public void configure() throws Exception {
        from(inputEndpoint).process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setBody(exchange.getIn().getBody(),String.class);
                exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
                exchange.getIn().setHeader(KafkaConstants.KEY, "1");
            }
        })
             .to(resultEndpoint);
    }

相关问题