如何模拟Kafka Producer和Producer.Send方法进行Python单元测试

njthzxwz  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(246)

我是一个新手,正在尝试编写一个涉及到Kafka的单元测试。我有一个带有函数的类,该函数调用另一个函数来初始化Kafka Producer,然后调用Producer.end()。我想在我的单元测试中嘲笑Kafka制作人。

下面是我的代码,我想为Product_Kafka_Message方法编写单元测试。

class KafkaProducerIntface

    def __init__(self, topic, .....)
        self.bootstrap_server = bootstrap_server
        self.producer = None
        self.post_topic = topic
    ....

    def produce_kafka_message(self, key, value, headers)

        self.__initialize_producer__(retries=3)
        future = self.producer.send(self.topic, key=key, value=value, headers=headers)
        self.producer.flush()

    def __initialize_producer__(self, retries=3)
        self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_server, acks='all', retries=retries)
f87krz0w

f87krz0w1#

您可以对其使用mock lib并模仿produce_kafka_message方法:

with mock.patch.object(KafkaProducerIntface, "produce_kafka_message") as mock_produce_kafka_message:
    ... # do some code here
    mock_produce_kafka_message.assert_called_once_with(key, value, headerss)

您也可以对KafkaProducer类本身执行相同的操作:

with mock.patch("kafka.path.KafkaProducer") as mock_kafka_producer:
     interface = KafkaProducerIntface(...)
     interface.produce_kafka_message(key, value, headers)

     mock_kafka_producer.send.assert_called_once()
     mock_kafka_producer.flush()

顺便说一句。另外,请检查您是要在发送后运行刷新,还是只运行.poll(0):)

相关问题