elasticsearch/dataflow-60次并发连接后连接超时

t30tvxxf  于 2021-06-10  发布在  ElasticSearch
关注(0)|答案(3)|浏览(594)

我们将搜索集群托管在弹性云上,并从数据流(gcp)调用它。这项工作在dev中运行得很好,但是当我们部署到prod时,我们会在客户端看到大量的连接超时。

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "main.py", line 159, in process
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers,**kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
    raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out)

我在elasticsearch客户端中将超时设置增加到300秒,如下所示,但似乎没有帮助。

self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)

查看部署https://cloud.elastic.co/deployments//metrics cpu和内存使用率很低(低于10%),搜索响应时间也只有200毫秒。这里的瓶颈是什么?我们如何避免这种超时?
如下面的日志所示,大多数请求都失败,连接超时,而成功的请求很快收到响应:

我尝试ssh到我们遇到连接错误的vm中。 netstat 显示大约有60个连接到ElasticSearchip地址。当我从vm curl 到elasticsearch地址时,我能够重现超时。我可以很好地 curl 到其他网址。另外,我可以从本地 curl 到elasticsearch,所以问题只是vm和ElasticsSerch服务器之间的连接。
数据流(计算引擎)或elasticsearch对并发连接的数量有限制吗?我在网上找不到任何信息。

ahy6op9u

ahy6op9u1#

编辑:这是红鲱鱼。关闭与等待无关。我再次遇到同样的问题,大多数连接现在都处于已建立状态:/
虽然下面的两个答案都很有见地,但我不认为他们回答了这个问题。
经过进一步的研究,我发现elasticsearch py(或urllib3)与dataflow结合后,会以某种方式保留连接 CLOSE_WAIT 状态。一旦连接达到这个状态,这些连接就卡住了(操作系统不会释放这些套接字,因为操作系统认为应用程序代码会关闭它),所以在运行作业之后,连接池中的所有连接都处于关闭等待状态,因此我无法建立任何新的连接。如果我不使用连接池并为每个pardo示例化elasticsaerch客户机,它就会变得有价值,不知何故连接会更快地卡住。
我在这里报告了这个问题https://github.com/elastic/elasticsearch-py/issues/1459 但老实说,这个问题似乎更深层次的堆栈,因为我有类似的问题,当我直接使用 requests 包的连接池(我相信在引擎盖下也使用了urllib3)。

sqxo8psd

sqxo8psd2#

数据流对传出连接的数量没有限制。它在引擎盖下使用一个k8s集群,每个python线程都存在于自己的docker容器中。
对elasticcloud的api调用是速率限制的(看看响应头中的x-rate-limit-{interval,limit,remaining}字段)。
有了dataflow,如果你做了很多并行作业和/或googlecloud扩展了你的作业节点使之更快,那么很容易达到api速率限制。
dataflow/apache beam作业中可能的解决方法:
1-(无需代码)播放(数据流执行参数)[https://cloud.google.com/dataflow/docs/guides/specifying-exec-params]限制并发处理线程的数量。
需要调整的三个参数是: max_num_workers :正在运行的工作示例(计算机)的最大数目。 number_of_worker_harness_threads :默认情况下,示例的每个cpu有1 thead。 machine_type :您将使用的示例类型。
2-对代码执行速率限制。请参阅apachebeam使用apachebeam进行及时(有状态)处理

ghhkc1vu

ghhkc1vu3#

我做了一点关于elasticsearch连接器的研究。有两个原则,您可以尝试确保您的连接器是尽可能有效的。
注意:如另一个答案中所建议的那样,设置最大工作线程数可能不会有太大帮助(目前)-让我们提高您的beam/elastic群集资源的利用率,如果我们开始达到这两个资源的限制,那么我们可以考虑限制工作线程数-但是现在,您可以尝试改进连接器。

对外部服务使用批量请求

您提供的代码为进入dofn的每个元素发出一个单独的搜索请求。正如您所注意到的,这可以正常工作,但会导致管道花费太多时间等待每个元素的外部请求,因此您对往返的等待将是o(n)。
很高兴的是 Elasticsearch 客户有 msearch 方法,该方法应允许您批量执行搜索。你可以这样做:

class PredictionFn(beam.DoFn):
    def __init__(self, ...):
      self.buffer = []
    ...
    def process(self, element):
        self.buffer.append(element)
        if len(self.buffer) > BATCH_SIZE:
          return self.flush()

    def flush(self):
        result = []

        # Perform the search requests for user ids
        user_ids = [uid for cid, did, uid in self.buffer]
        user_ids_request = self._build_uid_reqs(user_ids)

        resp = es.msearch(body=user_ids_request)

        user_id_and_device_id_lists = []
        for r, elm in zip(resp['responses'], self.buffer):
          if len(r["hits"]["hits"]) == 0:
            continue
          # Get new device_id_list
          user_id_and_device_id_lists.append((elm[2],  # User ID
                                              device_id_list))

        device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]
        device_ids_request = self._build_device_id_reqs(device_id_lists)

        resp = es.msearch(body=device_ids_request)

        resp = self.elasticsearch.search(index="sessions", body={"query": {"match": {"userId": user_id }}})
        # Handle the result, output anything necessary

    def _build_uid_reqs(self, uids):
      # Relying on this answer: https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352
      res = []
      for uid in uids:
        res.append(json.dumps({'index': 'sessions'}))  # Request HEAD
        res.append(json.dumps({"query": {"match": {"userId": uid }}}))  # Request BODY

      return '\n'.join(res)

重用线程安全的客户机

这个 Elasticsearch 客户端也是线程安全的!
因此,与其每次都创建一个新的,不如这样做:

class PredictionFn(beam.DoFn):
    CLIENT = None

    def init_elasticsearch(self):
        if PredictionFn.CLIENT is not None:
          return PredictionFn.CLIENT
        es_host = fetch_host()
        http_auth = fetch_auth()
        PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth, 
            timeout=300, sniff_on_connection_fail=True,
            retry_on_timeout=True, max_retries=2,
            maxsize=5) # 5 connections per client
        return PredictionFn.CLIENT

这应该确保为每个worker保留一个客户端,并且不会创建太多到elasticsearch的连接,因此不会收到拒绝消息。
让我知道如果这两个帮助,或者如果我们需要尝试进一步的改进!

相关问题