我正在用kafka和spark流媒体构建一个应用程序。输入数据来自第三方流媒体,并发表在Kafka主题上。这段代码展示了流代理模块:它是我从流中获取结果的方式,以及我如何将它们发送给kafkapublisher(它只显示了一个草图):
def on_result_response(self,*args):
self.kafkaPublisher.pushMessage(str(args[0]))
kafkapublisher是通过以下两种方法实现的:
class KafkaPublisher:
def __init__(self,address,port,topic):
self.kafka = KafkaClient(str(address)+":"+str(port))
self.producer = SimpleProducer(self.kafka)
self.topic=topic
def pushMessage(self,message):
self.producer.send_messages(self.topic, message)
self.producer = SimpleProducer(self.kafka, async=True)
该应用程序由以下主体启动:
from StreamProxy import StreamProxy
streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20) #seconds of streaming
在一些批处理(10秒左右)之后,它会启动以下异常:
thread-2354中的异常:traceback(最近一次调用last):file“/usr/lib/python2.7/threading.py”,第801行,在\uu bootstrap\u inner file“/usr/lib/python2.7/threading.py”中,第754行,在运行文件“/usr/local/lib/python2.7/dist packages/kafka/producer/base.py”中,第164行,在发送上游文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第649行,在发送生成请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第253行,在发送代理感知请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第74行,在第236行的“\u get\u conn file”/usr/local/lib/python2.7/dist packages/kafka/conn.py”中,连接错误:[errno 24]打开的文件太多
线程thread-2355中出现异常:traceback(最近一次调用last):file“/usr/lib/python2.7/threading.py”,第801行,在\uu bootstrap\u inner file“/usr/lib/python2.7/threading.py”中,第754行,在运行文件“/usr/local/lib/python2.7/dist packages/kafka/producer/base.py”中,第164行,在发送上游文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第649行,在发送生成请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第253行,在发送代理感知请求文件“/usr/local/lib/python2.7/dist-packages/kafka/client.py”的第74行,在第236行的“\u get\u conn file”/usr/local/lib/python2.7/dist packages/kafka/conn.py”中,连接错误:[errno 24]打开的文件太多
请注意,同一条消息有许多不同的异常,问题肯定出在发布者方面。
1条答案
按热度按时间ffvjumwh1#
尝试删除行: