用户编写的作业比较复杂,相互之间存在依赖关系,这种依赖关系可以用有向图表示,我们称之为 工作流 。 1.JobControl实现原理: 传统做法:为每个作业创建相应的JobConf对象,并按照依赖关系依次(串行)提交各个作业。 //创建Job对象JobConf extractJobConf =
用户编写的作业比较复杂,相互之间存在依赖关系,这种依赖关系可以用有向图表示,我们称之为工作流。
1.JobControl实现原理:
传统做法:为每个作业创建相应的JobConf对象,并按照依赖关系依次(串行)提交各个作业。
//创建Job对象JobConf extractJobConf = new JobConf(ExtractJob.class);JobConf classPriorJobConf = new JobConf(classPriorJob.class);JobConf conditionalProbilityJobConf = new JobConf(conditionalProbilityJobConf.class);JobConf predictJobConf = new JobConf(predictJobConf.class);//配置JobConf//按照依赖关系依次提交作业JobClient.runJob(extractJobConf);JobClient.runJob(classPriorJobConf);JobClient.runJob(conditionalProbilityJobConf);JobClient.runJob(predictJobConf);
Configuration extractJobConf = new Configuration();Configuration classPriorJobConf = new Configuration();Configuration conditionalProbilityJobConf = new Configuration();Configuration predictJobConf = new Configuration();。。设置各个Conf//创建Job对象Job extractJob = new Job(extractJobConf);Job classPriorJob = new Job(classPriorJobConf);Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);Job predictJob = new Job(predictJobConf);//设置依赖关系,构造一个DAG作业classPriorJob.addDepending(extractJob);conditionalProbilityJob.addDepending(extractJob);predictJob.addDepending(classPriorJob);predictJob.addDepending(conditionalProbilityJob);//创建JobControl对象,由它对作业进行监控和调度JobControl jc = new JobControl("test");jc.addJob(extractJob);jc.addJob(classPriorJob);jc.addJob(conditionalProbilityJob);jc.addJob(predictJob);jc.run();
开始--》waitingJobs--》readyJobs--》runningJobs--》successfulJobs--》结束 ??????--》failedJobs????--》结束
2.ChainMapper/ChainReducer的实现原理
CMR主要为了解决线性链式Mapper而提出的,在MR中存在多个Mapper,这些Mapper像管道一样,前一个Mapper的输出结果会被重定向到下一个Mapper的输入,形成
一个流水线,形式类似于[Map+Reduce Map*]
对于任意一个MR作业 ,MR阶段可以有无限个Mapper,但Reducer只能有一个。
实例:
conf.setJobName("chain");conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);JobConf mapper1Conf = new JobConf(false);JobConf mapper2Conf = new JobConf(false);JobConf reduce1Conf = new JobConf(false);JobConf mapper3Conf = new JobConf(false);?ChainMapper.addMapper(conf,Mapper1.class,LongWritable.class,Text.class,Text.class,Text.class,true,mapper1Conf);//第七个字段是:key/value是否按值传递,如果为true:则为按值传递;如果为false:则为按引用传递。如果设置为true,需要保证key/value不会被修改ChainMapper.addMapper(conf,Mapper2.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper2Conf);ChainMapper.setReducer(conf,Reducer.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduce1Conf);ChainMapper.addMapper(conf,Mapper3.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper1Conf);JobClient.runJob(conf);
原文地址:深入解析MapReduce架构设计与实现原理–读书笔记(5)hadoop工作流, 感谢原作者分享。
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。TEL:177 7030 7066 E-MAIL:11247931@qq.com