标签:
/** * Define the comparator that controls which keys are grouped together * for a single call to * {@link Reducer#reduce(Object, Iterable, * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted * @see #setCombinerKeyGroupingComparatorClass(Class) */ publicvoid setGroupingComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException{ ensureState(JobState.DEFINE); conf.setOutputValueGroupingComparator(cls); } /** * Set the user defined {@link RawComparator} comparator for * grouping keys in the input to the reduce. * * <p>This comparator should be provided if the equivalence rules for keys * for sorting the intermediates are different from those for grouping keys * before each call to * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> * * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed * in a single call to the reduce function if K1 and K2 compare as equal.</p> * * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control * how keys are sorted, this can be used in conjunction to simulate * <i>secondary sort on values</i>.</p> * * <p><i>Note</i>: This is not a guarantee of the reduce sort being * <i>stable</i> in any sense. (In any case, with the order of available * map-outputs to the reduce being non-deterministic, it wouldn‘t make * that much sense.)</p> * * @param theClass the comparator class to be used for grouping keys. * It should implement <code>RawComparator</code>. * @see #setOutputKeyComparatorClass(Class) * @see #setCombinerKeyGroupingComparator(Class) */ publicvoid setOutputValueGroupingComparator( Class<? extends RawComparator> theClass){ setClass(JobContext.GROUP_COMPARATOR_CLASS, theClass,RawComparator.class); }/** * Get the user defined {@link WritableComparable} comparator for * grouping keys of inputs to the reduce. * * @return comparator set by the user for grouping values. * @see #setOutputValueGroupingComparator(Class) for details. */ publicRawComparator getOutputValueGroupingComparator(){ Class<? extends RawComparator> theClass = getClass( JobContext.GROUP_COMPARATOR_CLASS, null,RawComparator.class); if(theClass == null){ return getOutputKeyComparator(); } returnReflectionUtils.newInstance(theClass,this); } RawComparator comparator = job.getOutputValueGroupingComparator();if(useNewApi){ runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); }else{ runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); }private<INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException,InterruptedException, ClassNotFoundException{ // wrap value iterator to report progress. final RawKeyValueIterator rawIter = rIter; rIter =newRawKeyValueIterator(){ publicvoid close() throws IOException{ rawIter.close(); } publicDataInputBuffer getKey() throws IOException{ return rawIter.getKey(); } publicProgress getProgress(){ return rawIter.getProgress(); } publicDataInputBuffer getValue() throws IOException{ return rawIter.getValue(); } public boolean next() throws IOException{ boolean ret = rawIter.next(); reporter.setProgress(rawIter.getProgress().getProgress()); return ret; } }; // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a reducer org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = newNewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext); job.setBoolean("mapred.skip.on", isSkipping()); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass); try{ reducer.run(reducerContext); } finally { trackedRW.close(reducerContext); } } @SuppressWarnings("unchecked") protectedstatic<INKEY,INVALUE,OUTKEY,OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context createReduceContext(org.apache.hadoop.mapreduce.Reducer <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, org.apache.hadoop.mapreduce.OutputCommitter committer, org.apache.hadoop.mapreduce.StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass,Class<INVALUE> valueClass ) throws IOException,InterruptedException{ org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = newReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); publicReduceContextImpl(Configuration conf,TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws InterruptedException,IOException{ super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory =newSerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(buffer); this.valueDeserializer = serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(buffer); hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }/** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException,InterruptedException{ if(!hasMore){ key = null; value = null; returnfalse; } firstValue =!nextKeyIsSame; DataInputBuffer nextKey = input.getKey(); currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength()- nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(),0, currentRawKey.getLength()); key = keyDeserializer.deserialize(key); DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition()); value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength()- nextKey.getPosition(); currentValueLength = nextVal.getLength()- nextVal.getPosition(); if(isMarked){ backupStore.write(nextKey, nextVal); } hasMore = input.next(); if(hasMore){ nextKey = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(),0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength()- nextKey.getPosition() )==0; }else{ nextKeyIsSame =false; } inputValueCounter.increment(1); returntrue; }if(theClass == null){ return getOutputKeyComparator(); }/** * Get the {@link RawComparator} comparator used to compare keys. * * @return the {@link RawComparator} comparator used to compare keys. */ publicRawComparator getOutputKeyComparator(){ Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null,RawComparator.class); if(theClass != null) returnReflectionUtils.newInstance(theClass,this); returnWritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class),this); } returnReflectionUtils.newInstance(theClass,this);标签:
原文地址:http://www.cnblogs.com/xuanlvshu/p/5748428.html