标签:mamicode 最大数 row epo 线程中断 有序 wait EDA boolean
BlockingQueue<Runnable> workQueue;    // 任务队列。使用workQueue.isEmpty()判断队列是否为空,而非workQueue.poll()==null判断,这样的判空方式容纳特殊队列,如DelayQueue
ReentrantLock mainLock;     
HashSet<Worker> workers;    // 线程池中的所有工作线程,仅能在mainLock下访问
Condition termination;      // 用于支持awaitTermination
// 所有的控制参数都被定义为volatile
ThreadFactory threadFactory;
RejectedExecutionHandler handler;
long keepAliveTime;
boolean allowCoreThreadTimeOut;
int corePoolSize;
int maximumPoolSize;            
int largestPoolSize;        // 线程池中工作线程的历史最大数量:largestPoolSize = workers.size() > largestPoolSize ? workers.size() : largestPoolSize;
long completedTaskCount;    // 完成任务的计数器,仅在工作线程终止时更新,仅在mainLock下访问/* 核心字段,打包了2种含义:worker线程数、线程池状态。
workerCount(低29位):已经被允许start并且不被允许stop的worker的数量。该值可能与活动线程的实际数量会出现短暂性不同
runState(高3位):状态的数值顺序是重要的,以允许有序的比较,状态流转如下
    > RUNNING -> SHUTDOWN,调用shutdown()方法,可能隐含在finalize()方法中
    > RUNNING or SHUTDOWN -> STOP,调用shutdownNow()
    > STOP -> TIDYING,当线程池是empty时
    > TIDYING -> TERMINATED,当terminate()钩子方法执行完成
    当状态为TERMINAED时,线程在awaitTermination()方法上的等待将会返回。
