在使用kafka的应用程序上打开的文件太多错误

hec6srdp  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我正在用kafka和spark流媒体构建一个应用程序。输入数据来自第三方流媒体,并发表在Kafka主题上。这段代码展示了流代理模块:它是我从流中获取结果的方式,以及我如何将它们发送给kafkapublisher(它只显示了一个草图):

def on_result_response(self,*args):
    self.kafkaPublisher.pushMessage(str(args[0]))

kafkapublisher是通过以下两种方法实现的:

class KafkaPublisher:

def __init__(self,address,port,topic):
    self.kafka = KafkaClient(str(address)+":"+str(port))
    self.producer = SimpleProducer(self.kafka)
    self.topic=topic

def pushMessage(self,message):
    self.producer.send_messages(self.topic, message)
    self.producer = SimpleProducer(self.kafka, async=True)

该应用程序由以下主体启动:

from StreamProxy import StreamProxy

streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20)  #seconds of streaming

在一些批处理(10秒左右)之后,它会启动以下异常:
thread-2354中的异常:traceback(最近一次调用last):file“/usr/lib/python2.7/threading.py”,第801行,在\uu bootstrap\u inner file“/usr/lib/python2.7/threading.py”中,第754行,在运行文件“/usr/local/lib/python2.7/dist packages/kafka/producer/base.py”中,第164行,在发送上游文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第649行,在发送生成请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第253行,在发送代理感知请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第74行,在第236行的“\u get\u conn file”/usr/local/lib/python2.7/dist packages/kafka/conn.py”中,连接错误:[errno 24]打开的文件太多
线程thread-2355中出现异常:traceback(最近一次调用last):file“/usr/lib/python2.7/threading.py”,第801行,在\uu bootstrap\u inner file“/usr/lib/python2.7/threading.py”中,第754行,在运行文件“/usr/local/lib/python2.7/dist packages/kafka/producer/base.py”中,第164行,在发送上游文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第649行,在发送生成请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第253行,在发送代理感知请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第74行,在第236行的“\u get\u conn file”/usr/local/lib/python2.7/dist packages/kafka/conn.py”中,连接错误:[errno 24]打开的文件太多
请注意,同一条消息有许多不同的异常,问题肯定出在发布者方面。

ffvjumwh

ffvjumwh1#

尝试删除行:

self.producer = SimpleProducer(self.kafka, async=True)

相关问题