标签:on() 空闲 over release 没有 ret oid sig perm
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理的使用公共资源。
使用Semaphore来简单模拟数据库连接池
public class Pool {
    //可同时访问资源的最大线程数
    private static final int MAX_AVAILABLE = 100;
    //信号量 表示:可获取的对象通行证
    private final Semaphore available = new Semaphore(MAX_AVAILABLE,true);
    //共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是连接池
    protected Object[] items = new Object[MAX_AVAILABLE];
    //共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false
    protected boolean[] used =  new boolean[MAX_AVAILABLE];
    /**
     * 获取一个空闲对象
     * 如若无空闲对象则等待,直到有空闲对象为止
     */
    public Object getItem() throws InterruptedException{
        available.acquire();
        return getNextAvailableItem();
    }
    /**
     * 获取池中的一个空闲对象,获取成功后返回Object,失败返回null
     * 成功后将对应的used[i] 设置为true
     */
    private synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; i++) {
            if(!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }
    /**
     * 归还对象到池中
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }
    /**
     *  归还对象到池中,归还成功返回true
     *  归还失败:
     *  1、池中不存在该对象的引用,返回false
     *  2、池中含有该对象的引用,但该对象目前状态为空闲状态,也返回false
     */
    private synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; i++) {
            if(item == items[i]) {
                if(used[i]) {
                    used[i] = false;
                    return true;
                }else {
                    return false;
                }
            }
        }
        return false;
    }
}
1、Semaphore的构造方法
/**
 *  permits:通行证的个数
 *  fair: 是否公平的获取锁
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
 *  默认是非公平锁
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
 
2、Semaphore公平锁模式下的acquire()方法
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
/**
 *  共享式地获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,
 *  在同一时刻可以有多个线程获取到同步状态,该方法可以响应中断
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //条件成立:说明当前调用acquire方法的线程 已经是 中断状态了,直接抛出异常..
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果获取同步状态失败,返回值小于0
    if (tryAcquireShared(arg) < 0)
     //可共享的中断模式下尝试获取锁
        doAcquireSharedInterruptibly(arg);
}
/**
 * 尝试获取通行证,获取成功返回 >= 0的值;
 * 获取失败 返回 < 0 值
 */
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //  //判断当前 AQS 阻塞队列内 是否有等待者线程,如果有直接返回-1,表示当前aquire操作的线程需要进入到队列等待..
        if (hasQueuedPredecessors())
            return -1;
        //执行到这里,有哪几种情况?
        //1.调用aquire时 AQS阻塞队列内没有其它等待者
        //2.当前节点 在阻塞队列内是headNext节点
        //获取state ,state这里表示 通行证
        int available = getState();
        //remaining 表示当前线程 获取通行证完成之后,semaphore还剩余数量
        int remaining = available - acquires;
        //条件一:remaining < 0 成立,说明线程获取通行证失败..
        //条件二:前置条件,remaning >= 0, CAS更新state 成功,说明线程获取通行证成功,CAS失败,则自旋。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //将调用semaphore.await()方法的线程 包装成node加入到 AQS的阻塞队列当中。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //获取当前线程节点的前驱节点
            final Node p = node.predecessor();
            //条件成立,说明当前线程对应的节点 为 head.next节点
            if (p == head) {
                int r = tryAcquireShared(arg);
                //说明还有剩余的通行证
                if (r >= 0) {
                    //设置当前节点为 head节点,并且向后传播!(依次唤醒!)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //shouldParkAfterFailedAcquire  会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 signal(-1),返回true
            //parkAndCheckInterrupt 挂起当前节点对应的线程...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        //如果发生了中断,取消该节点竞争共享锁
        if (failed)
            cancelAcquire(node);
    }
}
/**
 * 设置当前节点为 head节点,并且向后传播!(依次唤醒!)
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //将当前节点设置为 新的 head节点。
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        //获取当前节点的后继节点..
        Node s = node.next;
        //条件一:s == null  什么时候成立呢?  当前node节点已经是 tail了,条件一会成立。 doReleaseShared() 里面会处理这种情况..
        //条件二:前置条件,s != null , 要求s节点的模式必须是 共享模式。 latch.await() -> addWaiter(Node.SHARED)
        if (s == null || s.isShared())
            //基本上所有情况都会执行到 doReleasseShared() 方法。
            doReleaseShared();
    }
}
 /**
 * 唤醒获取资源失败的线程
 *
 * Semaphore版本
 * 都有哪几种路径会调用到doReleaseShared方法呢?
 * 1、semaphore.release() -> sync.releaseShared(1) -> tryReleaseShared() -> doReleaseShared
 * 2、被唤醒的线程 ->doAcquireSharedInterruptibly() parkAndCheckInterrupt() 唤醒 -> setHeadAndPropagate() -> doReleaseShared()
 */
private void doReleaseShared() {
    for (;;) {
        //获取当前AQS 内的 头结点
        Node h = head;
        //条件一:h != null 成立,说明阻塞队列不为空..
        //条件二:h != tail 成立,说明当前阻塞队列内,除了head节点以外  还有其他节点。
        if (h != null && h != tail) {
            //执行到if里面,说明当前head 一定有 后继节点!
            int ws = h.waitStatus;
            //当前head状态 为 signal 说明 后继节点并没有被唤醒过呢...
            if (ws == Node.SIGNAL) {
                //唤醒后继节点前 将head节点的状态改为 0
                //这里为什么,使用CAS呢? 回头说...
                //当doReleaseShared方法 存在多个线程 唤醒 head.next 逻辑时,
                //CAS 可能会失败...                   
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
     //false 没有人执行setHead(node)方法,有可能是没有线程需要唤醒了,或者唤醒线程已经更新过了head节点,那么执行自旋,在唤醒接下来的节点
     //true  唤醒线程还没有执行