本篇介绍MapReduce的一些高级特性,如计数器、数据集的排序和连接。计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的“”连接(join)操作。
计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器还可用于辅助诊断系统故障。对于大型分布式系统来说,获取计数器比分析日志文件容易的多。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//统计最高气温的作业,也统计气温值缺少的记录,不规范的记录
public class MaxTemperatureWithCounters extends Configured implements Tool {
	enum Temperature {
		MiSSING, MALFORMED
	}
	static class MaxTemeratureMapperWithCounters extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, IntWritable> {
		private NcdcRecordParser parser = new NcdcRecordParser();
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			parser.parse(value);
			if (parser.isValidTemperature()) {
				int airTemperature = parser.getAirTemperature();
				output.collect(new Text(parser.getYear()), new IntWritable(
						airTemperature));
			} else if (parser.isMa1formedTemperature()) {
				reporter.incrCounter(Temperature.MALFORMED, 1);
			} else if (parser.IsMissingTemperature()) {
				reporter.incrCounter(Temperature.MALFORMED, 1);
			}
		}
	}
	static class MaxTemperatureReduceWithCounters extends MapReduceBase implements
			Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int maxValue = Integer.MIN_VALUE;
			while (values.hasNext()) {
				maxValue = Math.max(maxValue, values.next().get());
			}
			output.collect(key, new IntWritable(maxValue));
		}
	}
	@Override
	public int run(String[] args) throws Exception {
		args = new String[] { "/test/input/t", "/test/output/t" }; // 给定输入输出路径
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setMapperClass(MaxTemeratureMapperWithCounters.class);
		conf.setCombinerClass(MaxTemperatureReduceWithCounters.class);
		conf.setReducerClass(MaxTemperatureReduceWithCounters.class);
		JobClient.runJob(conf);
		return 0;
	}
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
		System.exit(exitCode);
	}
}
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
//统计气温缺失记录所占比例
public class MissingTemperatureFields extends Configured implements Tool {
	@Override
	public int run(String[] args) throws Exception {
		String jobID = args[0];
		JobClient jobClient = new JobClient(new JobConf(getConf()));
		RunningJob job = jobClient.getJob(JobID.forName(jobID));
		if (job == null) {
			System.err.printf("No job with ID %s found.\n", jobID);
			return -1;
		}
		if (!job.isComplete()) {
			System.err.printf("Job %s is not complete.\n", jobID);
			return -1;
		}
		Counters counters = job.getCounters();
		long missing = counters
				.getCounter(MaxTemperatureWithCounters.Temperature.MiSSING);
		long total = counters.findCounter(
				"org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS")
				.getCounter();
		System.out.printf("Records with missing temperature fields:%.2f%%\n",
				100.0 * missing / total);
		return 0;
	}
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);
		System.exit(exitCode);
	}
}
排序是MapReduce的核心技术。尽管应用本身可能并不需要对数据排序,但仍可能使用MapReduce的排序功能来组织数据。下面将讨论几种不同的数据集排序方法,以及如何控制MapReduce的排序。
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class SortDataPreprocessor extends Configured implements Tool {
	static class CleanerMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, IntWritable, Text> {
		private NcdcRecordParser parser = new NcdcRecordParser();
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<IntWritable, Text> output, Reporter reporter)
				throws IOException {
			parser.parse(value);
			if (parser.isValidTemperature()) {
				output.collect(new IntWritable(parser.getAirTemperature()),
						value);
			}
		}
	}
	@Override
	public int run(String[] args) throws Exception {
		args = new String[] { "/test/input/t", "/test/input/seq" };
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setMapperClass(CleanerMapper.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(Text.class);
		conf.setNumReduceTasks(0);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		SequenceFileOutputFormat.setCompressOutput(conf, true);
		SequenceFileOutputFormat
				.setOutputCompressorClass(conf, GzipCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(conf,
				CompressionType.BLOCK);
		JobClient.runJob(conf);
		return 0;
	}
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
		System.exit(exitCode);
	}
}
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class SortByTemperatureUsingHashPartitioner extends Configured implements
		Tool {
	@Override
	public int run(String[] args) throws Exception {
		args = new String[] { "/test/input/seq", "/test/output/t" };
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setInputFormat(SequenceFileInputFormat.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		conf.setNumReduceTasks(5);//设置5个reduce任务,输出5个文件
		SequenceFileOutputFormat.setCompressOutput(conf, true);
		SequenceFileOutputFormat
				.setOutputCompressorClass(conf, GzipCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(conf,
				CompressionType.BLOCK);
		JobClient.runJob(conf);
		return 0;
	}
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(
				new SortByTemperatureUsingHashPartitioner(), args);
		System.exit(exitCode);
	}
}
产生多个已经排好序的小文件。
MapReduce能够执行大型数据集间的“”连接(join)操作,但是从头编写相关代码来执行连接比较麻烦。也可以考虑使用一个更高级的框架,如Pig、Hive或Casading等,它们都将连接操作视为整个实现的核心部分。
其他章节也可能用到:)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
public class JobBuilder {
	public static JobConf parseInputAndOutput(Tool tool, Configuration conf,
			String[] args) {
		if (args.length != 2) {
			printUsage(tool, "<input><output>");
			return null;
		}
		JobConf jobConf = new JobConf(conf, tool.getClass());
		FileInputFormat.addInputPath(jobConf, new Path(args[0]));
		FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
		return jobConf;
	}
	public static void printUsage(Tool tool, String extraArgsUsage) {
		System.err.printf("Usage:%s [genericOptions] %s\n\n", tool.getClass()
				.getSimpleName(), extraArgsUsage);
	}
}
import org.apache.hadoop.io.Text;
public class NcdcRecordParser {
	private static final int MISSING_TEMPERATURE = 9999;
	private String year;
	private int airTemperature;
	private String quality;
	public void parse(String record) {
		year = record.substring(15, 19);
		String airTemperatureString;
		// Remove leading plus sign as parseInt doesn‘t like them
		if (record.charAt(87) == ‘+‘) {
			airTemperatureString = record.substring(88, 92);
		} else {
			airTemperatureString = record.substring(87, 92);
		}
		airTemperature = Integer.parseInt(airTemperatureString);
		quality = record.substring(92, 93);
	}
	public void parse(Text record) {
		parse(record.toString());
	}
	public boolean isValidTemperature() {
		return airTemperature != MISSING_TEMPERATURE
				&& quality.matches("[01459]");
	}
	public boolean isMa1formedTemperature() {
		return !quality.matches("[01459]");
	}
	public boolean IsMissingTemperature() {
		return airTemperature == MISSING_TEMPERATURE;
	}
	public String getYear() {
		return year;
	}
	public int getAirTemperature() {
		return airTemperature;
	}
}
MapReduce编程实战之“高级特性”,布布扣,bubuko.com
原文地址:http://blog.csdn.net/puma_dong/article/details/24440199