TableName tn = TableName.valueOf("test010");
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(tn)) {
Put put = new Put("ROW1".getBytes());
put.addColumn("CF1".getBytes(),"column1".getBytes(),"value1".getBytes());
put.addColumn("CF2".getBytes(),"column1".getBytes(),"value1".getBytes());
table.put(put);
System.out.println("done!");
}
}本文着重解析put是如何被一步步的传送到服务器端以及被服务器端调用的。首先我们有必要回顾一下关于Connection的类型结构,如下图所示:HConnectionImplementation 类是实际负责和服务器连接的,要想对表的数据操作,例如例子中的put我们首选需要获取一个Table的的实例,这个可以从connection中拿到, public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
if (managed) {
throw new NeedUnmanagedConnectionException();
}
return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
}Table其实就是一个操作接口,真正的实现类是HTable,HTable可以负责对单一的HBase的数据表进行数据的插入删除等数据层次的操作,该类目前只是HBase Internal 的,对外的接口是Table,获取HTable实例之后就是对操作进行执行了, /**
* {@inheritDoc}
* @throws IOException
*/
@Override
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}以上的代码就是HTable操作的原型,这里进行了一系列的调用,我们一一分析,首先是getBufferedMutator()函数, private void doMutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
if (!(m instanceof Put) && !(m instanceof Delete)) {
throw new IllegalArgumentException("Pass a Delete or a Put");
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
writeAsyncBuffer.add(m);
backgroundFlushCommits(true);
}
if (m instanceof Put) {
validatePut((Put) m);
}
currentWriteBufferSize += m.heapSize();
writeAsyncBuffer.add(m);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
有效代码也就是这一句:writeAsyncBuffer.add(m);其实也就是向一个异步缓冲区添加该操作,单后当一定条件的时候进行flash,当发生flash操作的时候,才会真正的去执行该操作,这主要是提高系统的吞吐率,接下来我们去看看这个flush的操作内部吧。 private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
try {
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
}
}这个刷新操作可以是制定异步提交还是同步提交,从doMutate中来看默认是以异步的方式进行,这里的ap是AsyncProcess类的一个实例,该类使用多线程的来实现异步的请求,通过Future进行线程中服务器端数据的获取。这里的过程也比较复杂,我将在下一篇文章中继续。HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)
原文地址:http://blog.csdn.net/youmengjiuzhuiba/article/details/45024679