码迷,mamicode.com
首页 > 其他好文 > 详细

ScheduledThreadPoolExecutor 源码分析

时间:2018-12-09 22:05:22      阅读:212      评论:0      收藏:0      [点我收藏+]

标签:time   nano   worker   count   ict   RKE   优先级   正数   并且   

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 是能够在给定的延时之后、或周期性执行被提交任务的线程池

创建实例

    /**
     *  线程池关闭时是否需要继续执行周期性任务
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     *  线程池关闭时是否需要执行已经存在的延时任务
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    /**
     *  执行 ScheduledFutureTask.cancel 操作时是否需要将任务从任务队列中移除
     */
    volatile boolean removeOnCancel;

    /**
     * Sequence number to break scheduling ties, and in turn to
     * guarantee FIFO order among tied entries.
     */
    private static final AtomicLong sequencer = new AtomicLong();

    /**
     *  默认的线程超时时间为 10 毫秒
     */
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
                DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
            ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE,
                DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
            RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
                DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
                DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                new DelayedWorkQueue(), threadFactory, handler);
    }

    private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;

        /** 当前任务需要在指定的纳秒时间后执行 */
        private volatile long time;

        /**
         *  重复任务的执行周期,以纳秒为单位
         *  正数表示以 fixed-rate 模式执行
         *  负数表示以 fixed-delay 模式执行
         *  0 表示不需要重复执行
         */
        private final long period;

        /** 需要重新入队的周期性任务  */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         *  当前任务在延迟队列中的索引,以支持快速删除
         */
        int heapIndex;

        /**
         *  创建一个在 triggerTime 执行的一次性任务
         */
        ScheduledFutureTask(Runnable r, V result, long triggerTime,
                long sequenceNumber) {
            super(r, result);
            this.time = triggerTime;
            this.period = 0;
            this.sequenceNumber = sequenceNumber;
        }

        /**
         *  创建一个在 triggerTime 第一次执行,并以 period 为周期的周期性任务
         */
        ScheduledFutureTask(Runnable r, V result, long triggerTime,
                long period, long sequenceNumber) {
            super(r, result);
            this.time = triggerTime;
            this.period = period;
            this.sequenceNumber = sequenceNumber;
        }

        /**
         * 创建一个在 triggerTime 执行的一次性任务
         */
        ScheduledFutureTask(Callable<V> callable, long triggerTime,
                long sequenceNumber) {
            super(callable);
            this.time = triggerTime;
            this.period = 0;
            this.sequenceNumber = sequenceNumber;
        }

        /**
         *  读取当前任务的延时时间
         * created by ZXD at 9 Dec 2018 T 20:40:27
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.nanoTime(), NANOSECONDS);
        }

        /**
         *  当前任务是否是周期性任务
         */
        @Override
        public boolean isPeriodic() {
            return period != 0;
        }

        /**
         * 计算周期性任务的下一次触发时间
         */
        private void setNextRunTime() {
            final long p = period;
            if (p > 0) {
                // 基于上次记录的时间进行延时,可能已经超时
                time += p;
            } else {
                // 基于 System.nanoTime() 进行延时
                time = triggerTime(-p);
            }
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        @Override
        public void run() {
            // 1)当前任务是否能在线程池中执行
            if (!canRunInCurrentRunState(this)) {
                // 不能执行,则将其取消
                cancel(false);
                // 2)如果不是周期性任务
            } else if (!isPeriodic()) {
                // 则执行该任务
                super.run();
                // 3)如果是周期性任务,运行任务并且重置状态
            } else if (super.runAndReset()) {
                // 计算周期性任务的下一次触发时间
                setNextRunTime();
                // 重新将任务加入到延时队列中
                reExecutePeriodic(outerTask);
            }
        }
    }

    /**
     *  基于二叉堆实现的延迟优先级队列,队列元素只能是 RunnableScheduledFuture 实例。
     */
    static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {

        // 初始容量
        private static final int INITIAL_CAPACITY = 16;
        // 保持元素的数组
        private RunnableScheduledFuture<?>[] queue =
                new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        // 互斥锁
        private final ReentrantLock lock = new ReentrantLock();
        // 元素总数
        private int size;

        /**
         *  在队列头部阻塞等待任务的线程
         */
        private Thread leader;

        /**
         *  是否有任务可用
         */
        private final Condition available = lock.newCondition();
    }

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        // 当前任务是否能在线程池中运行
        if (canRunInCurrentRunState(task)) {
            // 将任务加入到延时队列中
            super.getQueue().add(task);
            // 又进行一次判断,如果不能运行目标任务则尝试从延时队列中删除它
            if (canRunInCurrentRunState(task) || !remove(task)) {
                ensurePrestart();
                return;
            }
        }
        // 任务不能运行,则将其取消
        task.cancel(false);
    }

