我开始运行一个自定义的kafka connect插件,将kafka的数据转储到elasticsearch中,然后周期性地我的任务无缘无故地消失(几小时到一天之后)
我在连接状态下看到:
curl -s http://localhost:8083/connectors/my-custom-sink/status | python -m json.tool
{
"connector": {
"state": "RUNNING",
"worker_id": "MASKED_IP1:8083"
},
"name": "my-custom-sink",
"tasks": [
{
"id": 0,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1500(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
"worker_id": "MASKED_IP1:8083"
},
{
"id": 1,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1500(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
"worker_id": "MASKED_IP2:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "MASKED_IP3:8083"
}
]
}
任何洞察都将受到赞赏(即Kafka抛出npe时试图做什么)
更新:这是Kafka0.10.2.0
这是我在工人死前的日志里看到的
INFO 2017-07-30 07:38:17,146 [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.10.2.0
INFO 2017-07-30 07:38:17,146 [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 576d93a8dc0cf421
INFO 2017-07-30 07:38:17,172 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN:9092 (id: 2147483643 rack: null) for group my-group.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,224 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,224 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,226 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Finished reading KafkaBasedLog for topic connect-configs
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Started KafkaBasedLog for topic connect-configs
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Started KafkaConfigBackingStore
INFO 2017-07-30 07:38:17,233 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder started
INFO 2017-07-30 07:38:17,239 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN1:9092 (id: 2147483643 rack: null) for group my-group.
INFO 2017-07-30 07:38:17,241 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: (Re-)joining group my-group
INFO 2017-07-30 07:38:20,868 [CLASSPATH traversal thread.] org.reflections.Reflections: Reflections took 3600 ms to scan 1 urls, producing 7612 keys and 36389 values
INFO 2017-07-30 07:38:22,511 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Successfully joined group my-group with generation 804011
INFO 2017-07-30 07:38:22,516 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Joined group and got assignment: Assignment{error=0, leader='connect-1-29930623-f17a-43ca-bed9-7f8a6fb5317a', leaderUrl='http://MASKED_IP:8083/', offset=2, connectorIds=[], taskIds=[my-group]}
WARN 2017-07-30 07:38:22,521 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Catching up to assignment's config offset.
INFO 2017-07-30 07:38:22,521 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Current config state offset -1 is behind group assignment 2, reading to end of config log
INFO 2017-07-30 07:38:22,846 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished reading to end of log and updated config snapshot, new config log offset: 2
INFO 2017-07-30 07:38:22,846 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting connectors and tasks using config offset 2
INFO 2017-07-30 07:38:22,849 [pool-6-thread-1] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting task my-group
INFO 2017-07-30 07:38:22,849 [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Creating task my-group
INFO 2017-07-30 07:38:22,851 [pool-6-thread-1] org.apache.kafka.connect.runtime.ConnectorConfig: ConnectorConfig values:
connector.class = IndexerSinkConnector
key.converter = null
name = my-group
tasks.max = 3
transforms = null
value.converter = null
ERROR 2017-07-30 07:38:22,856 [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Failed to start task my-group
java.lang.NullPointerException: null
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)
at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1500(DistributedHerder.java:101)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:848)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:844)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
INFO 2017-07-30 07:38:22,890 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished starting connectors and tasks
1条答案
按热度按时间s71maibg1#
npe起源于此https://github.com/apache/kafka/blob/0.10.2.0/clients/src/main/java/org/apache/kafka/common/config/abstractconfig.java#l52 配置主题中有多个分区吗?配置主题只允许有一个分区。这个要求从0.11开始严格执行,所以将来的版本不应该碰到这个问题。