我有一个进程,它接收输入数据,对其进行处理并输出数据in.log包含数据何时传入以及数据的类型。out.log包含处理数据的时间和数据的类型。所以。。。in.log包含in time idout.log包含超时id现在,作为使用python使用hadoop流进行处理的一部分,我想将这两个文件连接起来,并提供intime和out time的差异以及数据的id。例如:2秒id1233秒id112关于如何使用python实现这一点,有什么建议吗?
t40tm48m1#
看一看 MRjob 运行hadoop作业的帮助程序包。为这个任务编写一个map/reduce将是一个相当简单的过程,它可以按照下面的代码行进行
MRjob
from datetime import datetime from MRJob import MRJob class JoinJob(MRJob): fmt = '%Y-%M-%d' def steps(self): return [self.mr(mapper=self.mapper, reducer=self.reducer)] def mapper(self, rec_time, rec_id): yield rec_id, rec_time def reducer(self, rec_id, datetime_strs): datetimes = map(lambda x: datetime.strptime(x, self.fmt), datetime_strs) delta_secs = (max(datetimes) - min(datetimes)).total_seconds() yield rec_id, delta_secs if __name__ == '__main__': JoinJob.run()
1条答案
按热度按时间t40tm48m1#
看一看
MRjob
运行hadoop作业的帮助程序包。为这个任务编写一个map/reduce将是一个相当简单的过程,它可以按照下面的代码行进行