MapReduce介绍

Contents

  1. 1. Mapreduce编程模型
  2. 2. Mapreduce运行机制
  3. 3. MapReduce容错
    1. 3.1. 任务失败
    2. 3.2. TaskTracker失败
    3. 3.3. JobTracker失败
  4. 4. shuffle阶段和sort阶段
    1. 4.1. Map端的shuffle
    2. 4.2. reduce端的shuffle
    3. 4.3. 性能调优
    4. 4.4. MapReduce处理过程
  5. 5. 任务的执行
    1. 5.1. 推测执行
    2. 5.2. 任务JVM重用
    3. 5.3. 跳过坏记录
  6. 6. 作业调度器
    1. 6.1. 先进先出调度器
    2. 6.2. 公平调度器
    3. 6.3. 容量调度器
    4. 6.4. 自定义Hadoop调度器

本文主要简要介绍MapReduce的思想和工作机制。参考的文献是Hadoop权威指南,本文偏向于个人看完书的笔记总结,若有不足或错误之处,还望指出。
MapReduce和关系型数据库的主要差别是:MapReduce适合一次写入、多次读取数据的应用,而关系型数据库更适合持续更新的数据集;MapReduce对于非结构化或半结构化数据非常有效。其有两个核心特性:数据本地化和无共享框架。数据本地化指的是在计算节点上存储数据,以实现数据的本地快速访问。无共享框架指的是各个任务之间彼此独立。
MapReduce作业(job)是客户端需要执行的一个工作单元,包括输入数据、MapReduce程序和配置信息。有两类节点控制作业执行过程:一个jobtracker及一系列tasktracker。jobtracker调度任务来协调所有运行在系统上的作业,tasktracker运行任务并将运行进度报告发送给jobtracker。
MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。Hadoop将作业分成若干个小任务(task),其中包括两类任务:map任务和reduce任务。Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)简称分片。Hadoop为每个分片构建一个map任务来运行用户自定义的map函数。Map任务的输出结果写入本地硬盘,而非HDFS。为了减少数据通信开销,中间结果数据进入reduce节点前需要进行合并(combine)处理,把具有同样主键的数据合并到一起避免重复传送; 一个reducer节点所处理的数据可能会来自多个map节点, 因此, map节点输出的中间结果需使用一定的策略进行适当的划分(partitioner)处理,保证相关数据发送到同一个reducer节点。

Mapreduce编程模型

MapReduce是一种编程模型,是用于处理和生成大规模数据集的相关的实现。MapReduce任务过程分为两个阶段:map阶段和reduce阶段,每个阶段均以键/值对作为输入和输出。需具体定义两个函数:map函数和reduce函数以及一些用来运行作业的代码。
以下是MapReduce的架构结构:

分别介绍以上的概念:
1. Mapper和Reducer
一个Mapper抽象类、一个Reducer抽象类,创建JobConf的执行程序,也可能有combiner抽象类,实际也就是reducer。
2. JobTracker
Master服务,主要有两个功能:调度Task运行于TaskTracker;监控是否有失败Task,若有就重新运行。一般在单独服务器上。
3. TaskTracker
Slaver服务。类似Namenode,进行心跳链接,主动发送通信接受作业,直接负责执行任务。
4. JobClient
将应用程序和configuration打包成Jar文件文件存储在HDFS上,并将路径提交到master服务上。
5. JobInProgress
跟踪调度Job,添加到Job队列,根据Jar定义的输入数据集(已经分解成FileSplit)创建一批TaskInProgress用于监控调度MapTask,创建同样数目来监控调度ReduceTask,默认为1。
6. TaskInProgress
TaskTracker运行Task,将其序列化写入相应的TaskTracker服务中,TaskTracker收到之后会创建相应的TaskTracker,调度和监控该Task。通过TaskRunner来运行Task,TaskRunner会自动装任务Jar文件并设置好环境变量,启动JavaChild进程来执行Task。有MapTask和ReduceTask。
7. MapTask
输入数据、job相关信息等组成的对象。MapTask的运行效率取决于读取输入数据的效率。输入数据越靠近TaskTracker就速度越快。分为三个级别:node-local(节点本地)、rack-local(同一机架)、off-switch(不同机架)。
8. ReduceTask
汇总Map Task的输出结果,最后生出Job的输出

