MapReduce 计算框架 —— 执行流程详解

x33g5p2x  于2021-11-21 转载在 MapReduce  
字(4.3k)|赞(0)|评价(0)|浏览(426)

与这篇博文有点关系的可以参考下:

① MapReduce 计算框架 —— 执行流程详解

② 在Linux环境实现wordcount:mapper,reducer的代码创建,脚本实现map,reduce

③ Linux实现 map 返回列表形式操作

④ Linux hadoop 脚本实现 reduce合并数据

1、Map 框架解释

MapReduce 计算框架: 首先根据分片大小划分文件块,即输入切片(InputSplit);交给map处理,每一个切片对应一个map;map输出的数据,放入环形溢写缓冲区(默认100M,达到80M进行溢写,写入到本地文件)。

写入本地文件时候,涉及到分区(partition)、排序(sort)、溢写到磁盘本地(spill to disk);

那如何分区(partition)?

用哈希(hash)取模操作,设置 reduce数量为3,例如对 100,50,30,10 的数据分区。

100%3=1
50%3=2
30%3=0
10%3=1

分区有0、1、2区,对号入座;sort(快速排序),再溢写到磁盘后, 进行数据合并(merge)。

数据合并的好处:对数据进行了压缩,① 减少数据写入磁盘的数据量,② 减少网络传输的数据量。

2、Reduce 框架解释

reduce从哪里拉取map数据?
我们知道 tasktracker与 jobtracker同步心跳,进行网络通信;在抓取过程 fetch中,jobtracker知道所有的宏观信息,reduce可以从 jobtracker获取信息,进而获取 map数据。

在reduce中,也有跟map一样的环形缓冲区,溢写后进行排序(归并排序),再进行数据合并(merge),好处有:①减少数据量 ② 提高执行效率;merge后进行reduce汇总,聚合,最后输出(output)到本地(hdfs)。

MapReduce作业执行流程分三个板块:Map,Shuffle(核心),Reduce。

为什么会Shuffle涉及到排序,合并等操作呢?

这是因为我们的带宽是有限的,要尽可能在传输过程中进行数据的压缩,减少带宽压力!!!

3、小知识——企业集群规划

原则:
(1)节点(服务器)数量一般是奇数;
(2)主从一般不会部署在同一个节点上;
(3)mysql常存储业务基础数据(用户注册信息,登录信息等),要保证数据不丢失,主结点进行对外服务,从节点进行备份。

规划:
节点的安装规划;
组件规划(版本之间的兼容性,版本的稳定性 hadoop 1.x -> hadoop 2.x);
数据目录规划(软件目录,数据存储目录,日志目录等);

企业要记录好,在接手工作时节省时间。

4、框架的执行细节

从本地HDFS读取文件File,每个文件切分多个一定大小(默认64M)的Block(默认3个备份)存储在多个节点(DataNode)上;
块Block是数据存储的基本单位,二进制文件,存储在节点DN中。

为什么hdfs中块不能太大,不能太小?

  • 块太大
    (1)块太大,降低执行速度;
    (2)受限于网络传输,若是块太大,传输失败,恢复代价太大,无响应等情况。
  • 块太小
    (1)NameNode内存资源有限;
    (2)频繁的文件传输消耗带宽,cpu资源,导致任务执行变慢。

InputFormat: 验证输入的格式是否符合定义;
(1)数据分割,把输入文件切分成多个输入切片(InputSplit);
(2)记录读取器(block->record),把块Block二进制转换为Map可以识别的格式(键值对)。

Split:实际上每个 split包含后一个Block中的开头部分的数据,Split可以解诀记录跨Block问题,对数据完整性得到保证。

RecordReader: 每读取一条记录,调用一次map函数。

【注意】: 对压缩文件的map处理是:压缩文件不能被切分,一个压缩文件就对应一个map;在 map端是压缩状态,在reduce会自动解压缩, 这是框架自带的功能,格式一般为.zip;这也是压缩数据,减少带宽的方法。

Shuffle: Partition, Sort, Spill, Meger, Combiner,
Copy, Memery, Disk。

