我正在使用confluentinc/cp kafka connect docker映像。我正试图用elasticsearch id将json文件发送给kafka。
{"_id":10000725, "_source": {"createdByIdentity":"tu_adminn","createdBy":"Admin Testuser"}}
这是我的连接器
{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "andrii",
"key.ignore": "false",
"schema.ignore": "true",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"name": "elasticsearch-sink"
}
}
当我使用key.ignore=true时,它会生成一些奇怪的id。如何准确地传递我的id和来源?
1条答案
按热度按时间eblbsuwk1#
根据文件:
如果您指定
key.ignore=true
然后kafka connect将使用消息的kafka主题、分区和偏移量的复合键——这就是您看到的“奇怪的id”。如果您想为创建的elasticsearch文档使用自己的id,可以设置
key.ignore=false
Kafka连接将使用Kafka消息的密钥作为标识。如果您的Kafka消息没有合适的密钥,您需要设置它。一种选择是使用类似ksql的东西:
免责声明:我为confluent工作,这是一家支持开源ksql项目的公司