kafka连接任务随npe随机死亡

qncylg1j  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(342)

我开始运行一个自定义的kafka connect插件,将kafka的数据转储到elasticsearch中,然后周期性地我的任务无缘无故地消失(几小时到一天之后)
我在连接状态下看到:

curl -s http://localhost:8083/connectors/my-custom-sink/status | python -m json.tool
{
    "connector": {
        "state": "RUNNING",
        "worker_id": "MASKED_IP1:8083"
    },
    "name": "my-custom-sink",
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1500(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
            "worker_id": "MASKED_IP1:8083"
        },
        {
            "id": 1,
            "state": "FAILED",
            "trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1500(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
            "worker_id": "MASKED_IP2:8083"
        },
        {
            "id": 2,
            "state": "RUNNING",
            "worker_id": "MASKED_IP3:8083"
        }
    ]
}

任何洞察都将受到赞赏(即Kafka抛出npe时试图做什么)
更新:这是Kafka0.10.2.0
这是我在工人死前的日志里看到的

INFO  2017-07-30 07:38:17,146  [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.10.2.0
INFO  2017-07-30 07:38:17,146  [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 576d93a8dc0cf421
INFO  2017-07-30 07:38:17,172  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN:9092 (id: 2147483643 rack: null) for group my-group.
INFO  2017-07-30 07:38:17,223  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,223  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,223  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,224  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,224  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,226  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,228  [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Finished reading KafkaBasedLog for topic connect-configs
INFO  2017-07-30 07:38:17,228  [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Started KafkaBasedLog for topic connect-configs
INFO  2017-07-30 07:38:17,228  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Started KafkaConfigBackingStore
INFO  2017-07-30 07:38:17,233  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder started
INFO  2017-07-30 07:38:17,239  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN1:9092 (id: 2147483643 rack: null) for group my-group.
INFO  2017-07-30 07:38:17,241  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: (Re-)joining group my-group
INFO  2017-07-30 07:38:20,868  [CLASSPATH traversal thread.] org.reflections.Reflections: Reflections took 3600 ms to scan 1 urls, producing 7612 keys and 36389 values 
INFO  2017-07-30 07:38:22,511  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Successfully joined group my-group with generation 804011
INFO  2017-07-30 07:38:22,516  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Joined group and got assignment: Assignment{error=0, leader='connect-1-29930623-f17a-43ca-bed9-7f8a6fb5317a', leaderUrl='http://MASKED_IP:8083/', offset=2, connectorIds=[], taskIds=[my-group]}
WARN  2017-07-30 07:38:22,521  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Catching up to assignment's config offset.
INFO  2017-07-30 07:38:22,521  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Current config state offset -1 is behind group assignment 2, reading to end of config log
INFO  2017-07-30 07:38:22,846  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished reading to end of log and updated config snapshot, new config log offset: 2
INFO  2017-07-30 07:38:22,846  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting connectors and tasks using config offset 2
INFO  2017-07-30 07:38:22,849  [pool-6-thread-1] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting task my-group
INFO  2017-07-30 07:38:22,849  [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Creating task my-group
INFO  2017-07-30 07:38:22,851  [pool-6-thread-1] org.apache.kafka.connect.runtime.ConnectorConfig: ConnectorConfig values: 
        connector.class = IndexerSinkConnector
        key.converter = null
        name = my-group
        tasks.max = 3
        transforms = null
        value.converter = null

ERROR 2017-07-30 07:38:22,856  [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Failed to start task my-group
java.lang.NullPointerException: null
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)
        at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1500(DistributedHerder.java:101)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:848)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:844)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
INFO  2017-07-30 07:38:22,890  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished starting connectors and tasks
s71maibg

s71maibg1#

npe起源于此https://github.com/apache/kafka/blob/0.10.2.0/clients/src/main/java/org/apache/kafka/common/config/abstractconfig.java#l52 配置主题中有多个分区吗?配置主题只允许有一个分区。这个要求从0.11开始严格执行,所以将来的版本不应该碰到这个问题。

相关问题