Partitioner: 决定数据由哪个Reducer处理,从而进行分区;比如采用Hash法,有n个Reducer, 那么数据{ 'are’:1}的key,"are”对n进行取模,返回m,而生成 {partition, key, value} 格式。

MemoryBuffer: 内存缓冲区,每个map的结果和partition处理的key,value结果都保存在缓存中。

  • 缓冲区大小: 默认100M;
  • 溢写阈值: 100M* 0.8 = 80M
缓冲区中的数据: partition key value三元组数据,例如:
{"1","are":1}
{"2","at":1}
{"1","we":1}

Spill: 内存缓冲区达到阈值时,溢写spill线程 锁住这80M
的缓冲区,开始将数据写出到本地磁盘中,然后释放内存。

每次溢写都生成一个数据文件,溢出的数据到磁盘前会对数据进行 key排序 sort以及合并 combiner;发送相同 Reduce的 key数量,会拼接到一起,减少 partition的索引数量。

Sort: 缓冲区数据按照key进行排序。

{"1","are",1}
{"2","at",1}
....
{"1","we",1}
{"1","are",1}
--- sort后 ---
{"1","are",1}
{"1","are",1}
{"1","we",1}
...
{"2","at",1}

Combiner: 数据合并,相同的key的数据,value值合并, 减少输出传输量。
Combiner函数事实上是reducer函数,满足 combiner处理不影响{sum,max等} 最终reduce的结果时,可以极大提升性能。

{"1","are",1}
{"1","are",1}
{"1","we",1}
---Combiner---
{"1","are",2}
{"1","we",1}

Reduce: 多个 reduce任务输入的数据都属于不同的 partition,因此结果数据的key不会重复。合并reduce的输出文件即可得到最终的结果。

5、运行模型

MapReduce: map,reduce是进程,多进程运行模式;
Spark: 多线程运行模式。

6、Streaming 简介

MapReduce 和 HDFS采用Java实现,默认支持Java编程接口;
Streaming 框架允许让任何语言编写的 map,reduce程序能够在 hadoop集群上运行;
map / reduce 程序只要遵循从标准输入 stdin读,写出到标准输出 stdout即可。

6.1 Streaming 优点

开发效率高 :
(1)方便移植Hadoop平台,只需按照一定的格式从标准输入读取数据、向标准输出写数据就可 ;
(2)原有的单机程序稍加改动就可以在Hadoop平台进行分布式处理 cat input | mapper | sort | reducer > output

程序运行效率高
对于CPU密集的计算,有些语言如C/C++编写的程序可能比用Java编写的程序效率更高一些 ;

便于平台进行资源控制
Streaming 框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源。

6.2 Streaming 快速入门

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \

-input /user/test/input \ 
-output /user/test/output \
-mapper "python mapper.py"\
-reducer "python reducer.py" \
-file mapper.sh \ 
-jobconf mapred.job.name="xxx

input : 指定作业的输入文件的HDFS路径,支持使用*通配符,支持指定多个文件或目录,可多次使用。

output : 指定作业的输出文件的HDFS路径,路径必须不存在,并且具备执行作业用户有创建该目录的权限,只能使用一次。

mapper: 运行指定的mapper程序。
reducer: 运行指定的reducer程序。

file: 打包文件到提交的作业中
(1) map和reduce的执行文件;
(2) map和reduce要用输入的文件,如配置文件;
类似的配置还有-cacheFile, -cacheArchive分别用于向计算节点分发HDFS文件和HDFS压缩文件。

jobconf: 提交作业的一些配置属性
常见配置:
(1) mapred.map.tasks: map task数目;
(2) nmmapraepd.predeurc.ep.tyas: reduce task数目;
(3) stream.num.map.output.key.fields: 指定map task 输出记录中key所占的域数目;
(4) num.key.fields.for.partition 指定对key分出来的前几部分做 partition而不是整个key。

Streaming常见配置名称
mapred.job.name作业名
mapred.job.priority作业优先级
mapred.job.map.capacity最多同时运行map任务数
mapred.job.reduce.capacity最多同时运行reduce任务数
mapred.task.timeout任务没有响应(输入输出)的最大时间
mapred.compress.map.outputmap的输出是否压缩
mapred.map.output.compression.codecmap的输出压缩方式
mapred.output.compressreduce的输出是否压缩
mapred.output. compression. codecreduce的输出压缩方式

相关文章