标签:des blog http java 使用 os io 数据
代码测试环境:Hadoop2.4
应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧。
应用实例描述:
比如针对下面的数据:
a,5 b,7 c,2 c,9 a,3 a,1 b,10 b,3 c,1如果使用一般的MR的话,其输出可能是这样的:
a 1 a 3 a 5 b 3 b 10 b 7 c 1 c 9 c 2从数据中可以看到其键是排序的,但是其值不是。通过此篇介绍的技巧可以做到下面的输出:
a 1 a 3 a 5 b 3 b 7 b 10 c 1 c 2 c 9这个数据就是键和值都是排序的了。
二次排序原理:
1)自定义键类型,把值放入键中;
2)利用键的排序特性,可以顺便把值也排序了;
3)这时会有两个问题:
a. 数据传输到不同的Reducer会有异常;
b. 数据在Reducer中的分组不同;
针对这两个问题,需要使用自定义Partitioner、使用自定义GroupComparator来定义相应的逻辑;
实例:
driver类:
package fz.secondarysort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SortDriver extends Configured implements Tool{
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
ToolRunner.run(new Configuration(), new SortDriver(),args);
}
@Override
public int run(String[] args) throws Exception {
Job job1 = Job.getInstance(getConf(), "secondary sort ");
job1.setJarByClass(getClass());
if(args.length!=5){
System.err.println("Usage: <input> <output> <numReducers> <useSecondarySort> <useGroupComparator>");
System.exit(-1);
}
Path out = new Path(args[1]);
out.getFileSystem(getConf()).delete(out, true);
FileInputFormat.setInputPaths(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, out);
if("true".equals(args[3])||"false".equals(args[3])){
if("true".equals(args[3])){ // 使用二次排序
job1.setMapperClass(Mapper1.class);
job1.setReducerClass(Reducer1.class);
job1.setMapOutputKeyClass(CustomKey.class);
job1.setMapOutputValueClass(NullWritable.class);
job1.setOutputKeyClass(CustomKey.class);
job1.setOutputValueClass(NullWritable.class);
if("true".equals(args[4])){
job1.setGroupingComparatorClass(CustomGroupComparator.class);
}else if("false".equals(args[4])){
// do nothing
}else{
System.err.println("Wrong Group Comparator argument!");
System.exit(-1);
}
job1.setPartitionerClass(CustomPartitioner.class);
}else{ // 不使用二次排序
job1.setMapperClass(Mapper2.class);
job1.setReducerClass(Reducer2.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
}
}else{
System.err.println("The fourth argument should be ‘true‘ or ‘false‘");
System.exit(-1);
}
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);
job1.setNumReduceTasks(Integer.parseInt(args[2]));
boolean job1success = job1.waitForCompletion(true);
if(!job1success) {
System.out.println("The CreateBloomFilter job failed!");
return -1;
}
return 0;
}
}
package fz.secondarysort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 有二次排序mapper
* @author fansy
*
*/
public class Mapper1 extends
Mapper<LongWritable, Text, CustomKey, NullWritable> {
private String COMMA =",";
private CustomKey newKey = new CustomKey();
public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
String [] values = value.toString().split(COMMA);
newKey.setSymbol(values[0]);
newKey.setValue(Integer.parseInt(values[1]));
cxt.write(newKey, NullWritable.get());
}
}
Reducer1(二次排序Reducer)package fz.secondarysort;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 二次排序Reducer
* @author fansy
*
*/
public class Reducer1 extends
Reducer<CustomKey, NullWritable, CustomKey, NullWritable> {
private Logger log = LoggerFactory.getLogger(Reducer1.class);
public void setup(Context cxt){
log.info("reducer1*********************in setup()");
}
public void reduce(CustomKey key ,Iterable<NullWritable> values,Context cxt)throws IOException,InterruptedException{
log.info("reducer1******* in reduce()");
for(NullWritable v:values){
log.info("key:"+key+"-->value:"+v);
cxt.write(key, v);
}
log.info("reducer1****** in reduce() *******end");
}
}
无排序mapper2package fz.secondarysort;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 不是二次排序
* @author fansy
*
*/
public class Mapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
private String COMMA =",";
public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
String [] values = value.toString().split(COMMA);
Text newKey = new Text(values[0]);
cxt.write(newKey, new IntWritable(Integer.parseInt(values[1])));
}
}
package fz.secondarysort;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 无二次排序
* @author fansy
*
*/
public class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
private Logger log = LoggerFactory.getLogger(Reducer2.class);
public void setup(Context cxt){
log.info("reducer2*********************in setup()");
}
public void reduce(Text key ,Iterable<IntWritable> values,Context cxt)throws IOException,InterruptedException{
log.info("reducer2******* in reduce()");
for(IntWritable v:values){
log.info("key:"+key+"-->value:"+v);
cxt.write(key, v);
}
log.info("reducer2****** in reduce() *******end");
}
}
package fz.secondarysort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* symbol 是原始的key
* value是原始的值
* @author fansy
*
*/
public class CustomKey implements WritableComparable<CustomKey> {
private int value;
private String symbol;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.value);
out.writeUTF(this.symbol);
}
@Override
public void readFields(DataInput in) throws IOException {
this.value=in.readInt();
this.symbol= in.readUTF();
}
@Override
public int compareTo(CustomKey o) {
int result = this.symbol.compareTo(o.symbol);
return result!=0 ? result :this.value-o.value;
}
@Override
public String toString(){
return this.symbol+"\t"+this.value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
}
自定义GroupComparatorpackage fz.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 只对比symbol,即原始的键
* @author fansy
*
*/
public class CustomGroupComparator extends WritableComparator {
protected CustomGroupComparator(){
super(CustomKey.class,true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a,WritableComparable b){
CustomKey ak = (CustomKey) a;
CustomKey bk = (CustomKey) b;
return ak.getSymbol().compareTo(bk.getSymbol());
}
}
package fz.secondarysort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 保持原始分组条件
* @author fansy
*
* @param <K1>
* @param <V1>
*/
public class CustomPartitioner<K1, V1> extends Partitioner<K1, V1> {
@Override
public int getPartition(K1 key, V1 value, int numPartitions) {
CustomKey keyK= (CustomKey) key;
Text tmpValue =new Text(keyK.getSymbol());
return (tmpValue.hashCode() & Integer.MAX_VALUE)%numPartitions;
}
}
不使用二次排序
使用二次排序,同时使用自定义组分类器:
可以看到不管二次排序和非二次排序,在Reducer端都只有三个分组;同时二次排序的其值也是排序的;
如果在二次排序中不使用组分类器,那么会得到下面的结果:
从这个结果可以看到有大于3个分组,这样的结果可能是有问题的(对于键是a的分组 整合不了数据);
同时上面使用了自定义的Partitioner,这里看不出区别是因为只有一个Reducer,如果有多个就可以看出差别。
总结:使用二次排序可以对不单单是键排序,同时可以对值进行排序,即在Reducer每个组中接收的value值是排序的,这样在某些操作中是可以增加效率的。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
hadoop编程小技巧(9)---二次排序(值排序),布布扣,bubuko.com
标签:des blog http java 使用 os io 数据
原文地址:http://blog.csdn.net/fansy1990/article/details/38302615