码迷,mamicode.com
首页 > 其他好文 > 详细

文章标题

时间:2016-07-17 17:04:58      阅读:237      评论:0      收藏:0      [点我收藏+]

标签:

(可能会有很多错误,请谨慎阅读,如果本人发现会及时更新)。
最近在学习多线程编程,周末的时候用java写了一个生产者消费模型,这里做一些记录和总结。

基本模板

Producer
  while(true)
    data = generateData()
    queue.enqueue(data)

Consumer
  while(true)
    data = queue.dequeue()

main
  sharedQueue = new Queue()
  producer1, producer2 ...
  consumer1, consumer2 ...
  start producer1, producer2 ...
  start consumer1, consumer2 ...
  stop producer1, producer2 ...
  stop consumer1, consumer2 ...

各种错误版

最简单版

这个程序中工作任务很简单就是生产者不断放入一些随机数到队列中,而消费者就是不断取出这些数并打印。

public class ProducerConsumer {
    private static Queue<Integer> queue;
    public static void main(String[] args) {
        queue = new LinkedList<Integer>();

        int producerNum = 1;
        Producer[] producers = new Producer[producerNum];
        for (int i = 0; i < producerNum; i++) {
            producers[i] = new Producer(queue);
            producers[i].start();
        }

        int consumerNum = 2;
        Consumer[] consumers = new Consumer[consumerNum];
        for (int j = 0; j < consumerNum; j++) {
            consumers[j] = new Consumer(queue);
            consumers[j].start();
        }

        try {
            Thread.sleep(2 * 1000);
            System.out.println("Main thread is awakened now!");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // stopping all producers and consumers
        for (int i = 0; i < producerNum; i++) {
            producers[i].stopLoop();
        }
        for (int j = 0; j < consumerNum; j++) {
            consumers[j].stopLoop();
        }
        // make sure all threads is stopping
        for (int i = 0; i < producerNum; i++) {
            try {
                producers[i].join();
                System.out.println("Producer: " + producers[i].getId() + " is stopped!");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        for (int j = 0; j < consumerNum; j++) {
            try {
                consumers[j].join();
                System.out.println("Consumer: " + consumers[j].getId() + " is stopped!");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private boolean running = true;
    private long threadId; // 因为每次run之后,才会进入到新的thread
    private static Lock lock = new ReentrantLock();
    public Producer(Queue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        threadId = Thread.currentThread().getId();
        while (running) {
            try {
                double item = Math.random() * 100;
                lock.lock(); // 这里为什么加锁?因为queue我们使用的是LinkedList,所以offer操作不是同步的,不能让不同的生产者互相争抢
                queue.offer((int)item);
                System.out.println("Produce:" + threadId + "--" + item);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock(); // 放在finally中,保证程序不会中途出错而导致解锁步骤不运行
            }
        }
    }
    public void stopLoop() {
        System.out.println("Stopping Producer -- " + threadId);
        running = false;
    }
}
class Consumer extends Thread {
    /**
     * 这版开始加锁,不再出现NullPointerException错误
     * 消费的过程中其实和生产的过程没有冲突,所以只要消费者之间共享一把锁就行了
     * 让queue.peek 和 poll绑定,使之检查是有效的
     */
    private Queue<Integer> queue;
    private boolean running = true;
    private static Lock lock = new ReentrantLock();
    private long threadId;
    public Consumer(Queue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        threadId = Thread.currentThread().getId();
        while (running) {
        if (queue.peek() != null) {
          try {
                lock.lock(); // 加锁第一是因为poll非同步,还有peek和poll之间非原子性,不加锁会导致peek检查很容易失效
                    if (queue.peek() != null) { // double check
                        int item = queue.poll();
                        System.out.println("Consume:" + threadId +" -- " + item);
                    } else {
                        System.out.println("Consume:" + threadId +" -- empty");
                    }
          } catch (InterruptedException e) {
            e.printStackTrace();
          } finally {
            lock.unlock();
          }
        }
        }
    }
    public void stopLoop() {
        System.out.println("Stopping Consumer: " + threadId);
        running = false;
    }
}

开始点评这个版本:
- 巨大的缺点就是queue是无界的,很容易造成queue被爆掉
- running这个变量非同步,所以在主线程中调用stopLoop虽然可能生效,但是按照java的内存模型来说,没有同步的变量在不同线程中可能不能被互相观察到,这导致consumer的线程都观察不到running已经被主线程设置为false了。从而导致程序停不下来(查看Effective java的66条)。解决这个问题有两种方法,一种是使用synchronized修饰的方法来封装running的读写,一种是把running秀事成volatile(参看深入理解Java虚拟机:JVM高级特性与最佳实践)。

queue实现有界版

自己利用LinkedList实现了一个BlockQueue,当然java中本身就有个这个而数据结构,这个造轮子只是为了理解其中的原理。
这次这个程序稍微复杂一些,存取的不在是随机数,而是一个个log。

/**
 * 1、第一版的时候我们只用了linkedlist来模拟queu,但是让出现producer特别慢,但是consumer又总是再探测浪费资源?
 * 2、还有就是producer产生速度过去快,难道就让它把内存挤爆吗?
 * 3、所以我们需要blockingQueue,第一是限制queue的大小,第二是协调两方的生产和消费速度。
 * @author xiedandan
 *
 * 疑问点:使用monitor来设计一个blocking Queue
 * 1. 不对queue进行synchronize,会爆出IllegalMonitorException
 * 2. 然后开始不断调整synchronize位置,比如在while(running)外边,在while(running)里面
 * 3. 然后就开了怎么都唤不醒consumer的bug
 */
public class ProducerConsumerBlockingQueue {
    private static SudoBlockingQueue<Log> queue;
    public static void main(String[] args) {
      // 和格版本一致
  }
}
/* mimic a bin-log restore system.
 * Producer to produce operating logs and consumer to read the logs and try to restore the record.
 * log schema: transactionId, operatingType, dataValue(before current operation)
 *
 * version 1: Suppose we only have one row record in whole table and only add operation,
 *            so the log order is not important.
 */
class ProducerLog extends Thread {
    private SudoBlockingQueue<Log> queue;
    private boolean volatile running = true;
    private long threadId;
    // private Lock lock; // 1. 为啥在这里我们不用lock呢?因为都封装在queue自身中了。
    public ProducerLog(SudoBlockingQueue<Log> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        threadId = Thread.currentThread().getId();
        while(running) {
            try {
                synchronized(queue) { // 2. 试试把这句话取掉会产生结果?
        //          1.if (queue.isFull()) { // 3. 为什么不能使用if,而要使用while?
                    while (queue.isFull()) {
                        System.out.println("Proudce:" + threadId + "--- wait");
                        queue.wait();
                    }
                    boolean e = queue.isEmpty();
                    Log log = new Log(1, 1, (int)(Math.random() * 100));
                    System.out.println("Proudce:" + threadId + "---" + log);
                    queue.offer(log);
                    if (e) {
                        System.out.println("Proudce:" + threadId + "--- notify all");
                        queue.notifyAll();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void stopLoop() {
        System.out.println("Stopping Prouder:" + threadId);
        running = false;
        queue.syncNotifyAll(); // 4. 为啥这里还有再次notifyall一次呢?
    }
}
class ConsumerLog extends Thread {
    private SudoBlockingQueue<Log> queue;
    private boolean running = true;
    private long threadId;
    public ConsumerLog(SudoBlockingQueue<Log> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        threadId = Thread.currentThread().getId();
        while(running) {
            try {
                synchronized(queue) {
                    while (running && queue.isEmpty()) { //注意这里不仅要检查empty,还有检查running,不然程序会迅速再次陷入wait不能退出。
                        System.out.println("Consumer:" + threadId + "--wait");
                        queue.wait();
                    }
                    boolean f = queue.isFull();
                    Log log = queue.poll();
                    System.out.println("Consumer:" + threadId + "--" + log);
                    if (f) {
                        System.out.println("Consumer:" + threadId + "--notify");
                        queue.notifyAll();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void stopLoop() {
        System.out.println("Stopping Consumer -- " + threadId);
        running = false;
        queue.syncNotifyAll();
    }
}
class Log {
    public int tranId;
    public int operaType;
    public int data;
    public Log(int tranId, int operaType, int data) {
        this.tranId = tranId;
        this.operaType = operaType;
        this.data = data;
    }
    @Override
    public String toString() {
        return "TranId:" + tranId + ",OperaType:" + operaType + ",Data" + data;
    }
}
class SudoBlockingQueue <T> extends LinkedList<T> {
    private static final long serialVersionUID = 13344L;
    private final int CAPACITY;
    public SudoBlockingQueue(int capacity) {
        CAPACITY = capacity;
    }
    boolean isFull() {
        return super.size() == CAPACITY;
    }
    int getCapacity() {
        return CAPACITY;
    }
    public synchronized void syncNotifyAll() {
        this.notifyAll();
    }
    public synchronized void syncWait() {
        try {
            this.wait();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

回答上面的问题:

  1. 为什么要对queue进行同步呢?原因之一是是前一个例子一样lock的作用,同步offer的。还有一个重要的原因是queue.wait()和queue.notify()必须在拥有当前queue的控制权时候才能进行。不然会报IllegalMonitorException。
  2. 为什么不能使用if,而要使用while?原因是为了double check,想想有3个消费者,其生产的消费速度远远大于生产速度。当一个任务被生产出来,唤醒了全部的consumers,这时候多个consumer准备结束queue.wait()。但是只有一个consumer能抢到资源,其他的consumer如果不再次做isEmpty检查就会误以为任务,继续走下去,结果发生错误。一定参看这篇文章:如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例
  3. 为啥这里还有再次notifyAll一次呢?因为当stop的时候有些线程正处于wait装填,必须唤醒才能从while(running)跳出来。因为notifyAll需要保证获得queue的控制权,所以需要synchronized。

这个版本设计的也比较糟糕,因为需要调用多次的stopLoop,可以把running设计成static,只用调用一次stopLoop即可。

写正确一个并发程序真实不容易,特别容易遇到各种死锁而结束不了程序。学会并发编程:
1. 生产者消费模型
2. 线程池
3. 进程池

refenrence

文章标题

标签:

原文地址:http://blog.csdn.net/physicsdandan/article/details/51931975

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!