标签:tab context vat [1] employee config 员工 form 公积金
1、Mapper类package cn.sjq.bigdata.mr.salary;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 求部门员工薪资总和,部门平均薪资 Map类
* 数据格式:
* SEQ EMPLOYEE_NAME COMPANY_NAME DEPT_NAME SUB_TOTAL COM_FUND_PAYAMT
1 张三 xx有限公司 客户服务中心 5887 798
* @author songjq
*
*/
public class DeptSalaryMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text tkey = new Text();
private Text tvalue = new Text();
/*
* 实现Mapper类map方法
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取一行
/ 1 张三 xx有限公司 客户服务中心 5887 798
String line = value.toString();
//分词操作
String[] fileds = StringUtils.split(line, ",");
//将数据通过context传送到reduce
//设置部门作为key
tkey.set(fileds[3]);
//设置薪资及公积金作为value
tvalue.set(fileds[4]+","+fileds[5]);
context.write(tkey, tvalue);
}
}
2、Reducer类
package cn.sjq.bigdata.mr.salary;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 求部门员工薪资总和,部门平均薪资 Reduce类
* @author songjq
*
*/
public class DeptSalaryReducer extends Reducer<Text, Text, Text, Text> {
private Text tvalue = new Text();
/*
* 实现reducer类的reduce方法
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context ctx)
throws IOException, InterruptedException {
//部门总薪资
float total_sal = 0;
//部门平均薪资
float avg_sal = 0;
//部门总公积金
float tatal_fond = 0;
//部门平均公积金
float avg_fond = 0;
int count = 0;
/*
* 迭代处理values的集合
*/
for(Text v:values) {
//薪资及公积金 [5887,798]
String[] salAry = StringUtils.split(v.toString(), ",");
total_sal+=new Float(salAry[0]).floatValue();
tatal_fond+=new Float(salAry[1]).floatValue();
count++;
}
//求平均工资
avg_sal = total_sal/count;
//求平均公积金
avg_fond = tatal_fond/count;
tvalue.set(","+total_sal+","+avg_sal+","+tatal_fond+","+avg_fond);
ctx.write(key, tvalue);
}
}
3、提交Job主类
package cn.sjq.bigdata.mr.salary;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 求部门员工薪资总和,部门平均薪资 Main方法,提交job
* @author songjq
*
*/
public class DeptSalaryMain {
public static void main(String[] args) throws Exception {
//获取一个job实例
Job job = Job.getInstance(new Configuration());
//设置Job入口
job.setJarByClass(DeptSalaryMain.class);
//设置job Mapper类
job.setMapperClass(DeptSalaryMapper.class);
//设置job Reducer类
job.setReducerClass(DeptSalaryReducer.class);
/*
* 设置输出数据类型
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\test\\salary\\salary.csv"));
FileOutputFormat.setOutputPath(job, new Path("D:\\test\\salary\\output"));
//提交job
job.waitForCompletion(true);
}
}
2018-07-25期 MapReduce求部门工资总和及平均工资
标签:tab context vat [1] employee config 员工 form 公积金
原文地址:http://blog.51cto.com/2951890/2150253