标签:那伊抹微笑 hbase mapreduce 妳那伊抹微笑
Configuration config = HBaseConfiguration.create();...mapper需要继承于TableMapper...
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don‘t set to true for MR jobs
// set other scan attrs
...
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t emitting anything from mapper
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
public class MyMapper extends TableMapper<Text, LongWritable> { public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { // process data for the row from the Result instance.
package com.itdog8.cloud.hbase.mr.test; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * TestHBaseAsSourceMapReduceMainClass * * @author 那伊抹微笑 * @date 2015-07-30 18:00:21 * */ public class TestHBaseAsSourceMapReduceMainClass { private static final Log _log = LogFactory.getLog(TestHBaseAsSourceMapReduceMainClass.class); private static final String JOB_NAME = "TestHBaseAsSourceMapReduce"; private static String tmpPath = "/tmp/com/itdog8/yting/TestHBaseAsSourceMapReduce"; private static String hbaseInputTble = "itdog8:test_1"; public static class ExampleSourceMapper extends TableMapper<Text, Text> { private Text k = new Text(); private Text v = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String rowkey = Bytes.toString(key.get()); // 这里的操作需要熟悉下 Result 的操作就行了,接下来就是业务逻辑了 try { // set value k.set("望咩望"); v.set("食屎啦你"); // context write to reducer context.write(k, v); } catch (Exception e) { e.printStackTrace(); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } public static void main(String[] args) throws Exception { // hbase configuration Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // batch and caching Scan scan = new Scan(); scan.setCaching(10000); scan.setCacheBlocks(false); scan.setMaxVersions(1); // set hadoop speculative execution to false conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); // tmp index path tmpPath = args[0]; Path tmpIndexPath = new Path(tmpPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(tmpIndexPath)) { // fs.delete(tmpIndexPath, true); // dangerous // _log.info("delete tmp index path : " + tmpIndexPath.getName()); _log.warn("The hdfs path ["+tmpPath+"] existed, please change a path."); return ; } // Job && conf Job job = new Job(conf, JOB_NAME); job.setJarByClass(TestHBaseAsSourceMapReduceMainClass.class); TableMapReduceUtil.initTableMapperJob(hbaseInputTble, scan, ExampleSourceMapper.class, Text.class, Text.class, job); // job.setReducerClass(MyReducer.class); // 自己的处理逻辑 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, tmpIndexPath); int success = job.waitForCompletion(true) ? 0 : 1; System.exit(success); } }
版权声明:本文为博主原创文章,未经博主允许不得转载。
HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑
标签:那伊抹微笑 hbase mapreduce 妳那伊抹微笑
原文地址:http://blog.csdn.net/u012185296/article/details/47279419