码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop 系列(四)自定义数据类

时间:2020-04-19 23:46:13      阅读:104      评论:0      收藏:0      [点我收藏+]

标签:jar包   exists   override   html   add   log   参数   split   date()   

一:自定义数据类

为什么需要自定义数据类

        上一篇文章里面我们自己写了一个mapreduce 里面的Top N的https://www.cnblogs.com/wuxiaolong4/p/12733518.html,我们可以从代码里面看出来,里面用|作分隔符,这种方法不好,会由于你自己的失误导致读取字段出错或者把自己搞晕了。这时候我们就需要自定义类,定义属于自己的实体类,有点像操作数据库里面的entity。

怎么定义自己的数据类

class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {

public int compareTo(MinMaxtemperature otherMinMaxtemperature) {
if(date.equals(otherMinMaxtemperature.getDate())) return 1;
else return 0;
}

public void write(DataOutput out) throws IOException {
out.writeInt(max);
out.writeInt(min);
}

public void readFields(DataInput in) throws IOException {
max = in.readInt();
min = in.readInt();
}


private String date = "";
private int min =0 ;
private int max =0 ;

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

public int getMin() {
return min;
}

public void setMin(int min) {
this.min = min;
}

public int getMax() {
return max;
}

public void setMax(int max) {
this.max = max;
}

@Override
public String toString() {
return "MinMaxtemperature{" +
"date=‘" + date + ‘\‘‘ +
", min=" + min +
", max=" + max +
‘}‘;
}
}

 

怎么使用自己的类

数据:

2020040112 1
2020040113 3
2020040114 4
2020040115 5
2020040116 6
2020040117 7
2020040118 8
2020040119 9
2020040312 1
2020040313 3
2020040314 4
2020040315 5
2020040316 6
2020040317 7
2020040318 8
2020040319 9
2020040412 1
2020040413 3
2020040414 4
2020040415 5
2020040416 6
2020040417 7
2020040418 8
2020040419 9

代码1 :输出最高温度和最低温度:

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {

private String date = "";
private int min =0 ;
private int max =0 ;

public MinMaxtemperature() {
}
public MinMaxtemperature(String date, int max, int min) {
this.date = date;
this.min = min;
this.max = max;
}

public int compareTo(MinMaxtemperature otherMinMaxtemperature) {
if(date.equals(otherMinMaxtemperature.getDate())) return 1;
else return 0;
}

public void write(DataOutput out) throws IOException {
out.writeInt(max);
out.writeInt(min);
}

public void readFields(DataInput in) throws IOException {
max = in.readInt();
min = in.readInt();
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

public int getMin() {
return min;
}

public void setMin(int min) {
this.min = min;
}

public int getMax() {
return max;
}

public void setMax(int max) {
this.max = max;
}

@Override
public String toString() {
return "MinMaxtemperature{" +
"date=‘" + date + ‘\‘‘ +
", min=" + min +
", max=" + max +
‘}‘;
}
}

class WordcountMapper extends Mapper<LongWritable, Text, Text, MinMaxtemperature> {
private MinMaxtemperature dataMap = new MinMaxtemperature();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line[] = value.toString().split("\\ ");
String date = line[0].substring(0,line[0].length()-2);
int temperature = Integer.parseInt(line[1]);

if(temperature > dataMap.getMax()){
dataMap.setMax(temperature);
}
if(temperature < dataMap.getMin()){
dataMap.setMin(temperature);
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text(dataMap.getDate()),dataMap);
}
}
class WordcountReducer extends Reducer<Text,MinMaxtemperature,Text,Text> {

private MinMaxtemperature dataMap = new MinMaxtemperature();

@Override
protected void reduce(Text key, Iterable<MinMaxtemperature> values, Context context) throws IOException, InterruptedException {
String date = key.toString();
System.out.println("reduce:"+date);
for (MinMaxtemperature value : values ) {

if(value.getMax() > dataMap.getMax()){
dataMap.setMax(value.getMax());
}
if(value.getMin() < dataMap.getMin()){
dataMap.setMin(value.getMin());
}
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text("max:min"),new Text(dataMap.getMax()+":"+dataMap.getMin()));
}
}
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs= FileSystem.get(conf);
String outputPath = "/software/java/data/output/";
if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);

Job job = Job.getInstance(conf);
job.setJarByClass(WordcountDriver.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MinMaxtemperature.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
FileOutputFormat.setOutputPath(job, new Path(outputPath));

//将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行
//job.submit();
boolean res = job.waitForCompletion(true);
}

}
代码2 分组内输出最高温度和最低温度:
 

       代码3和代码4就不写了,因为差不多。

Hadoop 系列(四)自定义数据类

标签:jar包   exists   override   html   add   log   参数   split   date()   

原文地址:https://www.cnblogs.com/wuxiaolong4/p/12735016.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!