从confluent platform(docker)开始,我有以下设置:
MySQL DB ----------> Debezium MySQL Source Connector --> Kafka (topic: X)
Kafka (topic: X) --> Confluent JDBC Sink Connector ----> Postgres DB
(我在docker compose文件中将键和值转换器更改为avro。)
我在合流模式注册中心的帮助下测试模式演化。我将遵循以下步骤:
首先更新接收器数据库:添加新列 至postgres db表x: `alter table X add column int;` 测试管道。好 啊。 在control center上更新x:的架构http://localhost:9021/,添加可选字段
的 int
使用默认值键入 null
. 现在,topicx有两个模式版本。我保留了默认的向后兼容性。
再次测试管道。好 啊。
更新源数据库:添加新列 `` 到mysql db表x: alter table X add column int;
通过更新源表的记录来测试管道。我在kafka connect服务日志中看到了堆栈跟踪。不好。
[2020-08-11 05:58:45,014] INFO Unable to find fields
[SinkRecordField{schema=Schema{INT32}, name='abc', isPrimaryKey=false}]
among column names [... list of columns ...] (io.confluent.connect.jdbc.sink.DbStructure)
[2020-08-11 05:58:45,017] ERROR WorkerSinkTask{id=test-sink-0}
Task threw an uncaught and unrecoverable exception.
Task is being killed and will not recover until manually restarted.
Error: Table "X" is missing fields
([SinkRecordField{schema=Schema{INT32}, name='abc', isPrimaryKey=false}])
and auto-evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask)
我再次检查了架构 X
在汇合控制中心,现在它有3个版本。版本2和版本3是相同的。更新源表后的新消息是否创建了新的相同架构版本?
关于自动进化的错误消息,我的理解是我们应该保持禁用,因为我们将依赖模式注册表来控制进化。
您能帮助我理解为什么confluent jdbc connector即使在更新模式之后也找不到新字段吗?我错过了什么?
暂无答案!
目前还没有任何答案,快来回答吧!