延时执行一次性任务

    /**
     *  在指定的延时后执行目标任务
     */
    @Override
    public ScheduledFuture<?> schedule(Runnable command,
            long delay,
            TimeUnit unit) {
        if (command == null || unit == null) {
            throw new NullPointerException();
        }
        // 创建一个单次延时任务
        final RunnableScheduledFuture<Void> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                        triggerTime(delay, unit),
                        sequencer.getAndIncrement()));
        // 加入延时队列中执行
        delayedExecute(t);
        return t;
    }

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        // 线程池处于 SHUTDOWN 及以上状态
        if (isShutdown()) {
            // 拒绝执行任务
            reject(task);
        } else {
            // 将任务加入延时优先级队列
            super.getQueue().add(task);
            /**
             * 当前任务不能执行 && 将其从延时队列中移除成功
             */
            if (!canRunInCurrentRunState(task) && remove(task)) {
                // 则取消该任务
                task.cancel(false);
            } else {
                /**
                 * 尝试启动一个工作者线程来处理延时任务
                 * 1)当前工作者线程 < 核心线程数
                 * 2)当前工作者线程 == 0
                 */
                ensurePrestart();
            }
        }
    }

    /**
     *  是否能在当前线程池状态下运行
     */
    boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
        // 线程池处于 RUNNING 状态,则允许运行
        if (!isShutdown()) {
            return true;
        }
        // 线程池已经停止,则不允许运行
        if (isStopped()) {
            return false;
        }
        /**
         * 线程池处于 SHUTDOWN 状态正在停止
         * 1)当前任务是周期任务,continueExistingPeriodicTasksAfterShutdown 默认为 false
         * 2)当前任务是一次性任务,executeExistingDelayedTasksAfterShutdown 默认为 false
         *  如果任务已经超时,则执行它。
         */
        return task.isPeriodic()
                ? continueExistingPeriodicTasksAfterShutdown
                        : executeExistingDelayedTasksAfterShutdown
                        || task.getDelay(NANOSECONDS) <= 0;
    }

    /**
     *  尝试启动一个工作者线程来处理延时任务
     */
    void ensurePrestart() {
        final int wc = ThreadPoolExecutor.workerCountOf(ctl.get());
        if (wc < corePoolSize) {
            addWorker(null, true);
        } else if (wc == 0) {
            addWorker(null, false);
        }
    }

延时执行周期性任务

  • 在以 unit 为单位的 initialDelay 延时后执行第一次任务,
    并在 initialDelay + period,initialDelay + 2 * period 等时间点周期性执行。
    如果任务执行时间超出 period,则下次任务会立即开始执行。
    /**
     *  在以 unit 为单位的 initialDelay 延时后执行第一次任务,并在
     *  initialDelay + period,initialDelay + 2 * period 等时间点周期性执行
     */
    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
            long initialDelay,
            long period,
            TimeUnit unit) {
        // 任务和时间单位不能为 null
        if (command == null || unit == null) {
            throw new NullPointerException();
        }
        // 执行周期必须 > 0
        if (period <= 0L) {
            throw new IllegalArgumentException();
        }
        final ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<>(command,
                        null,
                        triggerTime(initialDelay, unit),
                        unit.toNanos(period),
                        sequencer.getAndIncrement());
        final RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 在以 unit 为单位的 initialDelay 延时后执行第一次任务,
    并在当次任务执行完成之后在 delay 延时之后再次执行。
    /**
     *  在以 unit 为单位的 initialDelay 延时后执行第一次任务,并在当次任务执行完成之后
     *  在 delay 延时之后再次执行。
     */
    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
            long initialDelay,
            long delay,
            TimeUnit unit) {
        if (command == null || unit == null) {
            throw new NullPointerException();
        }
        if (delay <= 0L) {
            throw new IllegalArgumentException();
        }
        final ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<>(command,
                        null,
                        triggerTime(initialDelay, unit),
                        -unit.toNanos(delay),
                        sequencer.getAndIncrement());
        final RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

ScheduledThreadPoolExecutor 源码分析

标签:time   nano   worker   count   ict   RKE   优先级   正数   并且   

原文地址:https://www.cnblogs.com/zhuxudong/p/10093436.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!