我用python编写了一个consumer,如下所示:
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('test',
group_id='',
bootstrap_servers=['kafka:9092'])
schema = """
{
"namespace":"com.martinkl.bottledwater.dbschema.public",
"type":"record",
"name":"test",
"fields":[
{"name":"id","type":["int", "null"]},
{"name":"value","type":["string", "null"]}
]
}
"""
schema = avro.schema.parse(schema)
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello
似乎一切正常,但当我运行insert data to postgres时:
postgres=# insert into test (value) values('hello world!');
使用者的输出为空:
$ python consumer_bottledwater-pg.py
{u'id': 0, u'value': u''}
请帮我把它修好。先谢谢你。
2条答案
按热度按时间hujrc8aj1#
非常感谢@martin kleppmann。我照你说的做了。它工作正常。
在python kafka avro上查看详细信息
dgjrabp22#
瓶装水向Kafka发布的avro编码的消息以5字节的头作为前缀。第一个字节始终为零(保留供将来使用),接下来的4个字节是一个大端32位数字,指示模式的id。
在您的示例中,您已经在python应用程序中硬编码了模式,但是一旦上游数据库模式发生变化,这种方法就会崩溃。这就是为什么瓶装水最好与模式注册一起使用。当您阅读来自kafka的消息时,您首先对头进行解码以查找模式id,如果您以前没有看到该模式id,则查询注册表以查找该模式。然后可以使用该模式解码消息的其余部分。模式可以缓存在使用者中,因为注册表保证特定id的模式是不可变的。
您还可以查看模式注册中心附带的kafkaavrodeserializer的代码,以了解如何在java中进行这种解码。在python中也可以这样做。