schema id在消息中的哪个位置?

gcxthw6b  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(481)

我想使用avro序列化kafka消息的数据,并希望将其与avro模式存储库一起使用,这样就不必在每条消息中都包含该模式。
将avro与kafka结合使用似乎是一件很流行的事情,很多blog/stack overflow问题/usergroups等都提到了发送带有消息的schema id,但是我找不到一个实际的例子来说明它应该去哪里。
我认为它应该放在Kafka的消息头的某个地方,但我找不到一个明显的地方。如果它在avro消息中,那么您必须根据一个模式对其进行解码,以获得消息内容并显示您需要根据其进行解码的模式,这有明显的问题。
我使用的是c#客户机,但任何语言的例子都很好。消息类包含以下字段:

public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }

但这些似乎都不正确。messagemetadata只有offset和partitionid。
那么,avro模式id应该放在哪里呢?

mum43rcc

mum43rcc1#

模式id实际上是在avro消息本身中编码的。看看这个,看看编码器/解码器是如何实现的。
一般来说,当你向Kafka发送avro信息时会发生什么:
编码器从要编码的对象获取模式。
编码器向架构注册表请求此架构的id。如果该模式已经注册,您将获得一个现有的id,如果没有-注册表将注册该模式并返回新的id。
对象的编码如下:[magic byte][schema id][actual message],其中magic byte只是一个 0x0 字节,用于区分这类消息,schema id是一个4字节的整数值,其余的是实际编码的消息。
当你把信息解码回来的时候会发生这样的事情:
解码器读取第一个字节并确保它是正确的 0x0 .
解码器读取接下来的4个字节并将其转换为整数值。这就是模式id的解码方式。
现在,当解码器有一个模式id时,它可能会向模式注册表询问这个id的实际模式!
如果您的密钥是avro编码的,那么您的密钥将采用上述格式。这同样适用于价值。这样,键和值可能都是avro值,并使用不同的模式。
编辑以回答评论中的问题:
实际的模式存储在模式存储库中(这实际上是模式存储库的要点—存储模式:)。avro对象容器文件格式与上述格式无关。kafkaavroencoder/decoder使用稍微不同的消息格式(但实际消息的编码方式完全相同)。
这些格式之间的主要区别在于,对象容器文件携带实际的模式,并且可能包含与该模式对应的多个消息,而上面描述的格式仅携带模式id和与该模式对应的正好一个消息。
传递对象容器文件编码的消息可能并不明显,因为一个kafka消息将包含多个avro消息。或者您可以确保一个kafka消息只包含一个avro消息,但这会导致每个消息都带有模式。
avro模式可能相当大(我见过类似600KB或更大的模式),并且在每条消息中携带模式将非常昂贵和浪费,因此这就是模式存储库的作用所在—模式只提取一次并在本地缓存,所有其他查找都只是快速的Map查找。

相关问题