扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容主要讲解“MapReduce的output输出过程是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce的output输出过程是什么”吧!
成都创新互联公司专业为企业提供赤坎网站建设、赤坎做网站、赤坎网站设计、赤坎网站制作等企业网站建设、网页设计与制作、赤坎企业网站模板建站服务,十余年赤坎做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
//--------------------------ReduceTask.java public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); if (this.isMapOrReduce()) { this.copyPhase = this.getProgress().addPhase("copy"); this.sortPhase = this.getProgress().addPhase("sort"); this.reducePhase = this.getProgress().addPhase("reduce"); } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); //reducetask初始化工作 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { this.codec = this.initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = this.conf.getCombinerClass(); CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null; Class extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles); shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run(); this.mapOutputFilesOnDisk.clear(); this.sortPhase.complete(); this.setPhase(Phase.REDUCE); this.statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); //开始运行reducetask if (useNewApi) { this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); this.done(umbilical, reporter); }
和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。
//----------------------------------------ReduceTask.java public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //创建上下文对象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //修改reducetask的状态为运行中 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。 //默认是TextOutputFormat.class this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //获取输出路径 Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); } }
主要就是初始化上下文对象,获取outputformat对象。
//-----------------------------------------------ReduceTask.java privatevoid runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException { //匿名内部类,用于构建key,value的迭代器 rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //反射获取Reducer对象 org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //获取RecordWriter对象,用于将结果写入到文件中 org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); //创建reduceContext对象,用于reduce任务 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); //开始运行reduce try { reducer.run(reducerContext); } finally { //关闭输出流 trackedRW.close(reducerContext); } }
可以看到,主要做了以下工作:
1)获取reducer对象,用于运行run() ,也就是运行reduce方法
2)创建 RecordWriter对象
3)创建reduceContext
4)开始运行reducer中的run
//--------------------------------------NewTrackingRecordWriter.java static class NewTrackingRecordWriterextends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; private final List fsStats; NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List matchedStats = null; if (reduce.outputFormat instanceof FileOutputFormat) { matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesOutPrev = this.getOutputBytes(this.fsStats); //通过outputFormat创建RecordWriter对象 this.real = reduce.outputFormat.getRecordWriter(taskContext); long bytesOutCurr = this.getOutputBytes(this.fsStats); this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } ..................... }
重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。
上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()
public class TextOutputFormatextends FileOutputFormat { public TextOutputFormat() { } //可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //key和value的分隔符,默认是 \t String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); //分为压缩和非压缩输出 if (!isCompressed) { //获取输出路径 Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); //创建输出流 FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator); } else { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); //返回LineRecordWriter对象 return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } //这里就是 LineRecordWriter 类 protected static class LineRecordWriter implements RecordWriter { private static final byte[] NEWLINE; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } //将KV输出 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { //先写key if (!nullKey) { this.writeObject(key); } //接着写入key和value之间的分隔符 if (!nullKey && !nullValue) { this.out.write(this.keyValueSeparator); } //最后写入value if (!nullValue) { this.writeObject(value); } //接着写入新的一行 this.out.write(NEWLINE); } } public synchronized void close(Reporter reporter) throws IOException { this.out.close(); } static { NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); } } }
可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。
接着回到3中,看 reduceContext这个对象的类
protected staticReducer .Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException { ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); Reducer .Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext); return reducerContext; }
可以看到reducerContext是一个ReduceContextImpl类对象。
下面看看ReduceContextImpl 这个类的构造方法
//---------------------------------ReduceContextImpl.java public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException { //父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了 super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(this.buffer); this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(this.buffer); this.hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }
这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。
继续看看父类 TaskInputOutputContextImpl
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter) { //可以看到这里的output就是recordWriter对象 super(conf, taskid, reporter); this.output = output; this.committer = committer; } //这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回true public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; //调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.output.write(key, value);
可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。
public void run(Reducer.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } }
可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。
到此,相信大家对“MapReduce的output输出过程是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流