标签:must field 共享资源 void throws 接下来 中断问题 ali 工具
LockSupport类的核心方法其实就两个:park()和unark(),其中park()方法用来阻塞当前调用线程,unpark()方法用于唤醒指定线程
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,可以把许可看成是一种(0,1)信号量(Semaphore),但与 Semaphore 不同的是,许可的累加上限是1。
初始时,permit为0,当调用unpark()方法时,线程的permit加1,当调用park()方法时,如果permit为0,则调用线程进入阻塞状态。
所以以下代码不会阻塞
// 初始信号量为0,调用unpark,信号量+1
LockSupport.unpark();
// 当前信号量为1,调用park,信号量-1
LockSupport.park();
// 以下代码可以继续执行
doSomething()
AbstractQueueSynchronizer是并发工具的核心,是一个抽象类,提供公平 / 非公平获取锁,获取可重入 / 不可重入锁,共享 / 排他等功能支持
AQS框架,分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开,并替用户解决了如下问题:
这其实是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操作细节,具体规则由子类去实现。
什么是资源:
| 同步器 | 资源的定义 | 
|---|---|
| ReentrantLock | 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数 | 
| CountDownLatch | 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。 | 
| Semaphore | 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。 | 
| ReentrantReadWriteLock | 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。 | 
AQS-API
共享和排他
| 钩子方法 | 描述 | 
|---|---|
| tryAcquire | 排它获取(资源数) | 
| tryRelease | 排它释放(资源数) | 
| tryAcquireShared | 共享获取(资源数) | 
| tryReleaseShared | 共享获取(资源数) | 
| isHeldExclusively | 是否排它状态 | 
CAS操作方法
Java中CAS操作的实现都委托给一个名为UnSafe类
| 方法名 | 修饰符 | 描述 | 
|---|---|---|
| compareAndSetState | protected final | CAS修改同步状态值 | 
| compareAndSetHead | private final | CAS修改等待队列的头指针 | 
| compareAndSetTail | private final | CAS修改等待队列的尾指针 | 
| compareAndSetWaitStatus | private static final | CAS修改结点的等待状态 | 
| compareAndSetNext | private static final | CAS修改结点的next指针 | 
等待队列
| 方法名 | 修饰符 | 描述 | 
|---|---|---|
| enq | private | 入队操作 | 
| addWaiter | private | 入队操作 | 
| setHead | private | 设置头结点 | 
| unparkSuccessor | private | 唤醒后继结点 | 
| doReleaseShared | private | 释放共享结点 | 
| setHeadAndPropagate | private | 设置头结点并传播唤醒 | 
资源获取操作
| 方法名 | 修饰符 | 描述 | 
|---|---|---|
| cancelAcquire | private | 取消获取资源 | 
| shouldParkAfterFailedAcquire | private static | 判断是否阻塞当前调用线程 | 
| acquireQueued | final | 尝试获取资源,获取失败尝试阻塞线程 | 
| doAcquireInterruptibly | private | 独占地获取资源(响应中断) | 
| doAcquireNanos | private | 独占地获取资源(限时等待) | 
| doAcquireShared | private | 共享地获取资源 | 
| doAcquireSharedInterruptibly | private | 共享地获取资源(响应中断) | 
| doAcquireSharedNanos | private | 共享地获取资源(限时等待) | 
| acquire | public final | 独占地获取资源 | 
| acquireInterruptibly | public final | 独占地获取资源(响应中断) | 
| acquireInterruptibly | public final | 独占地获取资源(限时等待) | 
| acquireShared | public final | 共享地获取资源 | 
| acquireSharedInterruptibly | public final | 共享地获取资源(响应中断) | 
| tryAcquireSharedNanos | public final | 共享地获取资源(限时等待) | 
资源释放:
| 方法名 | 修饰符 | 描述 | 
|---|---|---|
| release | public final | 释放独占资源 | 
| releaseShared | public final | 释放共享资源 | 
CLH队列中的结点是对线程的包装,结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。
每种类型的结点都有一些状态,其中独占结点使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享结点使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
| 结点状态 | 值 | 描述 | 
|---|---|---|
| CANCELLED | 1 | 取消。表示后驱结点被中断或超时,需要移出队列 | 
| SIGNAL | -1 | 发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。) | 
| CONDITION | -2 | Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了 | 
| PROPAGATE | -3 | 传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。) | 
| INITIAL | 0 | 默认。新结点会处于这种状态 | 
static final class Node {
    
    // 共享模式结点
    private static final Node SHARED = new Node();
    
