标签:系统 optional exception iso head 另一个 编程 current 构建
一、状态依赖性管理
1 acquire lock on object state 2 while (precondition does not hold) 3 { 4 release lock 5 wait until precondition might hold 6 optionally fail if interrupted or timeout expires 7 reacquire lock 8 } 9 perform action 10 release lock
1 //有界缓存实现的基类 2 public abstract class BaseBoundedBuffer<V> { 3 private final V[] buf; 4 private int tail; 5 private int head; 6 private int count; 7 8 protected BaseBoundedBuffer(int capacity){ 9 this.buf = (V[]) new Object[capacity]; 10 } 11 12 protected synchronized final void doPut(V v){ 13 buf[tail] = v; 14 if (++tail == buf.length){ 15 tail = 0; 16 } 17 ++count; 18 } 19 20 protected synchronized final V doTake(){ 21 V v = buf[head]; 22 buf[head] = null; //let gc collect 23 if (++head == buf.length){ 24 head = 0; 25 } 26 --count; 27 return v; 28 } 29 30 public synchronized final boolean isFull(){ 31 return count == buf.length; 32 } 33 34 public synchronized final boolean isEmpty(){ 35 return count == 0; 36 } 37 }
1、示例:将前提条件的失败传递给调用者
1 public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> { 2 public GrumyBoundedBuffer(int size){ 3 super(size); 4 } 5 6 public synchronized void put(V v){ 7 if (isFull()){ 8 throw new BufferFullException(); 9 } 10 doPut(v); 11 } 12 13 public synchronized V take(){ 14 if (isEmpty()) 15 throw new BufferEmptyExeption(); 16 return doTake(); 17 } 18 }
缺点:已满情况不应为异常;调用者自行处理失败;sleep:降低响应性;自旋等待:浪费CPU;yield让出CPU
2、示例:通过轮询与休眠来实现简单的阻塞
1 public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> { 2 private static long SLEEP_TIME; 3 public SleepyBounedBuffer(int size) { 4 super(size); 5 } 6 7 public void put(V v) throws InterruptedException{ 8 while (true){ 9 synchronized(this){ 10 if (!isFull()){ 11 doPut(v); 12 return; 13 } 14 } 15 Thread.sleep(SLEEP_TIME); 16 } 17 } 18 19 public V take() throws InterruptedException{ 20 while (true){ 21 synchronized(this){ 22 if (!isEmpty()){ 23 return doTake(); 24 } 25 } 26 Thread.sleep(SLEEP_TIME); 27 } 28 } 29 }
优点:对于调用者,无需处理失败与异常,操作可阻塞,可中断(休眠时候不要持有锁)
缺点:对于休眠时间设置的权衡(响应性与CPU资源)
3、条件队列——使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真(元素是一个个正在等待相关条件的线程)
1 public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { 2 3 public BoundedBuffer(int capacity) { 4 super(capacity); 5 } 6 7 public synchronized void put(V v) throws InterruptedException{ 8 while (isFull()){ 9 wait(); 10 } 11 doPut(v); 12 notifyAll(); 13 } 14 15 public synchronized V take() throws InterruptedException{ 16 while (isEmpty()){ 17 wait(); 18 } 19 V v = doTake(); 20 notifyAll(); 21 return v; 22 } 23 }
二、使用条件队列
1、条件谓词
2、过早唤醒——一个条件队列与多个条件谓词相关时,wait方法返回不一定线程所等待的条件谓词就变为真了
1 void stateDependentMethod() throws InterruptedException 2 { 3 synchronized(lock) // 必须通过一个锁来保护条件谓词 4 { 5 while(!condietionPredicate()) 6 lock.wait(); 7 } 8 }
当使用条件等待时(如Object.wait(), 或Condition.await()):
3、丢失信号量——线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词
如果线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个通知来唤醒它(导致活跃性下降)
4、通知——确保在条件谓词变为真时通过某种方式发出通知挂起的线程
使用notifyAll有时是低效的:唤醒的所有线程都需要竞争锁,并重新检验,而有时最终只有一个线程能执行
优化:条件通知
1 public synchronized void put(V v) throws InterruptedException 2 { 3 while(isFull()) 4 wait(); 5 boolean wasEmpty = isEmpty(); 6 doPut(v); 7 if(wasEmpty) 8 notifyAll(); 9 }
5、示例:阀门类
1 public class ThreadGate { 2 private boolean isOpen; 3 private int generation; 4 5 public synchronized void close() { 6 isOpen = false; 7 } 8 9 public synchronized void open() { 10 ++generation; 11 isOpen = true; 12 notifyAll(); 13 } 14 15 public synchronized void await() throws InterruptedException { 16 int arrivalGeneration = generation; 17 while (!isOpen && arrivalGeneration == generation) 18 wait(); 19 } 20 }
arrivalGeneration == generation为了保证在阀门打开时又立即关闭时,在打开时通知的线程都可以通过阀门
6、子类的安全问题
7、封装条件队列
8、入口协议和出口协议
三、显示的Condition对象
内置条件队列的缺点:每个内置锁都只能有一个相关联的条件队列,而多个线程可能在同一条件队列上等待不同的条件谓词,调用notifyAll通知的线程非等待同意谓词
Condition <-> Lock,内置条件队列 <-> 内置锁
1 public class ConditionBoundedBuffer<T> { 2 protected final Lock lock = new ReentrantLock(); 3 private final Condition notFull = lock.newCondition();//条件:count < items.length 4 private final Condition notEmpty = lock.newCondition();//条件:count > 0 5 private final T[] items = (T[]) new Object[100]; 6 private int tail, head, count; 7 8 public void put(T x) throws InterruptedException { 9 lock.lock(); 10 try { 11 while (count == items.length) 12 notFull.await();//等到条件count < items.length满足 13 items[tail] = x; 14 if (++tail == items.length) 15 tail = 0; 16 ++count; 17 notEmpty.signal();//通知读取等待线程 18 } finally { 19 lock.unlock(); 20 } 21 } 22 23 public T take() throws InterruptedException { 24 lock.lock(); 25 try { 26 while (count == 0) 27 notEmpty.await();//等到条件count > 0满足 28 T x = items[head]; 29 items[head] = null; 30 if (++head == items.length) 31 head = 0; 32 --count; 33 notFull.signal();//通知写入等待线程 34 return x; 35 } finally { 36 lock.unlock(); 37 } 38 } 39 }
四、Synchronizer解析
在ReentrantLock和Semaphore这两个接口之间存在许多共同点。两个类都可以用作一个”阀门“,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回”假“,表示在指定的时间内锁是不可用的或者无法获取许可)。而且,这两个接口都支持中断、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。
原因:都实现了同一个基类AbstractQueuedSynchronizer(AQS)
1 public class SemaphoreOnLock {//基于Lock的Semaphore实现 2 private final Lock lock = new ReentrantLock(); 3 //条件:permits > 0 4 private final Condition permitsAvailable = lock.newCondition(); 5 private int permits;//许可数 6 7 SemaphoreOnLock(int initialPermits) { 8 lock.lock(); 9 try { 10 permits = initialPermits; 11 } finally { 12 lock.unlock(); 13 } 14 } 15 16 //颁发许可,条件是:permits > 0 17 public void acquire() throws InterruptedException { 18 lock.lock(); 19 try { 20 while (permits <= 0)//如果没有许可,则等待 21 permitsAvailable.await(); 22 --permits;//用一个少一个 23 } finally { 24 lock.unlock(); 25 } 26 } 27 28 //归还许可 29 public void release() { 30 lock.lock(); 31 try { 32 ++permits; 33 permitsAvailable.signal(); 34 } finally { 35 lock.unlock(); 36 } 37 } 38 }
1 public class LockOnSemaphore {//基于Semaphore的Lock实现 2 //具有一个信号量的Semaphore就相当于Lock 3 private final Semaphore s = new Semaphore(1); 4 5 //获取锁 6 public void lock() throws InterruptedException { 7 s.acquire(); 8 } 9 10 //释放锁 11 public void unLock() { 12 s.release(); 13 } 14 }
五、AbstractQueuedSynchronizer
最基本的操作:
状态管理(一个整数状态):
六、java.util.concurrent 同步器类中的AQS
1、ReentrantLock
ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively
ReentrantLock将同步状态用于保存锁获取操作的次数,或者正要释放锁的时候,才会修改这个变量
Semaphore将AQS的同步状态用于保存当前可用许可的数量;CountDownLatch使用AQS的方式与Semaphore很相似,在同步状态中保存的是当前的计数值
3、FutureTask
在FutureTask中,AQS同步状态被用来保存任务的状态
FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常
4、ReentrantReadWriteLock
标签:系统 optional exception iso head 另一个 编程 current 构建
原文地址:http://www.cnblogs.com/HectorHou/p/6050443.html