我们将搜索集群托管在弹性云上,并从数据流(gcp)调用它。这项工作在dev中运行得很好,但是当我们部署到prod时,我们会在客户端看到大量的连接超时。
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "main.py", line 159, in process
File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
return func(*args, params=params, headers=headers,**kwargs)
File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
body=body,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
raise e
File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out)
我在elasticsearch客户端中将超时设置增加到300秒,如下所示,但似乎没有帮助。
self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)
查看部署https://cloud.elastic.co/deployments//metrics cpu和内存使用率很低(低于10%),搜索响应时间也只有200毫秒。这里的瓶颈是什么?我们如何避免这种超时?
如下面的日志所示,大多数请求都失败,连接超时,而成功的请求很快收到响应:
我尝试ssh到我们遇到连接错误的vm中。 netstat
显示大约有60个连接到ElasticSearchip地址。当我从vm curl 到elasticsearch地址时,我能够重现超时。我可以很好地 curl 到其他网址。另外,我可以从本地 curl 到elasticsearch,所以问题只是vm和ElasticsSerch服务器之间的连接。
数据流(计算引擎)或elasticsearch对并发连接的数量有限制吗?我在网上找不到任何信息。
3条答案
按热度按时间ahy6op9u1#
编辑:这是红鲱鱼。关闭与等待无关。我再次遇到同样的问题,大多数连接现在都处于已建立状态:/
虽然下面的两个答案都很有见地,但我不认为他们回答了这个问题。
经过进一步的研究,我发现elasticsearch py(或urllib3)与dataflow结合后,会以某种方式保留连接
CLOSE_WAIT
状态。一旦连接达到这个状态,这些连接就卡住了(操作系统不会释放这些套接字,因为操作系统认为应用程序代码会关闭它),所以在运行作业之后,连接池中的所有连接都处于关闭等待状态,因此我无法建立任何新的连接。如果我不使用连接池并为每个pardo示例化elasticsaerch客户机,它就会变得有价值,不知何故连接会更快地卡住。我在这里报告了这个问题https://github.com/elastic/elasticsearch-py/issues/1459 但老实说,这个问题似乎更深层次的堆栈,因为我有类似的问题,当我直接使用
requests
包的连接池(我相信在引擎盖下也使用了urllib3)。sqxo8psd2#
数据流对传出连接的数量没有限制。它在引擎盖下使用一个k8s集群,每个python线程都存在于自己的docker容器中。
对elasticcloud的api调用是速率限制的(看看响应头中的x-rate-limit-{interval,limit,remaining}字段)。
有了dataflow,如果你做了很多并行作业和/或googlecloud扩展了你的作业节点使之更快,那么很容易达到api速率限制。
dataflow/apache beam作业中可能的解决方法:
1-(无需代码)播放(数据流执行参数)[https://cloud.google.com/dataflow/docs/guides/specifying-exec-params]限制并发处理线程的数量。
需要调整的三个参数是:
max_num_workers
:正在运行的工作示例(计算机)的最大数目。number_of_worker_harness_threads
:默认情况下,示例的每个cpu有1 thead。machine_type
:您将使用的示例类型。2-对代码执行速率限制。请参阅apachebeam使用apachebeam进行及时(有状态)处理
ghhkc1vu3#
我做了一点关于elasticsearch连接器的研究。有两个原则,您可以尝试确保您的连接器是尽可能有效的。
注意:如另一个答案中所建议的那样,设置最大工作线程数可能不会有太大帮助(目前)-让我们提高您的beam/elastic群集资源的利用率,如果我们开始达到这两个资源的限制,那么我们可以考虑限制工作线程数-但是现在,您可以尝试改进连接器。
对外部服务使用批量请求
您提供的代码为进入dofn的每个元素发出一个单独的搜索请求。正如您所注意到的,这可以正常工作,但会导致管道花费太多时间等待每个元素的外部请求,因此您对往返的等待将是o(n)。
很高兴的是
Elasticsearch
客户有msearch
方法,该方法应允许您批量执行搜索。你可以这样做:重用线程安全的客户机
这个
Elasticsearch
客户端也是线程安全的!因此,与其每次都创建一个新的,不如这样做:
这应该确保为每个worker保留一个客户端,并且不会创建太多到elasticsearch的连接,因此不会收到拒绝消息。
让我知道如果这两个帮助,或者如果我们需要尝试进一步的改进!