kafka jdbc sink with delete=true选项我必须使用record\键吗?

hsvhsicv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(365)

我想阅读来自cdc debezium的多个主题,来自源postgres数据库,使用kafka消息中的一个键保存一个主键。然后,连接器在源数据库中执行etl操作。
当我设置 delete.enabledtrue 我不能使用Kafka主键,它说我必须指定 record_key 以及 pk_fields .
我的想法是,将regex设置为读取多个所需的主题,从主题名中获取表名,并使用kafka topic持有的主键(当前正在读取)。

name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
topics=pokladna.public.*use_regex
connection.url=jdbc:postgresql://localhost:5434/postgres
connection.user=postgres
connection.password=postgres
dialect.name=PostgreSqlDatabaseDialect
table.name.format=*get_table_table_name_from_topic_name
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
auto.create=true
auto.evolve=true
insert.mode=upsert
delete.enabled=true
pk.mode=kafka

当我设置 delete.enabled=true ,套 pk.mode=record_value 以及 pk.fields 空的。即使在插入过程中,我也会出现以下错误(设置 pk.mode=kafka ).

ERROR WorkerSinkTask{id=sink-postgres-perform-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled
        at io.confluent.connect.jdbc.sink.JdbcSinkConfig.<init>(JdbcSinkConfig.java:540)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.start(JdbcSinkTask.java:45)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

当我设置 delete.enabled=true ,离开 pk.mode=record_key 以及 pk.fields 空的。即使在插入过程中,我也出现了以下错误

ERROR WorkerSinkTask{id=sink-postgres-perform-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "questions" to add missing field SinkRecordField{schema=Schema{STRING}, name='__dbz__physicalTableIdentifier', isPrimaryKey=true}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:586)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER TABLE "questions" to add missing field SinkRecordField{schema=Schema{STRING}, name='__dbz__physicalTableIdentifier', isPrimaryKey=true}, as the field is not optional and does not have a default value

我做错什么了吗?错误的属性文件配置,还是Kafka接收器中的错误或某些限制?我能够在目标数据库中执行etl pk.mode=record_key 以及 pk.field=id_coolumn_names .
我有40个表,所以我真的要创建40个属性文件,用列名填充,然后运行40timeconnectsink吗?听起来很傻。。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题