kafka jdbc sink connect上出现架构异常

jaxagkaj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(953)

我正在尝试使用kafka jdbc sink connect将行插入到我的oracle表中。我的kafka主题(json)中有如下消息:;

[{"f1":"qws","f2":"zcz","f3":"SDFF","f4":"f33bfed577bcd7c4625479bd3cd13323--1132061303","f5":null,"f6":null,"f7":"ghSDAgh/akdjytfd/jhsgd","f8":"hsfgd/sdfjghsfjd/jsg","f9":null,"f10":"ASD","f11":"sdfg/vbnm","f12":"S","startTime":"2018-01-30T05:24:41.162","_startTime":"DATE","f13":219,"f14":"http://192.168.0.1:1234/asd/fgh/jkl/zxc/vbn/qwe/rty","f15":"fe80:0:0:0:7501:14d9:b44b:2a95%eth5","f16":1234,"f17":"ABCD-1234","f18":"192.168.0.1","f19":"sdfgd","dfgVO":{"fa1":null,"fa2":"formats","fa3":""qwe.rty.uiop.asd.fgh.jkl.zxc.vbn.asdf@61e97f29"","fa4":7,"fa5":79,"fa6":null,"fa7":"{}","fa8":1517289881381},"f20":null,"f21":"http-drte-1234-uik-7","f22":false,"f23":false,"f24":false}]

我有如下连接器配置;

name=jdbc-sink-2
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=my_topic_1
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:user01
connection.user=USER1
connection.password=PASSWD1
auto.create=true
table.name.format=MY_TABLE_2
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
producer.retries=1

当我启动连接器,我得到下面的错误;

[2018-01-30 11:16:55,417] ERROR Task jdbc-sink-2 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-30 11:16:55,422] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

然后我将下面的配置添加到我现有的连接器配置中;

key.converter.schemas.enable=false
value.converter.schemas.enable=false

现在,我得到另一个错误如下;

[2018-01-30 11:36:58,118] ERROR Task jdbc-sink-2 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: MY_TABLE_2
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-30 11:36:58,123] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2018-01-30 11:36:58,124] ERROR Task jdbc-sink-2 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-30 11:36:58,125] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

这说明我需要修改kafka消息,比如键值模式格式。我不能修改我的Kafka消息格式,因为它是由其他人发布的。如何修复此错误?
谢谢您。

h43kikqp

h43kikqp1#

对于每个文档,如果您想使用jdbc接收器,则需要提供一个模式。您可以使用avro+schema registry或将json与嵌入的模式一起使用。您可以在这里看到预期json结构的示例。
你的数据来自哪里?如果是kafka connect源代码,您可以在启用模式的情况下使用avro或json。如果它在别处,您需要修改它以提供包含模式的数据-模式注册表提供的avro序列化器可以为您完成这项工作。

相关问题