推自己的id合流Kafka连接ElasticSearchdocker

wkftcu5l  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(277)

我正在使用confluentinc/cp kafka connect docker映像。我正试图用elasticsearch id将json文件发送给kafka。

{"_id":10000725, "_source": {"createdByIdentity":"tu_adminn","createdBy":"Admin Testuser"}}

这是我的连接器

{
  "name": "test-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "andrii",
    "key.ignore": "false",
    "schema.ignore": "true",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "test-type",
    "name": "elasticsearch-sink"
  }
}

当我使用key.ignore=true时,它会生成一些奇怪的id。如何准确地传递我的id和来源?

eblbsuwk

eblbsuwk1#

根据文件:
如果您指定 key.ignore=true 然后kafka connect将使用消息的kafka主题、分区和偏移量的复合键——这就是您看到的“奇怪的id”。
如果您想为创建的elasticsearch文档使用自己的id,可以设置 key.ignore=false Kafka连接将使用Kafka消息的密钥作为标识。
如果您的Kafka消息没有合适的密钥,您需要设置它。一种选择是使用类似ksql的东西:

CREATE STREAM target AS SELECT * FROM source PARTITION BY _id

免责声明:我为confluent工作,这是一家支持开源ksql项目的公司

相关问题