第九章 Java 中的线程池
Java 中的线程池是运用场景最多的并发框架,几乎所有异步或并发执行的程序都可以使用线程池。合理使用线程池的好处有:
- 降低资源消耗
- 通过重复利用已创建的线程降低线程创建和销毁时的消耗
- 提高响应速度
- 当任务抵达时,无需创建线程,就能立即执行
- 提高线程的可管理型
- 线程池能够统一分配、调优和监控线程
- 线程属于稀缺资源,不能无限制的创建
9.1 线程池的实现原理


ThreadPoolExecutor.execute(Runnable command) 方法如下:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
9.2 线程池的使用
9.2.1 创建线程池
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
通过上面的构造方法来创建一个线程池,其具体参数如下:
corePoolSize 线程池的基本大小
- 线程池的基本大小,当任务提交时,会创建线程进行执行,且不会销毁。
- 调用 prestartAllCoreThreads() 提前创建并启动所有基本线程
maximumPoolSize
- 线程池允许创建的最大线程数
keepAliveTime
- 线程活动保持时间
workQueue
- 任务队列,用于保存等待执行的任务的阻塞队列
- ArrayBlockingQueue
- DelayedWorkQueue
- ForwardingBlockingQueue
- SynchronousQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- 任务队列,用于保存等待执行的任务的阻塞队列
threadFactory
用于设置创建线程的工厂
如使用 guava 提供的 ThreadFactoryBuilder 可以快速给线程池中的线程设置有意义的名字,如:
new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
handler
- 饱和策略,当队列和线程池都满了,则需要一种策略来处理新提交的任务
- 默认策略为 AbortPolicy
- JDK 提供的几种策略:
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:只用调用者所在线程来运行任务
- DiscardPolicy:不处理,丢弃掉
9.2.2 提交任务至线程池
execute() 方法,用于提交不需要返回值的任务,所以无法判断任务是否执行成功。
submit() 方法,用于提交需要返回值的任务。线程池会返回一个 future 类型对象。
9.2.3 关闭线程池
通过 shutdown 或 shutdownNow 方法来关闭线程池。
原理:遍历线程池中的工作线程,逐个调用线程的 interrupt 方法来中断线程,所以无法相应的线程可能永远无法终止。
调用这两个方法后,isShutdown 会返回true;当所有任务都关闭后,isTerminated 返回true。
通常会使用 shutdown 来关闭线程池,但是如果任务不一定要执行完毕,可以调用 shutdownNow 方法。
9.2.4 合理地配置线程池
想要合理配置线程池,需要先分析任务的特性:
- 任务的性质:CPU 密集型任务、IO 密集型任务、混合型任务
- CPU 密集型任务应该配置尽可能小的线程,如 N(cpu) + 1 个线程的线程池
- IO 密集型任务并非一直执行任务,应该配置尽可能多的线程,如 2*N(cpu) 个线程的线程池
- 混合型任务:如果可以拆分,将其拆分为一个 CPU 密集型任务和一个 IO 密集型任务
- 任务的优先级:高、中、低
- 优先级不同的任务,可以使用 PriorityBlockingQueue 优先级队列来处理
- 如果一直提交高优先级任务,则低优先级任务可能永远不会执行
- 任务的执行时间:长、中、短
- 可以使用优先级队列,让执行时间短的任务先执行
- 或者不同时间的任务交给不同规模的线程池来处理
- 任务的依赖性:是否依赖其他系统资源,如数据库连接
- 如依赖数据库连接的线程,当提交SQL 给数据库后需要等待数据库返回结果,等待时间越长,CPU空闲越久。那么线程数可以设置的越大,更好的利用CPU
Runtime.getRuntime().availableProcessors() 获取当前设备的 CPU 个数。
建议使用有界队列。有界队列能增加系统的稳定性和预警能力。
9.2.5 线程池监控
监控线程池时可以使用以下属性:
- taskCount:线程池需要执行的任务数量
- completedTaskCount:线程池在运行过程中已完成的任务数量,<= taskCount
- largestPoolSize:线程池里曾创建过的最大线程数量。
- 可以通过该数据知道线程池是否满过
- getPoolSize:线程池的线程数量
- 如果线程池不销毁的话,线程池里的线程不会自动销毁
- getActiveCount:获取活动的线程数
或者可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated 方法。