kafka协议:c++到java的序列化

yk9xbfzb  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(472)

我开发了两个c++应用程序,它们生成和使用嵌入protobuf3消息的kafka消息(使用cppkafka)。两者都很好。生产商的相关代码是:

std::string kafkaString;
cppkafka::MessageBuilder *builder;
...
solidList->SerializeToString(&kafkaString);
builder->payload(kafkaString);

protobuf对象被序列化为字符串并作为kafka负载插入。到目前为止一切正常。现在,我正在尝试用java开发一个消费者。相关代码应为:

KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);
....
ConsumerRecords<Long, String> records = consumer.poll(100);
  for (ConsumerRecord<Long, String> record : records) {
    SolidList solidList = SolidList.parseFrom(record.value());
    ...

但在编译时失败:parsefrom抱怨:solidlist.solidlist类型中的parsefrom(bytebuffer)方法不适用于参数(string)。所以,我试着用一个bytebuffer:

KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);
....
ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);
  for (ConsumerRecord<Long, ByteBuffer> record : records) {
    SolidList solidList = SolidList.parseFrom(record.value());
    ...

现在,错误发生在执行时,仍然发生在parsefrom():线程“main”java.lang.classcastexception中的异常:java.lang.string不能转换为java.nio.bytebuffer。我知道这是一个java.lang.string!!!所以,我回到原点,尝试将其用作字节数组:

SolidList solidList = SolidList.parseFrom(record.value().getBytes());

现在,错误出现在执行时间:线程“main”com.google.protobuf.invalidprotocolbufferexception$invalidwiretypeexception中的异常:协议消息标记的线类型无效。。
c序列化的protobuf文档状态:bool serializetostring(string output)const;:序列化消息并将字节存储在给定字符串中。注意,字节是二进制的,而不是文本;我们只将string类用作方便的容器。*
热释光;dr:因此,我应该如何解释java中的protobuf c
“二进制字节”?
这似乎是相关的(相反),但没有帮助:protobufjava到c++序列化[二进制]
提前谢谢。

3hvapo4f

3hvapo4f1#

尝试实现反序列化程序,并将其作为值反序列化程序传递给kafkaconsumer构造函数。可能是这样的:

class SolidListDeserializer implements Deserializer<SolidList> {
  public SolidList deserialize(final String topic, byte[] data) {
    return SolidList.parseFrom(data);
  }
  ...
}

...

KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
cgh8pdjw

cgh8pdjw2#

你可以把Kafka当作 ConsumerRecords<Long, String> . 然后 SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));

相关问题