Contents
本文主要简要介绍MapReduce的思想和工作机制。参考的文献是Hadoop权威指南,本文偏向于个人看完书的笔记总结,若有不足或错误之处,还望指出。
MapReduce和关系型数据库的主要差别是:MapReduce适合一次写入、多次读取数据的应用,而关系型数据库更适合持续更新的数据集;MapReduce对于非结构化或半结构化数据非常有效。其有两个核心特性:数据本地化和无共享框架。数据本地化指的是在计算节点上存储数据,以实现数据的本地快速访问。无共享框架指的是各个任务之间彼此独立。
MapReduce作业(job)是客户端需要执行的一个工作单元,包括输入数据、MapReduce程序和配置信息。有两类节点控制作业执行过程:一个jobtracker及一系列tasktracker。jobtracker调度任务来协调所有运行在系统上的作业,tasktracker运行任务并将运行进度报告发送给jobtracker。
MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。Hadoop将作业分成若干个小任务(task),其中包括两类任务:map任务和reduce任务。Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)简称分片。Hadoop为每个分片构建一个map任务来运行用户自定义的map函数。Map任务的输出结果写入本地硬盘,而非HDFS。为了减少数据通信开销,中间结果数据进入reduce节点前需要进行合并(combine)处理,把具有同样主键的数据合并到一起避免重复传送; 一个reducer节点所处理的数据可能会来自多个map节点, 因此, map节点输出的中间结果需使用一定的策略进行适当的划分(partitioner)处理,保证相关数据发送到同一个reducer节点。
Mapreduce编程模型
MapReduce是一种编程模型,是用于处理和生成大规模数据集的相关的实现。MapReduce任务过程分为两个阶段:map阶段和reduce阶段,每个阶段均以键/值对作为输入和输出。需具体定义两个函数:map函数和reduce函数以及一些用来运行作业的代码。
以下是MapReduce的架构结构:
分别介绍以上的概念:
1. Mapper和Reducer
一个Mapper抽象类、一个Reducer抽象类,创建JobConf的执行程序,也可能有combiner抽象类,实际也就是reducer。
2. JobTracker
Master服务,主要有两个功能:调度Task运行于TaskTracker;监控是否有失败Task,若有就重新运行。一般在单独服务器上。
3. TaskTracker
Slaver服务。类似Namenode,进行心跳链接,主动发送通信接受作业,直接负责执行任务。
4. JobClient
将应用程序和configuration打包成Jar文件文件存储在HDFS上,并将路径提交到master服务上。
5. JobInProgress
跟踪调度Job,添加到Job队列,根据Jar定义的输入数据集(已经分解成FileSplit)创建一批TaskInProgress用于监控调度MapTask,创建同样数目来监控调度ReduceTask,默认为1。
6. TaskInProgress
TaskTracker运行Task,将其序列化写入相应的TaskTracker服务中,TaskTracker收到之后会创建相应的TaskTracker,调度和监控该Task。通过TaskRunner来运行Task,TaskRunner会自动装任务Jar文件并设置好环境变量,启动JavaChild进程来执行Task。有MapTask和ReduceTask。
7. MapTask
输入数据、job相关信息等组成的对象。MapTask的运行效率取决于读取输入数据的效率。输入数据越靠近TaskTracker就速度越快。分为三个级别:node-local(节点本地)、rack-local(同一机架)、off-switch(不同机架)。
8. ReduceTask
汇总Map Task的输出结果,最后生出Job的输出
Mapreduce运行机制
MapReduce将输入数据切分成若干独立数据块,map任务并行处理,排序,输入Reduce任务。输入输出都在文件系统中,整个框架负责任务的调度和监控,重新执行失败任务。计算节点通常和存储节点在一起。
MapReduce(这是MapReduce1的处理过程,MapReduce2结合了YARN,与1有所不同)的提交过程如下:
JobClient的runJob()方法是用于新建JobClient实例并调用其submitJob()方法的便捷方法。提交作业后,runJob()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。JobClient的submitJob()方法所实现的作业提交过程如下:
- 作业的提交:向jobtracker请求一个新的作业ID。
- 检查作业的输出说明(如是否值得输出目录或已经存在该目录);计算作业的输入分片。
- 将运行Job所需要的资源(包括作业JAR文件、配置文件和计算所得的输入分片)复制到JobID命名的jobtracker文件系统中。
- 调用JobTracker的sumbitJob(),告知jobtracker作业准备执行。
- JobTracker接收到sumbitJob(),把此调用放入调度队列,由作业调度器调度,并对其进行初始化,包括创建作业对象,封装了任务和记录信息。
- 创建任务运行列表,首先作业调度器从共享文件系统中获取JobClient已计算好的输入分片信息,然后为每个分片创建一个map任务。以及相应数量的要运行的reduce任务。
- TaskTracker执行循环,定期发送心跳调用JobTracker,汇报TaskTracker当前状态,若正常会分配一个任务。
- TaskTracke本地化作业Jar文件,通过从共享文件系统中复制到tasktracker所在的文件系统。同时。Tasktracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。
- TaskRunner启动一个新的JVM来运行每个任务。
上述步骤涉及的点:
- 心跳响应的处理步骤:
首先查看jobTracker是否存在MapTask,若关联的block在本地磁盘,就优先执行这个mapTask。若没有则查同Rack,然后查不同Rack或其他推测式执行Task - 任务分配:
TaskTracker有固定数量的任务槽,先将map填满然后分配Reduce到TaskTracker。Reduce任务并没有什么标准选择哪个TaskTracker,因为无法考虑数据本地化。Reduce通过开启多线程取抓取Map的输出结果。 - 任务执行:
(1)TaskTracker复制作业Jar文件到本地
(2)复制文件到本地
(3)创建本地工作目录,并把jar文件内容解压到下面
(4)新建TaskRunner实例。 - 进度和状态更新:
每个Job和Task都有status,包括运行状态(running successful),Map和Reduce的进度、计算器值、状态消息和描述等。这些信息通过一定的时间间隔由ChildJVM向TaskTracker再向JobTracker汇聚。JobTracker将产生一个全局试图监听。 - MapReduce的进度组成:
并非总是可以预测,但能告诉Hadoop有几个任务在运行。 - 任务完成:
JobTracker收到完成通知,设置状态为成功,然后JobClient查询状态直到任务已经完成。JobTracker打印一条消息到控制台,最后调用jobRun()返回。
MapReduce容错
任务失败
最常见的是Map或Reduce任务的失败:子任务JVM进程会在推出前向上一级TaskTracker发送错误报告,错误报告记录在用户的错误日志。TaskTracker将task attempt标记为failed并且释放任务槽。对于流任务,如果流进程以非零退出代码退出运行,标记为fail。
若是由于JVM的bug导致子进程突然退出,。TaskTracker监控到进程一旦退出,并将此次attempt标记为failed。
超时错误:若将超时设置成0将关闭超时检测,所以长时间运行的任务永远不会被标记为failed,挂起的任务永远不会释放任务槽,导致集群性能降低。
TaskTracker失败
若是TaskTracker停止或者很少向JobTracker发送心跳,JobTracker会注意到它的发送心跳情况,将其从等待Task的队列删除,并且安排此TaskTracker的上一个成功运行的Map任务返回。
两种失败情况:
- 如果它们属于未完成作业,reduce无法获取,只要map阶段失败必须重新执行任务
- reduce阶段失败,执行未完成任务,只要Reduce执行完就会把输出写到HDFS上。
JobTracker失败
这是最严重的失败方式,特别是Hadoop存在单点故障的情况下。可以通过启动多个JobTracker,只运行一个主的,通过机制确定哪个是主的来避免这种故障。
任务反复多次失败的处理方法:
TaskTracker将子任务标记为失败后会将自身计数器减1,通过心跳告知JobTracker本地的一个任务尝试失败,JobTracker接受到失败通知将子任务重新放入等待队列让其他执行,若4次失败就放弃。Hadoop提供mapred.max.map.failures.percent解决这个问题。
若一个Job有200个MapTask,该参数设置为5,则单个Job最多允许10个MapTask失败(200*5%=10)失败。可以通过在mapred-site.xml文件中如下设置实现。
ReduceTask类似。
shuffle阶段和sort阶段
MapReduce确保每个reduce的输入都是按键值排序。系统执行排序并将map的输出提交给reducer输入的过程称为shuffle过程。
Map端的shuffle
Map函数刚开始产生输出,并不是简单地写到磁盘中,而是写到内存中,处于效率的考虑,会在内存进行预排序。当内存满时,将会把内存中的数据写入到磁盘中去。
在将数据写入到磁盘中之前,会根据数据最终要传入的reducer把数据划分成相应的分区,在每一个分区中,会对数据进行排序,如果有combiner,会对数据进一步执行combiner使得其输出结果更紧凑,减少写入到磁盘的数据。
每次内存中的数据达到溢出阈值,就会新建一个溢出文件。因此当map完成最后一个输出记录之后,会有多个溢出文件。在任务结束之前,文件将会被合并为已分区且已排序的输出文件。
reduce端的shuffle
Reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此,只要有一个任务完成,reduce任务就开始复制其输出。这就是复制阶段。一般情况下,reduce任务有少量复制线程,因此能够并行去复制map输出。默认是5个线程。
在复制过程中,如果map输出相当小,那么会被复制到reduce任务的JVM内存中,一旦内存缓冲区达到阈值或者达到map输出阈值,则合并写到磁盘中;否则,map输出被复制到磁盘。随着磁盘上副本增多,后台线程将会把它合并为更大、排好序的文件。
复制完所有的map输出后,reduce进入排序阶段,这个阶段将合并map输出,维持其顺序。
总结:
(1)copy过程
(2)merge阶段:有三种形式:内存到内存;内存到磁盘;磁盘到内存。
(3)Reducer的输入文件
性能调优
总原则:给shuffle过程尽量多地提供内存空间,同时确保map函数和reduce函数有足够内存来运行。因此编写map函数和reduce函数要尽量少用内存。
MapReduce处理过程
结合shuffl阶段,总结mapReduce整体处理过程:
- 向MapReduce框架提交一个Job
- 将Job拆分成若干个Map任务
- 分配任务,Tasktracker执行Map任务
- Map产生输出,进行Map端的shuffle,若有combiner,则执行,此时数据在内存中(一开始不是将数据写到磁盘,而是先将数据写到内存中的缓存区,缓存区大小默认100M,若超过该大小,则会spill到磁盘。)
- 将最终Map结果输出磁盘
- 进行Reduce端的shuffle:经历copy阶段,然后是merge阶段,归并reducer的输入文件。
- 执行Reduce任务
在网上看到一个图片可以很好地解释整个过程:
任务的执行
推测执行
问题:在集群中的某一节点执行任务速度很慢,导致其他任务节点完成任务之后,需要等待该节点,即集群中存在明显的“拖后腿”现象
推测执行:Hadoop在集群运行任务的过程中会不断地检测任务执行进度,在一个任务运行比预期慢的时候,他会启动另一个相同的任务作为备份,这就是推测执行。如果推测任务首先完成,则终止原任务,相反的,如果原任务首先完成,则终止推测任务。注意:推测执行只是一种优化措施,并不能是作业的运行更加可靠,也不能改变一些软件缺陷造成的心梗问题。默认情况下,推测执行是启用的,可以基于集群或者基于每个作业,单独为map和reduce任务启用或者禁用该功能。
任务JVM重用
Hadoop中有个参数是mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM。为每个task启动一个新的JVM将耗时1秒左右,对于运行时间较长(比如1分钟以上)的job影响不大,但如果都是时间很短的task,那么频繁启停JVM会有开销。如果我们想使用JVM重用技术来提高性能,那么可以将mapred.job.reuse.jvm.num.tasks设置成大于1的数。这表示属于同一job的顺序执行的task可以共享一个JVM,也就是说第二轮的map可以重用前一轮的JVM,而不是第一轮结束后关闭JVM,第二轮再启动新的JVM。那么最多一个JVM能顺序执行多少个task才关闭呢?这个值就是mapred.job.reuse.jvm.num. tasks。如果设置成-1,那么只要是同一个job的task(无所谓多少个),都可以按顺序在一个JVM上连续执行。如果task属于不同的job,那么JVM重用机制无效,不同job的task需要不同的JVM来运行。
注意:JVM重用技术不是指同一Job的两个或两个以上的task可以同时运行于同一JVM上,而是排队按顺序执行。
跳过坏记录
大型数据集十分庞杂,数据损坏十分常见,而且数据损坏的原因大不相同,因此想要在程序中完全忽略损坏数据是不现实的。而损坏的数据会导致任务执行的失败,Hadoop在遇到这种情况时,会重新执行任务,但是重新执行任务对于数据损坏无济于事。因此,最好在mapper和reducer代码中处理被损坏的记录:抛出异常来终止作业运行,使用计数器计算作业中总的坏记录数。但是在某些情况下(软件缺陷存在于第三方库),上述方法无法处理,因此可以使用Hadoop的skipping mode选项自动跳过坏记录。
因为skipping mode会导致额外的网络流量和记录错误以维护失败记录的范围,因此只有在任务执行失败两次后才会启用skipping mode。因此,针对任务失败,tasktracker将根据一下运行结果来启动任务尝试:
1任务失败
2任务失败
3开启skipping mode。任务失败,但是失败记录有tasktracker保存
4 仍然启用skipping mode。任务继续运行,但是跳过上一次尝试中失败的坏记录。
注意:每一次任务尝试,只能检测出一条坏记录,因此该机制只适用于检测个别坏记录。
作业调度器
先进先出调度器
Hadoop最早期使用的一种作业调度机制:按照作业提交的顺序进行调度。该方法简单明了,但是却没有充分考虑作业的优先级等情况,因此提出了以下两种作业调度器。
公平调度器
公平调度器的目标是:让每个用户公平共享集群。如果只有一个作业在运行,就会得到集群的所有资源。随着提交的作业越来越多,闲置的资源就会按照“让每个用户公平共享集群”的方法被分配。
作业都放在作业池中。默认情况下,每个用户拥有一个作业池。提交作业多的用户不会因此获得更多的资源。
公平调度器支持抢占机制,所以,如果一个池在特定的一段时间未能公平共享资源,那么就会终止运行池中得到过多资源的任务,把空出来的资源让给资源不足的作业池。
默认情况下,所有池的共享相等,但可以进行配置,根据作业类型提供更多或更少的共享。如果需要的话,还可以限制同时活动的作业数,以尽量减少拥堵,让工作及时完成。
容量调度器
容量调度器的原理与公平调度器有些相似,但也有一些区别。首先,容量调度是用于大型集群,它们有多个独立用户和目标应用程序。由于这个原因,容量调度能提供更大的控制和能力,提供用户之间最小容量保证并在用户之间共享多余的容量。
在容量调度中,创建的是队列而不是池,每个队列的 map 和 reduce 插槽数都可以配置。每个队列都会分配一个保证容量(集群的总容量是每个队列容量之和)。在每个队列中采用FIFO策略。而且不支持优先级抢占,一旦一个作业开始执行,在执行完之前,他的资源不会被优先级搞得的作业抢占。
队列处于监控之下;如果某个队列未使用分配的容量,那么这些多余的容量会被临时分配到其他队列中。由于队列可以表示一个人或大型组织,那么所有的可用容量都可以由其他用户重新分配使用。
与公平调度另一个区别是可以调整队列中作业的优先级。一般来说,具有高优先级的作业访问资源比低优先级作业更快。
自定义Hadoop调度器
参考文章:http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/