如何用嵌入式模式反序列化kafka中的avro

iq0todco  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(262)

我从一个Kafka主题接收二进制avro文件,我必须反序列化它们。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的avro文件分离是一种更好的做法,但是我不能控制生产者,我不能改变这一点。
我的代码运行在apachestorm之上。首先,我创建一个阅读器:

mDatumReader = new GenericDatumReader<GenericRecord>();

稍后我尝试反序列化消息而不声明架构:

Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);

但是当一个消息到达时,我得到一个错误:

Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]

我看到的所有答案都是关于使用其他格式,改变传递给Kafka的信息或其他东西。我无法控制这些事情。
我的问题是 bytes[] 在二进制消息中嵌入了模式,如何在不声明模式的情况下反序列化avro文件,以便读取它。

wfsdck30

wfsdck301#

添加maven依赖性

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.9.1</version>
    <type>maven-plugin</type>
</dependency>

创建一个如下所示的文件

{"namespace": "tachyonis.space",
   "type": "record",
   "name": "Avro",
   "fields": [
      {"name": "Id", "type": "string"},
    ]
  }

将以上内容另存为src/main/resources中的avro.avsc。
在eclipse或任何ide中run>maven生成源代码,这些源代码创建avro.java到包文件夹[namespace]tachyonis.space

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
KafkaConsumer<String, Avro> consumer = new KafkaConsumer<>(props);

消费者/生产者必须在同一台机器上运行。否则,您需要在windows/linux中配置hosts文件,并将所有组件配置属性从localhost更改为Map到实际的ip地址,以便广播给生产者/消费者。否则会出现网络连接问题等错误

Connection to node -3 (/127.0.0.1:9092) could not be established. Broker may not be available
wljmcqd8

wljmcqd82#

对于datumreader/writer,没有嵌入式模式这样的东西。当我第一次看到阿夫罗和Kafka的时候,也是我的误解。但是avro序列化程序的源代码清楚地表明,在使用genericdatumwriter时没有嵌入任何模式。
datafilewriter会在文件的开头编写模式,然后使用genericdatumwriter添加genericrecords。
既然您在开始时说有一个模式,我假设您可以读取它,将其转换为一个模式对象,然后将其传递给genericdataumreader(schema)构造函数。想知道消息是如何序列化的。也许datafilewriter被用来写入byte[]而不是实际的文件,那么您可以使用datafilereader来反序列化数据吗?

相关问题