kafka jdbc接收器连接器后数据不匹配

i34xakig  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(310)

尝试使用 kafka-connect-jdbc ,但数据库中的某些值与kafka消息值不同。
接收器配置:

{
    "name": "test-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",

        "connection.url": "jdbc:postgresql://localhost:5432/test_db?user=test&password=test",
        "dialect.name": "PostgreSqlDatabaseDialect",

        "topics.regex": "test.public.(.*)",

        "transforms": "dropPrefix, unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",

        "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex": "test.public.(.*)",
        "transforms.dropPrefix.replacement": "public.test_$1",

        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "delete.enabled": "true",
        "batch.size": "1"
    }
}

Kafka内部消息主题:
{“schema”:{“type”:“struct”,“fields”:[{“type”:“struct”,“fields”:[{“type”:“int64”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:true,“field”:“test\u id”},{“type”:“string”,“optional”:true,“field”:“test\u b\u c”},{“type”:“string”,“optional”:true,“field”:“test\u a\u t”},{“type”:“array”,“items”:{“type”:“string”,“optional”:true},“optional”:true,“field”:“test\u p”},{“type”:“int64”,“optional”:true,“field”:“test\u d\u id”}],“optional”:true,“name”:“test.public.test.value”,“field”:“before”},{“type”:“struct”,“fields”:[{“type”:“int64”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:true,“field”:“test\u c\u id”},{“type”:“string”,“optional”:true,“field”:“test\u b\u c”},{“type”:“string”,“optional”:true,“field”:“test\u a\u t”},{“type”:“array”,“items”:{“type”:“string”,“optional”:true},“optional”:true,“field”:“test\u p”},{“type”:“int64”,“optional”:true,“field”:“test\u id”},“optional”:true,“name”:“test.public.test.value”,“field”:“after”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“version”},{“type”:“string”,“optional”:false,“field”:“connector”},{“type”:“string”,“optional”:false,“field”:“name”},{“type”:“int64”,“optional”:false,“field”:“ts\u ms”},{“type”:“string”,“可选”:true,“name”:“io.debezium.data.enum”,“version”:1,“parameters”:{“allowed”:“true,last,false”},“default”:“false”,“field”:“snapshot”},{“type”:“string”,“optional”:false,“field”:“db”},{“type”:“string”,“optional”:false,“field”:“schema”},{“type”:“string”,“optional”:false,“field”:“table”},{“type”:“int64”,“optional”:true,“field”:“txid”},{“type”:“int64”,“optional”:true,“field”:“lsn”},{“type”:“int64”,“optional”:true,“field”:“xmin”}],“optional”:false,“name”:“io.debezium.connector.postgresql.source”,“field”:“source”},{“type”:“string”,“optional”:false,“field”:“op”},{“type”:“int64”,“optional”:true,“field”:“ts\u ms”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“id”},{“type”:“int64”,“optional”:false,“field”:“total\u order”},{“type”:“int64”,“optional”:false,“field”:“data\u collection\u order”}],“optional”:true,“field”:“transaction”}],“optional”:false,“name”:“test.public.test.envelope”},“payload”:{“before”:null,“after”:{“id”:4441,“test \u c \u id”:3606,“test \u c”:“qwerty”,“test \u a \u t”:null,“test \u p”:[“qwe”,“asd”,“zxc”],“test \u id”:22827},“source”:{“version”:“1.2.2.final”,“connector”:“postgresql”,“name”:“test”,“ts \u ms”:1599543319277,“snapshot”:“false”,“db”:“test”,“schema”:“public”,“table”:“test”,“txid”:3914206,“lsn”:108940649328,“xmin”:null},“op”:“u”,“ts\u ms”:1599543319509,“transaction”:null}}
前任:

Kafka                  Sink DB
---------------------------------------
test_c_id: 3606        test_c_id: 22632
test_b_c: QWERTY       test_b_c: null

大约有5-10%的数据不匹配,而且所有的数据都只出现在20列中的2列中。
有人知道是什么导致这样的问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题