标签:
public class DistributedQueueExample{private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception{CuratorFramework clientA = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));clientA.start();CuratorFramework clientB = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));clientB.start();DistributedQueue<String> queueA = null;QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);queueA = builderA.buildQueue();queueA.start();DistributedQueue<String> queueB = null;QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);queueB = builderB.buildQueue();queueB.start();for (int i = 0; i < 100; i++){queueA.put(" test-A-" + i);Thread.sleep(10);queueB.put(" test-B-" + i);}Thread.sleep(1000 * 10);// 等待消息消费完成queueB.close();queueA.close();clientB.close();clientA.close();System.out.println("OK!");}/** 队列消息序列化实现类 */private static QueueSerializer<String> createQueueSerializer(){return new QueueSerializer<String>(){@Overridepublic byte[] serialize(String item){return item.getBytes();}@Overridepublic String deserialize(byte[] bytes){return new String(bytes);}};}/** 定义队列消费者 */private static QueueConsumer<String> createQueueConsumer(final String name){return new QueueConsumer<String>(){@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){System.out.println("连接状态改变: " + newState.name());}@Overridepublic void consumeMessage(String message) throws Exception{System.out.println("消费消息(" + name + "): " + message);}};}}
消费消息(A): test-A-0消费消息(A): test-B-0......消费消息(B): test-A-51消费消息(B): test-B-51消费消息(B): test-A-52消费消息(B): test-B-52消费消息(B): test-A-53消费消息(B): test-B-54消费消息(B): test-A-55......消费消息(A): test-A-99消费消息(A): test-B-99OK!

public class DistributedIdQueueExample{private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception{CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();DistributedIdQueue<String> queue = null;QueueConsumer<String> consumer = createQueueConsumer("A");QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);queue = builder.buildIdQueue();queue.start();for (int i = 0; i < 10; i++){queue.put(" test-" + i, "Id" + i);Thread.sleep((long) (50 * Math.random()));queue.remove("Id" + i);}Thread.sleep(1000 * 3);queue.close();client.close();System.out.println("OK!");}......}
消费消息(A): test-2消费消息(A): test-3消费消息(A): test-4消费消息(A): test-7OK!
public class DistributedPriorityQueueExample{private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception{CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();DistributedPriorityQueue<String> queue = null;QueueConsumer<String> consumer = createQueueConsumer("A");QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);queue = builder.buildPriorityQueue(0);queue.start();for (int i = 0; i < 5; i++){int priority = (int) (Math.random() * 100);System.out.println("test-" + i + " 优先级:" + priority);queue.put("test-" + i, priority);Thread.sleep((long) (50 * Math.random()));}Thread.sleep(1000 * 2);queue.close();client.close();}......}
test-0 优先级:34test-1 优先级:51test-2 优先级:63test-3 优先级:45test-4 优先级:36消费消息(A): test-0消费消息(A): test-4消费消息(A): test-3消费消息(A): test-1消费消息(A): test-2OK!
public class DistributedDelayQueueExample{private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception{CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();DistributedDelayQueue<String> queue = null;QueueConsumer<String> consumer = createQueueConsumer("A");QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);queue = builder.buildDelayQueue();queue.start();for (int i = 0; i < 10; i++){queue.put("test-" + i, System.currentTimeMillis() + 3000);}System.out.println("put 完成!");Thread.sleep(1000 * 5);queue.close();client.close();System.out.println("OK!");}......}
put 完成!消费消息(A): test-0消费消息(A): test-3消费消息(A): test-1消费消息(A): test-2消费消息(A): test-6消费消息(A): test-4消费消息(A): test-5消费消息(A): test-7消费消息(A): test-8消费消息(A): test-9OK!
// 创建public SimpleDistributedQueue(CuratorFramework client, String path)// 增加元素public boolean offer(byte[] data) throws Exception// 删除元素public byte[] take() throws Exception// 另外还提供了其它方法public byte[] peek() throws Exceptionpublic byte[] poll(long timeout, TimeUnit unit) throws Exceptionpublic byte[] poll() throws Exceptionpublic byte[] remove() throws Exceptionpublic byte[] element() throws Exception
标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4951529.html