码迷,mamicode.com
首页 > 其他好文 > 详细

重写ThreadFactory方法和拒绝策略

时间:2020-01-28 21:13:07      阅读:84      评论:0      收藏:0      [点我收藏+]

标签:system   eject   desc   block   任务   seconds   max   put   xtend   

最近项目中要用到多线程处理任务,自然就用到了ThreadPoolTaskExecutor这个对象,这个是spring对于Java的concurrent包下的ThreadPoolExecutor类的封装,对于超出等待队列大小的任务默认是使用RejectedExecutionHandler去处理拒绝的任务,而这个Handler的默认策略是AbortPolicy,直接抛出RejectedExecutionException异常,这个不符合我们的业务场景,

业务需求:我希望是对于超出的任务,主线程进行阻塞,直到有可用线程,简单的代码如下

package com.quant.dev.modules.dev.enetity;

import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @program: dev
 * @description:
 * @author: Mr.EternityZhang
 * @create: 2019-07-08 17:41
 */
@Slf4j
public class TestThread {


    static class ThreadFactoryCustom implements ThreadFactory{
        private final AtomicInteger threadNum=new AtomicInteger(1);
        private final String namePrefix;

        private ThreadFactoryCustom(String namePrefix){
            this.namePrefix=namePrefix+"-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t=new Thread(r,namePrefix+threadNum.getAndIncrement());
            if(t.isDaemon()){
                t.setDaemon(true);
            }
            if(t.getPriority()!=Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    static class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

        private final String threadName;

        private final URL url;

        public AbortPolicyWithReport(String threadName, URL url) {
            this.threadName = threadName;
            this.url = url;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String msg = String.format("Provider端线程池满!" +
                            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)" ,
                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            log.warn(msg);
            if (!e.isShutdown()) {
                try {
                    log.info("start get queue");
                    e.getQueue().put(r);
                    log.info("end get queue");
                } catch (InterruptedException ee) {
                    log.error(ee.toString(), ee);
                    Thread.currentThread().interrupt();
                }
            }
        }

    }

    public static ThreadFactory getThreadFactoryCustom(String name){
        return new ThreadFactoryCustom(name);
    }

    public static void main(String[] args) {
        String poolName="eternity";
        ThreadFactory factory=getThreadFactoryCustom(poolName);
        log.info("核数={}",Runtime.getRuntime().availableProcessors());
        ThreadPoolExecutor executor=
                new ThreadPoolExecutor(100,400,5,
                        TimeUnit.SECONDS,new LinkedBlockingDeque<>(400),factory,new AbortPolicyWithReport(poolName,null));
        Long begin=System.currentTimeMillis();
        CountDownLatch count=new CountDownLatch(2000);
        AtomicInteger integer=new AtomicInteger(1);
        for(int i=0;i<2000;i++){
            executor.execute(()->{
                try {
                    log.info("当前线程为={},数值={}",Thread.currentThread().getName(),integer);
                    integer.getAndIncrement();
                    Thread.sleep(500);
                    count.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            count.await();
            log.info("阻塞数值={}",count.getCount());
            log.info("活跃数量={}",executor.getActiveCount());
            if(executor.getActiveCount()==0){
                executor.shutdown();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("耗时={}------结果={}",System.currentTimeMillis()-begin,integer);
    }
}

阻塞原理:

之所以能实现阻塞,是基于BlockingQueue的put方法来实现的,当阻塞队列满时,put方法会一直等待

参考

https://www.jianshu.com/p/3cfd943996a1

重写ThreadFactory方法和拒绝策略

标签:system   eject   desc   block   任务   seconds   max   put   xtend   

原文地址:https://www.cnblogs.com/eternityz/p/12238755.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!