码迷,mamicode.com
首页 > 其他好文 > 详细

并发编程—5AQS

时间:2019-05-17 23:10:27      阅读:208      评论:0      收藏:0      [点我收藏+]

标签:一个   模板模式   允许   lse   排它锁   nfa   nec   线程并发   策略   

5. AQS(AbstractQueuedSynchronizer)抽象的队列式的同步器

AQS为依赖FIFO的等待队列的阻塞锁和相关的同步器(信号量,事件等)实现提供了一个框架。设计的目标是为大部分的依赖一个原子int值表示状态的同步器提供一个有用的基础。子类必须实现一个受保护的来改变状态值,并且定义哪种状态值表示资源的获取或者释放。通过这些,类里面的其他的方法就可以实现排队和阻塞的机制。子类也可以维护其他的状态值,但是为了获取同步一般只追踪使用getState、setState、compareAndSetState修改的int值。

5.1 AbstractQueuedSynchronizer里面的设计模式--模板模式

模板模式:父类定义好了算法的框架,第一步做什么第二步做什么,同时把某些步骤的实现延迟到子类去实现。

5.1.1 模板方法,jdk已经实现好的方法,子类直接使用即可

独占式获取

  • acquire(int arg)
  • acquireInterruptibly(int arg)
  • tryAcquireNanos(int arg, long nanosTimeout)

    共享式获取

  • acquireShared(int arg)
  • acquireSharedInterruptibly(int arg)
  • tryAcquireSharedNanos(int arg, long nanosTimeout)

    独占式释放

  • release

    共享式释放

  • releaseShared(int arg)

5.1.2 子类需要实现的方法

  • tryAcquire(int arg) 独占式获取
  • tryRelease(int arg) 独占式释放
  • tryAcquireShared(int arg) 共享式获取
  • tryReleaseShared(int arg) 共享式释放
  • isHeldExclusively() 这个同步器是否处于同步模式

5.2 Node

5.2.1 waitStatus的值

  • CANCELLED = 1
    表示如果线程中断或者等待超时,会被移出同步队列。
  • SIGNAL -1
    表示后续的节点处于等待状态,当前节点通知后续节点运行。
  • CONDITION -2
    表示节点处于condition等待队列里面
  • PROPAGATE -3
    共享模式,表示状态要往后面的节点传播

5.2.2 设置和获取state的值

//同步状态被设置成了volatile,确保了state在不同线程的可见性。  
private volatile int state;
  • getState()
  • setState()
  • compareAndSetState() 原子操作设置

5.3 Condition

5.4 ReentrantLock为例子分析AQS

5.4.1 acquire(int) 独占模式的顶层获取资源的顶层方法

//1。lock方法,先通过原子操作设置state的值为1。
final void lock() {
    //不可重入。如果当前的state是0,则替换为1。同时把当前的线程设置为占有资源的线程
    if (compareAndSetState(0, 1))
    //1.1如果设置成功,当前线程设置为占有资源的线程。
        setExclusiveOwnerThread(Thread.currentThread());
    else
    //1.2否则调用acquire获取
        acquire(1);
}
//2.acquire(1) 获取资源,即使等待,直到获取成功为止。
public final void acquire(int arg) {
//如果tryAcquire返回true直接短路.该方法有子类实现。
//否则在addWaiter把当前线程打包成一个Node节点放到同步队列队尾
//通过
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//2.1 tryAcquire(arg)
//子类需要自己实现tryAcquire逻辑。主要通过(getState/setState/compareAndSetState),是否可重入,是否阻塞

//非公平锁资源获取
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//2.2 addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
    //先通过原子操作compareAndSetTail,尝试把当前node加到同步队列的尾部。        
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //上一步失败则通过enq入队。       
    enq(node);
    return node;
}
//2.2。1 enq方法   
private Node enq(final Node node) {
//使用自旋的方式
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
            // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            //队列不为空。则把当前node加入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
//2.3 acquireQueued(Node, int) 线程已经加入队尾,处于等待状态。直到其他线程释放线程唤醒自己。自己获取资源后,再处理自己的逻辑。
final boolean acquireQueued(final Node node, int arg) {
    //是否获得资源,true表示未获得
    boolean failed = true;
    try {
        //是否被中断过
        boolean interrupted = false;
        for (;;) {//自旋的方式
            //获取前驱节点
            final Node p = node.predecessor();
            //如果前驱节点是head节点,那么当前节点是第二个节点。
            //并且尝试获取资源成功,则设置当前节点为head
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果获取失败。将前驱节点的waitStatus改为Node.SIGNAL,就是告诉前驱节点,如果你释放了资源通知我一下。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//2.3.1 shouldParkAfterFailedAcquire(Node,Node) 获取资源失败后,判断前驱节点的waitStatus的值,1)如果前驱节点是取消状态舍弃;2)如果是正常状态设置前驱节点的状态是Node.SIGNAL并且返回true;3)如果前驱节点状态时Node.SIGNAL,直接返回true。
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果waitStatus = Node.SIGNAL,返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //如果waitStatus = Node.CANCELLED,则舍弃这个前驱节点。继续判断下一个前驱节点的waitStatus
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
              * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//2.3.2 parkAndCheckInterrupt() 如果当前节点的前驱节点是Node.SIGNAL,则把当前线程挂起,处于等待状态。
private final boolean parkAndCheckInterrupt() {
    //park的县城会处于waiting状态Thread。State。WAITING,通过unpark方法或者interrupt()方法唤醒
    LockSupport.park(this);
    return Thread.interrupted();
}

5.4.2 release(int) 独占式模式下释放资源

//1.独占模式下释放资源 一般都回成功,返回true
public final boolean release(int arg) {
    //当state=0时,返回true
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//2. tryRelease(int) 子类自己实现的方法,用于释放资源。
//一般会释放成功。因为只有获取了资源的线程才会释放资源,直接减去相应量的资源。不用考虑线程安全问题。不需要原则操作。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
//3. unparkSuccessor(Node) 唤醒下一个节点。如果下一个节点null或者cancelled,从队尾往前取节点依次唤醒。
 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }    

5.4.3 acquireShared(int) 共享模式下获取资源

//tryAcquireShared方法尝试获取资源,成功则直接返回。
//aqs里面已经定义好,获取失败小于0。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

//2.doAcquireShared 如果调用tryAcquireShared获取失败,这里则调用doAcquireShared把节点放入同步队列尾部,并且通过park()让其休息,直到有线程唤醒它。
 privateprivate void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    } void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
// 3。  setHeadAndPropagate(node, r); 将head指向自己,如果还有剩余量,则继续花唤醒下一个邻居线程。

5.4.4 releaseShared() 共享模式下释放资源

//1.releaseShared 先调用tryReleaseShared方法尝试释放
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
//2。唤醒后继节点
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}     

独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

5.4.5 如何实现可重入

//以公平所为例子
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //关键是这里如果当前线程就是目前占有所的线程。则把state增加对应的acquires。因为当前线程持有锁,所以这里使用setState(nextc)来直接设置state的值
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//可以猜测在释放资源的时候会扣减对应的releases。最终state=0,唤醒等待队列里面的下一个节点(如果存在的话)。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

5.4.6 如何实现公平锁和非公平锁

//1。公平锁的lock vs 非公平锁的lock
//1.1 fairSync
final void lock() {
    acquire(1);
}
//1.2 NonfairSync
final void lock() {
    //一上来完全不管同步队列里面的节点,直接判断一把,如果同步器的state=0,立刻用原子操作修改stata值。返回true表示抢占资源成功。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
    //上一步失败了,才尝试获取
        acquire(1);
}
//2. 公平锁的lock vs 非公平锁的acquire(1)
//2.1 NonfairSync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    //和fairSync不同,不需要判断当前线程是否同步队列的第一个节点。直接进入if里面的逻辑。
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
。。。。
}
//2.2 fairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    //hasQueuedPredecessors()方法会判断当前节点是否有强制节点,不存在前置节点才走if里面的逻辑。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
。。。。
}

