使用python代码无法在mapreduce中获得预期的输出

n9vozmp4  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(313)

运行此代码以在hadoop中将我的数据聚集在大约10k+的csv文件中。我正在使用googledataproc集群来运行这个代码。请告诉我如何才能得到我的预期产出。最后一件事可能是逻辑问题或者功能问题。


# !/usr/bin/env python3

"""mapper.py"""
import sys

# Get input lines from stdin

for line in sys.stdin:
    # Remove spaces from beginning and end of the line
    line = line.strip()

    # Split it into tokens
    #tokens = line.split()

    #Get probability_mass values
    for probability_mass in line:
        print("None\t{}".format(probability_mass))

# !/usr/bin/env python3

"""reducer.py"""
import sys
from collections import defaultdict

counts = defaultdict(int)

# Get input from stdin

for line in sys.stdin:
    #Remove spaces from beginning and end of the line
    line = line.strip()

    # skip empty lines
    if not line:
        continue  

    # parse the input from mapper.py
    k,v = line.split('\t', 1)
    counts[v] += 1

total = sum(counts.values())
probability_mass = {k:v/total for k,v in counts.items()}
print(probability_mass)

我的csv文件看起来像这样。

probability_mass
10
10
60
10
30
Expected output Probability of each number

{10: 0.6, 60: 0.2, 30: 0.2}

but result still show like this 
{1:0} {0:0} {3:0} {6:0} {1:0} {6:0}

我将在nano中保存此命令,然后运行此命令。

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options=-n \
-files mapper.py,reducer.py \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-input /tmp/data.csv \
-output /tmp/output
q3qa4bjr

q3qa4bjr1#

您将行拆分为单个字符,这就解释了为什么会得到1、3、6、0等作为Map键。
不循环,只打印值的行;你的Map绘制者不需要超过这个

import sys
for line in sys.stdin:
    print("None\t{}".format(line.strip()))

然后,在reducer中,您将一个int除以一个更大的int,这将导致向下舍入到最接近的int,即0。
您可以通过将dict更改为store float来解决这个问题

counts = defaultdict(float)

或者让总和浮动

total = float(sum(counts.values()))

如前所述,这不是hadoop的问题,因为您可以在本地对其进行测试和调试

cat data.txt | python mapper.py | sort -n | python reducer.py

相关问题