标签:
maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014;
Hadoop2.6.4,HBase1.1.2
源码下载:https://github.com/fansy1990/ssh_v3/releases
部署参考:http://blog.csdn.net/fansy1990/article/details/51356583
数据下载:http://download.csdn.net/detail/fansy1990/9540865 或 http://pan.baidu.com/s/1dEVeJz7public void submitJob() {
Map<String, Object> jsonMap = new HashMap<String, Object>();
if (HadoopUtils.getMrLock().equals(MRLock.NOTLOCKED)) {// 没有锁,则可以提交代码
// 先加锁
HadoopUtils.setMrLock(MRLock.LOCKED);
// 清空MR任务缓存
HadoopUtils.initMRCache();
// 提交任务
new Thread(new Hdfs2HBaseRunnable(hdfsFile, tableName,
colDescription, splitter, dateFormat)).start();
jsonMap.put("flag", "true");
jsonMap.put("jobId", HadoopUtils.getJobId());
} else {
jsonMap.put("flag", "false");
jsonMap.put("msg", "已经在运行MR程序,请确认!");
}
Utils.write2PrintWriter(JSON.toJSONString(jsonMap));
return;
}这里提供一个MRLock,加此锁是防止在提交任务后,任务正在运行,而有其他程序重复提交任务(监控会有问题);ret = callByAJax("hadoop/hadoop_submitJob.action",
{hdfsFile:hdfs,tableName:table,colDescription:colDescription,splitter:splitter,dateFormat:dateFormat})
if(ret.flag=="false"){// 提交任务失败
$.messager.alert(‘提示‘,ret.msg,‘warn‘);
return ;
}
$.messager.progress({
title:‘提示‘,
msg:‘导入数据中...‘,
interval:0 //disable auto update progress value
});
// hadoop_submitJob.action 返回的ret中包含jobId , ret.jobId
if(typeof(EventSource)!=="undefined")
{
console.info("jobId:"+ret.jobId);
var source=new EventSource("hadoop/hadoop_getMRProgress.action"+"?jobId="+ ret.jobId );
source.onmessage=function(event)
{
console.info(event.data);
// TODO 判断event.data indexOf error ,解析:后面的值,显示,同时提示任务错误
if(event.data.indexOf( "error")> -1){
source.close();
$.messager.progress(‘close‘);
$.messager.alert(‘提示‘,"任务运行失败!",‘warn‘);
}
// TODO 判断 event.data 为success ,则提示任务成功, 其他清空则任务进度即可
if(event.data == "success"){
source.close();
$.messager.progress(‘close‘);
$.messager.alert(‘提示‘,"任务运行成功!",‘warn‘);
}
var bar = $.messager.progress(‘bar‘);
bar.progressbar(‘setValue‘, event.data);
};public static String getJobId() {
long start = System.currentTimeMillis();
while (noJobId()) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(" Getting job id ...");
}
long end = System.currentTimeMillis();
log.info("获取jobId,耗时:" + (end - start) * 1.0 / 1000 + "s");
return currJob.getJobID().toString();
}private static boolean noJobId() {
if (currJob == null || currJob.getJobID() == null)
return true;
return false;
}/**
* 检查给定的冠字号是否存在疑似伪钞冠字号
*
* @param stumbers
* @return
* @throws IllegalArgumentException
* @throws IOException
*/
public Map<String, String> checkStumbersExist(String stumbers)
throws IllegalArgumentException, IOException {
String[] stumbersArr = StringUtils.split(stumbers, Utils.COMMA);
Connection connection = HadoopUtils.getHBaseConnection();
Table table = connection.getTable(TableName
.valueOf(Utils.IDENTIFY_RMB_RECORDS));
Map<String, String> map = new HashMap<>();
Get get = null;
try {
List<Get> gets = new ArrayList<>();
for (String stumber : stumbersArr) {
get = new Get(stumber.trim().getBytes());
gets.add(get);
}
Result[] results = table.get(gets);
String exist;
StringBuffer existStr = new StringBuffer();
StringBuffer notExistStr = new StringBuffer();
for (int i = 0; i < results.length; i++) {
exist = new String(results[i].getValue(Utils.FAMILY,
Utils.COL_EXIST));
if ("1".equals(exist)) {
existStr.append(stumbersArr[i]).append(Utils.COMMA);
} else if ("0".equals(exist)) {
notExistStr.append(stumbersArr[i]).append(Utils.COMMA);
} else {
log.info("冠字号:" + stumbersArr[i] + "值 exist字段值异常!");
}
}
if (existStr.length() > 0) {
map.put("exist", existStr.substring(0, existStr.length() - 1));
} else {
map.put("exist", "nodata");
}
if (notExistStr.length() > 0) {
map.put("notExist",
notExistStr.substring(0, notExistStr.length() - 1));
} else {
map.put("notExist", "nodata");
}
} catch (Exception e) {
e.printStackTrace();
}
return map;
}/**
* 根据rowkey和版本个数查询数据
* @param tableName
* @param cfs
* @param rowkeys
* @param versions
* @return
* @throws IOException
*/
public List<HBaseTableData> getTableCertainRowKeyData(String tableName,
String cfs, String rowkeys, int versions) throws IOException {
String[] stumbersArr = StringUtils.split(rowkeys, Utils.COMMA);
Connection connection = HadoopUtils.getHBaseConnection();
Table table = connection.getTable(TableName
.valueOf(tableName));
List<HBaseTableData> list = new ArrayList<>();
Get get = null;
try {
List<Get> gets = new ArrayList<>();
for (String stumber : stumbersArr) {
get = new Get(stumber.trim().getBytes());
get.setMaxVersions(versions);
gets.add(get);
}
Result[] results = table.get(gets);
Cell[] cells;
for (int i = 0; i < results.length; i++) {
cells = results[i].rawCells();
list.addAll(getHBaseTableDataListFromCells(cells));
}
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}1) 根据给定的取款冠字号个数num,随机查找冠字号(rowkey)对应的op_www:exist字段值为1的num*3条记录;
2) 使用HBase.checkAndPut进行更新,把op_www:exist字段值更新为0,并返回更新后的rowkey,即冠字号;
3) 如果在num*3条记录更新后,被更新的冠字号不足num条,则再次随机查找冠字号对应的op_www:exist字段值为1的记录,并更新,返回更新后的冠字号,直到返回的冠字号个数为num;
package stumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadTest {
// private static String FAMILY ="info";
public static void main(String[] args) throws IOException {
long size =10000;
get(Utils.getConn(),Utils.generateRowKey(size));
}
public static void get(Connection connection,List<byte[]> rowkeys) throws IOException {
System.out.println(new Date()+":开始读取记录...");
long start =System.currentTimeMillis();
Table table = connection.getTable(TableName.valueOf(Utils.TABLE));
Get get = null ;
long count =0;
try{
for(byte[] rowkey :rowkeys){
count ++;
// get = new Get(Bytes.toBytes(""));
get = new Get(rowkey);
table.get(get);
if(count%1000==0){
System.out.println("count:"+count);
}
}
long end = System.currentTimeMillis();
System.out.println(new Date()+":"+rowkeys.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s");
}catch(Exception e){
}finally{
table.close();
}
}
}
package stumer;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
public class ReadThread implements Runnable {
private List<byte[]> rks;
private Table table;
public ReadThread(Table table ,List<byte[]> rks) {
this.table = table;
this.rks = rks;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" "+new Date()+":开始读取记录...");
long start =System.currentTimeMillis();
Get get = null ;
long count =0;
try{
for(byte[] rowkey :rks){
count ++;
// get = new Get(Bytes.toBytes(""));
get = new Get(rowkey);
table.get(get);
if(count%1000==0){
System.out.println(Thread.currentThread().getName()+" count:"+count);
}
}
long end = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName()+" "+new Date()
+":"+rks.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s");
}catch(Exception e){
}
}
}
package stumer;
import java.io.IOException;
public class ReadThreadTest {
public static void main(String[] args) throws IOException {
long dataSize =500;
int threadSize = 20;
for(int i=0;i<threadSize;i++){
new Thread(new ReadThread(Utils.getTable(), Utils.generateRowKey(dataSize))).start();
}
}
}
package stumer;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class Utils {
public static String TABLE = "records";
private static DecimalFormat df = new DecimalFormat( "0000" );
public static String[] crownSizePrefixes =null;
static Random random = new Random();
static {
crownSizePrefixes = new String[26*2];
for (int i = 0; i < crownSizePrefixes.length/2; i++) {
crownSizePrefixes[i] = "AAA" + (char) (65 + i);
crownSizePrefixes[i+26] = "AAB" + (char) (65 + i);
}
}
/**
* 把0~9999 转为 0000~9999
* @param num
* @return
*/
public static String formatCrownSizeSuffix(int num){
return df.format(num);
}
public static Table getTable() throws IOException{
return getConn().getTable(TableName.valueOf(TABLE));
}
public static String getRandomCrownSize(){
return crownSizePrefixes[random.nextInt(crownSizePrefixes.length)]
+formatCrownSizeSuffix(random.nextInt(10000));
}
public static Connection getConn() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.master", "node2:16000");// 指定HMaster
conf.set("hbase.rootdir", "hdfs://node1:8020/hbase");// 指定HBase在HDFS上存储路径
conf.set("hbase.zookeeper.quorum", "node2,node3,node4");// 指定使用的Zookeeper集群
conf.set("hbase.zookeeper.property.clientPort", "2181");// 指定使用Zookeeper集群的端口
Connection connection = ConnectionFactory.createConnection(conf);// 获取连
return connection;
}
public static List<byte[]> generateRowKey(long size){
System.out.println(new Date()+"开始生成"+size +"条记录...");
long start =System.currentTimeMillis();
List<byte[]> rowkeys = new ArrayList<>();
for(int i=0;i<size;i++){
rowkeys.add(Bytes.toBytes(Utils.getRandomCrownSize()));
}
long end =System.currentTimeMillis();
System.out.println(new Date()+":"+rowkeys.size()+"条记录,生成耗时:"+(end-start)*1.0/1000+"s");
return rowkeys;
}
}
标签:
原文地址:http://blog.csdn.net/fansy1990/article/details/51583401