运行Hadoop流和MapReduce作业:pipeMapRed.waitOutputThreads():子进程失败,代码为% 1

k0pti3hp  于 10个月前  发布在  Hadoop
关注(0)|答案(1)|浏览(89)

我正在使用Hadoop 3.3.4,并试图在Python中执行一个MapReduce程序,该程序使用Google页面排名算法对页面进行排名。
我正试着在我自己的Hadoop集群上运行这个。我使用以下命令运行作业。
mapred streaming -文件mapper.py,reducer.py-input/user/hadoop/input/input.txt-output /user/hadoop/output -mapper ./reducer.py-reducer ./ mapper.py
但得到以下错误!

2023-07-14 17:19:01,076 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at Master/192.168.1.10:8032
2023-07-14 17:19:01,343 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/hadoop/.staging/job_1689368595696_0003
2023-07-14 17:19:01,790 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-14 17:19:01,934 INFO mapreduce.JobSubmitter: number of splits:20
2023-07-14 17:19:02,149 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1689368595696_0003
2023-07-14 17:19:02,149 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-14 17:19:02,293 INFO conf.Configuration: resource-types.xml not found
2023-07-14 17:19:02,294 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-07-14 17:19:02,351 INFO impl.YarnClientImpl: Submitted application application_1689368595696_0003
2023-07-14 17:19:02,408 INFO mapreduce.Job: The url to track the job: http://Master:8088/proxy/application_1689368595696_0003/
2023-07-14 17:19:02,410 INFO mapreduce.Job: Running job: job_1689368595696_0003
2023-07-14 17:19:09,539 INFO mapreduce.Job: Job job_1689368595696_0003 running in uber mode : false
2023-07-14 17:19:09,540 INFO mapreduce.Job:  map 0% reduce 0%
2023-07-14 17:19:33,742 INFO mapreduce.Job: Task Id : attempt_1689368595696_0003_m_000002_0, Status : FAILED
[2023-07-14 17:19:29.868]Container killed on request. Exit code is 137
[2023-07-14 17:19:30.046]Container exited with a non-zero exit code 137. 
[2023-07-14 17:19:30.080]Killed by external signal

2023-07-14 17:19:33,830 INFO mapreduce.Job: Task Id : attempt_1689368595696_0003_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

字符串
我也加了#!/usr/bin/env python3在我的两个文件mapper.py和reducer.py
这是我的mapper.py:

#!/usr/bin/env python3

import sys
import networkx as nx

# Create a directed graph
G = nx.DiGraph()

# Dictionary to store the mapping of page_id to page_link
page_link_dict = {}

# Read the input and store lines in a list
lines = sys.stdin.readlines()

# Process each line
for line in lines:
    page_infos = line.strip().split('\t')
    page_id = int(page_infos[0])
    page_link = page_infos[2]
    # Add the node to the graph
    G.add_node(page_id)

    # Store the mapping of page_id to page_link in the dictionary
    page_link_dict[page_id] = page_link
    
for line in lines:
    page_id, page_title, page_link, links = line.strip().split('\t')
    # Split the links
    pages_links = links.split(',')
    for page_link in pages_links:
        # Search for the id of the linked page and add an edge to it
        for linked_page_id, link in page_link_dict.items():
            if page_link == link:
                G.add_edge(int(page_id), linked_page_id)
                
# Output the graph as adjacency list
for node in G.nodes():
    neighbors = ','.join(map(str, G.neighbors(node)))
    sys.stdout.write(f'{node}\t{neighbors}\n')


这是我的网站reducer.py:

#!/usr/bin/env python3

import sys
import networkx as nx

# Create a directed graph
G = nx.DiGraph()

# Read the adjacency list from mapper output and add edges to the graph
for line in sys.stdin:
    page_infos = line.strip().split('\t')
    node_id = page_infos[0]
    node_id = int(node_id)
    G.add_node(node_id)
    if len(page_infos) == 2:
        neighbors = page_infos[1]
        neighbors = neighbors.split(',')
        for neighbor_id in neighbors:
            G.add_edge(node_id, int(neighbor_id))

# Run the PageRank algorithm
pagerank_scores = nx.pagerank(G)

# Write the output to stdout
for page_id, rank in pagerank_scores.items():
    sys.stdout.write(f'{page_id}\t{rank}\n')


我还尝试通过运行以下命令在计算机本地测试代码:
cat input.txt|./mapper.py> file.txt
cat文件. txt|./reducer.py
它在这里工作得很好,下面的一些结果显示了每个页面ID及其相应的排名分数:

10  8.883661079121364e-05
9   6.371139303094697e-05
97  7.724393979460297e-05
152 0.0002934058532326906
145 0.00011016393468028126
11  8.886938479913977e-05
13  8.887866372994127e-05
12  6.371139303094697e-05
more results after that ...


最后,我还在Python中测试了一个小的wordcount MapReduce程序来测试我的Hadoop配置,这也很好用,我确实在我的master中安装了所有的依赖包来运行程序,我也在我的2个slave中安装了它们,我不知道这是否有必要。

**更新!!:**我在这个项目中使用了apache spark框架和graphframes,因为它与图形更兼容,并且得到了想要的结果。

jgovgodb

jgovgodb1#

我的问题是我使用pip install networkx安装了依赖项,并且不会在根系统中安装模块,而是添加了sudo以使其适用于sudo pip install networkx
之后,作业成功执行。
但是我得到的结果仍然与我在本地运行脚本得到的结果不相似,所以我猜是与我错过的MapReduce逻辑有关的东西。所以如果有人能帮助我,我会为此再发一个帖子。

**UPDATE:**Hadoop中的MapReduce范式主要是为批处理而设计的,自然不支持像PageRank这样需要多次迭代和节点间全局通信的迭代算法。

在PageRank算法的情况下,每次迭代都取决于上一次迭代的结果,并且需要所有节点(我的input.txt中的所有页面ID),Apache Spark等框架更适合。Spark提供了一个内存中的分布式计算模型,允许迭代处理和跨迭代的高效数据共享。

相关问题