码迷,mamicode.com
首页 > 其他好文 > 详细

【Hadoop基础教程】7、Hadoop之一对多关联查询

时间:2015-03-05 19:30:56      阅读:268      评论:0      收藏:0      [点我收藏+]

标签:hadoop   教程   大数据   mapreduce   

我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD、Beijing Red Star)的关联信息。

开发环境


硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点)
软件环境:Java 1.7.0_45、hadoop-1.2.1

1、 Map过程


首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容并存入< key,value>例如<0,” 1:Beijing”>。Map过程首先按照输入文件的类型不同对输入信息进行不同的处理,例如,对于address类型输入文件将value值(”1:Beijing”)处理成<”1”,”address:Beijing”>,对于company类型输入文件将value值(”Beijing Red Star:1”)处理成<”1”,”company:Beijing Red Star”>,如图所示:

技术分享

Map端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        Text addressId = new Text();
        Text info = new Text();
        String[] line = value.toString().split(":");// 获取文件的每一行数据,并以":"分割
        String path = ((FileSplit) context.getInputSplit()).getPath().toString();
        if (line.length < 2) {
            return;
        }
        if (path.indexOf("company") >= 0) {//处理company文件的value信息: "Beijing Red Star:1"
            addressId.set(line[1]);//"1"
            info.set("company" + ":" + line[0]);//"company:Beijing Red Star"
            context.write(addressId,info);//<key,value> --<"1","company:Beijing Red Star">
        } else if (path.indexOf("address") >= 0) {//处理adress文件的value信息:"1:Beijing"
            addressId.set(line[0]);//"1"
            info.set("address" + ":" + line[1]);//"address:Beijing"
            context.write(addressId,info);//<key,value> --<"1","address:Beijing">
        }
    }
}

2、 Reduce过程


Reduce过程首先对输入< key,values>即<”1”,[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]>的values值进行遍历获取到单元信息value(例如”company:Beijing Red Star”),然后根据value中的标识符(company和address)将公司名和地址名分别存入到company集合和address集合,最后对company集合和address集合进行笛卡尔积运算得到company与address的关系,并进行输出,如图所示。

技术分享

Reduce端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class ReduceClass extends Reducer<Text, Text, Text, Text>{

    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)
            throws IOException, InterruptedException {
        List<String> companys = new ArrayList<String>();
        List<String> addresses = new ArrayList<String>();
//["company:Beijing Red Star","company:Beijing JD","address:Beijing"]
        Iterator<Text> it = values.iterator();
        while(it.hasNext()){
            String value = it.next().toString();//"company:Beijing Red Star"
            String[] result = value.split(":");
            if(result.length >= 2){
                if(result[0].equals("company")){
                    companys.add(result[1]);
                }else if(result[0].equals("address")){
                    addresses.add(result[1]);
                }
            }
        }
        // 求笛卡尔积
        if(0 != companys.size()&& 0 != addresses.size()){
            for(int i=0;i<companys.size();i++){
                for(int j=0;j<addresses.size();j++){
                    context.write(new Text(companys.get(i)), new Text(addresses.get(j)));//<key,value>--<"Beijing JD","Beijing">
                }
            }
        }
    }
}

3、 驱动实现


驱动核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Usage: company Join address <companyTableDir> <addressTableDir> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "company Join address");
        //设置Job入口类
        job.setJarByClass(CompanyJoinAddress.class);
        // 设置Map和Reduce处理类
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//companyTableDir
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//addressTableDir
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//out
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

4、部署运行


1)启动Hadoop集群

[hadoop@K-Master ~]$ start-dfs.sh
[hadoop@K-Master ~]$ start-mapred.sh
[hadoop@K-Master ~]$ jps
5283 SecondaryNameNode
5445 JobTracker
5578 Jps
5109 NameNode

2)部署源码

