调试hadoop流程序

ni65a41a  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(305)

我有表格里的数据

id,    movieid      , date,    time
 3710100, 13502, 2012-09-10, 12:39:38.000

现在基本上我想做的是。。
我想知道,一部电影在早上7点到11点之间每隔30分钟看几次
所以基本上。。
这部电影在两人之间看了多少次

6 and 6:30
  6:30 and 7
   7 and 7:30
   ...
   10:30-11

所以我写了mapper和reducer来实现这个。

mapper.py

# !/usr/bin/env python

import sys

# input comes from STDIN (standard input)

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    line = line.split(",")
    #print line

    print '%s\t%s' % (line[1], line)

异径管.py


# !/usr/bin/env python

import sys
import datetime
from collections import defaultdict

def convert_str_to_date(time_str):
    try:
        timestamp =   datetime.datetime.strptime(time_str, '%Y-%m-%d:%H:%M:%S.000')  #00:23:51.000

        return timestamp

    except Exception,inst:

        pass

def is_between(time, time1,time2):
    return True if time1 <= time < time2 else False

def increment_dict(data_dict, se10,date_time):
    start_time = datetime.datetime(date_time.year,date_time.month,date_time.day, 07,00,00)
    times = [start_time]
    for i in range(8):
        start_time += datetime.timedelta(minutes = 30 )
        times.append(start_time)
    for i in range(len(times) -1 ):
        if is_between(date_time, times[i], times[i+1]):
            data_dict[se10][i] += 1

keys = [0,1,2,3,4,5,6,7]

data_dict = defaultdict(dict)

# input comes from STDIN

def initialize_entry(se10):
    for key in keys:
        data_dict[se10][key] = 0

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py

    se10, orig_data = line.split('\t')
    initialize_entry(se10)
    parse_line = orig_data.split(",")

    datestr = parse_line[2].replace(" ","").replace("'","")
    timestr = parse_line[3].replace(" ","").replace("'","")

    date_time = datestr + ":" + timestr

    time_stamp = convert_str_to_date(date_time)

    increment_dict(data_dict, se10,time_stamp)

for key, secondary_key in data_dict.items():
    for skey, freq in secondary_key.items():
        print key,"," ,skey,",",freq

如果我执行以下操作,上面的代码运行得很好

cat input.txt | python mapper.py | sort | python reducer.py

但当我把它部署到集群上时。不能说工作已经结束了。。这个原因还不清楚。
请帮忙。
谢谢。

tyu7yeag

tyu7yeag1#

好吧,我想出来了。。
主要问题是我的工作本地计算机是基于windows的。。而集群是基于linux的。。
所以我不得不把dos写的文件转换成unix。。

ou6hu8tu

ou6hu8tu2#

阅读jobhistory中的日志通常是个好主意,如中所述https://stackoverflow.com/a/24509826/1237813 . 它应该给你更多的细节,为什么工作失败。
关于行尾,hadoop streaming在默认情况下用于分割行的类是textinputformat。它曾经打破了windows的新线,但自2006年以来,它应该工作得很好。
这使得mapper和reducer脚本成为可能的问题源。python3使用了一种称为universalnewlines的东西,它应该可以在unix和windows的新行中开箱即用。在Python2.7中,需要显式地打开它。
在linux和MacOSX上,您可以像这样启用通用换行符来重新打开stdin sys.stdin = open('/dev/stdin', 'U') . 我手头没有windows计算机可供尝试,但以下三种系统都适用:

import os
import sys

# reopen sys.stdin

os.fdopen(sys.stdin.fileno(), 'U')

for line in sys.stdin:
    …

相关问题