在hadoop上运行python的mapreduce程序只输出我的一半数据

dgenwo3n  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(293)

我正在hadoop上运行一个简单的mapreduce程序,从数据集的一列计算值的min、max、median和stdev。当我在我的计算机上本地运行这个程序时,我会从数据集的所有值计算出最终的输出。然而,当我在hadoop上运行这个时,输出相当于dataset列中几乎一半的值。代码如下:
Map器.py


# !/usr/bin/env python3

import sys
import csv

# Load data

data = csv.DictReader(sys.stdin)

# Prints/Passes key-value to reducer.py

for row in data:
    for col, value in row.items():
        if col == sys.argv[1]:
            print('%s\t%s' % (col, value))

异径管.py


# !/usr/bin/env python3

import sys
import statistics

key = None
current_key = None
num_list = []
for line in sys.stdin:
    # Remove leading and trailng whitespace
    line = line.strip()

    # Parse input 
    key, value = line.split('\t', 1)

    # Convert string to int
    try:
        value = float(value)
    except ValueError:
        # Skip the value
        continue

    if current_key == key:
        num_list.append(value)
    else:
        if current_key:
            print("Num. of Data Points %s\t --> Max: %s\t Min: %s\t Median: %s\t Standard Deviation: %s" \
                % (len(num_list), max(num_list), min(num_list), statistics.median(num_list), statistics.pstdev(num_list)))
        num_list.clear()
        num_list.append(value)
        current_key = key

# Output last value if needed

if current_key == key:
    print("Num. of Data Points %s\t --> Max: %s\t Min: %s\t Median: %s\t Standard Deviation: %s" \
                % (len(num_list), max(num_list), min(num_list), statistics.median(num_list), statistics.pstdev(num_list)))

haddop日志:

2019-12-02 23:54:40,705 INFO mapreduce.Job: Running job: job_1575141442909_0026
2019-12-02 23:54:47,903 INFO mapreduce.Job: Job job_1575141442909_0026 running in uber mode : false
2019-12-02 23:54:47,906 INFO mapreduce.Job:  map 0% reduce 0%
2019-12-02 23:54:54,019 INFO mapreduce.Job:  map 100% reduce 0%
2019-12-02 23:54:59,076 INFO mapreduce.Job:  map 100% reduce 100%
2019-12-02 23:55:00,115 INFO mapreduce.Job: Job job_1575141442909_0026 completed successfully
2019-12-02 23:55:00,253 INFO mapreduce.Job: Counters: 54
        File System Counters
                FILE: Number of bytes read=139868
                FILE: Number of bytes written=968967
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=501097
                HDFS: Number of bytes written=114
                HDFS: Number of read operations=11
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
                HDFS: Number of bytes read erasure-coded=0
        Job Counters
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=7492
                Total time spent by all reduces in occupied slots (ms)=2767
                Total time spent by all map tasks (ms)=7492
                Total time spent by all reduce tasks (ms)=2767
                Total vcore-milliseconds taken by all map tasks=7492
                Total vcore-milliseconds taken by all reduce tasks=2767
                Total megabyte-milliseconds taken by all map tasks=7671808
                Total megabyte-milliseconds taken by all reduce tasks=2833408
        Map-Reduce Framework
                Map input records=10408
                Map output records=5203
                Map output bytes=129456
                Map output materialized bytes=139874
                Input split bytes=220
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=139874
                Reduce input records=5203
                Reduce output records=1
                Spilled Records=10406
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=80
                CPU time spent (ms)=2790
                Physical memory (bytes) snapshot=676896768
                Virtual memory (bytes) snapshot=8266964992
                Total committed heap usage (bytes)=482344960
                Peak Map Physical memory (bytes)=253210624
                Peak Map Virtual memory (bytes)=2755108864
                Peak Reduce Physical memory (bytes)=173010944
                Peak Reduce Virtual memory (bytes)=2758103040
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=500877
        File Output Format Counters
                Bytes Written=114
2019-12-02 23:55:00,254 INFO streaming.StreamJob: Output directory: data/output

本地输出:


# Data Points: 10407     Max: 89.77682042        Min: 13.87331897        Median: 46.44807153     Standard Deviation: 11.156280347146872

hadoop输出:


# Data Points: 5203      Max: 89.77682042        Min: 13.87331897        Median: 46.202181       Standard Deviation: 11.28118280525746

正如您所看到的,hadoop输出中的数据点数量几乎正好是本地输出中全部数据点数量的一半。我试着使用不同大小的不同数据集,结果总是一半…我是做错了什么还是遗漏了什么?

ukqbszuj

ukqbszuj1#

我已经弄明白了为什么我会得到这样的结果。原因是因为hadoop将我的输入数据一分为二,用于两个独立的Map器,正如我所怀疑的那样。但是,只有数据的前半部分保留了数据集的列标题,因此,当Map程序读取数据集的后半部分时,将不会访问指定的列。
我从数据集中删除了现有的头,并在读取数据时设置了字段名,以便解决问题:

data = csv.DictReader(sys.stdin, fieldnames=("col1", "col2", "col3", "col4", "col5"))

相关问题