MapReduce中的shuffle详解

前言

shuffle在某些情况下,表达的意义为reduce任务获取map输出的这部分过程,也就是通常意义上的“map-shuffle-reduce”。如果是这种情况下,shuffle表示的是将map输出的键值对,按照key值进行了一个groupby操作,最后得到< key, list[key] >的过程。而在这里,shuffle包含了从map输出到reducer的整个过程,包括了写入磁盘、分区、排序等步骤,有利于理解工作机制,优化MapReduce程序。

shuffle简述

shuffle

由上图(引用自shuffle和排序)所示,shuffle是一个横跨map task 和reduce task的过程,它是map 和 reduce的一个数据桥梁,负责将map输出作为输入传给reducer。在map端包括了写入缓冲区,溢出到磁盘,分区与排序等步骤;在reduce端则包括了复制数据、归并数据、reduce阶段等步骤。

map端

map函数产生输出时,出于效率的原因并不会直接写入硬盘,而是先放到一个环形内存缓存区中,并将缓存的数据按照key值进行一个预排序。缓冲区默认为100MB,该值可以通过io.sort.mb属性来调整。一旦达到阈值(io.sort.spill.percent, 默认为0.8),则后台进程开始将内容溢出到磁盘。溢出过程中,map输出仍写入缓冲区,在此期间缓冲区被填满,则会将map阻塞,知道该溢出过程结束。

每个溢出过程都会产生一个文件存到mapred.local.dir属性指定的目录中,在上图中,共产生了三个溢出到磁盘的文件。在溢出到磁盘之前,会根据reducer的数量划分分区,如图中共划分了3个分区,每个分区中,都按键进行内排序,如果指定了combiner,则在排序后的输出上进行,以减少写到磁盘和传递给reducer的数据。上图中每个溢出文件都有3个分区,每个分区内数据都是排好序的。

当map任务结束后,会将溢出到磁盘的文件进行一个合并,如图中,将3个文件合并成了一个文件,合并好的文件中每个分区内的数据也是排好序的。在map输出到磁盘时,可以通过设置mapred.compress.map.output为true和指定mapred.map.output.compression.codec指定压缩格式,这样可以加快溢出到磁盘的速度。

reducer是通过http方式获得输出文件的分区的,如上图中,第一个reducer获取了第一个分区。

reduce端

reducer会将各个map task上最后溢出的那个文件的对应分区复制到本地,由于map任务的完成时间可能不同,因此只要一个任务完成,reduce任务就开始复制其输出。reduce可以并行的复制map的输出,默认为5个线程,可以通过设置mapred.reduce.parallel.copies属性来改变。对于指定的作业,jobtracker(或App master)知道map输出和tasktracker的映射关系。reducer线程定期询问jobtracker以获取map输出的位置,直到获取全部的输出位置

复制完所有map输出后,就进入到上图中的“sort phase”,但它并不是一个严格意义上的排序过程,可以将它理解成归并排序中的merge过程,将若干个排好序的序列,归并成一个有序文件。这个过程根据合并因子(io.sort.factor设置,默认为10)进行,如果有50个map的输出,而合并因子为10,则每次最多合并10个map输出,因此最后会有5个中间文件。

在reduce阶段,直接将上面的5个中间文件合并成一个已排序的文件输入给reduce函数,最后的合并不需要磁盘的读写,只需要内存和磁盘片段的配合即可。

在“sort phase”时,由于最后一趟的合并总是将结果直接输入给reduce,而没有磁盘写入过程,因此可以据此进行优化。比如如果合并因子为10,有40个文件,此时不会再四趟中每次合并10个文件而得到4个文件,相反,第一趟只合并4个文件,随后的三趟每次合并10个文件,在最后一趟中,4个已合并的文件(4,10,10,10)和6个未合并的文件合并给reduce函数,这并不改变合并次数,却使得合并过程中磁盘只写入了4+10+10+10=34个文件,从而减少了磁盘的数据量。

结束语

本文是结合《Hadoop权威指南》shuffle和排序章节整理的读书笔记,方便日后的查阅。shuffle是MapReduce中数据传输的核心步骤,深入理解该部分的实现细节,有助于MapReduce程序的优化,通过减少网络上的数据传输,可以有效的提高MapReduce程序的执行效率。