dataproc-hadoop-mapreduce-无法让它工作

kzmpq1sx  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(313)

我基本上是在尝试运行我的第一个hadoopmapreduce例程,我必须使用hadoop和mapreduce,因为我是为一个类项目做这个的。我想使用python作为mapper和reducer,因为我非常熟悉这种语言,而且我的同龄人也非常熟悉它。我觉得最简单的设置方法是通过googledataproc示例,所以我也运行了这个。我将描述我做了什么,使用了哪些资源,但我对这方面比较陌生,可能会遗漏一些东西。
dataproc配置
数据过程1
数据过程2
数据过程3
然后,我可以用ssh连接到主节点。我有 mapper.py 以及 reducer.py 存储在google云存储桶中的文件。
mapper和reducer代码来自micheal noll的这篇博客文章,修改后可用于python3。
Map器.py:


# !/usr/bin/env python

"""mapper.py"""

import sys

# input comes from STDIN (standard input)

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        #print ('%s\t%s' % (word, 1))
        print(f"{word}\t{1}")

异径管.py


# !/usr/bin/env python

"""reducer.py"""

from operator import itemgetter
import sys

print_out = lambda x, y: print(f'{x}\t{y}')

current_word = None
current_count = 0
word = None

# input comes from STDIN (standard input)

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    #print("still working")

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            #print '%s\t%s' % (current_word, current_count)
            print_out(current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!

if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    print_out(current_word, current_count)

最后,我将ssh连接到主节点,然后检查python版本:

hduser@data-604-m:~$ python
Python 3.7.3 (default, Mar 27 2019, 22:11:17) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

我运行以下命令(改编自这里):

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input gs://data-604-hadoop/books/pg20417.txt \
    -output gs://data-604-hadoop/output

结果如下:

hduser@data-604-m:~$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar     -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py     -map
per mapper.py     -reducer reducer.py     -input gs://data-604-hadoop/books/pg20417.txt     -output gs://data-604-hadoop/output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.9.2.jar] /tmp/streamjob4601880105330541890.jar tmpDir=null
19/11/12 02:10:46 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:47 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:49 INFO mapred.FileInputFormat: Total input files to process : 1
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: number of splits:15
19/11/12 02:10:49 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher
.enabled
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573523684358_0002
19/11/12 02:10:50 INFO impl.YarnClientImpl: Submitted application application_1573523684358_0002
19/11/12 02:10:50 INFO mapreduce.Job: The url to track the job: http://data-604-m:8088/proxy/application_1573523684358_0002/
19/11/12 02:10:50 INFO mapreduce.Job: Running job: job_1573523684358_0002
19/11/12 02:10:58 INFO mapreduce.Job: Job job_1573523684358_0002 running in uber mode : false
19/11/12 02:10:58 INFO mapreduce.Job:  map 0% reduce 0%
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:19 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:20 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)

19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:28 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:30 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:37 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:39 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:40 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:48 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:49 INFO mapreduce.Job:  map 80% reduce 0%
19/11/12 02:11:50 INFO mapreduce.Job:  map 100% reduce 100%
19/11/12 02:11:50 INFO mapreduce.Job: Job job_1573523684358_0002 failed with state FAILED due to: Task failed task_1573523684358_0002_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0
19/11/12 02:11:50 INFO mapreduce.Job: Counters: 14
        Job Counters 
                Failed map tasks=19
                Killed map tasks=14
                Killed reduce tasks=5
                Launched map tasks=22
                Other local map tasks=14
                Rack-local map tasks=8
                Total time spent by all maps in occupied slots (ms)=885928
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=221482
                Total vcore-milliseconds taken by all map tasks=221482
                Total megabyte-milliseconds taken by all map tasks=453595136
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
19/11/12 02:11:50 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

我真的不知道现在该怎么办。我花了很多时间在这件事上,我觉得我是在一个砖墙,因为我不知道什么是错的。
我也尝试过:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
    -mapper ./mapper.py \
    -reducer ./reducer.py \
    -input gs://data-604-hadoop/books/pg20417.txt \
    -output gs://data-604-hadoop/output

结果相似。
谢谢你的帮助。
更新:我尝试了更多的东西,但没有成功。我已经尝试将python脚本移到hadoop集群上。然后我用 head -n100 mobydick.txt | ./mapper.py | sort | ./reducer.py 他们工作。在下面的评论中,我提到我查看了我的shebang并进行了更改,但也没有成功。

pdkcd3nj

pdkcd3nj1#

这里发生了一些不同的事情,但最主要的是它归结为不一定能够假设每个mapper/reducer任务(作为Yarn容器运行)的系统环境必然与您登录的shell具有相同的系统环境。在大多数情况下,许多元素都是有意不同的(比如java类路径等)。通常,对于基于java的mapreduce程序,这是可以按预期工作的,因为您最终会在运行在 hadoop jar 命令和在容器中的工作节点上运行的执行器代码。hadoop流媒体有点奇怪,因为在正常的hadoop使用中,它并不是一等公民。
无论如何,在本例中,您遇到的主要问题是,在登录到集群时,您的默认python是conda发行版和python 3.7版,但在您的yarn环境中生成mapper/reducer任务的默认python版本实际上是python 2.7。这是dataproc中一些遗留兼容性考虑的不幸结果。您可以通过对mapper.py进行黑客攻击以作为所需环境信息的转储来看到这一点,例如,在ssh进入dataproc集群时尝试运行以下命令:

echo foo > foo.txt
hdfs dfs -mkdir hdfs:///foo
hdfs dfs -put foo.txt hdfs:///foo/foo.txt
echo '#!/bin/bash' > info.sh
echo 'which python' >> info.sh
echo 'python --version 2>&1' >> info.sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
     -Dmapred.reduce.tasks=0 \
     -Dmapred.map.tasks=1 \
     -files info.sh \
     -input hdfs:///foo \
     -output hdfs:///info-output \
     -mapper ./info.sh

您的本地环境将显示与hdfs:///info输出不同的python版本:

$ bash info.sh
/opt/conda/bin/python
Python 3.7.3
$ hdfs dfs -cat hdfs:///info-output/*
/usr/bin/python 
Python 2.7.15+

这意味着您可以使mapper/reducer与python2.7兼容,也可以显式指定 /opt/conda/bin/python 在你的雪邦。在复制了您的设置(dataproc 1.4-ubuntu18加上jupyter.sh init操作)的我的设置中,以下操作对我有效:


# !/opt/conda/bin/python

"""mapper.py"""

import sys

# input comes from STDIN (standard input)

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        #print ('%s\t%s' % (word, 1))
        print(f"{word}\t{1}")

和减速器:


# !/opt/conda/bin/python

"""reducer.py"""

from operator import itemgetter
import sys

print_out = lambda x, y: print(f'{x}\t{y}')

current_word = None
current_count = 0
word = None

# input comes from STDIN (standard input)

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    #print("still working")

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            #print '%s\t%s' % (current_word, current_count)
            print_out(current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!

if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    print_out(current_word, current_count)

但是,要记住的另一件事是jupyter.sh初始化操作有点过时,您应该使用实际的dataproc jupyter组件。在任何情况下,尽管你可能想运行 info.sh 为了确定要在mapper.py和reducer.py中使用的相关python环境,我首先概述了上述步骤。
例如,一个没有jupyter.sh init操作的vanilla dataproc 1.4-debian9实际上会使用登录的默认python /opt/conda/default/bin/python 而不是 /opt/conda/bin/python . 一个dataproc1.3-debian9映像 /usr/bin/python 在python2.7中为默认值。

相关问题