码迷,mamicode.com
首页 > 其他好文 > 详细

disruptor 单生产者多消费者

时间:2019-06-04 09:43:29      阅读:141      评论:0      收藏:0      [点我收藏+]

标签:col   efault   ice   BMI   art   OLE   read   uid   生产   

demo1 单生产者多消费者创建。

maven 依赖

<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

 

1 对象 - Message

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message2 {
    private String id;
    private String name;
    private double price;
}

2 在主函数中创建 disruptor

Disruptor<Message2> disruptor = new Disruptor<>(
                new EventFactory<Message2>() {
                    @Override
                    public Message2 newInstance() {
                        return new Message2();
                    }
                },
                1 << 10,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new BusySpinWaitStrategy()
        );

 

3 disruptor 绑定消费者

// disruptor 绑定消费者
disruptor.handleEventsWith(new MessageHandler1());


//创建消费者
@Slf4j
public class MessageHandler1 implements EventHandler<Message2> {
    @Override
    public void onEvent(Message2 event, long sequence, boolean endOfBatch) throws Exception {
        event.setId(UUID.randomUUID().toString());
        log.info("【handler1,set id】 id: {}, name: {}, price: {}", event.getId(), event.getName(), event.getPrice());
    }
}

 

4 启动 disruptor

RingBuffer<Message2> ringBuffer = disruptor.start();

 

5 disruptor 绑定生产者

//绑定生产者
CountDownLatch latch = new CountDownLatch(1);
ExecutorService es = Executors.newFixedThreadPool(4);
es.submit(new MessagePublish2(disruptor, latch));

// 生产者类
public class MessagePublish2 implements Runnable {
    private Disruptor<Message2> disruptor;
    private CountDownLatch latch;

    public MessagePublish2(Disruptor<Message2> disruptor, CountDownLatch latch) {
        this.disruptor = disruptor;
        this.latch = latch;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            disruptor.publishEvent(new MessageEventTranslator());
        }
        latch.countDown();
    }
}

 

6 阻塞等待 & 关闭服务

        // 阻塞等待
        latch.await();

        // 关闭服务
        es.shutdown();
        disruptor.shutdown();

 

disruptor 单生产者多消费者

标签:col   efault   ice   BMI   art   OLE   read   uid   生产   

原文地址:https://www.cnblogs.com/zhaopengcheng/p/10971516.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!