标签:
小知识点:




package cn.itcast.hadoop.mr.flowsort;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.itcast.hadoop.mr.flowsum.FlowBean;public class SortMR {public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{//拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");String phoneNB = fields[0];long u_flow = Long.parseLong(fields[1]);long d_flow = Long.parseLong(fields[2]);context.write(new FlowBean(phoneNB, u_flow, d_flow), NullWritable.get());}}public static class SortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{@Overrideprotected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)throws IOException, InterruptedException {String phoneNB = key.getPhoneNB();context.write(new Text(phoneNB), key);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);// main方法所在的类,此处表示自身的类job.setJarByClass(SortMR.class);//会代表map,reduce的output,如果不一样可以申明mapoutput类型,像下面的一样job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);// mapoutput类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//这两个参数正好是 hadoop jar 。。 最后两个参数FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//标准输出System.exit(job.waitForCompletion(true)?0:1);}}

package cn.itcast.hadoop.mr.areapartition;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.metrics2.impl.ConfigBuilder;import cn.itcast.hadoop.mr.flowsum.FlowBean;/*** 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件* 需要自定义改造两个机制* 1,改造分区的逻辑,自定义一个partitioneer* 2,自定义reduer task的并发任务数*/public class FlowSumArea {public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {//拿一行数据String line = value.toString();//切分成各个字段String[] fields = StringUtils.split(line,"\t");//拿到我们的字段String phoneNB = fields[1];long u_flow = Long.parseLong(fields[7]);long d_flow = Long.parseLong(fields[8]);//封装数据为kv并输出context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));}}public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{@Overrideprotected void reduce(Text key, Iterable<FlowBean> values,Context context)throws IOException, InterruptedException {long up_flow_counter = 0;long d_flow_counter = 0;for (FlowBean bean : values) {up_flow_counter +=bean.getUp_flow();d_flow_counter += bean.getD_flow();}context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumArea.class);//job.setMapperClass(FlowSumAreaMapper.class);job.setMapperClass(FlowSumAreaMapper.class);job.setReducerClass(FlowSumAreaReducer.class);//设置我们自定义的分组逻辑定义job.setPartitionerClass(AreaPartitioner.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置reduce的任务并发数,应该跟分组的数量保持一致job.setNumReduceTasks(6);//进程数如果大了,后面的文件为空,小了会出现错误,为1则没有分组FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true)?0:1);}}
package cn.itcast.hadoop.mr.areapartition;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {private static HashMap<String,Integer> areaMap = new HashMap<>();static{areaMap.put("135", 0);areaMap.put("136", 1);areaMap.put("137", 2);areaMap.put("138", 3);areaMap.put("139", 4);}@Overridepublic int getPartition(KEY key, VALUE value, int numPartitions) {//从key中拿到手机号,查询手机归属地字典,不同省份返回不同的组号int areaCoder = areaMap.get(key.toString().substring(0,3))==null?5:areaMap.get(key.toString().substring(0,3));return areaCoder;}}

流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计
标签:
原文地址:http://www.cnblogs.com/xiaoxiao5ya/p/c23cd7c85104ae4bc5875c798d81fb2e.html