理论教育 深入了解MapReduce工作原理

深入了解MapReduce工作原理

时间:2023-06-14 理论教育 版权反馈
【摘要】:一个Partitioner 对应一个Reduce 作业,也就是说有几个Reduce 就会有几个Partitioner。Partitioner 用来决定Map 产生的key 由哪个Reduce 来处理,默认处理采用对key 进行hash 运算后再对Reduce 数取模。这种方式只是为了平均Reduce 的处理能力,但在实际中可能会出现“数据倾斜”的情况。

深入了解MapReduce工作原理

MapReduce 按照执行的先后顺序,大致分为输入分片、map、partitioner、sort、combiner、和reduce 阶段,其中partitioner、sort 和combiner 又通常被称为shuffle,工作流程如图4-4 所示:

图4-4 MapReduce 工作流程

1.输入分片(input split):在进行Map 计算之前,MapReduce 会根据输入文件计算输入分片,每个输入分片会启动一个Map 任务,也就说Map 任务个数是由输入分片决定的,map 个数的计算公式如下:splitsize=max(minimumsize,min(maximumsize,blocksize))。

blocksize 为文件块大小,minimumsize 和maximumsize 可以在配置文件中设置,没有设置的话splitsize 的大小默认等于blocksize,即128M。假设有三个输入文件,大小分别是64M,129M 和256M,那么MapReduce 会把64M 文件划分为一个输入分片,129M 则是两个输入分片(128M 和1M),256M 也是两个输入分片(都是128M),一共有五个输入分片,分别启动五个Map 任务,但是每个Map 任务运算的数据量不一样。

2. Map 阶段:通过自定义map()函数读入数据分片,输出的key/value 对放到环形内存缓冲区,这个缓冲区专门用来存放Map 的输出结果,默认大小是100M,并且在配置文件里为这个缓冲区设定一个阀值(默认是0.80,缓冲区大小和阀值都是可配置的)。如果缓冲区的内存达到了阀值的时候,会把内容以一个临时文件的方式存放到磁盘,这个过程叫spill。另外的20%内存可以继续写入要写进磁盘的数据,写入磁盘和写入内存操作是互不干扰的,如果缓存区满了,map 就会阻塞写入内存的操作,让写入磁盘操作完成后再继续执行写入内存操作。

3. Partitioner 阶段:对map 产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce 处理。一个Partitioner 对应一个Reduce 作业,也就是说有几个Reduce 就会有几个Partitioner。Partitioner 用来决定Map 产生的key 由哪个Reduce 来处理,默认处理采用对key 进行hash 运算后再对Reduce 数取模。这种方式只是为了平均Reduce 的处理能力,但在实际中可能会出现“数据倾斜”的情况。某个key 有大量相同数据,比如一个key 有20W 的数据,而其他所有key 加起来才有不到200条数据,要是由两个Reduce 处理的话,会造成两个任务处理量严重不均衡,为了避免这种情况就需要用户自定义Partitioner。

4. Sort 阶段:根据key 按照字典升续排序。

5. Combiner 阶段:Combiner 阶段可以由用户选择是否执行,是一个本地化的Reduce 操作。它是在Map 生成中间文件前做一个合并重复key 值的操作,这样可以减少磁盘存储和网络传输量。不是所有操作都要使用Combiner,使用原则是Combiner 的输出不能影响到Reduce 的最终输入。例如:如果计算只是求总数,最大值或最小值可以使用Combiner,但是求平均值就不能使用Combiner,否则Reduce 会求出比真实值大的结果。

6. Reduce 阶段:需要用户自定义实现,可以没有,比如只是对原始数据做一些格式转换。在Reduce 执行之前会将从各个Map 中Partitioner 相同的数据合并排序,执行的结果保存在HDFS 上。

下面通过一个单词计数案例来理解各个过程。

1.读取文件,文件划片后默认会将文件按行分割形成<key,value>,其中key 的值是行偏移量,如图4-5 所示:

图4-5 读取文件

2.执行自定义map,比如将1 中的value 分割成一个个单词,每个单词做新key/value 键值对的key,1 作为value,如图4-6 所示:(www.daowen.com)

图4-6 map 操作

3.得到map 方法输出的<key,value>对后,Mapper 会执行shuffle 过程,即先排序后合并,在此默认reduce 的执行数量为1,得到Mapper 的最终输出结果,如下图4-7 所示:

图4-7 shuffle 操作

4. Reducer 先对从Mapper 接收的数据进行合并排序,再交由用户自定义的reduce 方法进行处理,得到新的<key,value>对,并将其输出到HDFS 上,如下图4-8所示:

图4-8 reduce 操作

Hadoop1.x MapReduce 框架由JobTracker 和TaskTracker 共同组成。其中JobTracker是Master 节点上的主控进程,负责调度一个作业的所有任务并监控它们的执行。TaskTracker 是slave 节点上的从进程,负责执行由JobTracker 指派的任务,具体工作流程如下:

1.用户提交作业后,JobTracker 首先检查作业的输出目录是否存在,如果存在返回异常,如果不存在作业将被提交。

2.计算作业的输入划分并实例化作业为其分配ID,将程序jar 包、作业配置文件、分片信息等上传到HDFS。这些文件都存放在JobTracker 专门为该作业创建的文件夹中,文件夹名为该作业的Job ID。

3.作业被JobTracker 放入作业队列,作业调度器按照相关算法对其进行调度(常用调度算法有先进先出,计算能力和公平调度算法)。

4.作业被调度器调度执行时,会根据输入文件进行输入划分,为每个划分创建一个Map 任务,并将Map 任务分配给TaskTracker 执行。根据“数据本地化”原则,Map任务会被分配给含有该Map 处理的文件块的TaskTracker 上,同时将程序jar 文件从共享文件系统复制到该TaskTracker 上,最后新建一个TaskTracker 实例来运行任务。

5. TaskTracker 每隔一段时间会给JobTracker 发送一个心跳,告诉JobTracker 它依然在运行,同时心跳包还携带很多的信息,比如当前Map 任务完成的进度等信息。待所有Task 执行完毕后,整个作业执行成功。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