标签:
其实这个例子都是书上的,我也只是拿过来理解学习下。
WordCount是Hadoop中的Hello, world,这是我听得最多的一个表述。
下面是WordCount.java的源码
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/* 这个类实现Mapper接口中的map方法,
* 输入参数中的value是文本文件中的一行,
* 利用StringTokenizer将这个字符串拆成单词,
* 然后将输出结果<单词,1>
* 写入到org.apache.hadoop.mapred.OutputCollector中
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/* 代码中LongWritable, IntWritable, Text
* 均是Hadoop中实现的用于封装Java数据类型的类,
* 这些类都能够被串行化从而便于在分布式环境中进行数据交换,
* 可以将它们分别视为long, int, String的替代品
*/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
// 将每一行变为字符串, 并进行分析, 最后变为一个Iterator
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/* 这个类实现Reducer接口中的reduce方法, 输入参数中的key, values是由Map任务输出的
* 中间结果, values是一个Iterator
*/
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
/* 遍历这个Iterator, 就可以得到属于同一个key的所有的values.
* 此处, key是一个单词, values是词频
*/
// 依次获得每个词的词频
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// 这个配置从何而来, 往哪里去呢
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
// 新建一个job, 并读取配置文件conf, 不知道是不是读取安装目录下的配置文件
Job job = new Job(conf, "word count");
// 下面这几行都是设置编译好的类
job.setJarByClass(WordCount.class);
// 实现map函数, 完成输入的<key, value>对到中间结果的映射
job.setMapperClass(TokenizerMapper.class);
// 实现combine函数, 将中间结果的重复key进行合并
job.setCombinerClass(IntSumReducer.class);
// 实现reduce函数, 对中间结果进行合并, 形成最终结果
job.setReducerClass(IntSumReducer.class);
// 输出的最终结果中key的类型
job.setOutputKeyClass(Text.class);
// 输出的最终结果中value的类型
job.setOutputValueClass(IntWritable.class);
// 设定job的输入目录, job运行时会处理输入目录下的所有文件
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 设定job的输出目录, job的最终结果会写入输出目录下
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}由于要编译执行,用的是hadoop命令,就来看看hadoop吧
HADOOP_HOME/bin/hadoop
有这样三段话
# part 1
# 设置java命令的路径所在
JAVA=$JAVA_HOME/bin/java
# part 2
# 假如我们在hadoop后面接的是jar, 则会进行一系列设置
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
# 后面这一大段是处在文件的最后
# 主要分成安全模式和非安全模式下
# 安全模式下会执行一些设置, 才会运行
# 非安全模式下直接运行
# 说到底最后还是都是在java虚拟机上运行的
# part 3
# Check to see if we should start a secure datanode
if [ "$starting_secure_dn" = "true" ]; then
if [ "$HADOOP_PID_DIR" = "" ]; then
HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid"
else
HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
fi
if [[ $JSVC_HOME ]]; then
JSVC="$JSVC_HOME/jsvc"
else
if [ "$JAVA_PLATFORM" = "Linux-amd64-64" ]; then
JSVC_ARCH="amd64"
else
JSVC_ARCH="i386"
fi
JSVC="$HADOOP_HOME/libexec/jsvc.${JSVC_ARCH}"
fi
if [[ ! $JSVC_OUTFILE ]]; then
JSVC_OUTFILE="$HADOOP_LOG_DIR/jsvc.out"
fi
if [[ ! $JSVC_ERRFILE ]]; then
JSVC_ERRFILE="$HADOOP_LOG_DIR/jsvc.err"
fi
exec "$JSVC" -Dproc_$COMMAND -outfile "$JSVC_OUTFILE" -errfile "$JSVC_ERRFILE" -pidfile "$HADOOP_SECURE_DN_PID" -nodetach -user "$HADOOP_SECURE_DN_USER" -cp "$CLASSPATH" $JAVA_HEAP_MAX $HADOOP_OPTS org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter "$@"
else
# run it
exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"
fi最后这个java执行的命令好长一串啊, 不过分析下觉得
前面的-Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS感觉是对java虚拟机的设置
后面的-classpath则是执行需要那些jar包
"$CLASSPATH" 是根据命令设置的一些jar所在地
$CLASS 如果针对于hadoop运行MapReduce这里的就是org.apache.hadoop.util.RunJar这个jar包
总的过程包括有:
Map类的实现;
Reduce类的实现;
Job的创建以及设置;
运行Job。
标签:
原文地址:http://www.cnblogs.com/tuhooo/p/5486933.html