avro-反序列化pojo

46scxncf  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(394)

我是新来的阿夫罗和Kafka花了最后几天发送Kafka的主题序列化数据。。。没有成功。
让我来解释一下我想要达到的目标:
在生产者方面,我通过soap接收数据并发送Kafka主题的内容。我使用cxf从wsdl生成pojo,并编写了相应的模式。我要做的是序列化cxf未编组的对象,并在kafka主题中发送它们。
在web上的大多数示例中,avro记录是使用已知的模式(或数据类型)生成的,但在这种情况下,我不知道序列化数据时将使用哪个模式。因此,我动态获取消息类型(通过cxf拦截器)并按以下方式序列化:

// get unmarshaled POJO
MessageContentsList objs = MessageContentsList.getContentsList(message);
Object obj = objs.get(0);

EncoderFactory factory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = factory.directBinaryEncoder(out, null);

// getting schema from class name (first approach)
String scName = obj.getClass().getSimpleName();
InputStream avroRes = this.getClass().getClassLoader().getResourceAsStream(scName);
Schema schema = new Schema.Parser().parse(avroRes);

ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
writer.write(obj, encoder);
encoder.flush();
out.close();

KeyedMessage< String, byte[]> kMessage = new KeyedMessage<String, byte[]>("mytopic", out.toByteArray());
producer.send(kMessage);

通过这种方式,我可以发送有关主题的数据,但无法从传入消息中获取模式。
有没有办法:
阅读来自kafka主题的消息并获取用于序列化的模式?
在消费和反序列化时将通用记录Map到pojo?
当数据类型未知时,发送Kafka主题的avro记录的“最佳”实践是什么?
也许我在阅读avro文档时遗漏了一些东西,没有按预期使用它。
谢谢你的帮助。。。

lvjbypge

lvjbypge1#

发送到Kafka主题的消息应该对模式和avro记录进行编码。如果在每条消息中发送模式的开销太大,则改为发送模式的标识符。消息使用者可以使用标识符从模式注册表检索完整的模式定义。例如,此序列化kafka消息的代码在消息的第一个字节中写入架构标识符:

ByteArrayOutputStream out = new ByteArrayOutputStream();

schema = getSchema(object);
int id = schemaRegistry.register(subject, schema);
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
  writer = new SpecificDatumWriter<Object>(schema);
} else {
  writer = new GenericDatumWriter<Object>(schema);
}
writer.write(object, encoder);
encoder.flush();

byte[] bytes = out.toByteArray();
out.close();
return bytes;

相关问题