python—hadoop中拆分和Map任务的数量

5f0d552i  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(293)

我对map reduce编程是新手,我已经用python编写了我的算法,我需要在'n'数据集上运行相同程序(我的算法)的'n'map示例。因为我的代码是用python编写的,所以我在代码中使用hadoopstreaming。
Hadoop流媒体文档建议如下-http://hadoop.apache.org/docs/r1.2.1/streaming.html#how+do+i+process+files%2c+one+per+map%3f,“生成一个包含输入文件的完整hdfs路径的文件。每个Map任务将获得一个文件名作为输入。“
因此,我为每个数据集文件创建了一个带有路径的文本文件。为了测试,我写了一个字数计算程序-http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ . 在我的map函数中,在进行实际的字数计算之前,我已经编写了下面的代码

for line in sys.stdin:
    # obtain filename from file list
    filename = line.rstrip('\n')
    localfilename = ntpath.basename(filename)
    os.environ("hadoop dfs -get"+line+ " " + localfilename)

问题1。因此,我的理解是,每一行都将作为一个分割给我的map函数,因此分割的数目应该是主文件中分割的数目或行数。我有三个文件名在我的主文件,但我可以看到有两个分裂创建。为什么会这样?
问题2。我的工作失败了,我不知道为什么,在哪里检查这些日志文件?
问题3。除此之外,我还有另一个选项来处理我的需求,将所有三个数据集放在一个文件中,并用特定的分隔符将其分隔,然后可以设置conf.set(“textinputformat.record.delimiter”,“specific delimiter”),但问题是它必须用java来完成。而且,在许多论坛中,编写自定义的记录阅读器是为了实现这一点。因为我不擅长java,所以我正在用python编写我的实现,到底是要设置这个参数,还是不用编写java代码就可以实现?
第四季度。在hadoop中有没有其他的选项可以满足我的需求?

hduser@master:~/code$ hadoop jar /usr/local/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper "python $PWD/fileprocess.py" -reducer "python $PWD/reduce.py" -input final.txt -output output.txt
14/09/16 05:27:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/home/hduser/tmp/hadoop-unjar4045267665479713934/] [] /tmp/streamjob4078572719514334736.jar tmpDir=null
14/09/16 05:27:26 INFO client.RMProxy: Connecting to ResourceManager at master/10.0.0.4:8032
14/09/16 05:27:26 INFO client.RMProxy: Connecting to ResourceManager at master/10.0.0.4:8032
14/09/16 05:27:31 INFO mapred.FileInputFormat: Total input paths to process : 1
14/09/16 05:27:31 INFO mapreduce.JobSubmitter: number of splits:2
14/09/16 05:27:31 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
14/09/16 05:27:31 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
14/09/16 05:27:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1410171456875_0012
14/09/16 05:27:34 INFO impl.YarnClientImpl: Submitted application application_1410171456875_0012 to ResourceManager at master/10.0.0.4:8032
14/09/16 05:27:35 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1410171456875_0012/
14/09/16 05:27:35 INFO mapreduce.Job: Running job: job_1410171456875_0012
14/09/16 05:27:51 INFO mapreduce.Job: Job job_1410171456875_0012 running in uber mode : false
14/09/16 05:27:51 INFO mapreduce.Job:  map 0% reduce 0%
14/09/16 05:28:11 INFO mapreduce.Job: Task Id : attempt_1410171456875_0012_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:429)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
dsf9zpds

dsf9zpds1#

问题1:hadoop会将每个文件按其认为合适的方式拆分,并且不能保证哪一行放在哪里。您需要将行放入单独的文件中,以确保它们由单独的Map器处理。
例如,如果您有三个文件名,而不是将它们全部放在一个文件中 /TEMP/files 文件您应该在子文件夹中创建三个文件,每个文件都有一个文件名,然后将它们添加到作业中,如下所示: -input /TEMP/files/* . 那会给你你想要的行为。
请注意,您将无法获取数据的任何位置。获取第一个文件引用的Map器可能需要从另一个节点获取它。根据集群的大小,对于正在处理的大多数文件,您可能更需要访问网络。
问题2:命令行输出只告诉您java容器的故障,而没有告诉您python的实际错误。要获取该信息,请转到“工作追踪器”页面: http://localhost:50030/jobtracker.jsp 从那里你可以在失败的工作下找到你的工作。单击该页上失败的任务,然后在“任务日志”列中选择一个选项。在那里您将看到python脚本的stderr输出。
你在用os.environ做一些奇怪的事情。您应该使用子进程来执行命令。例如:

from subprocess import call
call(["/usr/bin/hadoop", "dfs", "-get", line, localfilename])

问题3:我不太清楚这里有什么要求。你说的是被上面的文件引用的实际文件,然后你将直接通过-进入你的Map?您正在手动处理它们,因此它们的格式无关紧要,因为它们不会被传递到map/reduce。
问题4:看起来有些文件需要并行处理,但不需要使用map/reduce。基本上,您只想利用hadoop集群和大量cpu这一事实。这很好,可以工作,但除了将工作洗牌到从属服务器之外,您并没有真正使用hadoop。

相关问题