从图中可以清楚的看到当客户端向服务器端发送一个请求的时候,最开始是被RPCServer中的Listener所监听到的,如下面的代码所示,HBase的RpcServer启动的时候会启动几个处理线程:
responder.start();
listener.start();
scheduler.start();其中,Responder线程负责数据的request的回复工作,listener负责监听客户端的请求,scheduler负责具体call的调度工作 while (running) {
SelectionKey key = null;
try {
selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
key = null;
}
}listener的实现过程中运用了java nio的一些特性,主要是每个Listener线程又会管理这一个Reader的线程池,这些Reader具体负责从Socket Channel中读取数据,并解析数据中的相关项,进而构造出可运行的CallRunner: Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize,
traceInfo);
scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));RpcScheduler调用对应的RpcExcutor进行相应的处理,RpcExcutor中启动了多个处理线程,这些线程从队列中取出任务并且执行, protected void startHandlers(final String nameSuffix, final int numHandlers,
final List<BlockingQueue<CallRunner>> callQueues,
final int qindex, final int qsize, final int port) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
for (int i = 0; i < numHandlers; i++) {
final int index = qindex + (i % qsize);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
consumerLoop(callQueues.get(index));
}
});
t.setDaemon(true);
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
",queue=" + index + ",port=" + port);
t.start();
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
handlers.add(t);
}
} protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
double handlerFailureThreshhold =
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
try {
while (running) {
try {
CallRunner task = myQueue.take();
try {
activeHandlerCount.incrementAndGet();
task.run(); resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status); if (!call.isDelayed() || !call.isReturnValueDelayed()) {
Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
call.setResponse(param, cells, errorThrowable, error);
}
call.sendResponseIfReady();到此结束就简要介绍了一个完整的服务端RPC处理流程,该流程中涉及到的相关的类的关系如下图所示:原文地址:http://blog.csdn.net/youmengjiuzhuiba/article/details/44750631