标签:mapper fileinput over ati throw exit public ble protected
// map的数量与数的分片有关系
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = StringUtils.split(line, " ");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
for (LongWritable l : values) {
count ++;
}
context.write(key, new LongWritable(count));
}
}
public class WCRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WCRunner.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置reduce的数量,对应的会生成设置数量的文件,每个文件的内容是根据
// job.setPartitionerClass(HashPartitioner.class);中的Partitioner确定
job.setNumReduceTasks(10);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
public class WCRunner2 extends Configured implements Tool{
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WCRunner2.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new WCRunner2(), args);
}
}
执行: hadoop jar wc.jar com.easytrack.hadoop.mr.WCRunner2 /wordcount.txt /wc/output4
标签:mapper fileinput over ati throw exit public ble protected
原文地址:http://www.cnblogs.com/heml/p/5990133.html