Python:Linux hadoop 脚本实现 reduce合并数据

x33g5p2x  于2021-11-21 转载在 Python  
字(2.5k)|赞(0)|评价(0)|浏览(411)

与这篇博文有点关系的可以参考下:

① MapReduce 计算框架 —— 执行流程详解

② 在Linux环境实现wordcount:mapper,reducer的代码创建,脚本实现map,reduce

③ Linux实现 map 返回列表形式操作

④ Linux hadoop 脚本实现 reduce合并数据

1、准备数据

有 a_join.txt 数据
user_id order_id

aaa1    123
aaa2    123
aaa3    123
aaa4    123
aaa5    123
aaa6    123
aaa7    123
aaa8    123
aaa9    123
aaa10   123
aaa11   123

有 b_join.txt 数据
user_id amount

aaa1    hadoop
aaa2    hadoop
aaa3    hadoop
aaa4    hadoop
aaa5    hadoop
aaa6    hadoop
aaa7    hadoop
aaa8    hadoop
aaa9    hadoop
aaa10   hadoop
aaa11   hadoop

使得两个数据,以key合成新的数据。
user_id order_id amount

aaa1 123 hadoop
aaa2 123 hadoop

怎么办?

可以先把 a_join 转为 map_a;

aaa1	1	123
aaa2	1	123

同理,b_join 转为 map_b;

aaa1	2	hadoop
aaa2	2	hadoop

2、创建 map.py函数

vi map_a.py

#!/usr/local/bin/python
import sys

for line in sys.stdin:
    ss = line.strip().split(' ')

    key = ss[0]
    val = ss[1]

    print ("%s\t1\t%s" % (key, val))

把输出数据复制到 a1 中

vi map_b.py

#!/usr/local/bin/python
import sys

for line in sys.stdin:
    ss = line.strip().split(' ')

    key = ss[0]
    val = ss[1]

    print ("%s\t2\t%s" % (key, val))

把输出数据复制到 b2 中

要对两个map函数的返回结果文件a1,b2 进行reduce操作。

3、创建 red.py函数

import sys

val_1 = ""

for line in sys.stdin:
    key, flag, val = line.strip().split('\t')

    if flag == '1':
        val_1 = val
    elif flag == '2' and val_1 != "":
        val_2 = val
        print ("%s\t%s\t%s" % (key, val_1, val_2))
        val_1 = ""

如上图,已经把两个文件reduce成,我们想要的结果。

4、脚本实现需求

创建 run.sh

set -e -x

HADOOP_CMD="/usr/hadoop/hadoop-2.7.3/bin/hadoop"
# hdfs输入路径
INPUT_FILE_PATH_A="/test/a_join.txt"
INPUT_FILE_PATH_B="/test/b_join.txt"

# hdfs输出路径
OUTPUT_A_PATH="/output/a"
OUTPUT_B_PATH="/output/b"

OUTPUT_JOIN_PATH="/output/join"

# 已经有输出路径就进行删除
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_A_PATH $OUTPUT_B_PATH $OUTPUT_JOIN_PATH
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_JOIN_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A \
    -output $OUTPUT_A_PATH \
    -mapper "python map_a.py" \
    -file ./map_a.py \

# Step 2.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_B \
    -output $OUTPUT_B_PATH \
    -mapper "python map_b.py" \
    -file ./map_b.py \

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $OUTPUT_A_PATH,$OUTPUT_B_PATH \
    -output $OUTPUT_JOIN_PATH \
    -mapper "cat" \
    -reducer "python red_join.py" \
    -file ./red_join.py \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf num.key.fields.for.partition=1
-mapper "cat" 
意思是把前面两个map输出结果作为red的输入;

-jobconf stream.num.map.output.key.fields=2 
意思是: 组合key=(aaa1	1), value=123;

-jobconf num.key.fields.for.partition=1
意思是:使用数据的第一列作为partition。

查看是否已经上次输入文件hadoop fs -ls /test ,没有就上传文件 hadoop fs -put ./a_join.txt b_join.txt /test/

查看输出文件是否存在,hadoop fs -ls /output
执行脚本命令sh -x run.sh

查看输出目录的 /output/join

hadoop fs -ls /output/join
hadoop fs -cat /output/join | head

脚本实现完成!!!

【注意细节】:
(1)修改别人代码前,必须备份!!!
(2)查看别人代码时,习惯使用 :q! 强制退出不保存

相关文章