`

Hadoop OutputFormat浅析

 
阅读更多

转自:http://hi.baidu.com/_kouu/blog/item/dd2f08fd25da09e0fc037f15.html

在Hadoop中,OutputFormat和InputFormat是相对应的两个东西。相比于InputFormat,OutputFormat似乎没有那么多细节。InputFormat涉及到对输入数据的解析和划分,继而影响到Map任务的数目,以及Map任务的调度(见Hadoop InputFormat浅析)。而OutputFormat似乎像其字面意思那样,仅仅是完成对输出数据的格式化。

对于输出数据的格式化,这个应该没什么值得多说的。根据需要,OutputFormat爱把输出写成什么格式就写成什么格式、爱把输出写到数据库就写到数据库、爱把输出通过网络发给其他服务就发给其他服务...


不过,OutputFormat所做的事情其实并不限于此。OutputFormat类包含如下三个方法:
RecordWriter  getRecordWriter(TaskAttemptContext context);
void  checkOutputSpecs(JobContext context);
OutputCommitter  getOutputCommitter(TaskAttemptContext context);

其中:
checkOutputSpecs是在JobClient提交Job之前被调用的(在使用InputFomat进行输入数据划分之前),用于检测Job的输出路径。比如,FileOutputFormat通过这个方法来确认在Job开始之前,Job的Output路径并不存在,然后该方法又会重新创建这个Output路径。这样一来,就能确保Job结束后,Output路径下的东西就是且仅是该Job输出的。

getRecordWriter用于返回一个RecordWriter的实例,Reduce任务在执行的时候就是利用这个实例来输出Key/Value的。(如果Job不需要Reduce,那么Map任务会直接使用这个实例来进行输出。)

RecordWriter有如下两个方法:

void  write(K key, V value);
void  close(TaskAttemptContext context);
前者负责将Reduce输出的Key/Value写成特定的格式,后者负责对输出做最后的确认并关闭输出。
前面提到的OutputFormat的字面含义,其实就是由这个RecordWriter来实现的。

而第三个方法,getOutputCommitter则用于返回一个OutputCommitter的实例。(在Hadoop-0.20中,MapReduce有两套API。getOutputCommitter是在NewAPI中才提供的,OldAPI里面并没有。不过OldAPI同样有OutputCommtter这个东西,只是不能通过OutputFormat来定制而已。)

OutputCommitter用于控制Job的输出环境,它有下面几个方法:
void  setupJob(JobContext jobContext);
void  commitJob(JobContext jobContext);
void  abortJob(JobContext jobContext, JobStatus.State state);
void  setupTask(TaskAttemptContext taskContext);
boolean  needsTaskCommit(TaskAttemptContext taskContext);
void  commitTask(TaskAttemptContext taskContext);
void  abortTask(TaskAttemptContext taskContext);

Job开始被执行之前,框架会调用OutputCommitter.setupJob()为Job创建一个输出路径;

如果Job成功完成,框架会调用OutputCommitter.commitJob()提交Job的输出;

如果Job失败,框架会调用OutputCommitter.abortJob()撤销Job的输出;

对应于Job下的每一个Task,同样牵涉创建、提交和撤销三个动作,分别由OutputCommitter.setupTask()、OutputCommitter.commitTask()、OutputCommitter.abortTask()来完成。而一个Task可能没有输出,从而也就不需要提交,这个可以通过OutputCommitter.needsTaskCommit()来判断;


具体OutputCommitter的这些方法里面完成了什么样的操作,这是由具体的OutputCommitter来定制的,可以任意去实现。比如,FileOutputCommitter完成了如下操作:

setupJob - mkdir ${mapred.output.dir}/_temporary
commitJob - touch ${mapred.output.dir}/_SUCCESS && rm -r ${mapred.output.dir}/_temporary
abortJob - rm -r ${mapred.output.dir}/_temporary
setupTask - <nothing>
needsTaskCommit - test -d ${mapred.output.dir}/_temporary/_${TaskAttemptID}
commitTask - mv ${mapred.output.dir}/_temporary/_${TaskAttemptID}/* ${mapred.output.dir}/
abortTask - rm -r ${mapred.output.dir}/_temporary/_${TaskAttemptID}

(注意,上面这些路径都是HDFS上的,不是某个TaskTracker本地机器上的。)

其中的逻辑是:Job执行的时候,Task的输出放到Output路径下的_temporary目录的以TaskAttemptID命名的子目录中。只有当Task成功了,相应的输出才会被提交到Output路径下。而只有当整个Job都成功了,才会在Output路径下放置_SUCCESS文件。_SUCCESS文件的存在表明了Output路径下的输出信息是正确且完整的;而如果_SUCCESS文件不存在,Output下的信息也依然是正确的(这已经由commitTask保证了),但是不一定是完整的(可能只包含部分Reduce的输出)。

与之对应,FileOutputFormat会让它所创建的RecordWriter将输出写到${mapred.output.dir}/_temporary/_${TaskAttemptID}/下。当然,Map和Reduce任务也可以自己向这个路径put数据。

接下来就是到在哪里去执行这些方法的问题了。

一个Job被提交到JobTracker后会生成若干的Map和Reduce任务,这些任务会被分派到TaskTracker上。对于每一个Task,TaskTracker会使用一个子JVM来执行它们。那么对于Task的setup/commit/abort这些操作,自然应该在执行Task的子JVM里面去完成:

当一个Task被关联到一个子JVM后,在任务初始化阶段,OutputCommitter.setupTask()会被调用;

当一个任务执行成功完成了之后,脱离子JVM之前,OutputCommitter.commitTask()会被调用。不过这里还有两个细节:1、需要先调用OutputCommitter.needsTaskCommit()来确定是否有输出需要提交;2、提交之前还有一个同步逻辑,需要由JobTracker同意提交后才能提交。因为Hadoop有推测执行的逻辑,一个Task可能在多个TaskTracker上同时执行,但是它们之中最多只有一个能得到提交,否则可能导致结果的错乱;

当一个任务执行失败时,OutputCommitter.abortTask()会被调用。这个调用很特殊,它不大可能在执行任务的子JVM里面完成。因为执行任务的子JVM里面跑的是用户提供的Map/Reduce代码,Hadoop框架是无法保证这些代码的稳定性的,所以任务的失败往往伴随着子JVM的异常退出(这也就是为什么要用子JVM来执行Map和Reduce任务的原因,否则异常退出的可能就是整个框架了)。于是,对于失败的任务,JobTracker除了要考虑它的重试之外,还要为其生成一个cleanup任务。这个cleanup任务像普通的Map和Reduce任务一样,会被分派到TaskTracker上去执行(不一定分派到之前执行该任务失败的那个TaskTracker上,因为输出是在HDFS上,是全局的)。而它的执行逻辑主要就是调用OutputCommitter.abortTask();


而对于Job的setup/commit/abort,则显然不能使用上面的逻辑。

从时间上说,OutputCommitter.setupJob()应该在所有Map和Reduce任务执行之前被调用、OutputCommitter.commitJob()应该在所有Map和Reduce任务执行之后被调用、而OutputCommitter.abortJob()应该在Job确认失败之后被调用;

从地点上说,可能调用这些方法的地方无外乎JobClient、JobTracker、或TaskTracker;

JobClient应该第一个被排除,因为Job的执行并不依赖于JobClient。JobClient在提交完Job之后就可以退出了,它的退出并不会影响Job的继续执行(如果不退出则可以接收JobTracker的进度反馈)。所以,不可能依靠JobClient在Job成功以后来调用OutputCommitter.commitJob();

JobTracker呢?貌似是个合适的地方,因为JobTracker明确知道Job的开始与结束、成功与失败。但是实际上还是不能由JobTracker来调用这些方法。就像前面说到的OutputCommitter.abortTask()一样,既然JobTracker知道了Task的失败,却不直接为它清理输出,而是通过生成一个对应的cleanup任务来完成清理工作。为什么要这样做呢?其实原因很简单,因为OutputCommitter是独立于Hadoop框架,可以由用户自己定制的。Hadoop框架不能保证用户定制代码的稳定性,当然不能让它直接在JobTracker上执行。必须启动一个新的JVM来执行这些方法,那么正好TaskTracker上已经有这样的逻辑了。

所以,对于Job的setup/commit/abort,跟OutputCommitter.abortTask()类似,JobTracker会生成对应的setup任务和cleanup任务。在初始化Job的时期将Job的setup任务分派给TaskTracker,TaskTracker执行这个setup任务所要做的事情就是调用OutputCommitter.setupJob();在Job结束时,Job的cleanup任务将分派给TaskTracker,TaskTracker执行这个cleanup任务所要做的事情就是根据Job的执行结果是成功或是失败,来调用OutputCommitter.commitJob()或OutputCommitter.abortJob()。

为了保证OutputCommitter.setupJob()在所有Map和Reduce任务执行之前被调用,在JobTracker上,Job的初始化被分成了两个步骤:一是为Job生成一堆任务,二是将setup任务分派给TaskTracker去执行,并等待它执行完成。在这之后,初始化才算完成,Map和Reduce任务才能得到分派。

 

可见,在Job执行的过程中,除了我们关注的Map和Reduce任务之外,还会有一些隐藏的setup和cleanup任务。不过这些任务都有一个共同点,它们都可以是用户定制的。

分享到:
评论

相关推荐

    hadoop原理浅析及安装.doc

    hadoop原理浅析及安装.doc

    大数据云计算技术 Hadoop应用浅析(共16页).rar

    大数据云计算技术 Hadoop应用浅析(共16页).rar

    亿赞普Hadoop应用浅析 共16页.pptx

    一 IZP Hadoop集群现状 Hadoop应用 Hadoop集群维护及出现的问题

    大数据云计算技术 Hadoop应用浅析(共16页).pptx

    一:IZP Hadoop集群现状 集群规模 共大、小 2个集群:数据中心和实验室集群 数据中心: 1台NameNode, 1台SecondNameNode, 1台JobTracker,100来台DataNode 共100多台高配服务器; 数据中心又分为10多个机架,每个机架...

    探索HadoopOutputFormat

    Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。...OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而

    hadoop2.7.3 hadoop.dll

    在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path追加%HADOOP_HOME%\bin,有可能出现如下错误: org.apache.hadoop.io.nativeio.NativeIO$Windows....

    大数据云计算技术 Hadoop的相关技术与应用案例分享 全套PPT 共9套PPT课件.rar

    大数据云计算技术 Hadoop应用浅析(共16页).pptx 大数据云计算技术 Hadoop运维杂记(共21页).pptx 大数据云计算技术 暴风集团基于hadoop的数据平台总体架构简介(共18页).ppt 大数据云计算技术 淘宝网Hadoop与...

    Hadoop HDFS和MapReduce架构浅析.pdf

    Hadoop HDFS和MapReduce架构浅析.pdf 更多资源请点击:https://blog.csdn.net/weixin_44155966

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf...

    Hadoop下载 hadoop-2.9.2.tar.gz

    Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo 的工程师 Doug Cutting 和 Mike Cafarella Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo...

    Hadoop下载 hadoop-3.3.3.tar.gz

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...

    Hadoop权威指南 中文版

    本书从hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍hado叩这一高性能处理海量数据集的理想工具。全书共14章,3个附录,涉及的主题包括:haddoop简介:mapreduce简介:hadoop分布式文件系统;hadoop的i...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    (hadoop HDFS 和 Mapreduce 架构浅析

    Hadoop 是一个基于 Java 的分布式密集数据处理 和数据分析的软件框架。Hadoop 在很大程度上是受 Google 在 2004 年白皮书中阐述的 MapReduce 技术的 启发。MapReduce 工作原理是将任务分解为成百上千 个小任务,然后...

    hadoop配置资源 ,hadoop-3.0.0,hadoop.dll,winutils

    调用保存文件的算子,需要配置Hadoop依赖 将文件夹中的 hadoop-3.0.0 解压到电脑任意位置 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’ winutils.exe,并放入Hadoop解压...

    hadoop-3.3.4 版本(最新版)

    Apache Hadoop (hadoop-3.3.4.tar.gz)项目为可靠、可扩展的分布式计算开发开源软件。官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程...

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop的dll文件 hadoop.zip

    hadoop的dll文件 hadoop.zip

    Hadoop集群pdf文档

    Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期)_SecureCRT使用 Hadoop_Hadoop集群(第5期)_Hadoop安装配置 Hadoop_Hadoop...

    hadoop_tutorial hadoop入门经典

    hadoop_tutorial hadoop入门经典 Hadoop 是一个能够对大量数据进行分布式处理的软件框架。Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。...

Global site tag (gtag.js) - Google Analytics