标签:
/** * Define the comparator that controls * how the keys are sorted before they * are passed to the {@link Reducer}. * @param cls the raw comparator * @see #setCombinerKeyGroupingComparatorClass(Class) */ publicvoid setSortComparatorClass(Class<? extends RawComparator> cls) throws IllegalStateException{ ensureState(JobState.DEFINE); conf.setOutputKeyComparatorClass(cls); }Define the comparator that controls how the keys are sorted before they
/** * Set the {@link RawComparator} comparator used to compare keys. * @param theClass the {@link RawComparator} comparator used to * compare keys. * @see #setOutputValueGroupingComparator(Class) */设定用于比较key的比较器,theClass参数就是那个比较器啦publicvoid setOutputKeyComparatorClass(Class<?extendsRawComparator> theClass){ setClass(JobContext.KEY_COMPARATOR, theClass,RawComparator.class);}Set the {@link RawComparator} comparator used to compare keys. * @param theClass the {@link RawComparator} comparator used to * compare keys.setClass(JobContext.KEY_COMPARATOR,theClass,RawComparator.class);
/**
* Get the {@link RawComparator} comparator used to compare keys.
获取到一个用于比较key的比较器,并返回,返回类型是RawComparator
* @return the {@link RawComparator} comparator used to compare keys.
*/
publicRawComparator getOutputKeyComparator(){
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null,RawComparator.class);
如果KEY_COMPARATOR属性中没值,则返回null
if(theClass != null)
returnReflectionUtils.newInstance(theClass,this);
如果不为空,则就通过反射创建theClass
否则,使用默认的
returnWritableComparator.get(getMapOutputKeyClass().
asSubclass(WritableComparable.class),this);
}
if(theClass != null) returnReflectionUtils.newInstance(theClass,this);
/*** Compare logical range, st i, j MOD offset capacity.* Compare by partition, then by key.* @see IndexedSortable#compare*/publicint compare(final int mi, final int mj){final int kvi = offsetFor(mi % maxRec);final int kvj = offsetFor(mj % maxRec);final int kvip = kvmeta.get(kvi + PARTITION);final int kvjp = kvmeta.get(kvj + PARTITION);// sort by partitionif(kvip != kvjp){return kvip - kvjp;}// sort by keyreturn comparator.compare(kvbuffer,kvmeta.get(kvi + KEYSTART),kvmeta.get(kvi + VALSTART)- kvmeta.get(kvi + KEYSTART),kvbuffer,kvmeta.get(kvj + KEYSTART),kvmeta.get(kvj + VALSTART)- kvmeta.get(kvj + KEYSTART));- }
/** Optimization hook. Override this to make SequenceFile.Sorter‘s scream. * * <p>The default implementation reads the data into two {@link * WritableComparable}s (using {@link * Writable#readFields(DataInput)}, then calls {@link * #compare(WritableComparable,WritableComparable)}. */ @Override publicint compare(byte[] b1,int s1,int l1, byte[] b2,int s2,int l2){ try{ buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); }catch(IOException e){ thrownewRuntimeException(e); } return compare(key1, key2); // compare them }/** Compare two WritableComparables. * <p> The default implementation uses the natural ordering, calling {@link * Comparable#compareTo(Object)}. */ @SuppressWarnings("unchecked") publicint compare(WritableComparable a,WritableComparable b){ return a.compareTo(b); }/** * Get the key class for the map output data. If it is not set, use the * (final) output key class. This allows the map output key class to be * different than the final output key class. * * @return the map output key class. */ publicClass<?> getMapOutputKeyClass(){ Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null,Object.class); if(retv == null){ retv = getOutputKeyClass(); } return retv; }public interface WritableComparable<T> extends Writable,Comparable<T>/** * A serializable object which implements a simple, efficient, serialization * protocol, based on {@link DataInput} and {@link DataOutput}. 一个实现了一个简单高效的序列化协议(基于....)的可序列化的对象 * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce * framework implements this interface.</p> 在hadoop mp框架中。任何一个key或者value类型实现该接口
publicclassText extends BinaryComparable implements WritableComparable<BinaryComparable>{} *<p>Implementations typically implement a static<code>read(DataInput)</code> * method which constructs a new instance, calls {@link#readFields(DataInput)} * and returns the instance.</p> 实现类通常实现一个静态的read方法——它构建一个新的实例,调用readFields,返回实例 <p>Example:</p> *<p><blockquote><pre> * publicclassMyWritableComparable implements WritableComparable<MyWritableComparable>{ * // Some data * privateint counter; * privatelong timestamp; * * publicvoid write(DataOutput out) throws IOException{ * out.writeInt(counter); * out.writeLong(timestamp); * } * * publicvoid readFields(DataInput in) throws IOException{ * counter = in.readInt(); * timestamp = in.readLong(); * } * * publicint compareTo(MyWritableComparable o){ * int thisValue =this.value; * int thatValue = o.value; * return(thisValue < thatValue ?-1:(thisValue==thatValue ?0:1)); * } * * publicint hashCode(){ * final int prime =31; * int result =1; * result = prime * result + counter; * result = prime * result +(int)(timestamp ^(timestamp >>>32)); * return result * } * }classWritableComparator implements RawComparator,Configurable A Comparatorfor{@linkWritableComparable}s. *<p>This base implemenation uses the natural ordering. To define alternate * orderings, override {@link#compare(WritableComparable,WritableComparable)}. *<p>One may optimize compare-intensive operations by overriding *{@link#compare(byte[],int,int,byte[],int,int)}. Static utility methods are * provided to assist in optimized implementations of this method.标签:
原文地址:http://www.cnblogs.com/xuanlvshu/p/5748098.html