将s3连接器与kafka的landoop docker容器一起使用时出错

w46czmvw  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(381)

使用以下配置创建接收器连接器时

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-west-2
topics.dir=topics
flush.size=3
schema.compatibility=NONE
topics=my_topic
tasks.max=1
s3.part.size=5242880
format.class=io.confluent.connect.s3.format.avro.AvroFormat

# added after comment

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=my-bucket

运行它我得到以下错误

org.apache.kafka.connect.errors.DataException: coyote-test-avro
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 91319
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:192)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

但是我的主题有一个模式,我可以看到使用docker容器提供的ui landoop/fast-data-dev . 即使我尝试将原始数据写入s3

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE

以及移除 schema.generator.class ,同样的错误出现了,即使我认为这不应该使用avro模式。
为了能够写入s3,我设置了环境变量 AWS_ACCESS_KEY_ID 以及 AWS_SECRET_ACCESS_KEY 在我的容器,但似乎无论如何,问题来之前,这一点。
我想版本可能有问题,如上所述,我使用的是容器 landoop/fast-data-dev 在docker机器中(它在mac本机docker机器中不工作),生产和消费完美地工作。这是关于部分

我查看了连接日志,但找不出任何有用的信息,但是如果您能告诉我应该查找什么,我将添加相关行(所有日志都太大)

3b6akqbq

3b6akqbq1#

每个主题消息都必须编码为avro,由schema注册表指定。
转换器查看原始Kafka数据(键和值)的字节2-5,将其转换为整数(在您的情况下,是错误中的id),并在注册表中进行查找。
如果不是avro,或者是其他坏数据,那么要么是这里的错误,要么是关于的错误 invalid magic byte .
这个错误不是连接错误。如果添加 print-key 财产。
假设是这样,一种解决方案是将密钥serde更改为使用字节数组反序列化器,以便跳过avro查找
否则,由于您无法删除kafka中的消息,因此这里唯一的解决方案是找出生产者发送错误数据的原因,修复它们,然后将connect consumer组移动到具有有效数据的最新偏移量,等待无效数据在主题上过期,或者完全移动到新主题

相关问题