标签:
本文主要介绍下二次排序的实现方式
我们知道mapreduce是按照key来进行排序的,那么如果有有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这个其实就是二次排序。
下面就具体说一下二次排序的实现方式
1. 自定义一个key
为什么要自定义一个key,我们知道mapreduce中排序就是按照key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定不行,所以需要定义一个新的key,key里面有两个属性,也就是我们要排序的两个字段
首先,实现WritableComparable接口,因为key是可序列化并且可以比较的
其次,重载相关的方法,例如序列化、反序列化相关的方法write、readFields。重载在分区的时候要用到的hashcode方法,注意后面会说道一个partitioner类,也是用来分区的,用hashcode方法和partitioner类进行分区都是可以的,使用其中的一个即可。重载排序用的compareTo方法,这个就是真正对排序起作用的方法。
2. 分区函数类
上面定义了一个新的key,那么我现在做分发,到底按照什么样的规则进行分发是在分区函数类中定义的,这个类要继承Partitioner类,重载其中的分区方法getPartition,在main函数里给job添加上即可,例如:job.setPartitionerClass(partitioner.class)
这个类的作用跟key的hashcode方法的作用一样,所以如果在hashcode方法中写了分区的方法,这个分区类是可以省掉的
3. 比较函数类
这个类决定着key的排序规则,是一个比较器,需要继承WritableComparator类,并且重载其中的compare方法。在main函数里给job添加上即可,例如:job.setSortComparatorClass(KeyComparator.class)
这个类的作用跟自定义key的compareTo方法一样,如果在自定义的key中重载的compareTo方法,则这个类可省略。
4. 分组函数类
通过分区类,我们重新定义了key的分区规则,但是多个key不同的也可以进入到一个reducer中,所以我们需要分组函数类来定义什么样的key做为一组来执行,因为也涉及到比较,所以这个类也需要继承WritableComparator,并且重载其中的compare方法,在main函数中加入即可,例如:job.setPartitionerClass(partitioner.class);
下面是具体实现的代码
public class SecondSortTest {
private static String input = "/dsap/rawdata/secondSortTest/result3";
private static String output = "/dsap/rawdata/secondSortTest/result6";
public static class Mapper1 extends Mapper<Object, Text, Pair, Text> {
private Pair pair = new Pair();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] segs = value.toString().split("\\s+");
pair.set(Float.parseFloat(segs[0]), Float.parseFloat(segs[1]));
context.write(pair, new Text(segs[1]));
}
}
public static class Reducer2 extends Reducer<Pair, Text, Text, Text> {
public void reduce(Pair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
context.write(new Text(key.toString()), new Text("==========="));
for (Text text : values) {
context.write(new Text(key.toString()), text);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
/** 判断输出路径是否存在,如果存在,则删除 */
FileSystem hdfs = FileSystem.get(conf);
Job job = new Job(conf, "secondSortTest");
job.setJarByClass(SecondSortTest.class);
FileInputFormat.addInputPath(job, new Path(input));
if (hdfs.exists(new Path(output)))
hdfs.delete(new Path(output));
FileOutputFormat.setOutputPath(job, new Path(output));
job.setGroupingComparatorClass(GroupingComparator.class);
job.setNumReduceTasks(19);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer2.class);
job.setMapOutputKeyClass(Pair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
public static class partitioner extends Partitioner<Pair, Text> {
@Override
public int getPartition(Pair key, Text value, int numPartitions) {
return Math.abs((int) (key.getFirst() * 127)) % numPartitions;
}
}
static class Pair implements WritableComparable<Pair> {
private float first;
private float second = 0;
@Override
public void readFields(DataInput in) throws IOException {
first = in.readFloat();
second = in.readFloat();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeFloat(first);
out.writeFloat(second);
}
@Override
public int hashCode() {
return (int) (first * 127);
}
// 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
@Override
public int compareTo(Pair o) {
if (first != o.first) {
return first - o.first > 0 ? 1 : -1;
} else if (second != o.second) {
return second - o.second > 0 ? 1 : -1;
}
return 0;
}
public void set(float left, float right) {
first = left;
second = right;
}
public float getFirst() {
return first;
}
public float getSecond() {
return second;
}
@Override
public String toString() {
return "Pair [first=" + first + ", second=" + second + "]";
}
}
static class GroupingComparator implements RawComparator<Pair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
b2, s2, Integer.SIZE / 8);
}
@Override
public int compare(Pair o1, Pair o2) {
float first1 = o1.getFirst();
float first2 = o2.getFirst();
return first1 - first2 > 0 ? 1 : -1;
}
}
}
参考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html
标签:
原文地址:http://blog.csdn.net/nwpuwyk/article/details/42687289