码迷,mamicode.com
首页 > 其他好文 > 详细

BlockingQueue

时间:2018-02-25 00:06:26      阅读:228      评论:0      收藏:0      [点我收藏+]

标签:配置   trace   nbsp   array   await   唤醒   设计   cat   link   

BlockingQueue

 

一、阻塞队列基本方法介绍

谈到线程池,不得不谈到生产者-消费者模式,谈到生产者-消费者,就不得不谈到对应的数据结构,谈到对应的数据结构不得不言BlockingQueue。

顾名思义,BlockingQueue翻译为阻塞队列。队列无非两种操作:入队和出队。而针对于入队出队的边界值的不同,分为几个方法:

 

 

抛出异常

特殊值

阻塞

超时

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

移除

remove()

poll()

take()

poll(time, unit)

检查

element()

peek()

不可用

不可用

 

 

测试代码:

public class QueueTest {

    public static void main(String args[]) {

        final BlockingQueue queue = new ArrayBlockingQueue(5);

        init(queue);

        System.out.println("queue.size=" + queue.size() + ",    top element:" + queue.element());

        // queue.add("f");  //1. add方法:队满加入抛异常

        /*boolean bool = queue.offer("f");  //2. offer方法,队满加入会返回:false

        System.out.println("queue.size=" + queue.size() + ",   入队结果:" + bool);*/

 /*       try {

            queue.put("f");  //3.put方法,队满put会阻塞,下边的systemout方法不会执行,直至有消费者take出去

        } catch (InterruptedException e) {

            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

        }

        System.out.println("queue.size=" + queue.size() + ",   put结束:");*/

        Thread thread1 = new Thread(new Runnable() {

            public void run() {

                boolean bool = false;

                try {

                    bool = queue.offer("f", 5, TimeUnit.SECONDS); //5.offer带时间参数:当队满时,如果等待一定时间内还是满的就返回false,如果在这个期间队有空间了就可以放入一个元素,返回

                    System.out.println("queue.size=" + queue.size() + ",   入队结果:" + bool);

                } catch (InterruptedException e) {

                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                }

            }

        });

        thread1.start();

        //我们开一个消费者线程做出队操作,以便上述的offer可以正常加入

        Thread thread = new Thread(new Runnable() {

            public void run() {

                while (queue.size() > 0) {

                    try {

                        String str = (String) queue.take();

                        System.out.println("queue.size=" + queue.size() + ",   出队结果:" + str);

                        Thread.sleep(500);

                    } catch (InterruptedException e) {

                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                    }

                }

 

            }

        });

        thread.start();

 

 

    }

 

    private static void init(BlockingQueue queue) {

        queue.add("a");

        queue.add("b");

        queue.add("c");

        queue.add("d");

        queue.add("e");

    }

 

}

 

add:入队,如果队列满会抛出异常。

Exception in thread "main" java.lang.IllegalStateException: Queue full

at java.util.AbstractQueue.add(AbstractQueue.java:71)

at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)

源码:

    public boolean add(E e) {

        if (offer(e)) 

            return true; //如果加入成功,返回true

        else

            throw new IllegalStateException("Queue full"); //如果添加失败,抛异常

    }

 

Offer:入队,如果队满会返回false

boolean bool = queue.offer("f");  //offer方法会返回:false

源码:

    public boolean offer(E e) {

        if (e == null) throw new NullPointerException();

        final ReentrantLock lock = this.lock;

        lock.lock();

        try {

            if (count == items.length)

                return false; //返回失败

            else {

                insert(e);

                return true; //返回成功

            }

        } finally {

            lock.unlock();

        }

    }

 

Put:入队,如果队满会阻塞

        try {

            queue.put("f");

        } catch (InterruptedException e) {

            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

        }

         System.out.println("queue.size=" + queue.size() + ",   put结束:" );

源码:

    public void put(E e) throws InterruptedException {

        if (e == null) throw new NullPointerException();

        final E[] items = this.items;

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            try {

                while (count == items.length)

                    notFull.await(); //阻塞在这儿

            } catch (InterruptedException ie) {

                notFull.signal(); // propagate to non-interrupted thread

                throw ie;

            }

            insert(e);

        } finally {

            lock.unlock();

        }

    }

 

