我正在运行一个python脚本来收集来自新闻提供者的数据,并在flume.conf文件中获取这个脚本。
我的flume.conf文件:
newsAgent.sources = r1
newsAgent.sinks = spark
newsAgent.channels = MemChannel
# Describe/configure the source
newsAgent.sources.r1.type = exec
newsAgent.sources.r1.command = python path_to/data_collector.py
# Describe the sink
newsAgent.sinks.spark.type = avro
newsAgent.sinks.spark.channel = memoryChannel
newsAgent.sinks.spark.hostname = localhost
newsAgent.sinks.spark.port = 4040
# Use a channel which buffers events in memory
newsAgent.channels.MemChannel.type = memory
newsAgent.channels.MemChannel.capacity = 10000
newsAgent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
newsAgent.sources.r1.channels = MemChannel
newsAgent.sinks.spark.channel = MemChannel
python脚本在sunalation中运行良好,我可以看到json数据被打印出来。但当我通过flume执行时,数据会下沉,从而触发低于警告的消息。
警告信息
18/08/04 07:36:20 WARN HttpParser: Illegal character 0x0 in state=START
for buffer HeapByteBuffer@5ae61d8b[p=1,l=8192,c=8192,r=8191]= . {\x00<<<\x00\x00\x01\x00\x00\x00\x06\x00\x00\x000\x86\xAa\xDa\xE2\xC4T...ing town", "sum>>>}
18/08/04 07:36:20 WARN HttpParser: bad HTTP parsed: 400 Illegal character 0x0 for HttpChannelOverHttp@46691f53{r=0,c=false,a=IDLE,uri=null}
数据采集器.py
def process():
for k, v in news_source.items():
feeds = feedparser.parse(v)
for e in feeds.entries:
doc = json.dumps(
{"news_provider": k, "title": e.title.strip(), "summary": BeautifulSoup(e.summary, 'lxml').text.strip(),
"id": e.id.strip(), "published": e.published if e.has_key('published') else None})
print("%s"%doc)
流媒体脚本
def func():
sc = SparkContext(master="local[*]", appName="App")
ssc = StreamingContext(sc, 300)
flume_strm = FlumeUtils.createStream(ssc, "localhost", 9999)
lines = flume_strm.map(lambda v: json.loads(v[1]))
lines.pprint()
ssc.start()
ssc.awaitTermination()
使用的命令
bin/flume-ng agent --conf conf --conf-file libexec/conf/test.conf --name Agent -Dflume.root.logger=INFO,console
spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 path_to/streaming_script.py
我无法摆脱这些警告消息,我希望使用pprint()在spark日志中打印相同的json数据,稍后我可以相应地处理这些消息。
在读取流式内容时是否缺少任何特定配置?我需要指定任何特定的编码器吗?
谢谢你的帮助。
1条答案
按热度按时间piwo6bdm1#
我一定和你看过同样的教程。我尝试了很多不同的选择。大多数没有成功。不过,我找到了一个解决方法:在flume.conf中使用exec源代码,并完全按照您所做的方式调用脚本。但是,在python脚本中,将数据写入一个文件。然后在脚本(data\u collector.py)停止执行之前“cat”文件。
我认为这是因为exec源代码需要“流式”数据,而仅仅打印输出将不起作用。
我的设置与你的非常相似:
stream.py(删除逻辑以便于理解):
这里是我的data_collector.py(注意最后一行的“cat”命令):
这是我的flume.conf:
所以基本上在我的data_collector.py中,我只需执行任何需要执行的逻辑,将其写入一个名为exec.txt的文件,然后立即“cat”该文件。它工作。。。祝你好运