Mapreduce运行机制

MapReduce将输入数据切分成若干独立数据块,map任务并行处理,排序,输入Reduce任务。输入输出都在文件系统中,整个框架负责任务的调度和监控,重新执行失败任务。计算节点通常和存储节点在一起。
MapReduce(这是MapReduce1的处理过程,MapReduce2结合了YARN,与1有所不同)的提交过程如下:
MapReduce的处理过程
JobClient的runJob()方法是用于新建JobClient实例并调用其submitJob()方法的便捷方法。提交作业后,runJob()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。JobClient的submitJob()方法所实现的作业提交过程如下:

  1. 作业的提交:向jobtracker请求一个新的作业ID。
  2. 检查作业的输出说明(如是否值得输出目录或已经存在该目录);计算作业的输入分片。
  3. 将运行Job所需要的资源(包括作业JAR文件、配置文件和计算所得的输入分片)复制到JobID命名的jobtracker文件系统中。
  4. 调用JobTracker的sumbitJob(),告知jobtracker作业准备执行。
  5. JobTracker接收到sumbitJob(),把此调用放入调度队列,由作业调度器调度,并对其进行初始化,包括创建作业对象,封装了任务和记录信息。
  6. 创建任务运行列表,首先作业调度器从共享文件系统中获取JobClient已计算好的输入分片信息,然后为每个分片创建一个map任务。以及相应数量的要运行的reduce任务。
  7. TaskTracker执行循环,定期发送心跳调用JobTracker,汇报TaskTracker当前状态,若正常会分配一个任务。
  8. TaskTracke本地化作业Jar文件,通过从共享文件系统中复制到tasktracker所在的文件系统。同时。Tasktracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。
  9. TaskRunner启动一个新的JVM来运行每个任务。

上述步骤涉及的点:

  1. 心跳响应的处理步骤:
    首先查看jobTracker是否存在MapTask,若关联的block在本地磁盘,就优先执行这个mapTask。若没有则查同Rack,然后查不同Rack或其他推测式执行Task
  2. 任务分配:
    TaskTracker有固定数量的任务槽,先将map填满然后分配Reduce到TaskTracker。Reduce任务并没有什么标准选择哪个TaskTracker,因为无法考虑数据本地化。Reduce通过开启多线程取抓取Map的输出结果。
  3. 任务执行:
    (1)TaskTracker复制作业Jar文件到本地
    (2)复制文件到本地
    (3)创建本地工作目录,并把jar文件内容解压到下面
    (4)新建TaskRunner实例。
  4. 进度和状态更新:
    每个Job和Task都有status,包括运行状态(running successful),Map和Reduce的进度、计算器值、状态消息和描述等。这些信息通过一定的时间间隔由ChildJVM向TaskTracker再向JobTracker汇聚。JobTracker将产生一个全局试图监听。
  5. MapReduce的进度组成:
    并非总是可以预测,但能告诉Hadoop有几个任务在运行。
  6. 任务完成:
    JobTracker收到完成通知,设置状态为成功,然后JobClient查询状态直到任务已经完成。JobTracker打印一条消息到控制台,最后调用jobRun()返回。

MapReduce容错

任务失败

最常见的是Map或Reduce任务的失败:子任务JVM进程会在推出前向上一级TaskTracker发送错误报告,错误报告记录在用户的错误日志。TaskTracker将task attempt标记为failed并且释放任务槽。对于流任务,如果流进程以非零退出代码退出运行,标记为fail。
若是由于JVM的bug导致子进程突然退出,。TaskTracker监控到进程一旦退出,并将此次attempt标记为failed。
超时错误:若将超时设置成0将关闭超时检测,所以长时间运行的任务永远不会被标记为failed,挂起的任务永远不会释放任务槽,导致集群性能降低。

TaskTracker失败

若是TaskTracker停止或者很少向JobTracker发送心跳,JobTracker会注意到它的发送心跳情况,将其从等待Task的队列删除,并且安排此TaskTracker的上一个成功运行的Map任务返回。
两种失败情况:

  1. 如果它们属于未完成作业,reduce无法获取,只要map阶段失败必须重新执行任务
  2. reduce阶段失败,执行未完成任务,只要Reduce执行完就会把输出写到HDFS上。