Offer:带时间的,会有一个缓冲时间,若超过此期间插入失败则失败。

queue.size=5,    top element:a

queue.size=4,   出队结果:a

queue.size=5,   入队结果:true

queue.size=4,   出队结果:b

queue.size=3,   出队结果:c

queue.size=2,   出队结果:d

queue.size=1,   出队结果:e

queue.size=0,   出队结果:f

源码:

  public boolean offer(E e, long timeout, TimeUnit unit)

        throws InterruptedException {

 

        if (e == null) throw new NullPointerException();

long nanos = unit.toNanos(timeout);

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            for (;;) { //轮询

                if (count != items.length) {

//如果有其他线程消费元素,队列不满了,才可以插入元素

                    insert(e);

                    return true;

                }

                if (nanos <= 0)//如果到了约定时间没有插入成功,返回false

                    return false;

                try {

                    nanos = notFull.awaitNanos(nanos); //轮询减时间

                } catch (InterruptedException ie) {

                    notFull.signal(); // propagate to non-interrupted thread

                    throw ie;

                }

            }

        } finally {

            lock.unlock();

        }

    }

 

出队的几种形式与入队对应,实例略。

从上述源码也可以看出,BlockingQueque还有几个特点:

1. 不接受null值。

2. 线程安全,方法都用了Lock

3. 最最精华的部分:生产者-消费者模型。

二、生产者-消费者模型

实例:

public class BlockingQueueTest {

 

    private static AtomicInteger count = new AtomicInteger(0);

    private static AtomicInteger countCreate = new AtomicInteger(0);

 

public static void main(String args[]) {

    //定义一个阻塞队列,存放文件信息

        BlockingQueue fileQueue = new ArrayBlockingQueue(5);

        String path = "F:\\Song";

        File root = new File(path);

        //生产者线程去遍历文件,放入队列

        FileCrawler fileCrawler = new FileCrawler(fileQueue, root);

        //消费者线程去遍历队列,取出文件

        Indexer indexer = new Indexer(fileQueue);

 

        //开启几个生产者线程开始遍历文件

        for (File file : root.listFiles()) {

            new Thread(new FileCrawler(fileQueue, file)).start();

        }

         //开启7个消费者者线程开始取出文件

        for (int i = 0; i < 7; i++) {

            new Thread(new Indexer(fileQueue)).start();

        }

 

        try {

            Thread.sleep(2000);

        } catch (InterruptedException e) {

            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

        }

 

        System.out.println("生产者生产:" + countCreate.get());

        System.out.println("消费者取到:" + count.get());

    }

 

 

    static class FileCrawler implements Runnable {

        private final BlockingQueue fileQueue;

        private final File root;

 

        FileCrawler(BlockingQueue fileQueue, File root) {

            this.fileQueue = fileQueue;

            this.root = root;

        }

 

        public void run() {

 

            try {

                System.out.println("生产者开始生产:" + fileQueue.size());

                crawl(root);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

 

        private void crawl(File root) throws InterruptedException {

            File[] files = root.listFiles();

            if (files != null) {

                for (File file : files) {

                    if (file.isDirectory()) {

                        crawl(file);

                    } else {

                        fileQueue.put(file); //put

                        countCreate.incrementAndGet();

                    }

                }

            }

        }

    }

 

 

    static class Indexer implements Runnable {

 

        private final BlockingQueue fileQueue;

 

        Indexer(BlockingQueue fileQueue) {

            this.fileQueue = fileQueue;

        }

 

        public void run() {

            while (true) {

                try {

                    System.out.println("消费者开始消费:" + fileQueue.size());

                    File file = (File) fileQueue.take(); //take

                    count.incrementAndGet();

                    System.out.println(file.getName());

                } catch (InterruptedException e) {

                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                }

            }

        }

    }

 

}

 

三、阻塞队列的实现类

基本的任务排队方式有三种:

有界:ArrayBlockingQueue: 一个由数组支持的有界阻塞队列。

无界:LinkedBlockingQueue: 一个基于已链接节点的、范围任意的blocking queue。

同步移交:SynchronousQueue: 同步队列,put和take串行执行。生产者对其的插入操作必须等待消费者的移除操作,反之亦然。同步队列类似于信道,它非常适合传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

synchronousQueue的思想:

参考:http://ifeve.com/java-synchronousqueue/

实例:

public class SynchroNousQueueTest {