运行状态描述 已知:RUNNING=111, SHUTDOWN=0, STOP=001, TIDYING=010, TERMINATED=011
    RUNNING     // 接收的新任务,并且处理排队中的任务
    SHUTDOWN    // 不接受新任务,但处理排队中的任务
    STOP        // 不接受新任务,不处理排队中的任务,并且中断正在处理的任务
    TIDYING     // 所有任务已经终止,workerCount=0,将运行terminate()钩子方法
    TERMINATED  // terminate()钩子方法执行完毕
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));ReentrantLock mainLock;
HashSet<Worker> workers;    // 线程池中的所有工作线程,仅能在mainLock下访问mainLock字段主要是为了保证工作线程池字段workers(非安全集合HashMap)在多线程并发情况下的访问。至于workers为何使用HashMap而非使用安全的ConcurrentHashMap,原因如下所示:
使用
mainLock加锁操作会让操作按照顺序一个一个执行。这样保证了interruptIdleWorkers()方法在执行期间避免中断风暴,尤其是在shutdown期间。注:
interruptIdleWorkers()方法只会被shutdown()方法调用
从类图结构来看:Worker类既是锁,又是线程。
核心代码如下所示,共分为3部分:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1);   // 抑制中断操作,直到runWorker()方法开始运行,runWorker()方法运行时会先执行unlock()方法,将state设置为0
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    // 多线程核心部分:实现run()方法
    public void run() {
        runWorker(this);
    }
    
    // AQS核心实现部分
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    // 中断worker线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}worker线程自己提供了中断worker的方法interruptIfStarted()。即创建Worker对象之后,只有在worker线程运行后,才可以执行worker线程的中断操作。
抑制中断的手段:
interruptIfStarted()tryAcquire()方法中,CAS只有在state=0的时候才能操作成功,即获取到锁在中断操作的方法中:
interruptWorkers():该方法会调用interruptIfStarted()方法中断worker线程,而该方法只有在state>=0的情况下才会执行中断操作interruptIdleWorkers():该方法只有在worker.tryLock()获取到锁时,才能执行中断操作而上述情况下,state=-1,均不能执行成功。只有在runWorker()方法执行后,才会将state=0操作执行
该方法作为整体方法入口,会根据线程池的线程数量、线程池状态,来决定针对添加的task执行以下哪3种操作:
任务队列添加新task成功,且线程池仍在运行状态,但此时线程池worker线程数量为0任务队列添加新task成功,且线程池状态已经不在运行态,但从任务队列中移除新任务失败,但此时线程池worker线程数量为0任务队列(前提条件:线程池为运行态)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}该方法一共干了3件事:
workers属性中源码省略
该方法有个非常重要的关键点,即:worker线程是否会正常执行结束。即:getTask()方法一旦返回null,表明该worker线程无task可以处理,此时正常情况下该worker线程将会执行结束,线程退出。而getTask()方法中,会会根据timed来决定在BlockingQueue获取task时,是否阻塞等待。
总结:该方法一旦返回NULL,即表明该worker线程将执行结束;否则,worker线程将在BlockingQueue队列中阻塞等待
private Runnable getTask() {
    boolean timedOut = false; // 在workQueue.poll()是否超时
    for (;;) {
        /* 如下操作判断线程池的状态,是否结束该线程:
            1. 线程池状态SHUTDOWN
            2. 线程池状态STOP,且任务队列为空(即没有任何可执行任务)*/
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        int wc = workerCountOf(c);
        /* timed:用于判断worker线程调用BlockingQueue获取task时,阻塞是否为有限期的阻塞。
            timed=true,表示是有超时时间的阻塞
            timed=false,表示无限期阻塞
           allowCoreThreadTimeOut=true,则表示允许核心线程数超时 */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
        // 线程池数量大于maximumPoolSize,该线程获取任务直接返回null。
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}顾名思义,该方法时worker线程退出前的处理工作:
将当前线程完成的任务数量,累加到completedTaskCount中
尝试进行线程池终止
如果线程池仍在RUNNING状态,且worker线程没有异常退出,由于getTask()==null,即任务等待队列已经为空,此时判断coreThread是否允许超时,来限制空闲的workers的线程数量
注:是否将线程池的worker线程数量维护在一个稳定范围,仍然需要考虑。该方法就是做如此处理:worker线程在结束前,会判断线程池是否需要新增一个新的worker线程,新增worker线程的情况如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 试着看看线程池是否可以终止
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        /* 新增worker线程的情况如下:
        1. worker线程为异常终止
        2. 线程池中的worker线程数量,小于其应该有的最小值(若允许核心线程运行结束,则最小值为1,否则最小值为corePoolSize)*/
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())  // 如果min == 0,且任务队列中又增加新任务,则将min=1,workers中保留min个worker线程。
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 相当于创建一个新的worker线程,但没有马上为该worker线程分配task。该worker将会从队列中getTask()获取任务。
        addWorker(null, false);
    }
}核心操作如下:
SHUTDOWNinterruptIdleWorkers()onShutdown()tryTerminate()public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}中断风暴的产生,主要来自于如下方法:
空闲的worker线程假如多线程并发调用shutdown()方法,此时若没有mainLock锁,让线程有序的进行调用,则可能引发大规模的空闲worker线程中断
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}两个方法如下所示,其中主要区别为:
shutdown()的状态是SHUTDOWN;而shutdownNow()的状态则是STOPshutdown()中断空闲worker线程;而shutdownNow()则是中断所有worker线程shutdown()有钩子方法
中断worker的线程总共有两类:
中断空闲worker:若未中断,且不可重入锁加锁成功,再中断。非重入锁加锁的时机有两个:
runWorker()方法中,getTask()获取任务时,会加锁。即,运行可执行任务和中断线程的操作,是不可同时发生!!
中断所有worker:遍历直接中断

1、runWorker()中有前置、后置方法
2、shutdown()中有onShutdown()方法
标签:mamicode 最大数 row epo 线程中断 有序 wait EDA boolean
原文地址:https://www.cnblogs.com/wolf-w/p/12498161.html