《Java并发编程的艺术》第九章 Java中的线程池

第九章 Java 中的线程池

Java 中的线程池是运用场景最多的并发框架,几乎所有异步或并发执行的程序都可以使用线程池。合理使用线程池的好处有:

  1. 降低资源消耗
    1. 通过重复利用已创建的线程降低线程创建和销毁时的消耗
  2. 提高响应速度
    1. 当任务抵达时,无需创建线程,就能立即执行
  3. 提高线程的可管理型
    1. 线程池能够统一分配、调优和监控线程
    2. 线程属于稀缺资源,不能无限制的创建

9.1 线程池的实现原理

线程池的主要处理流程

ThreadPoolExecutor 执行示意图

ThreadPoolExecutor.execute(Runnable command) 方法如下:

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

9.2 线程池的使用

9.2.1 创建线程池

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

通过上面的构造方法来创建一个线程池,其具体参数如下:

  1. corePoolSize 线程池的基本大小

    1. 线程池的基本大小,当任务提交时,会创建线程进行执行,且不会销毁。
    2. 调用 prestartAllCoreThreads() 提前创建并启动所有基本线程
  2. maximumPoolSize

    1. 线程池允许创建的最大线程数
  3. keepAliveTime

    1. 线程活动保持时间
  4. workQueue

    1. 任务队列,用于保存等待执行的任务的阻塞队列
      1. ArrayBlockingQueue
      2. DelayedWorkQueue
      3. ForwardingBlockingQueue
      4. SynchronousQueue
      5. DelayQueue
      6. LinkedBlockingQueue
      7. PriorityBlockingQueue
  5. threadFactory

    1. 用于设置创建线程的工厂

    2. 如使用 guava 提供的 ThreadFactoryBuilder 可以快速给线程池中的线程设置有意义的名字,如:

    3. new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();

  6. handler

    1. 饱和策略,当队列和线程池都满了,则需要一种策略来处理新提交的任务
    2. 默认策略为 AbortPolicy
    3. JDK 提供的几种策略:
      1. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
      2. AbortPolicy:直接抛出异常
      3. CallerRunsPolicy:只用调用者所在线程来运行任务
      4. DiscardPolicy:不处理,丢弃掉

9.2.2 提交任务至线程池

execute() 方法,用于提交不需要返回值的任务,所以无法判断任务是否执行成功。

submit() 方法,用于提交需要返回值的任务。线程池会返回一个 future 类型对象。

9.2.3 关闭线程池

通过 shutdown 或 shutdownNow 方法来关闭线程池。

原理:遍历线程池中的工作线程,逐个调用线程的 interrupt 方法来中断线程,所以无法相应的线程可能永远无法终止。

调用这两个方法后,isShutdown 会返回true;当所有任务都关闭后,isTerminated 返回true。

通常会使用 shutdown 来关闭线程池,但是如果任务不一定要执行完毕,可以调用 shutdownNow 方法。

9.2.4 合理地配置线程池

想要合理配置线程池,需要先分析任务的特性:

  • 任务的性质:CPU 密集型任务、IO 密集型任务、混合型任务
    • CPU 密集型任务应该配置尽可能小的线程,如 N(cpu) + 1 个线程的线程池
    • IO 密集型任务并非一直执行任务,应该配置尽可能多的线程,如 2*N(cpu) 个线程的线程池
    • 混合型任务:如果可以拆分,将其拆分为一个 CPU 密集型任务和一个 IO 密集型任务
  • 任务的优先级:高、中、低
    • 优先级不同的任务,可以使用 PriorityBlockingQueue 优先级队列来处理
    • 如果一直提交高优先级任务,则低优先级任务可能永远不会执行
  • 任务的执行时间:长、中、短
    • 可以使用优先级队列,让执行时间短的任务先执行
    • 或者不同时间的任务交给不同规模的线程池来处理
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接
    • 如依赖数据库连接的线程,当提交SQL 给数据库后需要等待数据库返回结果,等待时间越长,CPU空闲越久。那么线程数可以设置的越大,更好的利用CPU

Runtime.getRuntime().availableProcessors() 获取当前设备的 CPU 个数。

建议使用有界队列 。有界队列能增加系统的稳定性和预警能力。

9.2.5 线程池监控

监控线程池时可以使用以下属性:

  • taskCount:线程池需要执行的任务数量
  • completedTaskCount:线程池在运行过程中已完成的任务数量,<= taskCount
  • largestPoolSize:线程池里曾创建过的最大线程数量。
    • 可以通过该数据知道线程池是否满过
  • getPoolSize:线程池的线程数量
    • 如果线程池不销毁的话,线程池里的线程不会自动销毁
  • getActiveCount:获取活动的线程数

或者可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated 方法。

文章作者: koral
文章链接: http://luokaiii.github.io/2019/06/06/读书笔记/《Java并发编程的艺术》/9.Java中的线程池/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自