5.4.7 可重入的读写锁ReentrantReadWriteLock如何实现读写锁

排它锁一个时刻只允许一个线程访问。而读写锁一个时刻允许多个读线程访问,当写锁被其他线程占有未释放时,所有的读线程和其他读线程均被阻塞。在ReentrantReadWriteLock内部分别维护了一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大的提升。同步器的同步状态通过把状态int值分成高16位和低16位分别表示读锁和写锁的状态。

//1.ReentrantReadWriteLock内部分别定义了一个writerLock和readerLock
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
//2。1 读锁lock 共享模式获取资源
public final void acquireShared(int arg) {
    //先调用子类自己实现的tryAcquireShared尝试获取资源
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
//2.1.1 tryAcquireShared 尝试获取
protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            //c是锁状态为:高位16位表示共享锁的数量,低位16位表示独占锁的数量
            int c = getState();
            //exclusiveCount(c) 取低16位的值,也就是写锁状态位:不等于0表示写锁被占用
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                //写锁被其他线程占用,获取读锁失败
                return -1;
            //取高16位的值,读锁状态位    
            int r = sharedCount(c);
            //readerShouldBlock()根据读锁获取策略,返回是否阻塞当前读锁获取操作。后面会详细说明此方法
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                //cas修改高16位的读锁状态,即获取读锁
                compareAndSetState(c, c + SHARED_UNIT)) {
                //首次获取读锁
                if (r == 0) {
                //缓存首次获取读锁的线程,及其读锁重入次数
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {//线程重入
                    firstReaderHoldCount++;
                } else {
                    //cachedHoldCounter是最后获取锁的线程的读锁重入次数
                    HoldCounter rh = cachedHoldCounter;
                    /readHolds是缓存了当前线程的读锁重入次数信息的ThreadLocal
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                     //缓存当前线程的holdCounter
                     //fullTryAcquireShared()方法中,
                     //获取读锁失败的线程会执行:readHolds.remove(),故此时需要重新设置 
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //首次获取读锁失败后,重试获取
            return fullTryAcquireShared(current);
        }
//2.1.2。1 fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
    //rh表示当前线程的锁计数器
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
         //写锁被占用
        if (exclusiveCount(c) != 0) {
        //如果其他线程占用,读锁获取失败。如果当前读线程占用,表示“锁降级”。
        //如果这里不支持锁降级,这里线程持有写锁(exclusiveCount(c) != 0),这里会出现死锁。
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}       
//2.1.2 doAcquireShared
//AQS里面doAcquireShared方法,如果当前节点是head.next时,才会尝试获取资源。
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //tryAcquireShared(arg);如果获取成功则返回还剩余多少个资源。
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //如果还有剩余量,继续唤醒下一个邻居线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

//FairSyn
 //如果当前线程不是同步队列头结点的next节点(head.next)则阻塞当前线程 
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

//NonFairSyn
//为了防止写线程饥饿,如果同步队列中的第一个线程是以独占模式获取锁(写锁),那么当前获取读锁的线程需要阻塞,让队列中的第一个线程先执行
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
//2.2 写锁lock

 跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。
那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,
他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。
独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。
当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

并发编程—5AQS

标签:一个   模板模式   允许   lse   排它锁   nfa   nec   线程并发   策略   

原文地址:https://www.cnblogs.com/codetree/p/10884170.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!