码迷,mamicode.com
首页 > 编程语言 > 详细

线程池的实现原理

时间:2017-08-05 20:27:50      阅读:146      评论:0      收藏:0      [点我收藏+]

标签:try   post   了解   示意图   exception   complete   adf   运行时间   nano   

当向线程池提交一个任务之后,线程池是怎样处理这个任务的呢?本节来看一下线程池
的主要处理流程,处理流程图所看到的。
从图中能够看出,当提交一个新任务到线程池时,线程池的处理流程例如以下。


1)线程池推断核心线程池里的线程是否都在运行任务。

假设不是。则创建一个新的工作
线程来运行任务。假设核心线程池里的线程都在运行任务,则进入下个流程。


2)线程池推断工作队列是否已经满。

假设工作队列没有满。则将新提交的任务存储在这
个工作队列里。假设工作队列满了,则进入下个流程。


3)线程池推断线程池的线程是否都处于工作状态。假设没有,则创建一个新的工作线程
来运行任务。假设已经满了。则交给饱和策略来处理这个任务。


ThreadPoolExecutor运行execute()方法的示意图,如图所看到的。


技术分享

技术分享

ThreadPoolExecutor运行execute方法分下面4种情况。
1)假设当前运行的线程少于corePoolSize,则创建新线程来运行任务(注意,运行这一步骤
须要获取全局锁)。
2)假设运行的线程等于或多于corePoolSize,则将任务增加BlockingQueue。
3)假设无法将任务增加BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执
行这一步骤须要获取全局锁)。


4)假设创建新线程将使当前运行的线程超出maximumPoolSize。任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor採取上述步骤的整体设计思路,是为了在运行execute()方法时。尽可能
地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完毕预热之后
(当前运行的线程数大于等于corePoolSize),差点儿全部的execute()方法调用都是运行步骤2。而
步骤2不须要获取全局锁。

源代码分析:上面的流程分析让我们非常直观地了解了线程池的工作原理。让我们再通过源代
码来看看是怎样实现的。线程池运行任务的方法例如以下。

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 假设线程数小于基本线程数,则创建线程并运行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 如线程数大于等于基本线程数或线程创建失败。则将当前任务放到工作队列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
} // 假设线程池不处于运行中或任务无法放入队列,而且当前线程数量小于最大同意的线程数量,
// 则创建一个线程运行任务。

else if (!addIfUnderMaximumPoolSize(command)) // 抛出RejectedExecutionException异常 reject(command); // is shutdown or saturated } }

工作线程:线程池创建线程时。会将线程封装成工作线程Worker,Worker在运行完任务
后。还会循环获取工作队列里的任务来运行。我们能够从Worker类的run()方法里看到这点。

public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

线程池的创建

我们能够通过ThreadPoolExecutor来创建一个线程池。

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
milliseconds,runnableTaskQueue, handler);

创建一个线程池时须要输入几个參数。例如以下。
1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时。线程池会创建一个线
程来运行任务,即使其它空暇的基本线程能够运行新任务也会创建线程,等到须要运行的任
务数大于线程池基本大小时就不再创建。假设调用了线程池的prestartAllCoreThreads()方法,
线程池会提前创建并启动全部基本线程。
2)runnableTaskQueue(任务队列):用于保存等待运行的任务的堵塞队列。能够选择下面几
个堵塞队列。


·ArrayBlockingQueue:是一个基于数组结构的有界堵塞队列,此队列按FIFO(先进先出)原
则对元素进行排序。
·LinkedBlockingQueue:一个基于链表结构的堵塞队列。此队列按FIFO排序元素。吞吐量通
常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
·SynchronousQueue:一个不存储元素的堵塞队列。每一个插入操作必须等到还有一个线程调用
移除操作。否则插入操作一直处于堵塞状态。吞吐量通常要高于Linked-BlockingQueue,静态工
厂方法Executors.newCachedThreadPool使用了这个队列。
·PriorityBlockingQueue:一个具有优先级的无限堵塞队列。


3)maximumPoolSize(线程池最大数量):线程池同意创建的最大线程数。

假设队列满了,并
且已创建的线程数小于最大线程数。则线程池会再创建新的线程运行任务。值得注意的是,如
果使用了无界的任务队列这个參数就没什么效果。
4)ThreadFactory:用于设置创建线程的工厂,能够通过线程工厂给每一个创建出来的线程设
置更有意义的名字。

