使用elasticsearch接收器连接器将数据输入elasticsearch,一直超时,最终需要手动重新启动

omtl5h9j  于 2021-06-13  发布在  ElasticSearch
关注(0)|答案(0)|浏览(247)

我们正忙于一个poc,在那里我们生成一个kafka主题的消息(现在大约200万,最终应该是1.3亿),我们喜欢通过elasticsearch对这个主题进行查询。因此,一个小型的poc已经制作完成,它通过合流的elasticsearch sink连接器(最新版本)和连接器6.0.0将数据输入es。但是,我们遇到了很多超时问题,最终任务失败,并显示需要重新启动任务的消息:

ERROR WorkerSinkTask{id=transactions-elasticsearch-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.net.SocketTimeoutException: Read timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)

我的接收器连接器配置如下:

{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "connection.url": "http://elasticsearch:9200",
  "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
  "value.converter" : "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url" : "http://schema-registry:8081",
  "topics": "transactions,trades",
  "type.name": "transactions",
  "tasks.max" : "4",
  "batch.size" : "50",
  "max.buffered.events" : "500",
  "max.buffered.records" : "500",
  "flush.timeout.ms" : "100000",
  "linger.ms" : "50",
  "max.retries" : "10",
  "connection.timeout.ms" : "2000",
  "name": "transactions-elasticsearch-connector",
  "key.ignore": "true",
  "schema.ignore": "false",
  "transforms" : "ExtractTimestamp",
  "transforms.ExtractTimestamp.type" : "org.apache.kafka.connect.transforms.InsertField\$Value",
  "transforms.ExtractTimestamp.timestamp.field" : "MSG_TS"
}

不幸的是,即使不生成消息并手动启动elasticsearch接收器连接器,任务也会关闭,需要重新启动。我已经摆弄了各种批量大小的窗口,重试等,但没有结果。注意,我们只有一个kafka代理、一个elasticsearch连接器和一个elasticsearch示例在docker容器中运行。
我们还可以看到很多这样的超时消息:

[2020-12-08 13:23:34,107] WARN Failed to execute batch 100534 of 50 records with attempt 1/11, will attempt retry after 43 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:34,116] WARN Failed to execute batch 100536 of 50 records with attempt 1/11, will attempt retry after 18 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:34,132] WARN Failed to execute batch 100537 of 50 records with attempt 1/11, will attempt retry after 24 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:36,746] WARN Failed to execute batch 100539 of 50 records with attempt 1/11, will attempt retry after 0 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:37,139] WARN Failed to execute batch 100536 of 50 records with attempt 2/11, will attempt retry after 184 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:37,155] WARN Failed to execute batch 100534 of 50 records with attempt 2/11, will attempt retry after 70 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:37,160] WARN Failed to execute batch 100537 of 50 records with attempt 2/11, will attempt retry after 157 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:39,681] WARN Failed to execute batch 100540 of 50 records with attempt 1/11, will attempt retry after 12 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:39,750] WARN Failed to execute batch 100539 of 50 records with attempt 2/11, will attempt retry after 90 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:40,231] WARN Failed to execute batch 100534 of 50 records with attempt 3/11, will attempt retry after 204 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect            |^[[0m [2020-12-08 13:23:40,322] WARN Failed to execute batch 100537 of 50 records with attempt 3/11, will attempt retry after 58 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)

你知道我们可以改进什么使整个链条可靠吗?出于我们的目的,只要所有消息都能可靠地进入elasticsearch,而无需每次连接器的任务都重新启动,它就不需要非常快。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题