合流kafka到s3连接失败,错误为线程[kafkabasedlog工作线程]中出现意外异常-

b5buobof  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(287)

我在ec2上设置了汇合(4.0)连接器,它从kafka读取数据并写入s3。
独立的尝试进行得很顺利:
bin/connect standalone等/standalone/example-connect-worker.properties等/standalone/example-connect-s3-sink.properties
但是,分布式版本不断出现故障

[2018-01-30 21:26:05,860] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1097)
	at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)
	at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)
	at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)

我只想先使用连接器类等于filestreamsinkconnector
sink conf文件如下所示:

name=local-file-sink

# connector.class=FileStreamSink

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=tests3

s3.bucket=tests3
s3.prefix=tests3
s3.endpoint=http://localhost:9090
s3.path_style=true
local.buffer.dir=/tmp/connect-system-test

谢谢!

x3naxklr

x3naxklr1#

启动分布式连接工作进程时 ./bin/connect-distributed 只能通过命令行提供辅助对象的属性。
要通过将连接器的配置发布到worker的rest端点来加载连接器,可以使用 curl 或同等命令。
例如: curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors 哪里 config.json 是包含连接器属性的文件。
更多信息请点击此处:https://docs.confluent.io/current/connect/managing.html#distributed-示例

相关问题