    // 独占模式结点
    private?static final Node EXCLUSIVE = null;
    private?static final int CANCELLED =  1;
    private?static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    // 等待状态
    volatile int waitStatus;
    // 前驱指针
    volatile Node prev;
    // 后驱指针
    volatile Node next;
    // 结点所包装的线程
    volatile Thread thread;
    // Condition队列使用,存储condition队列中的后继节点
    Node nextWaiter;
    Node() {
    }
    Node(Thread thread, Node mode) { 
        this.nextWaiter = mode;
        this.thread = thread;
    }
}
这里以ReentrantLock为例,看看 FairSync 和 NonfairSync 的源码
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897090466540L;
    
    // 加锁
    final void lock() {
        acquire(1);
    }
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 如果头和尾指向了同一个对象(null)或者头节点下一个为当前节点时,说明队列没有节点,或仅有一个当前节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
可以发现他们两个都继承自 Sync,Sync继承自AbstractQueuedSynchronizer,在ReentrantLock中的实现如下
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}
public void lock() {
    sync.lock();
}
ReentrantLock的加锁直接委托了Sync的 lock,在Sync中,lock是个抽象方法,依次查看NonfairSync 和 FairSync 的实现,如上文源码注释
非公平锁:
1、sync 修改 state (compareAndSetState),尝试直接获取锁,获取成功,则设置排他属性。获取失败,则执行AQS获取锁逻辑
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
2、执行获取锁,tryAcquire (NonfairSync 和 FairSync都有各自的实现),获取失败,获取等待队列,将线程放入等待队列中
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 将当前线程包装为独占节点加入队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
3、NonfairSync 对 tryAcquire 的实现
// 委托给父类
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
// 父类实现
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取state状态
    int c = getState();
    // 如果当前线程持有锁,设置nextc, 添加acquires,并设置给state,可见Reentrant支持可重入锁
    // 如果当前线程没有持有锁,CAS尝试获取锁,获取成功,设置排他性,否则获取锁失败,返回false
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
3、获取锁失败的处理:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 将当前节点加入到队列尾部(算是优化吧)
        node.prev = pred;
        // 设置尾节点为当前节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果尾为null, 先初始化队列
    enq(node);
    return node;
}
// node构造函数
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}
// 自旋将节点加入尾部,包含队列初始化
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // Must initialize
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
为已经在队列中的线程以独占不间断模式获取。 由条件等待方法使用以及获取
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果当前节点的前驱节点是头节点,当前线程再次获取锁,如果成功,进入if
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为头节点,返回中断状态
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 尝试阻塞线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 退出等待
        if (failed)
            cancelAcquire(node);
    }
}
// 判断是否能阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 判断前驱节点状态
    if (ws == Node.SIGNAL)
        // SIGNAL:前驱节点释放锁时,会唤醒当前节点,可以阻塞
        return true;
    if (ws > 0) {
        // CANCELED:前驱节点已中断/取消,需要从队列中移除
        // 循环检查,剔除队列前面无效的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 再将当前节点放入队列
        pred.next = node;
    } else {
        // 将前驱节点修改为 SIGNAL, 自旋再次执行此方法时,将走第一条分支
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
acquireQueued方法抛出异常时会执行 cancelAcquire
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    // 再次剔除无效的前驱节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
         node.prev = pred = pred.prev;
    // 前驱节点的原后继节点,用于后续CAS操作
    Node predNext = pred.next;
    // 当前节点设置为打断
    node.waitStatus = Node.CANCELLED;
    // 如果当前节点为尾节点,设置最后一个有效节点为尾节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 有效的前驱节点属性next设置为null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 唤醒后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    // 尝试解锁
    if (tryRelease(arg)) {
        Node h = head;
        // 释放锁成功唤醒后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
释放锁,本质为维护state变量,此处支持可重入锁,如果state值为0,说明释放,取消线程的独占,并更新state
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
唤醒后继节点
private void unparkSuccessor(Node node) {
    
    // 状态置为0,初始化
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 如果 s 为 null, 从后向前迭代,找到最前的未被CANCALLED的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒后继节点
    if (s != null)
        LockSupport.unpark(s.thread);
}
后继节点被唤醒后,回到之前的逻辑,开始争夺锁,并将头节点设置为当前节点
公平锁和非公平锁:
公平锁尝试加锁时,先判断队列中是否有等待线程,如果有,直接进队列
非公平锁直接获取锁,获取失败才进队列
中断特性:
如下代码中,使用了一个bool标记返回标识线程的中断状态,而中断锁会直接抛出异常
private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);//以独占模式放入队列尾部
    boolean failed = true;
    try {
        for (; ; ) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 中断问题,抛出异常;非中断方法中,返回中断的 bool 变量   
                throw new InterruptedException();
        }
    } finally {
        if (failed)
          cancelAcquire(node);
    }
}
限时等待:
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
具体实现:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);// 加入队列
    boolean failed = true;
    try {
        for (; ; ) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 自旋时更新剩余等待时间
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 超时直接返回获取失败
                return false;
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                // 阻塞指定时长,超时则线程自动被唤醒,自旋时,将在上一个if块退出
                // 底层通过 unsafe 实现
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())// 当前线程中断状态
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
当线程在指定Condition对象上等待的时候,是将线程包装成结点,加入了条件队列,然后阻塞。当线程被通知唤醒时,则是将条件队列中的结点转换成等待队列中的结点,之后的处理就和独占功能完全一样。
J.U.C包提供了Conditon接口,用以对原生的Object.wait()、Object.notify()进行增强。
public Condition newCondition() {
        return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}
