ConfluentSchemaRegistry:jdbc接收器连接器无法在更新的架构中标识新列

kcugc4gi  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(197)

从confluent platform(docker)开始,我有以下设置:

MySQL DB ----------> Debezium MySQL Source Connector --> Kafka (topic: X)
Kafka (topic: X) --> Confluent JDBC Sink Connector ----> Postgres DB

(我在docker compose文件中将键和值转换器更改为avro。)
我在合流模式注册中心的帮助下测试模式演化。我将遵循以下步骤:
首先更新接收器数据库:添加新列 至postgres db表x: `alter table X add column int;` 测试管道。好 啊。 在control center上更新x:的架构http://localhost:9021/,添加可选字段int 使用默认值键入 null . 现在,topicx有两个模式版本。我保留了默认的向后兼容性。
再次测试管道。好 啊。
更新源数据库:添加新列 `` 到mysql db表x: alter table X add column int; 通过更新源表的记录来测试管道。我在kafka connect服务日志中看到了堆栈跟踪。不好。

[2020-08-11 05:58:45,014] INFO Unable to find fields 
[SinkRecordField{schema=Schema{INT32}, name='abc', isPrimaryKey=false}] 
among column names [... list of columns ...] (io.confluent.connect.jdbc.sink.DbStructure)

[2020-08-11 05:58:45,017] ERROR WorkerSinkTask{id=test-sink-0} 
Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted. 

Error: Table "X" is missing fields 
([SinkRecordField{schema=Schema{INT32}, name='abc', isPrimaryKey=false}]) 
and auto-evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask)

我再次检查了架构 X 在汇合控制中心,现在它有3个版本。版本2和版本3是相同的。更新源表后的新消息是否创建了新的相同架构版本?
关于自动进化的错误消息,我的理解是我们应该保持禁用,因为我们将依赖模式注册表来控制进化。
您能帮助我理解为什么confluent jdbc connector即使在更新模式之后也找不到新字段吗?我错过了什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题