hadoop流:python连接不同的文件

uqxowvwt  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(266)

我有一个进程,它接收输入数据,对其进行处理并输出数据
in.log包含数据何时传入以及数据的类型。out.log包含处理数据的时间和数据的类型。所以。。。in.log包含in time id
out.log包含超时id
现在,作为使用python使用hadoop流进行处理的一部分,我想将这两个文件连接起来,并提供intime和out time的差异以及数据的id。
例如:
2秒id123
3秒id112
关于如何使用python实现这一点,有什么建议吗?

t40tm48m

t40tm48m1#

看一看 MRjob 运行hadoop作业的帮助程序包。为这个任务编写一个map/reduce将是一个相当简单的过程,它可以按照下面的代码行进行

from datetime import datetime
from MRJob import MRJob

class JoinJob(MRJob):
    fmt = '%Y-%M-%d'
    def steps(self):
        return [self.mr(mapper=self.mapper, 
                        reducer=self.reducer)]
    def mapper(self, rec_time, rec_id):
        yield rec_id, rec_time

    def reducer(self, rec_id, datetime_strs):
        datetimes = map(lambda x: datetime.strptime(x, self.fmt), 
                            datetime_strs)
        delta_secs = (max(datetimes) - min(datetimes)).total_seconds()
        yield rec_id, delta_secs

if __name__ == '__main__':
    JoinJob.run()

相关问题