JobTracker失败

这是最严重的失败方式,特别是Hadoop存在单点故障的情况下。可以通过启动多个JobTracker,只运行一个主的,通过机制确定哪个是主的来避免这种故障。
任务反复多次失败的处理方法:
TaskTracker将子任务标记为失败后会将自身计数器减1,通过心跳告知JobTracker本地的一个任务尝试失败,JobTracker接受到失败通知将子任务重新放入等待队列让其他执行,若4次失败就放弃。Hadoop提供mapred.max.map.failures.percent解决这个问题。
若一个Job有200个MapTask,该参数设置为5,则单个Job最多允许10个MapTask失败(200*5%=10)失败。可以通过在mapred-site.xml文件中如下设置实现。

ReduceTask类似。

shuffle阶段和sort阶段

MapReduce确保每个reduce的输入都是按键值排序。系统执行排序并将map的输出提交给reducer输入的过程称为shuffle过程。

Map端的shuffle

Map函数刚开始产生输出,并不是简单地写到磁盘中,而是写到内存中,处于效率的考虑,会在内存进行预排序。当内存满时,将会把内存中的数据写入到磁盘中去。
在将数据写入到磁盘中之前,会根据数据最终要传入的reducer把数据划分成相应的分区,在每一个分区中,会对数据进行排序,如果有combiner,会对数据进一步执行combiner使得其输出结果更紧凑,减少写入到磁盘的数据。
每次内存中的数据达到溢出阈值,就会新建一个溢出文件。因此当map完成最后一个输出记录之后,会有多个溢出文件。在任务结束之前,文件将会被合并为已分区且已排序的输出文件。

reduce端的shuffle

Reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此,只要有一个任务完成,reduce任务就开始复制其输出。这就是复制阶段。一般情况下,reduce任务有少量复制线程,因此能够并行去复制map输出。默认是5个线程。
在复制过程中,如果map输出相当小,那么会被复制到reduce任务的JVM内存中,一旦内存缓冲区达到阈值或者达到map输出阈值,则合并写到磁盘中;否则,map输出被复制到磁盘。随着磁盘上副本增多,后台线程将会把它合并为更大、排好序的文件。
复制完所有的map输出后,reduce进入排序阶段,这个阶段将合并map输出,维持其顺序。
总结:
(1)copy过程
(2)merge阶段:有三种形式:内存到内存;内存到磁盘;磁盘到内存。
(3)Reducer的输入文件

性能调优

总原则:给shuffle过程尽量多地提供内存空间,同时确保map函数和reduce函数有足够内存来运行。因此编写map函数和reduce函数要尽量少用内存。

MapReduce处理过程

结合shuffl阶段,总结mapReduce整体处理过程:

  1. 向MapReduce框架提交一个Job
  2. 将Job拆分成若干个Map任务
  3. 分配任务,Tasktracker执行Map任务
  4. Map产生输出,进行Map端的shuffle,若有combiner,则执行,此时数据在内存中(一开始不是将数据写到磁盘,而是先将数据写到内存中的缓存区,缓存区大小默认100M,若超过该大小,则会spill到磁盘。)
  5. 将最终Map结果输出磁盘
  6. 进行Reduce端的shuffle:经历copy阶段,然后是merge阶段,归并reducer的输入文件。
  7. 执行Reduce任务
    在网上看到一个图片可以很好地解释整个过程:

任务的执行

推测执行

问题:在集群中的某一节点执行任务速度很慢,导致其他任务节点完成任务之后,需要等待该节点,即集群中存在明显的“拖后腿”现象
推测执行:Hadoop在集群运行任务的过程中会不断地检测任务执行进度,在一个任务运行比预期慢的时候,他会启动另一个相同的任务作为备份,这就是推测执行。如果推测任务首先完成,则终止原任务,相反的,如果原任务首先完成,则终止推测任务。注意:推测执行只是一种优化措施,并不能是作业的运行更加可靠,也不能改变一些软件缺陷造成的心梗问题。默认情况下,推测执行是启用的,可以基于集群或者基于每个作业,单独为map和reduce任务启用或者禁用该功能。

任务JVM重用

