我开发了两个c++应用程序,它们生成和使用嵌入protobuf3消息的kafka消息(使用cppkafka)。两者都很好。生产商的相关代码是:
std::string kafkaString;
cppkafka::MessageBuilder *builder;
...
solidList->SerializeToString(&kafkaString);
builder->payload(kafkaString);
protobuf对象被序列化为字符串并作为kafka负载插入。到目前为止一切正常。现在,我正在尝试用java开发一个消费者。相关代码应为:
KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);
....
ConsumerRecords<Long, String> records = consumer.poll(100);
for (ConsumerRecord<Long, String> record : records) {
SolidList solidList = SolidList.parseFrom(record.value());
...
但在编译时失败:parsefrom抱怨:solidlist.solidlist类型中的parsefrom(bytebuffer)方法不适用于参数(string)。所以,我试着用一个bytebuffer:
KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);
....
ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);
for (ConsumerRecord<Long, ByteBuffer> record : records) {
SolidList solidList = SolidList.parseFrom(record.value());
...
现在,错误发生在执行时,仍然发生在parsefrom():线程“main”java.lang.classcastexception中的异常:java.lang.string不能转换为java.nio.bytebuffer。我知道这是一个java.lang.string!!!所以,我回到原点,尝试将其用作字节数组:
SolidList solidList = SolidList.parseFrom(record.value().getBytes());
现在,错误出现在执行时间:线程“main”com.google.protobuf.invalidprotocolbufferexception$invalidwiretypeexception中的异常:协议消息标记的线类型无效。。
c序列化的protobuf文档状态:bool serializetostring(string output)const;:序列化消息并将字节存储在给定字符串中。注意,字节是二进制的,而不是文本;我们只将string类用作方便的容器。*
热释光;dr:因此,我应该如何解释java中的protobuf c“二进制字节”?
这似乎是相关的(相反),但没有帮助:protobufjava到c++序列化[二进制]
提前谢谢。
2条答案
按热度按时间3hvapo4f1#
尝试实现反序列化程序,并将其作为值反序列化程序传递给kafkaconsumer构造函数。可能是这样的:
cgh8pdjw2#
你可以把Kafka当作
ConsumerRecords<Long, String>
. 然后SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));