在ReentrantLock中,通过内部ConditionObject实现了Condition接口,提供对条件队列的支持
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /**
         * First node of condition queue.
     */
    private transient Node firstWaiter;
    /**
     * Last node of condition queue.
     */
    private transient Node lastWaiter;
    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() {
    }
    ...
}
条件队列操作:
1、加入条件队列等待,条件队列入口
public final void await() throws InterruptedException {
    // 如果当前线程被中断则直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 把当前节点加入条件队列
    Node node = addConditionWaiter();
    // 释放掉已经获取的独占锁资源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果不在同步队列中则不断挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 这里被唤醒可能是正常的signal操作也可能是中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    /**
     * 走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
     * 和独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
     * 在处理中断之前首先要做的是从同步队列中成功获取锁资源
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
    // 删除条件队列中被取消的节点
    // clean up if cancelled
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 根据不同模式处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
addConditionWaiter:将当前线程包装为节点加入条件队列
/**
 * 1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
 * 2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果最后一个节点被取消,则删除队列中被取消的节点
    //至于为啥是最后一个节点后面会分析
    if (t != null && t.waitStatus != Node.CONDITION) {
        //删除所有被取消的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
删除条件队列当中被取消的节点
/**
 * 删除条件队列当中被取消的节点
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 判断中间变量
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        } else
            // 保存的是最靠前的有效条件节点
            trail = t;
        t = next;
    }
}
释放所有资源
/**
 * 入参就是新创建的节点,即当前节点
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            //如果这里释放失败,则抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        /**
         * 如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中
         * 只需要判断最后一个节点是否被取消就可以了
         */
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
判断节点是否在同步队列中
/**
 * 判断节点是否在同步队列中
 */
final boolean isOnSyncQueue(Node node) {
    //快速判断1:节点状态或者节点没有前置节点
    //注:同步队列是有头节点的,而条件队列没有
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //上面如果无法判断则进入复杂判断
    return findNodeFromTail(node);
}
2、唤醒等待队列
/**
 * 通知条件队列当中节点到同步队列当中去排队
 */
public final void signal() {
    // 节点不能已经持有独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    /**
     * 发信号通知条件队列的节点准备到同步队列当中去排队
     */
            doSignal(first);
}
排队过程
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal方法会将CONDITON结点转换为初始结点,并插入【等待队列】
final boolean transferForSignal(Node node) {
    // 尝试转化为初始节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
        
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
AQS的共享功能,通过钩子方法tryAcquireShared暴露,与独占功能最主要的区别就是:
共享功能的结点,一旦被唤醒,会向队列后部传播(Propagate)状态,以实现共享结点的连续唤醒。这也是共享的含义,当锁被释放时,所有持有该锁的共享线程都会被唤醒,并从等待队列移除。
以CountDownLatch为例:CountDownLatch内部继承了AQS,覆盖了共享获取和释放锁的方法
构造:
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    Sync(int count) {
        setState(count);
    }
    int getCount() {
        return getState();
    }
    protected int tryAcquireShared(int acquires) 
        // 只要 state == 0 就获取锁成功
        return (getState() == 0) ? 1 : -1;
    }
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
private final Sync sync;
await():
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取共享锁,取决于state变量是否为0(1:获取成功;-1:获取失败)
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 以共享节点形式加入等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点为头节点
            if (p == head) {
                // 再次获取锁
                int r = tryAcquireShared(arg);
                // 获取成功设置当前节点为头节点
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 判断阻塞条件,响应中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 移除当前节点
        if (failed)
            cancelAcquire(node);
    }
}
setHeadAndPropagate:
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    // 判断头节点等待状态
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
doReleaseShared:
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果头节点等待状态为 SIGN, 设置头节点状态归0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 传递唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 退出循环的条件是head并未改变
        if (h == head)                   // loop if head changed
            break;
    }
}
countDown():
public void countDown() {
    // 释放资源
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放资源成功后执行
        doReleaseShared();
        return true;
    }
    return false;
}标签:must field 共享资源 void throws 接下来 中断问题 ali 工具
原文地址:https://www.cnblogs.com/zuier/p/11388795.html