标签:
// 构造方法public LeaderLatch(CuratorFramework client, String latchPath)public LeaderLatch(CuratorFramework client, String latchPath, String id)public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)// 查看当前LeaderLatch实例是否是leaderpublic boolean hasLeadership()// 尝试让当前LeaderLatch实例称为leaderpublic void await() throws InterruptedException, EOFExceptionpublic boolean await(long timeout, TimeUnit unit) throws InterruptedException
public class LeaderLatchExample{private static final int CLIENT_QTY = 10;private static final String PATH = "/examples/leader";public static void main(String[] args) throws Exception{List<CuratorFramework> clients = Lists.newArrayList();List<LeaderLatch> examples = Lists.newArrayList();try{for (int i = 0; i < CLIENT_QTY; ++i){CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));clients.add(client);client.start();LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);examples.add(example);example.start();}System.out.println("LeaderLatch初始化完成!");Thread.sleep(10 * 1000);// 等待Leader选举完成LeaderLatch currentLeader = null;for (int i = 0; i < CLIENT_QTY; ++i){LeaderLatch example = examples.get(i);if (example.hasLeadership()){currentLeader = example;}}System.out.println("当前leader:" + currentLeader.getId());currentLeader.close();examples.get(0).await(10, TimeUnit.SECONDS);System.out.println("当前leader:" + examples.get(0).getLeader());System.out.println("输入回车退出");new BufferedReader(new InputStreamReader(System.in)).readLine();}catch (Exception e){e.printStackTrace();}finally{for (LeaderLatch exampleClient : examples){System.out.println("当前leader:" + exampleClient.getLeader());try{CloseableUtils.closeQuietly(exampleClient);}catch (Exception e){System.out.println(exampleClient.getId() + " -- " + e.getMessage());}}for (CuratorFramework client : clients){CloseableUtils.closeQuietly(client);}}System.out.println("OK!");}}
LeaderLatch初始化完成!当前leader:Client #1当前leader:Participant{id=‘Client #8‘, isLeader=true}输入回车退出当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}Client #1 -- Already closed or has not been started当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #8‘, isLeader=true}当前leader:Participant{id=‘Client #9‘, isLeader=true}OK!

public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener)
public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable{private final String name;private final LeaderSelector leaderSelector;private final AtomicInteger leaderCount = new AtomicInteger();public ExampleClient(CuratorFramework client, String path, String name){this.name = name;leaderSelector = new LeaderSelector(client, path, this);leaderSelector.autoRequeue();}public void start() throws IOException{leaderSelector.start();}@Overridepublic void close() throws IOException{leaderSelector.close();}@Overridepublic void takeLeadership(CuratorFramework client) throws Exception{final int waitSeconds = 1;System.out.println(name + " 是当前的leader(" + leaderSelector.hasLeadership() + ") 等待" + waitSeconds + "秒...");System.out.println(name + " 之前成为leader的次数:" + leaderCount.getAndIncrement() + "次");try{Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));}catch (InterruptedException e){System.err.println(name + " 已被中断");Thread.currentThread().interrupt();}finally{System.out.println(name + " 放弃leader\n");}}}
public class LeaderSelectorExample{private static final int CLIENT_QTY = 10;private static final String PATH = "/examples/leader";public static void main(String[] args) throws Exception{List<CuratorFramework> clients = Lists.newArrayList();List<ExampleClient> examples = Lists.newArrayList();try{for (int i = 0; i < CLIENT_QTY; ++i){CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));clients.add(client);client.start();ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);examples.add(example);example.start();}System.out.println("输入回车退出:");new BufferedReader(new InputStreamReader(System.in)).readLine();}finally{for (ExampleClient exampleClient : examples){CloseableUtils.closeQuietly(exampleClient);}for (CuratorFramework client : clients){CloseableUtils.closeQuietly(client);}}System.out.println("OK!");}}
输入回车退出:Client #4 是当前的leader(true) 等待1秒...Client #4 之前成为leader的次数:0次Client #4 放弃leaderClient #5 是当前的leader(true) 等待1秒...Client #5 之前成为leader的次数:0次Client #5 已被中断Client #5 放弃leaderOK!
LeaderSelectorListener可以对领导权进行控制,在适当的时候释放领导权,这样每个节点都有可能获得领导权。而LeaderLatch一根筋到死,除非调用close方法,否则它不会释放领导权。标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4930486.html