#设置工作环境
[hadoop@K-Master ~]$ mkdir -p /usr/hadoop/workspace/MapReduce
#部署源码
将CompanyJoinAddress文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;

… 你可以直接 下载 CompanyJoinAddress

3)编译文件

#切换工作目录
[hadoop@K-Master ~]$ cd /usr/hadoop/workspace/MapReduce/CompanyJoinAddress
#编译文件
[hadoop@K-Master CompanyJoinAddress]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar -d bin src/com/zonesion/tablejoin/CompanyJoinAddress.java 
[hadoop@K-Master CompanyJoinAddress]$ ls bin/com/zonesion/tablejoin/* -la
-rw-rw-r-- 1 hadoop hadoop 1909 8月   1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress.class
-rw-rw-r-- 1 hadoop hadoop 2199 8月  1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class
-rw-rw-r-- 1 hadoop hadoop 2242 8月   1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class

4)打包jar文件

[hadoop@K-Master CompanyJoinAddress]$ jar -cvf CompanyJoinAddress.jar -C bin/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class(in = 2273) (out= 951)(deflated 58%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class(in = 2242) (out= 1029)(deflated 54%)
adding: com/zonesion/tablejoin/CompanyJoinAddress.class(in = 1909) (out= 983)(deflated 48%)

5)上传输入文件

#创建company输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/company/
#创建address输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/address/
#上传文件到company输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -put input/company* CompanyJoinAddress/input/company/
#上传文件到address输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -put input/address* CompanyJoinAddress/input/address/

6)运行Jar文件

[hadoop@K-Master CompanyJoinAddress]$ hadoop jar CompanyJoinAddress.jar com.zonesion.tablejoin.CompanyJoinAddress CompanyJoinAddress/input/company/  CompanyJoinAddress/input/address/ CompanyJoinAddress/output
14/08/01 10:50:05 INFO input.FileInputFormat: Total input paths to process : 4
14/08/01 10:50:05 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/08/01 10:50:05 WARN snappy.LoadSnappy: Snappy native library not loaded
14/08/01 10:50:05 INFO mapred.JobClient: Running job: job_201408010921_0008
14/08/01 10:50:06 INFO mapred.JobClient:  map 0% reduce 0%
14/08/01 10:50:09 INFO mapred.JobClient:  map 50% reduce 0%
14/08/01 10:50:10 INFO mapred.JobClient:  map 100% reduce 0%
14/08/01 10:50:17 INFO mapred.JobClient:  map 100% reduce 100%
14/08/01 10:50:17 INFO mapred.JobClient: Job complete: job_201408010921_0008
14/08/01 10:50:17 INFO mapred.JobClient: Counters: 29
......

7)查看输出结果

[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -ls CompanyJoinAddress/output
Found 3 items
-rw-r--r--   1 hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_SUCCESS
drwxr-xr-x   - hadoop supergroup  0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_logs
-rw-r--r-- 1 hadoop supergroup 241 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/part-r-00000
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -cat CompanyJoinAddress/output/part-r-00000
Beijing Red Star        Beijing
Beijing Rising          Beijing
Back of Beijing     Beijing
Beijing JD          Beijing
xiaomi              Beijing
Guangzhou Honda     Guangzhou
Guangzhou Development Bank  Guangzhou
Shenzhen Thunder        Shenzhen
Tencent             Shenzhen
aiplay              hangzhou
huawei              wuhan

您可能喜欢

【Hadoop基础教程】5、Hadoop之单词计数
【Hadoop基础教程】6、Hadoop之单表关联查询
【Hadoop基础教程】7、Hadoop之一对多关联查询
【Hadoop基础教程】8、Hadoop之一对多关联查询
【Hadoop基础教程】9、Hadoop之倒排索引

【Hadoop基础教程】7、Hadoop之一对多关联查询

标签:hadoop   教程   大数据   mapreduce   

原文地址:http://blog.csdn.net/andie_guo/article/details/44086183

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!