Hadoop中有个参数是mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM。为每个task启动一个新的JVM将耗时1秒左右,对于运行时间较长(比如1分钟以上)的job影响不大,但如果都是时间很短的task,那么频繁启停JVM会有开销。如果我们想使用JVM重用技术来提高性能,那么可以将mapred.job.reuse.jvm.num.tasks设置成大于1的数。这表示属于同一job的顺序执行的task可以共享一个JVM,也就是说第二轮的map可以重用前一轮的JVM,而不是第一轮结束后关闭JVM,第二轮再启动新的JVM。那么最多一个JVM能顺序执行多少个task才关闭呢?这个值就是mapred.job.reuse.jvm.num. tasks。如果设置成-1,那么只要是同一个job的task(无所谓多少个),都可以按顺序在一个JVM上连续执行。如果task属于不同的job,那么JVM重用机制无效,不同job的task需要不同的JVM来运行。
注意:JVM重用技术不是指同一Job的两个或两个以上的task可以同时运行于同一JVM上,而是排队按顺序执行。

跳过坏记录

大型数据集十分庞杂,数据损坏十分常见,而且数据损坏的原因大不相同,因此想要在程序中完全忽略损坏数据是不现实的。而损坏的数据会导致任务执行的失败,Hadoop在遇到这种情况时,会重新执行任务,但是重新执行任务对于数据损坏无济于事。因此,最好在mapper和reducer代码中处理被损坏的记录:抛出异常来终止作业运行,使用计数器计算作业中总的坏记录数。但是在某些情况下(软件缺陷存在于第三方库),上述方法无法处理,因此可以使用Hadoop的skipping mode选项自动跳过坏记录。
因为skipping mode会导致额外的网络流量和记录错误以维护失败记录的范围,因此只有在任务执行失败两次后才会启用skipping mode。因此,针对任务失败,tasktracker将根据一下运行结果来启动任务尝试:
1任务失败
2任务失败
3开启skipping mode。任务失败,但是失败记录有tasktracker保存
4 仍然启用skipping mode。任务继续运行,但是跳过上一次尝试中失败的坏记录。
注意:每一次任务尝试,只能检测出一条坏记录,因此该机制只适用于检测个别坏记录。

作业调度器

先进先出调度器

Hadoop最早期使用的一种作业调度机制:按照作业提交的顺序进行调度。该方法简单明了,但是却没有充分考虑作业的优先级等情况,因此提出了以下两种作业调度器。

公平调度器

公平调度器的目标是:让每个用户公平共享集群。如果只有一个作业在运行,就会得到集群的所有资源。随着提交的作业越来越多,闲置的资源就会按照“让每个用户公平共享集群”的方法被分配。
作业都放在作业池中。默认情况下,每个用户拥有一个作业池。提交作业多的用户不会因此获得更多的资源。
公平调度器支持抢占机制,所以,如果一个池在特定的一段时间未能公平共享资源,那么就会终止运行池中得到过多资源的任务,把空出来的资源让给资源不足的作业池。
默认情况下,所有池的共享相等,但可以进行配置,根据作业类型提供更多或更少的共享。如果需要的话,还可以限制同时活动的作业数,以尽量减少拥堵,让工作及时完成。

容量调度器

容量调度器的原理与公平调度器有些相似,但也有一些区别。首先,容量调度是用于大型集群,它们有多个独立用户和目标应用程序。由于这个原因,容量调度能提供更大的控制和能力,提供用户之间最小容量保证并在用户之间共享多余的容量。
容量调度中,创建的是队列而不是池,每个队列的 map 和 reduce 插槽数都可以配置。每个队列都会分配一个保证容量(集群的总容量是每个队列容量之和)。在每个队列中采用FIFO策略。而且不支持优先级抢占,一旦一个作业开始执行,在执行完之前,他的资源不会被优先级搞得的作业抢占。
队列处于监控之下;如果某个队列未使用分配的容量,那么这些多余的容量会被临时分配到其他队列中。由于队列可以表示一个人或大型组织,那么所有的可用容量都可以由其他用户重新分配使用。
与公平调度另一个区别是可以调整队列中作业的优先级。一般来说,具有高优先级的作业访问资源比低优先级作业更快。

自定义Hadoop调度器

参考文章:http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/