标签:
上一篇文章这里已经列出了Curator的一个使用的例子,这篇文章将详细分析其初始化和启动部分。
(1) newClient方法返回CuratorFramework接口对象:
public staticCuratorFramework newClient(String connectString, int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}看看builder()方法:
//返回用于构建CuratorFramework的新的builder对象
publicstaticBuilder builder()
{
return new Builder();
}可以看到这个方法返回一个构建CuratorFramework的Builder。Builder类就在CuratorFrameworkFactory.java文件中。
//设置连接到的ZooKeeper集群的地址列表
public BuilderconnectString(String connectString)
{
ensembleProvider =newFixedEnsembleProvider(connectString);
return this;
}前面方法都是设置当前对象的属性,然后将当前Builder对象返回,设置的属性可以看到包括这些:
private EnsembleProvider ensembleProvider;
private int sessionTimeoutMs =DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs =DEFAULT_CONNECTION_TIMEOUT_MS;
private int maxCloseWaitMs =DEFAULT_CLOSE_WAIT_MS;
private RetryPolicy retryPolicy;
private ThreadFactory threadFactory =null;
private String namespace;
private String authScheme =null;
private byte[] authValue =null;
private byte[] defaultData =LOCAL_ADDRESS;
private CompressionProvidercompressionProvider=DEFAULT_COMPRESSION_PROVIDER;
private ZookeeperFactory zookeeperFactory =DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider =DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly =false;主要看最后一个build()方法:
//使用当前的Builder对象构建一个CuratorFramework接口对象
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}可以看到创建一个CuratorFrameworkImpl实例,将当前Builder对象传递进去。
CuratorFrameworkImpl类是CuratorFramework接口的实现类,看看其构造函数:
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory= makeZookeeperFactory(builder.getZookeeperFactory());
this.client =new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(),builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(),new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null);
processEvent(event);
}
}, builder.getRetryPolicy(),builder.canBeReadOnly());
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners =newListenerContainer<UnhandledErrorListener>();
backgroundOperations = newDelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager =new ConnectionStateManager(this,builder.getThreadFactory());
compressionProvider =builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
byte[] builderDefaultData =builder.getDefaultData();
defaultData = (builderDefaultData !=null) ? Arrays.copyOf(builderDefaultData,builderDefaultData.length):newbyte[0];
if ( builder.getAuthScheme() !=null )
{
authInfo.set(new AuthInfo(builder.getAuthScheme(),builder.getAuthValue()));
}
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache =new NamespaceFacadeCache(this);
}CuratorFrameworkImpl主要是对CuratorZookeeperClient的封装,所以我们主要看构造函数中第二句代码是如何构建CuratorZookeeperClient对象的。构造函数中除了参数1和参数5,其他参数都是来自builder对象,参数1是localZookeeperFactory,通过下面方法构造:
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
{
@Override
public ZooKeeper newZooKeeper(StringconnectString,intsessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception
{
ZooKeeper zooKeeper =actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher,canBeReadOnly);
AuthInfo auth = authInfo.get();
if ( auth !=null )
{
zooKeeper.addAuthInfo(auth.scheme, auth.auth);
}
return zooKeeper;
}
};
}传递进来的是builder中定义的ZookeeperFactory对象,实际上就是Curator提供的DefaultZookeeperFactory类,定义如下:
public classDefaultZookeeperFactory implements ZookeeperFactory
{
@Override
publicZooKeeper newZooKeeper(String connectString,int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception
{
return new ZooKeeper(connectString, sessionTimeout,watcher, canBeReadOnly);
}
}仅仅是简单地new出一个原生的ZooKeeper对象,所以传递到CuratorZookeeperClient构造函数中的ZookeeperFactory类的newZooKeeper返回的是原生的ZooKeeper对象。
参数5是一个Watcher对象,其中事件响应函数process()又调用了processEvent()方法:
private void processEvent(finalCuratorEvent curatorEvent)
{
if ( curatorEvent.getType() ==CuratorEventType.WATCHED )
{
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListenerlistener)
{
try
{
TimeTrace trace = client.startTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
logError("Event listener threw exception", e);
}
returnnull;
}
});
}
接着深入进去看的CuratorZookeeperClient类的构造函数:
publicCuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProviderensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy,boolean canBeReadOnly)
{
this.connectionTimeoutMs = connectionTimeoutMs;
state = new ConnectionState(zookeeperFactory,ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher,tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}主要在构造函数的最后两行初始化下面两个成员变量:
private finalConnectionState state;
private finalAtomicReference<RetryPolicy> retryPolicy =newAtomicReference<RetryPolicy>();
继续看ConnectionState的构造函数:
ConnectionState(ZookeeperFactoryzookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs,Watcher parentWatcher, AtomicReference<TracerDriver> tracer,boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
if ( parentWatcher !=null )
{
parentWatchers.offer(parentWatcher);
}
zooKeeper = new HandleHolder(zookeeperFactory,this, ensembleProvider,sessionTimeoutMs, canBeReadOnly);
}HandleHolder(ZookeeperFactoryzookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider,int sessionTimeout,boolean canBeReadOnly)
{
this.zookeeperFactory = zookeeperFactory;
this.watcher = watcher;
this.ensembleProvider = ensembleProvider;
this.sessionTimeout = sessionTimeout;
this.canBeReadOnly = canBeReadOnly;
}简单字段赋值,这个类是ZooKeeper对象的持有者,其中包含两个关键函数:
ZooKeeper getZooKeeper() throws Exception
{
return (helper !=null) ?helper.getZooKeeper() : null;
}
void closeAndReset()throws Exception
{
internalClose();
// first helper is synchronized when getZooKeeper is called. Subsequentcalls
// are not synchronized.
helper = new Helper()
{
private volatile ZooKeeper zooKeeperHandle =null;
private volatile String connectionString =null;
@Override
public ZooKeeper getZooKeeper()throws Exception
{
synchronized(this)
{
if (zooKeeperHandle ==null)
{
connectionString =ensembleProvider.getConnectionString();
zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout,watcher,canBeReadOnly);
}
helper = newHelper()
{
@Override
public ZooKeepergetZooKeeper()throwsException
{
returnzooKeeperHandle;
}
@Override
public StringgetConnectionString()
{
returnconnectionString;
}
};
returnzooKeeperHandle;
}
}
@Override
public String getConnectionString()
{
returnconnectionString;
}
};
}可以看到这个类提供了一个getZooKeeper()方法,返回ZooKeeper对象,closeAndReset()方法是对helper对象的初始化,Helper中的getZooKeeper()方法返回的是ZooKeeper对象的单例,保障一个HandleHolder只会持有一个ZooKeeper对象。
总结一下,通过CuratorFrameworkFactory类的newClient()方法将会返回一个实现了CuratorFramework接口的实现类CuratorFrameworkImpl的对象,这个对象中包含一个CuratorZookeeperClient对象,里面又包含一个ConnectionState对象,再里面又包含一个HandleHolder对象,这个对象通过从最外层逐层传递进来的DefaultZookeeperFactory对象获取原生ZooKeeper对象,并以单例进行维护,每一层都有一个getZooKeeper()方法,在外面调用会最终到HandleHolder这里来取得一个ZooKeeper对象。
这里面HandleHolder是ZooKeeper对象的持有者,外层封装的ConnectionState类是核心,管理ZooKeeper的连接状态,响应ZooKeeper的watch回调事件。这个回调函数是创建HandleHolder对象时将自己传递进去注册的。
初始化这一步骤注册的真正的原生ZooKeeper对象的watcher响应事件是ConnectionState类中的process()函数,我们看看这个函数:
@Override
publicvoidprocess(WatchedEvent event)
{
//逐个调用parentWatchers容器中的Watcher的process函数
for ( Watcher parentWatcher :parentWatchers )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get());
parentWatcher.process(event);
timeTrace.commit();
}
boolean wasConnected =isConnected.get();
boolean newIsConnected = wasConnected;
if ( event.getType() ==Watcher.Event.EventType.None )
{
newIsConnected =checkState(event.getState(), wasConnected);
}
//若当前连接状态不为false,则真正设置isConnected = true
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}这个watch事件响应函数主要做两件事:
(1)将parentWatchers容器中的所有Watcher都调用一次;
(2)检查并更新ConnectionState类中维护的ZooKeeper的连接状态isConnected。
那么parentWatchers容器中有哪些Watcher呢,目前只有CuratorFrameworkImpl构造函数中初始化CuratorZookeeperClient对象时传递进去的Watcher,如下所示:
this.client=newCuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(), builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null);
processEvent(event);
}
}, builder.getRetryPolicy(),builder.canBeReadOnly());其中的processEvent()函数实际上是将CuratorListener列表中的所有事件响应函数全部调用一次,这个和异步执行ZooKeeper操作相关,具体不介绍了,详细可以参考这个例子:
public staticvoid setDataAsync(CuratorFramework client,String path,byte[]payload)throwsException
{
// this is one method of getting event/async notifications
CuratorListener listener =newCuratorListener()
{
@Override
public void eventReceived(CuratorFramework client,CuratorEvent event) throws Exception
{
// examine event for details
}
};
client.getCuratorListenable().addListener(listener);
// set data for the given node asynchronously. The completion notification
// is done via the CuratorListener.
client.setData().inBackground().forPath(path, payload);
}
(2)CuratorFramework 的start()方法启动:
CuratorFramework的start方法会调用CuratorZookeeperClient对象的start方法,内部又调用ConnectionState的start方法,最后ConnectionState的start方法调用一个reset方法:
private synchronizedvoidreset() throwsException
{
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
zooKeeper.closeAndReset();
zooKeeper.getZooKeeper(); // initiateconnection
}主要是最后两句代码,调用HandleHolder类对象zooKeeper的closeAndReset方法是为了实例化获取ZooKeeper对象的Helper对象,调用一次getZooKeeper方法是为了先第一次实例化好ZooKeeper对象,提高之后调用访问接口时的性能。
标签:
原文地址:http://blog.csdn.net/jiyiqinlovexx/article/details/42406125