标签:
package com.dubbo.analyzer.executor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 任务执行者<br/>
* 当线程不够且线程队列爆满时,会租塞生产者<br/>
* 此类为单例,必须先调用 init 方法进行初始化.<br/>
* <a href="http://www.importnew.com/10790.html" target="_blank">http://www.importnew.com/10790.html</a>
* @author BennyTian
* @date 2015/01/06
*/
public class Executor {
private Executor() { }
private static ThreadPoolExecutor executor = null;
private static TimeUnit unit = TimeUnit.MINUTES ;
private static long keepAliveTime = 1;
/**
* 初始化
* @param threadSize 线程数
* @param poolSize 线程租塞队列的容量
*/
public static void init(Integer threadSize,Integer poolSize){
executor = new ThreadPoolExecutor(threadSize, threadSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>(threadSize));
//异常捕获处理:当线程池达到处理最大极限的时候,调用 queue.put 对生产者进行租塞. execute方法默认使用的是异步的 queue.offer
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 执行任务
* @param task
*/
public static void execute(Runnable task){
verify();
executor.execute(task);
}
/**
* 线程池的任务是否全部执行完成
* @return
*/
public static Boolean isCompleted(){
return executor.getActiveCount() == 0;
}
/**
* 停止任务
*/
public static void shutdown(){
verify();
executor.shutdown();
}
private static void verify(){
if(executor==null){
throw new RuntimeException("please invoke [ init ] method");
}
}
}
JAVA 生产者租塞模式的线程池 ThreadPoolExecutor
标签:
原文地址:http://www.cnblogs.com/BennyTian/p/4207778.html