使用python和subprocess,pipe,popen从hdfs读取/写入文件时出错

wnrlj8wa  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(542)

我正在尝试在python脚本中读取(打开)和写入hdfs文件。但是有错误。有人能告诉我这里怎么了吗。
代码(完整):sample.py


# !/usr/bin/python

from subprocess import Popen, PIPE

print "Before Loop"

cat = Popen(["hadoop", "fs", "-cat", "./sample.txt"],
            stdout=PIPE)

print "After Loop 1"
put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
            stdin=PIPE)

print "After Loop 2"
for line in cat.stdout:
    line += "Blah"
    print line
    print "Inside Loop"
    put.stdin.write(line)

cat.stdout.close()
cat.wait()
put.stdin.close()
put.wait()

当我执行时:

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.5.1.jar -file ./sample.py -mapper './sample.py' -input sample.txt -output fileRead

它执行正常我找不到应该在hdfs modifiedfile中创建的文件
当我执行时:

hadoop fs -getmerge ./fileRead/ file.txt

在file.txt中,我得到:

Before Loop 
Before Loop 
After Loop 1    
After Loop 1    
After Loop 2    
After Loop 2

有人能告诉我我做错了什么吗??我不认为它是从sample.txt读取的

jv4diomz

jv4diomz1#

有人能告诉我我做错了什么吗??
你的 sample.py 可能不是合适的Map器。Map程序可能接受其在stdin上的输入并将结果写入其stdout,例如。, blah.py :


# !/usr/bin/env python

import sys

for line in sys.stdin: # print("Blah\n".join(sys.stdin) + "Blah\n")
    line += "Blah"
    print(line)

用法:

$ hadoop ... -file ./blah.py -mapper './blah.py' -input sample.txt -output fileRead
uubf1zoe

uubf1zoe2#

试着改变你的想法 put 子流程采取 cat 通过改变这个

put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
            stdin=PIPE)

在这里

put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
            stdin=cat.stdout)

完整脚本:


# !/usr/bin/python

from subprocess import Popen, PIPE

print "Before Loop"

cat = Popen(["hadoop", "fs", "-cat", "./sample.txt"],
            stdout=PIPE)

print "After Loop 1"
put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
            stdin=cat.stdout)
put.communicate()

相关问题