我尝试将apachecamel与kafka集成,并编写了一个示例程序来读取文件并写入kafka主题。但这样做的时候,我发现了一个错误。我可以用相反的方式读Kafka的主题,然后写进一个文件。
堆栈跟踪
org.apache.kafka.common.errors.serializationexception:无法将类org.apache.camel.component.file.genericfile的值转换为value.serializer[#0-file://c用法:%5cshare%5cinput]kafkaproducer警告未设置消息键或分区键[#0-file://c:%5c共享%5c输入]genericfileoncompletion警告回滚文件策略:org.apache.camel.component.file.strategy。genericfilerenameprocessstrategy@7127845b 对于文件:genericfile[c:\share\input\file.txt][#0-file://c:%5cshare%5cinput]defaulterrorhandler错误传递失败(exchangeid:id-l8-cwbl462-49953-1480494317350-0-21上的messageid:id-l8-cwbl462-49953-1480494317350-0-21):编号:l8-cwbl462-49953-1480494317350-0-22)。传递尝试后耗尽:1捕获到:org.apache.kafka.common.errors.serializationexception:无法将类org.apache.camel.component.file.genericfile的值转换为value.serializer中指定的类org.apache.kafka.common.serialization.stringserializer
代码
@contextname(“mycdicamelcontext”)公共类myroutes扩展了routebuilder{
@Inject
@Uri("file:C:\\share\\input?fileName=file.txt&noop=true")
private Endpoint inputEndpoint;
@Inject
@Uri("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
private Endpoint resultEndpoint;
@Override
public void configure() throws Exception {
from(inputEndpoint)
.to(resultEndpoint);
}
}
1条答案
按热度按时间bvn4nwqk1#
在添加了一个新的处理器之后,它为我工作了