标签:
maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014;
Hadoop2.6.4,HBase1.1.2
源码下载:https://github.com/fansy1990/ssh_v3/releases
部署参考:http://blog.csdn.net/fansy1990/article/details/51356583
数据下载:http://download.csdn.net/detail/fansy1990/9540865 或 http://pan.baidu.com/s/1dEVeJz7本系统就是使用客观的方法来验证伪钞。本系统采用的方案是基于冠字号的,每张人民币的冠字号是唯一的,如果有一个大表可以把所有的人民币以及人民币对应的操作(在什么时间、什么地点存入或获取)记录下来,这样在进行存取时就可以根据冠字号先查询一下,看当前冠字号对应的纸币在大表中的保存的情况,这样就可以确定当前冠字号对应的纸币是否是伪钞了(这里假设在大表中的所有冠字号对应的钞票都是真钞)。
下面对应存储场景:
| 存/取 | 最近状态(表中有无) | 真钞/伪钞 |
场景1 | 存 | 有 | 伪钞 |
场景2 | 存 | 无 | 真钞 |
场景3 | 取 | 有(此时没有无状态) | 真钞 |
目前,基于传统数据库存储数据一般在千万级别(受限于查询等性能),但是如果要存储所有钞票的信息以及其被存储或获取的记录信息,那么传统数据库肯定是不能胜任的。所以本系统是基于HBase的。
? 存储万级用户信息;
? 存储百万级别钞票信息;
? 支持前端业务每秒500+实时查询请求;
? 数据存储和计算能够可扩展;
? 提供统一接口,支持前端相关查询业务;
? 数据层:包括基础数据MySQL、文档、Web数据等;
? 数据处理层:主要是数据的加载,包括MR加载方式、Java API加载模式、Sqoop加载模式等;
? 数据存储层:主要是HBase存储,包括钞票的所有信息以及用户信息等;
? 数据服务层:主要是对外提供查询、存储等接口服务;
? 数据应用层:存取钞系统,在存钞时设计到伪钞识别;其他应用系统;
create ‘records‘,{NAME=>‘info‘,VERSIONS=>1000},SPLITS =>[‘AAAM9999‘,‘AAAZ9999‘,‘AABM9999‘]主键/列簇 | 字段名称 | 字段含义 | 字段值举例 | 备注 |
rowkey | - | 表主键(钞票冠字号) | AAAA0000 |
|
timestamp | - | 时间戳 | 1414939140000 | long型(可以存储用户操作的时间) |
info | - | 列簇 | - | who、when、where做了哪些操作 |
exist | 是否存在 | 1 | 如果用户是存储行为,那么在行为结束后,该值为1 | |
uid | 用户ID | 4113281991XXXX9919 |
| |
bank | 存取钞银行 | SPDBCNSH | 银行编号 |
create ‘user‘,{NAME=>‘info‘},SPLITS =>[‘4113281990XXXX0000‘,‘4113281991XXXX0000‘,‘4113281992XXXX0000‘]主键/列簇 | 字段名称 | 字段含义 | 字段值举例 | 备注 |
Rowkey | - | 用户主键(身份证号) | 4113281991XXXX9919 |
|
Timestamp | - | 时间戳 | 1414939140000 | long型 |
info | - | 列簇 | - | 用户信息 |
name | 用户名 | JACO |
| |
gender | 用户性别 | femail |
| |
bank | 用户注册银行 | SPDBCNSH | 银行编号 | |
address | 用户住址 | EXX-O94-1319151759 |
| |
| birthday | 用户出生年月 | 1981-10-20 09:12 |
|
package ssh.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import ssh.util.HadoopUtils;
/**
* Job Driver驱动类
*
* @author fansy
*
*/
public class ImportToHBase extends Configured implements Tool {
public static final String SPLITTER = "SPLITTER";
public static final String COLSFAMILY = "COLSFAMILY";
public static final String DATEFORMAT = "DATEFORMAT";
@Override
public int run(String[] args) throws Exception {
if (args.length != 5) {
System.err
.println("Usage:\n demo.job.ImportToHBase <input> <tableName> <splitter> <rk,ts,col1:q1,col2:q1,col2:q2> <date_format>");
return -1;
}
if (args[3] == null || args[3].length() < 1) {
System.err.println("column family can‘t be null!");
return -1;
}
Configuration conf = getConf();
conf.set(SPLITTER, args[2]);
conf.set(COLSFAMILY, args[3]);
conf.set(DATEFORMAT, args[4]);
TableName tableName = TableName.valueOf(args[1]);
Path inputDir = new Path(args[0]);
String jobName = "Import to " + tableName.getNameAsString();
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(ImportMapper.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ImportMapper.class);
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(),
null, job);
job.setNumReduceTasks(0);
HadoopUtils.setCurrJob(job);// 设置外部静态Job
return job.waitForCompletion(true) ? 0 : 1;
}
}
主类的run方法中使用的是传统的MR导入HBase的代码,只是设置了额外的参数,这里主类参数意思解释如下:package ssh.mr;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper类,接收HDFS数据,写入到HBase表中
* @author fansy
*
*/
public class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
private static final String COMMA = ",";
private static final String COLON=":";
private String splitter = null;
// private String colsStr = null;
private int rkIndex =0; // rowkey 下标
private int tsIndex =1; // timestamp下标
private boolean hasTs = false; // 原始数据是否有timestamp
private SimpleDateFormat sf = null;
private ArrayList<byte[][]> colsFamily= null;
private Put put =null;
ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
@Override
protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
splitter = context.getConfiguration().get(ImportToHBase.SPLITTER,",");
String colsStr = context.getConfiguration().get(ImportToHBase.COLSFAMILY,null);
sf = context.getConfiguration().get(ImportToHBase.DATEFORMAT,null)==null
? new SimpleDateFormat("yyyy-MM-dd HH:mm")
:new SimpleDateFormat(context.getConfiguration().get(ImportToHBase.DATEFORMAT));
String[] cols = colsStr.split(COMMA, -1);
colsFamily =new ArrayList<>();
for(int i=0;i< cols.length;i++){
if("rk".equals(cols[i])){
rkIndex= i;
colsFamily.add(null);
continue;
}
if("ts".equals(cols[i])){
tsIndex = i;
colsFamily.add(null);
hasTs = true; // 原始数据包括ts
continue;
}
colsFamily.add(getCol(cols[i]));
}
}
/**
* 获取 family:qualifier byte数组
* @param col
* @return
*/
private byte[][] getCol(String col) {
byte[][] fam_qua = new byte[2][];
String[] fam_quaStr = col.split(COLON, -1);
fam_qua[0]= Bytes.toBytes(fam_quaStr[0]);
fam_qua[1]= Bytes.toBytes(fam_quaStr[1]);
return fam_qua;
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(splitter, -1);
if(words.length!=colsFamily.size()){
System.out.println("line:"+value.toString()+" does not compatible!");
return ;
}
rowkey.set(getRowKey(words[rkIndex]));
put = getValue(words,colsFamily,rowkey.copyBytes());
context.write(rowkey, put);
}
/**
* 获取Put值
* @param words
* @param colsFamily
* @param bs
* @return
*/
private Put getValue(String[] words, ArrayList<byte[][]> colsFamily, byte[] bs) {
Put put = new Put(bs);
for(int i=0;i<colsFamily.size();i++){
if(colsFamily.get(i)==null){// rk 或ts
continue;// 下一个 列
}
if(words[i]==null || words[i].length()==0) {
// 不添加,直接往下一个value
continue;
}
// 日期异常的记录同样添加
if(hasTs){// 插入包含时间的数据
put.addColumn(colsFamily.get(i)[0], colsFamily.get(i)[1],
getLongFromDate(words[tsIndex]), Bytes.toBytes(words[i]));
}else{// 不包含时间的数据
put.addColumn(colsFamily.get(i)[0], colsFamily.get(i)[1],
Bytes.toBytes(words[i]));
}
}
return put;
}
private long getLongFromDate(String dateStr) {
try{
return sf.parse(dateStr).getTime();
}catch(ParseException e){
System.out.println(dateStr+" 转换失败!");
return 0;
}
}
/**
* 获取rowkey byte数组
* @param rowKey
* @return
*/
private byte[] getRowKey(String rowKey) {
return Bytes.toBytes(rowKey);
}
}
Mapper是整个流程的核心,主要负责进行数据解析、并从HDFS导入到HBase表中的工作,其各个部分功能如下:
? setup():获取输入数据字段分隔符,获取列簇、列名,获取rowkey列标,获取ts格式及列标(如果没有的话,就按照插入数据的时间设置);
? map():解析、过滤并提取数据(需要的字段数据),生成Put对象,写入HBase;
使用Java API来操作HBase数据库,完成实时HBase数据库更新,包括冠字号查询、存取款等功能。
分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990
标签:
原文地址:http://blog.csdn.net/fansy1990/article/details/51583080