kafka connect hdfs接收器,用于使用jsonconverter的json格式

dpiehjr4  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(335)

在json中生成/使用kafka。使用以下属性保存到json格式的hdfs:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

制作人:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
      --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"

消费者:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

问题1:

key.converter.schemas.enable=true

value.converter.schemas.enable=true

获取异常:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)

问题2:
启用上述两个属性不会引发任何问题,但不会在hdfs上写入任何数据。
如有任何建议,我们将不胜感激。
谢谢

alen0pnh

alen0pnh1#

转换器指的是如何将数据从Kafka主题转换为连接器解释并写入hdfs。hdfs连接器仅支持在avro中写入hdfs或在开箱即用的情况下写入hdfs。您可以在这里找到有关如何将格式扩展为json的信息。如果您进行这样的扩展,我鼓励您将其贡献给连接器的开源项目。

slsn1g29

slsn1g292#

对于要写入hdfs的输入json格式消息,请设置以下属性

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

相关问题