postgresql jdbc sink raise error null(array)类型没有到sql数据库列类型的Map

n7taea2i  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(342)

我在尝试使用kafka jdbc sink复制数据库时遇到问题。当我将服务器运行到一个表上有数组数据类型时,它会给出以下错误

...
Caused by: org.apache.kafka.connect.errors.ConnectException: null (ARRAY) type doesn't have a mapping to the SQL database column type
...

我想保留相同的数组条件,不想像对SQLServer那样将其转换为字符串(因为SQLServer不允许数组数据类型)。
这是我的连接配置:

{"name" :"pgsink_'$topic_name'",
    "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max":"1",
            "topics":"'$table'",
            "connection.url":"jdbc:postgresql://",
            "connection.user":"",
            "connection.password":"",
            "transforms":"unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": "false",
            "delete.handling.mode":"drop",
            "auto.create":"true",
            "auto.evolve":"true",
            "insert.mode":"upsert",
            "pk.fields":" '$pk'",
            "pk.mode":"record_key",
            "delete.enabled":"true",
            "destination.table.format":"public.'$table'",
            "connection.attempts":"60",
            "connection.backoff.ms":"100000"

}}

我的kafka源代码来自debezium,因为我想保留相同的数据类型,所以我不将smt放入源代码中。这是源配置:

{
"name":"pg_prod",
    "config":{
        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name":"wal2json_streaming",
        "database.hostname":"",
        "database.port":"",
        "database.user":"",
        "database.password":"",
        "database.dbname":"",
        "database.server.name":"",
    "database.history.kafka.bootstrap.servers": "",
    "database.history.kafka.topic": "",
        "transforms":"unwrap,reroute",
    "table.whitelist":"public.table",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "drop",
    "transforms.unwrap.drop.tombstones": "false",
        "decimal.handling.mode":"double",
        "time.precision.mode":"connect",
    "transforms.reroute.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.reroute.regex":"postgres.public.(.*)",
    "transforms.reroute.replacement":"$1",
    "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,
    "kafkaPartition": "0",
    "snapshot.delay.ms":"1000",
    "schema.refresh.mode":"columns_diff_exclude_unchanged_toast"
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题