码迷,mamicode.com
首页 > 其他好文 > 详细

HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑

时间:2015-08-04 19:25:53      阅读:121      评论:0      收藏:0      [点我收藏+]

标签:那伊抹微笑   hbase   mapreduce   妳那伊抹微笑   

博文作者:那伊抹微笑
csdn 博客地址:http://blog.csdn.net/u012185296
itdog8 地址链接 : http://www.itdog8.com/thread-203-1-1.html
博文标题:HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑
个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307  技术分享(期待与你一起学习,共同进步)



1 官网代码
下面是使用HBase 作为源的MapReduce读取示例。特别是仅有Mapper实例,没有Reducer。Mapper什么也不产生。
如下所示...
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don‘t set to true for MR jobs
// set other scan attrs
...

TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t emitting anything from mapper

boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
...mapper需要继承于TableMapper...
public class MyMapper extends TableMapper<Text, LongWritable> {
public void map(ImmutableBytesWritable row, Result value, Context context) 
throws InterruptedException, IOException {
// process data for the row from the Result instance.


2 我的参考代码
package com.itdog8.cloud.hbase.mr.test;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * TestHBaseAsSourceMapReduceMainClass
 *
 * @author 那伊抹微笑
 * @date 2015-07-30 18:00:21
 *
 */
public class TestHBaseAsSourceMapReduceMainClass {
 private static final Log _log = LogFactory.getLog(TestHBaseAsSourceMapReduceMainClass.class);
 
 private static final String JOB_NAME = "TestHBaseAsSourceMapReduce";
 private static String tmpPath = "/tmp/com/itdog8/yting/TestHBaseAsSourceMapReduce";
 private static String hbaseInputTble = "itdog8:test_1";
 
 public static class ExampleSourceMapper extends TableMapper<Text, Text> {
  private Text k = new Text();
  private Text v = new Text();
 
  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
   super.setup(context);
  }

  @Override
  protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
   String rowkey = Bytes.toString(key.get());
   
   // 这里的操作需要熟悉下 Result 的操作就行了,接下来就是业务逻辑了
   try {
   
    // set value
    k.set("望咩望");
    v.set("食屎啦你");
   
    // context write to reducer
    context.write(k, v);
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 
  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
   super.cleanup(context);
  }
 
 }
 
 public static void main(String[] args) throws Exception {
  // hbase configuration
  Configuration conf = HBaseConfiguration.create();
  conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com");
  conf.set("hbase.zookeeper.property.clientPort", "2181");

  // batch and caching
  Scan scan = new Scan();
  scan.setCaching(10000);
  scan.setCacheBlocks(false);
  scan.setMaxVersions(1);
 
  // set hadoop speculative execution to false
  conf.setBoolean("mapred.map.tasks.speculative.execution", false);
  conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
 
  // tmp index path
  tmpPath = args[0];
  Path tmpIndexPath = new Path(tmpPath);
  FileSystem fs = FileSystem.get(conf);
  if(fs.exists(tmpIndexPath)) {
//	 fs.delete(tmpIndexPath, true); // dangerous
//	 _log.info("delete tmp index path : " + tmpIndexPath.getName());
   _log.warn("The hdfs path ["+tmpPath+"] existed, please change a path.");
   return ;
  }

 
  // Job && conf
  Job job = new Job(conf, JOB_NAME);
  job.setJarByClass(TestHBaseAsSourceMapReduceMainClass.class);
 
  TableMapReduceUtil.initTableMapperJob(hbaseInputTble, scan, ExampleSourceMapper.class, Text.class, Text.class, job);
//	 job.setReducerClass(MyReducer.class); // 自己的处理逻辑
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  FileOutputFormat.setOutputPath(job, tmpIndexPath);
 
  int success = job.waitForCompletion(true) ? 0 : 1;
 
  System.exit(success);
 }
}


版权声明:本文为博主原创文章,未经博主允许不得转载。

HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑

标签:那伊抹微笑   hbase   mapreduce   妳那伊抹微笑   

原文地址:http://blog.csdn.net/u012185296/article/details/47279419

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!