标签:code org success ack 序列 ted rom info pareto
排序操作在hadoop中属于默认的行为。默认按照字典殊勋排序。
1)部分排序
2)全排序
3)辅助排序
4)二次排序
数据预览,这里只是进行了流量的汇总,没有进行分区和排序

package it.dawn.YARNPra.基本用法.排序;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* @author Dawn
* @date 2019年5月7日09:04:04
* @version 1.0
* 直接继承 WritableComparable,
*/
public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
private long dfFlow;
private long flowSum;
//无参构造
public FlowBean() {}
//有参构造
public FlowBean(long upFlow,long dfFlow) {
this.upFlow=upFlow;
this.dfFlow=dfFlow;
this.flowSum=upFlow+dfFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDfFlow() {
return dfFlow;
}
public void setDfFlow(long dfFlow) {
this.dfFlow = dfFlow;
}
public long getFlowSum() {
return flowSum;
}
public void setFlowSum(long flowSum) {
this.flowSum = flowSum;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowSum);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
upFlow=in.readLong();
dfFlow=in.readLong();
flowSum=in.readLong();
}
@Override
public String toString() {
return upFlow+"\t"+dfFlow+"\t"+flowSum;
}
//排序
@Override
public int compareTo(FlowBean o) {
//倒序
return this.flowSum>o.getFlowSum()? -1 : 1;
}
}
package it.dawn.YARNPra.基本用法.排序;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author Dawn
* @date 2019年5月7日09:24:06
* @version 1.0
*
* 输入?
* 13480253104 120 1320 1440
* 输出?
* <key2 , v2>
* <流量上行+\t+流量下行,手机号码>
*/
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//1:读数据
String line=value.toString();
//2:切割
String[] fields=line.split("\t");
//3:取出指定字段
long upFlow=Long.parseLong(fields[1]);
long dfFlow=Long.parseLong(fields[2]);
//4:输出到reduce阶段
context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0]));
}
}
package it.dawn.YARNPra.基本用法.排序;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
@Override
protected void reduce(FlowBean k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
//直接输出
context.write(v3.iterator().next(), k3);
}
}
package it.dawn.YARNPra.基本用法.排序;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowSortPartitioner extends Partitioner<FlowBean, Text>{
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
//1: 获取手机前3个数字
String phoneThree=value.toString().substring(0, 3);
//2:定义分区号
int partitioner=4;
if("135".equals(phoneThree)) {
return 0;
}else if("137".equals(phoneThree)) {
return 1;
}else if("138".equals(phoneThree)) {
return 2;
}else if("139".equals(phoneThree)) {
return 3;
}
return partitioner;
}
}
package it.dawn.YARNPra.基本用法.排序;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author Dawn
* @date 2019年5月7日09:22:12
* @version 1.0
* 需求?
* 将数据进行分区,并在每个分区中进行排序
*/
public class FlowSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1:添加配置
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
//2:设置主类
job.setJarByClass(FlowSortDriver.class);
//3:设置Mapper和Reduce类
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);
//4:设置Map输出类
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5:设置Reduce输出类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//添加自定义分区
job.setPartitionerClass(FlowSortPartitioner.class);
job.setNumReduceTasks(5);
//6:设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("f:/temp/流量统计结果/out1/part-r-00000"));
FileOutputFormat.setOutputPath(job, new Path("f:/temp/流量统计结果/out2"));
//7提交任务
boolean rs=job.waitForCompletion(true);
System.out.println(rs ? "success" : "fail");
}
}


标签:code org success ack 序列 ted rom info pareto
原文地址:https://www.cnblogs.com/hidamowang/p/10828686.html