标签:
--------------------------------------------------------------------------------------
[版权申明:本文系作者原创,转载请注明出处]
文章出处:http://www.cnblogs.com/sdksdk0/p/5585192.htmlString create(String path, byte[] data, List<ACL> acl, CreateMode createMode) Stat exists(String path, boolean watch) void delete(String path, int version) List<String> getChildren(String path, boolean watch) List<String> getChildren(String path, boolean watch) Stat setData(String path, byte[] data, int version) byte[] getData(String path, boolean watch, Stat stat) void addAuthInfo(String scheme, byte[] auth) Stat setACL(String path, List<ACL> acl, int version) List<ACL> getACL(String path, Stat stat)
ZooKeeper zk = null;
@Before
public void init() throws Exception{
zk = new ZooKeeper("ubuntu2:2181,ubuntu1:2181,ubuntu3:2181", 5000, new Watcher() {
//监听事件发生时的回调方法
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath());
System.out.println(event.getType());
}
});
}@Test
public void testZkNode() throws Exception {
String path = zk.create("/eclipse", "指令汇科技".getBytes("utf-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建了一个永久节点: " + path);
zk.close();
}@Test
public void testGet() throws Exception {
//监听器的注册只能生效一次
byte[] data = zk.getData("/eclipse", true, new Stat());
System.out.println(new String(data,"utf-8"));
Thread.sleep(Long.MAX_VALUE);
}
@Test
public void testSet() throws UnsupportedEncodingException, KeeperException, InterruptedException{
zk.setData("/eclipse", "谁是英雄".getBytes("utf-8"), -1);
zk.close();
}
private ZooKeeper zk;
private String groupNode = "grpnode";
private String subNode = "sub";
// 向zookeeper注册信息
public void connectZK(String name) throws KeeperException, InterruptedException, IOException {
zk = new ZooKeeper("ubuntu2:2181,ubuntu1:2181,ubuntu3:2181", 5000, new Watcher() {
//监听事件发生时的回调方法
@Override
public void process(WatchedEvent event) {
}
});
String path = zk.create("/" + groupNode + "/" + subNode, name.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("服务器上线,创建了一个子节点: " + path);
}// 业务处理逻辑
public void handle() throws Exception {
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
if(args.length==0){
System.err.println("参数个数不对,请附加服务器名作为参数来启动.....");
System.exit(1);
}
// 去向zookeeper注册本服务器信息
AppServer server = new AppServer();
server.connectZK(args[0]);
server.handle();
}private volatile List<String> servers;
private ZooKeeper zk;
//使用zk的监听器功能触发服务器更新的动作
public void connectZK() throws IOException, KeeperException, InterruptedException{
zk = new ZooKeeper("ubuntu2:2181,ubuntu1:2181,ubuntu3:2181", 5000, new Watcher() {
//监听事件发生时的回调方法
@Override
public void process(WatchedEvent event) {
if("/grpnode".equals(event.getPath()) && event.getType()==EventType.NodeChildrenChanged ){
//触发更新服务器列表的动作
try {
updateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
updateServerList();
}//动态获取服务器列表
public void updateServerList() throws KeeperException, InterruptedException, UnsupportedEncodingException{
ArrayList<String> serverList=new ArrayList<String>();
//监听子节点,并且对父节点注册监听器
List<String> childer=zk.getChildren("/grpnode", true);
//遍历子节点
for(String child:childer){
byte[] data=zk.getData("/grpnode/"+child,false, new Stat());
String server=new String(data,"utf-8");
//将获取到的服务器名称存入list
serverList.add(server);
}
//把暂存的list放到全局的list中
servers=serverList;
System.out.println("最新的在线服务器是:"+serverList);
}
//客户端的业务功能
public void handle() throws InterruptedException{
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException{
AppClient client=new AppClient();
client.connectZK();
client.handle();
}总结:Zookeeper 作为 Hadoop 项目中的一个子项目,是 Hadoop 集群管理的一个必不可少的模块,它主要用来控制集群中的数据,如它管理 Hadoop 集群中的 NameNode,还有 Hbase 中 Master Election、Server 之间状态同步等。 Zoopkeeper 提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,并对树中的节点进行有效管理,从而可以设计出多种多样的分布式的数据管理模型。
标签:
原文地址:http://blog.csdn.net/sdksdk0/article/details/51674871