一.准备工作
1.1 搭建hadoop分布式系统,博主是用3台虚拟机搭建的一个简易hadoop分布式系统。
linux 5.5 64位 hadoop2.6.0
192.168.19.201 h1 (master)
192.168.19.202 h2 (slaver1)
192.168.19.203 h3 (slaver2)
1.2 准备网站访问IP文件
由于是实验,一个简单的txt文件即可
如:vim a.txt
10.0.0.1
10.0.0.2
10.0.0.3
10.0.0.2
10.0.0.5
10.0.0.1
10.0.0.5
10.0.0.1
将数据放到hdfs中
hadoop fs -put a.txt /user
1.3 准备JAVA编译器,导入需要的hadoopJAR包,不赘述
二.mepreduce
2.1 首先需要理解IP,PV,UV的关系
简单来说,PV就是网站点击率,相同IP点击10次,计算数为10;
UV可以理解为访问客户,同样的IP一天内无论登陆多少次,进计算1次。
2.2 java代码
package com.mapreduce.pvuv;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class IpUv {
public static class Map1 extends Mapper<LongWritable, Text, Text, Text>{
public static Text line=new Text();
@Override
public void map(LongWritable longWritable,Text text,Context context) throws IOException, InterruptedException{ //Mapper中的Map函数实现
line=text;
context.write(line, new Text("1"));
}
}
public static class Reduce1 extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text text,Iterable<Text> values,Context context) throws IOException, InterruptedException{
context.write(text,new Text("1"));
}
}
public static class Map2 extends Mapper<LongWritable, Text, Text, Text>{
public static Text line=new Text();
@Override
public void map(LongWritable longWritable,Text text,Context context) throws IOException, InterruptedException{ //Mapper中的Map函数实现
line=text;
context.write(new Text("uv"), new Text("1"));
}
}
public static class Reduce2 extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text text,Iterable<Text> values,Context context) throws IOException, InterruptedException{
long sum=0;
for(Text val:values){
sum++;
}
context.write(text,new Text(String.valueOf(sum)));
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
if(args.length < 2){
System.out.println("args not right!");
return ;
}
Configuration conf=new Configuration();
conf.set("mapred.job.tracker", "192.168.19.201:9001");
String inputDir = args[0];
Path outputDir =new Path(args[1]);
Job job1 = new Job(conf, "ipuv1");
job1.setJarByClass(IpUv.class);
job1.setMapperClass(Map1.class);
job1.setReducerClass(Reduce1.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job1,inputDir);
FileOutputFormat.setOutputPath(job1, outputDir);
boolean flag = job1.waitForCompletion(true);
if(flag){
Job job2 = new Job(conf, "ipuv2");
job2.setJarByClass(IpUv.class);
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2,outputDir);
FileOutputFormat.setOutputPath(job2,new Path(outputDir + "-2"));
job2.waitForCompletion(true);
}
}
}hadoop计算pv、uv的方法不止一种,这里博主用的是最简单易懂的方法
2.3 运行
打包成jar包,放入主机Linux任意目录下
hadoopjar /home/hadoop/ipuv.jar com.mapreduce.pvuv.IpUv /user /output
查看运行效果
hadoop dfs -cat /output-2/part-r-00000
uv 4
hadoop分布式系统下的mapreduce java小程序计算网站uv
原文地址:http://10425580.blog.51cto.com/10415580/1680356