    public static void main(String args[]) {

//        final SynchronousQueue synchronousQueue = new SynchronousQueue();

        SynchroNousQueueTest synchroNousQueueTest = new SynchroNousQueueTest();

        final MyShnchronouseQueue<String> synchronousQueue = synchroNousQueueTest.new MyShnchronouseQueue<String>();

        //1。开启一个生产者线程

        Thread threadPut = new Thread(new Runnable() {

            public void run() {

                try {

                    for (int i = 0; i < 10; i++) {

                        synchronousQueue.put(i + "");

                        System.out.println("synchronousQueue,insert element:" + i);

                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                }

            }

        });

 

        //2。开启一个消费者线程

        Thread threadTask = new Thread(new Runnable() {

            public void run() {

                try {

                    for (int i = 0; i < 10; i++) {

                        synchronousQueue.take();

                        System.out.println("synchronousQueue,output element:" + i);

                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                }

            }

        });

        threadPut.start();

        threadTask.start();

    }

 

    class MyShnchronouseQueue<E> {

        Lock lock = new ReentrantLock();

        Condition isFull = lock.newCondition();

        Condition isEmpty = lock.newCondition();

        boolean flag = false;   //同步开关

        E item = null;     //只有一个元素

 

        public void put(E e) throws InterruptedException {

            lock.lock();

            try {

                while (flag) {    // 当开关为true时,put阻塞,一直await

                    isEmpty.await();

                }

                //当开关为false之后,改为true,item设值,唤醒消费者消费

                flag = true;

                item = e;

                isFull.signalAll();

            } catch (Exception e1) {

                e1.printStackTrace();

            } finally {

                lock.unlock();

            }

 

        }

 

        public synchronized E take() throws InterruptedException {

            lock.lock();

            try {

                while (!flag) {      // 当开关为false时,take阻塞,一直await

                    isFull.await();

                }

                //当开关为true之后,改为false,获取item的值,唤醒生产者生产

                flag = false;

                E e = item;

                item = null;

                isEmpty.signalAll();

                return e;

            } catch (Exception e1) {

                e1.printStackTrace();

            } finally {

                lock.unlock();

            }

            return null;

        }

    }

}

 

结果:

synchronousQueue,insert element:0

synchronousQueue,output element:0

synchronousQueue,output element:1

synchronousQueue,insert element:1

synchronousQueue,insert element:2

synchronousQueue,output element:2

synchronousQueue,insert element:3

synchronousQueue,output element:3

synchronousQueue,insert element:4

synchronousQueue,output element:4

synchronousQueue,insert element:5

synchronousQueue,output element:5

synchronousQueue,insert element:6

synchronousQueue,output element:6

synchronousQueue,insert element:7

synchronousQueue,output element:7

synchronousQueue,insert element:8

synchronousQueue,output element:8

synchronousQueue,insert element:9

synchronousQueue,output element:9

 

四、线程池的选择:

 

根据这些队列的不同特性,我们的线程池也定义了不同的类别:

 

单一线程池:可以看到corePoolSize=1

    public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService

            (new ThreadPoolExecutor(1, 1,

                                    0L, TimeUnit.MILLISECONDS,

                                    new LinkedBlockingQueue<Runnable>()));

    }

 

固定大小线程池:corePoolSize和maximumPoolSize固定,

    public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>());

    }

 

无界线程池:maximumPoolSize为Integer.MAX_VALUE

    public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }

 

前两者默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理他们的速度,那么队列将无限制地增加。

一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(这就需要一些饱和策略)在使用有界队列工作时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时可以减少上下文切换,但付出的代价是可能会限制吞吐量。

对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值。

对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择。它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么狠容易发生过载问题。

 

从另一个维度来看:cpu密集型任务,由于cpu使用率一直很高,这时的线程不宜过多,建议配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务由于线程并不是一直在执行任务,IO比较频繁,所以可以配置较多的线程,如2*Ncpu。

 

BlockingQueue

标签:配置   trace   nbsp   array   await   唤醒   设计   cat   link   

原文地址:https://www.cnblogs.com/qi-dian-ao/p/8467963.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!