如何模拟elasticsearch.helpers.bulk

qzwqbdag  于 2021-06-15  发布在  ElasticSearch
关注(0)|答案(1)|浏览(355)

我正在测试一个使用 bulk 索引一些文档。
这是我的密码:

import mock
import unittest
import json

from elasticsearch.helpers import bulk

from ingestion.ingestor import Ingestor

class TestIngestor(unittest.TestCase):

    def setUp(self):
        self.ingestor = Ingestor()

    @mock.patch("elasticsearch.helpers.bulk", mock.MagicMock(return_value=True))
    def test_ingestor(self):
        with open("tests/data/sample_payload.json", "r") as reader:
            sample_messages = json.loads(reader.read())["Messages"]

        actions = self.ingestor.ingest(sample_messages)

        self.assertEqual(len(actions), 10)

然而,嘲笑似乎不起作用。。。当我运行它时,我会得到一个很长的连接拒绝错误列表。
我该怎么修?

6za6bjd0

6za6bjd01#

结果发现我的补丁错了。。。下面是我如何修复它的:

@mock.patch("elasticsearch.Elasticsearch.bulk", 
             mock.MagicMock(return_value={"items":[]}))

相关问题