kafka连接debezium和mssqlserver密钥提取配置问题

tyky79it  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(343)

我正在与德贝齐姆和Kafka连接。我需要从下面的负载中提取“listid”,并能够将其分配给kafka消息键。使用连接器文件中的配置,我无法提取值。感谢您的帮助。

{
    "before": null,
    "after": {
        "listid": 19,
        "billingid": "0",
        "userid": "test",

连接器属性

key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid

key.converter.schemas.enable=false
value.converter.schemas.enable=true

所以我在继续实验,想和大家分享一个更好的问题解释和我的实验。
跟随罗宾的博客https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
1-从终端运行kafka avro consumer
钥匙

{"LIST_ID":10058}

价值

{"before":null,"after":{"test.dbo.test.Value":{"LIST_ID":10058,"billingid 
etc...

2-我正在尝试使键的值为10058
3-这是我的转换配置

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID

我在针对mssql服务器的管道中使用debezium连接器。

j1dl9f46

j1dl9f461#

看起来像是打字错误( extractextractkey ):

transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid

相关问题