码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce-倒排索引

时间:2016-04-16 22:56:37      阅读:277      评论:0      收藏:0      [点我收藏+]

标签:

环境:
  Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境

  数据:任意数量、格式的文本文件(我用的四个.java代码文件)

方案目标:

  根据提供的文本文件,提取出每个单词在哪个文件中出现了几次,组成倒排索引,格式如下

  Ant FaultyWordCount.java : 1 , WordCount.java : 1 

思路:

  因为这个程序需要用到三个变量:单词、文件名、出现的频率,因此需要自定义Writable类,以单词为key,将文件名和出现的频率打包。

  1.先将每行文本的单词进行分割,以K/V=Word/Filename:1的格式分割。

  2.利用Combiner类,将本Map一个文件的先进行一次计数,减少传输量

  3.在Reduce中对Combiner中传输过来的同一个单词的在不同文件出现的频率数据进行组合。

难点:这个程序主要是用到了一个Combiner和自定义了Writable类。在实现的时候,需要注意的是Writable必须默认无参构造函数。

主调用Main类:

技术分享
package ren.snail;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Main extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        int result = ToolRunner.run(new Configuration(), new Main(), args);
        System.exit(result);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
        Configuration configuration = getConf();
        Job job = new Job(configuration, "InvertIndex");
        job.setJarByClass(Main.class);
        FileInputFormat.addInputPath(job, new Path(arg0[0]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

        job.setMapperClass(InvertMapper.class);
        job.setCombinerClass(Combinner.class);  //设置Combiner类
        job.setReducerClass(InvertReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FileFreqWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

}
View Code

自定义Writbale类

技术分享
package ren.snail;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class FileFreqWritable implements Writable {
    Text documentID;
    IntWritable fequence;

    public FileFreqWritable()    //必须提供无参构造函数
    {
        this.documentID = new Text();
        this.fequence = new IntWritable();
    }
    public FileFreqWritable(Text id,IntWritable feq) {
        // TODO Auto-generated constructor stub
        this.documentID=id;
        this.fequence =feq;
    }

    public void set(String id,int feq)
    {
        this.documentID.set(id);
        this.fequence.set(feq);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        documentID.readFields(in);
        fequence.readFields(in);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        documentID.write(out);
        fequence.write(out);
    }

    public Text getDocumentID() {
        return documentID;
    }

    public String toString()
    {
        return documentID.toString()+" : "+fequence.get();
    }
    public IntWritable getFequence() {
        return fequence;
    }

     

}
View Code

Map

技术分享
package ren.snail;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertMapper extends Mapper<LongWritable, Text, Text, FileFreqWritable>{
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
    {
        String data = value.toString().replaceAll("[^a-zA-Z0-9]+", " ");   //将不需要的其他字符都设为空
        String[] values = data.split(" ");
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String filename = fileSplit.getPath().getName();
        for (String temp : values) {
            FileFreqWritable obj = new FileFreqWritable(new Text(filename),new IntWritable(1));
            context.write(new Text(temp), obj);
        }
        
    }
}
View Code

 

Combiner

技术分享
package ren.snail;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Combinner extends Reducer<Text, FileFreqWritable, Text, FileFreqWritable>{
    public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException
    {
        int count = 0 ;
        String id = "";
        for (FileFreqWritable temp : values) {
            count++;
            if(count == 1)
            {
                id=temp.getDocumentID().toString();
            }
        }
        context.write(key,new FileFreqWritable(new Text(id), new IntWritable(count)));
    }
}
View Code

 

Reduce

技术分享
package ren.snail;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertReducer extends Reducer<Text, FileFreqWritable, Text, Text> {

    public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException {
        StringBuilder value = new StringBuilder();
        for (FileFreqWritable fileFreqWritable : values) {
            String temp = fileFreqWritable.toString();
            value.append(temp+" , ");
        }
        context.write(key,new Text(value.toString()));
    }
}
View Code

其实我的Reduce实现思路可能有点问题,不过大致是这样

MapReduce-倒排索引

标签:

原文地址:http://www.cnblogs.com/ren-jie/p/5399447.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!