标签:sum java string map job for config [] sys
package com.asin.hdp.inverted;
import java.io.IOException;   
 import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;   
 import org.apache.hadoop.fs.Path;    
 import org.apache.hadoop.io.IntWritable;    
 import org.apache.hadoop.io.LongWritable;    
 import org.apache.hadoop.io.Text;    
 import org.apache.hadoop.mapreduce.InputSplit;    
 import org.apache.hadoop.mapreduce.Job;    
 import org.apache.hadoop.mapreduce.Mapper;    
 import org.apache.hadoop.mapreduce.Reducer;    
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;    
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;    
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndexCombine {
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();   
        Job job = Job.getInstance(conf);    
        job.setJarByClass(InvertedIndexCombine.class);
        job.setMapperClass(invertedMapper.class);   
         job.setCombinerClass(invertedCombine.class);    
        job.setReducerClass(invertedReduce.class);
        job.setOutputKeyClass(Text.class);   
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("e:/a.txt"));   
         FileInputFormat.addInputPath(job, new Path("e:/b.txt"));    
        FileInputFormat.addInputPath(job, new Path("e:/c.txt"));    
        FileOutputFormat.setOutputPath(job, new Path("e:/outputCombine"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class invertedMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override   
         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)    
                throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();   
            Path path = split.getPath();    
            String name = path.getName().replace("e:/", "");
            StringTokenizer token = new StringTokenizer(value.toString(), " ");   
            while (token.hasMoreTokens()) {
                context.write(new Text(name + "\t" + token.nextToken()), new Text("1"));   
            }
        }   
    }
public static class invertedCombine extends Reducer<Text, Text, Text, Text> {
        @Override   
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)    
                throws IOException, InterruptedException {
            String line = key.toString();   
            String[] split = line.split("\t");
            int sum = 0;   
            for (Text text : values) {    
                 sum += Integer.parseInt(text.toString());    
             }    
            context.write(new Text(split[1]), new Text(split[0] + ":" + sum));
        }   
    }
    public static class invertedReduce extends Reducer<Text, Text, Text, Text> {   
        @Override    
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)    
                 throws IOException, InterruptedException {
            String val = "";   
            for (Text text : values) {    
                 val += text + "\t";    
            }
            context.write(new Text(key), new Text(val));   
        }    
    }
}
标签:sum java string map job for config [] sys
原文地址:http://www.cnblogs.com/datadev/p/7052895.html