使用开源框架guava提供的ThreadFactoryBuilder能够高速给线程池里的线
程设置有意义的名字。代码例如以下。

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状
态。那么必须採取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法
处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了下面4种策略。
·AbortPolicy:直接抛出异常。
·CallerRunsPolicy:仅仅用调用者所在线程来运行任务。
·DiscardOldestPolicy:丢弃队列里近期的一个任务,并运行当前任务。
·DiscardPolicy:不处理,丢弃掉。
当然。也能够依据应用场景须要来实现RejectedExecutionHandler接口自己定义策略。如记录
日志或持久化存储不能处理的任务。


·keepAliveTime(线程活动保持时间):线程池的工作线程空暇后,保持存活的时间。所以,
假设任务非常多。而且每一个任务运行的时间比較短,能够调大时间,提高线程的利用率。
·TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟
(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之中的一个毫秒)和纳秒
(NANOSECONDS。千分之中的一个微秒)。

向线程池提交任务

能够使用两个方法向线程池提交任务,分别为execute()和submit()方法。


execute()方法用于提交不须要返回值的任务,所以无法推断任务是否被线程池运行成功。
通过下面代码可知execute()方法输入的任务是一个Runnable类的实例。

threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});

submit()方法用于提交须要返回值的任务。线程池会返回一个future类型的对象。通过这个
future对象能够推断任务是否运行成功,而且能够通过future的get()方法来获取返回值,get()方
法会堵塞当前线程直到任务完毕。而使用get(long timeout,TimeUnit unit)方法则会堵塞当前线
程一段时间后马上返回。这时候有可能任务没有运行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法运行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性。能够从下面几个角度来分析。
·任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
·任务的优先级:高、中和低。
·任务的运行时间:长、中和短。
·任务的依赖性:是否依赖其它系统资源,如数据库连接。
性质不同的任务能够用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的
线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并非一直在运行任务,则应配
置尽可能多的线程,如2*Ncpu。混合型的任务,假设能够拆分,将其拆分成一个CPU密集型任务
和一个IO密集型任务,仅仅要这两个任务运行的时间相差不是太大,那么分解后运行的吞吐量
将高于串行运行的吞吐量。假设这两个任务运行时间相差太大。则不是必需进行分解。能够通过
Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务能够使用优先级队列PriorityBlockingQueue来处理。它能够让优先级高
的任务先运行。
注意 假设一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能
运行。
运行时间不同的任务能够交给不同规模的线程池来处理,或者能够使用优先级队列,让
运行时间短的任务先运行。
依赖数据库连接池的任务,由于线程提交SQL后须要等待数据库返回结果,等待的时间越
长。则CPU空暇时间就越长,那么线程数应该设置得越大,这样才干更好地利用CPU。

建议使用有界队列。

有界队列能增加系统的稳定性和预警能力,能够依据须要设大一点
儿,比方几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任
务的异常,通过排查发现是数据库出现了问题。导致运行SQL变得非常缓慢。由于后台任务线
程池里的任务全是须要向数据库查询和插入数据的。所以导致线程池里的工作线程全部阻
塞,任务积压在线程池里。

假设当时我们设置成无界队列。那么线程池的队列就会越来越多。
有可能会撑满内存,导致整个系统不可用,而不仅仅是后台任务出现故障。

当然,我们的系统所
有的任务是用单独的server部署的。我们使用不同规模的线程池完毕不同类型的任务,可是
出现这样问题时也会影响到其它任务。

线程池的监控

假设在系统中大量使用线程池。则有必要对线程池进行监控。方便在出现故障时。能够根
据线程池的使用状况高速定位问题。能够通过线程池提供的參数进行监控,在监控线程池的
时候能够使用下面属性。
·taskCount:线程池须要运行的任务数量。


·completedTaskCount:线程池在运行过程中已完毕的任务数量,小于或等于taskCount。


·largestPoolSize:线程池里以前创建过的最大线程数量。通过这个数据能够知道线程池是
否以前满过。

如该数值等于线程池的最大大小。则表示线程池以前满过。


·getPoolSize:线程池的线程数量。假设线程池不销毁的话。线程池里的线程不会自己主动销
毁。所以这个大小仅仅增不减。


·getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。能够通过继承线程池来自己定义线程池,重写线程池的
beforeExecute、afterExecute和terminated方法。也能够在任务运行前、运行后和线程池关闭前执
行一些代码来进行监控。比如,监控任务的平均运行时间、最大运行时间和最小运行时间等。


这几个方法在线程池里是空方法。


protected void beforeExecute(Thread t, Runnable r) { }

摘自【Java并发编程的艺术】

线程池的实现原理

标签:try   post   了解   示意图   exception   complete   adf   运行时间   nano   

原文地址:http://www.cnblogs.com/slgkaifa/p/7291169.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!