Java-并发.06a.JUC-执行器-线程池

线程池相关类和方法

  • ExecutorService: Java线程池的接口, 提供了如下方法:
    • void execute(Runnable command) 执行 Ruannable 类型的任务
    • Future<?> submit(Runnable task) 可用来提交 Callable 或 Runnable 任务,并返回代表此任务的 Future 对象
    • Future<T> submit(Callable<T> task): 同上
    • void shutdown() : 关闭线程池,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。最终调用了每个线程的 interrupt()
    • void shutdownNow() : 关闭线程池, 中断正在处理任务的线程,也不处理阻塞队列中已保存的任务。最终调用了每个线程的 interrupt()
    • boolean isShutdown()

几种常见的线程池实现类:

  • ThreadPoolExecutor: 实现了ExecutorService接口, 通用线程池
  • ScheduledExecutorService: 也实现了 ExecutorService 接口, 它的 schedule() 方法用来执行定时任务

Executors 是线程池的工厂类, 用于创建线程池:

  • ExecutorService newCachedThreadPool(): 创建一个可缓存线程池,队列容量固定是1(可以认为没有队列),线程数会一直增长(如果没有空闲线程),如果线程空闲超过60s会被回收;
  • ExecutorService newFixedThreadPool(int nThreads): 创建一个定长线程池,超出的线程会进入等待队列,队列是无限大的;
  • ExecutorService newSingleThreadExecutor(): 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建一个定长线程池,支持定时及周期性任务执行。

示例代码:

public static void tpoolTest() {
ExecutorService cachedPool = Executors.newCachedThreadPool();
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
ScheduledExecutorService schedulePool = Executors.newScheduledThreadPool(1);

// execute()无返回值
cachedPool.execute(new Runnable() {
@Override
public void run() { /*doSomething*/ }
});
cachedPool.shutdown();

// submit()有返回值
// labmda写法
Futrue<String> futrue = fixedPool.submit(() -> {
return "hello world";
});
String ret = futrue.get();
fixedPool.shutdownNow();

// schedule()增加定时任务
schedulePool.schedule(() -> {
System.out.print("scheduled task");
}, 5, TimeUnit.SECONDS
);
schedulePool.shutdown();
}

线程池的实现

构造方法

工厂类 Executors 包装了对 ThreadPoolExecutor 构造方法的调用, 隐藏了很多创建线程池的细节, 所以在并发严格的情况下, 最好的方式还是直接调用 ThreadPoolExecutor 构造方法创建线程池.

ThreadPoolExecutor的构造函数:

public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
}

构造器中各个参数的含义:

  • corePoolSize: (线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到线程数大于 corePoolSize 时就不再创建。如果调用了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有基本线程。

  • workQueue: 一个阻塞队列,用来存储等待执行的任务。当线程数已经大于 corePoolSize 时, 再向线程池添加任务,会把任务放入该队列中。阻塞队列有以下几种选择:

    • ArrayBlockingQueue:基于数组结构的 有界阻塞队列,此队列按 FIFO(先进先出)排序元素。因为入队/出队操作的同步都用同一个 lock 对象,所以生产者和消费者无法同时进行,所以吞吐量低;
    • LinkedBlockingQueue:一个基于链表结构的 有界阻塞队列,如果队列不指定 size,默认长度是 Integer.MAX,此队列按 FIFO (先进先出)排序元素,吞吐量通常要高于 ArrayBlockingQueue(因为使用了两个 lock,读写不冲突)。 Executors.newFixedThreadPool() 使用了这个队列。
    • SynchronousQueue:一个不存储元素的 有界阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态(可以这样来理解:生产者和消费者互相等待对方,握手然后一起离开)。Executors.newCachedThreadPool() 使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的 无限阻塞队列,此队列按优先级排序元素。
  • maximumPoolSize: 线程池最大容量(池+队列里的线程数)。

    • 如果是 LinkedBlockingQueue 这种 近似无界的队列maximumPoolSize 没有效果;
    • 如果是 ArrayBlockingQueue 这种 有界阻塞队列,如果队列满了,并且已创建的线程数小于 maximumPoolSize,则线程池会再创建新的线程执行任务,直到总线程数超过 maximumPoolSize
  • keepAliveTime: 工作线程空闲后,保持存活的时间。线程池会一直终止空闲超过 keepAliveTime 的线程,直到线程池中的线程数不超过 corePoolSize

  • unit: keepAliveTime 的单位

  • handler: 当队列和线程池都满了(maximumPoolSize),说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。

➤ 再回过来看 Executors 提供的几种工厂方法:

  • newCachedThreadPool(): corePoolSize 为0, maximumPoolSize 为 INT.Max, 队列使用 SynchronousQueue 不存储线程, 所以有新任务提交时, 如果没有空闲的线程, 则继续创建新的线程, 直到线程数达到 INT.Max. 空闲时间超过 60s 的线程会被回收;
  • newFixedThreadPool(int nThreads): corePoolSize 和 maximumPoolSize 都是 nThreads, 意味着线程池大小从 0 会增长到 coreSize, 队列是近似无界队列 LinkedBlockingQueue, 可以一直接收新任务, keepAliveTime=0 意味着不会回收空闲线程
  • newSingleThreadExecutor(): 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

上面提到的几种线程池,都有不足:

  • 使用 newCachedThreadPool() 的问题在于, 如果没有控制好任务大小(所有线程一直在忙) 线程数会一直增长(maxPoolSizeInteger.MAX_VALUE). 只有线程空闲的时候才有机会减少线程数.
  • 使用 newFixedThreadPool() 的问题在于, 虽然工作线程数是固定的, 但是等待队列大小是 Integer.MAX_VALUE,

这两种线程池都有可能因为创建大量线程导致 OOM. 所以不建议使用 Executors 提供的方法直接创建线程池

提交任务

当提交一个新任务到线程池时(execute or submit),线程池的处理流程如下:

  1. 首先线程池判断基本线程池(corePoolSize) 是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列(workQueue) 是否已满?这一步尝试队列能否 offer 进新任务,如果 offer 失败(队列满),则进入下个流程。
  3. 最后线程池判断整个线程池(maximumPoolSize) 是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
public void execute(Runnable command) {  
if (command == null)
throw new NullPointerException();

// ctl 是 Atomic类型
// 32位,高3位=线程池状态 ,后29位=当前运行worker的数量
int c = ctl.get();

// (1) corePoolSize
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 创建新线程
return;
c = ctl.get();
}
// (2) taskQueue.offer
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// (3) 大于coresize 且 queue满,再尝试增加线程数量到maximumPoolSize
else if (!addWorker(command, false))
reject(command); // 拒绝
}

提交任务时,使用的是 TaskQueue 的 offer() 方法,不会阻塞调用线程;

submit 方法在 AbstractExecutorService 中的实现:

public Future<?> submit(Runnable task) { 
if (task == null) throw new NullPointerException();

// 通过 submit 方法提交的 Callable 任务会被封装成了一个 FutureTask 对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

工作线程

流程: execute –> addWorker() –>runWorker()

向线程池提交 Runnable, 会调用到线程池的 addWorker(),这个方法里会将工作线程封装成 Worker 类,
在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程。
从 Woker 类的构造方法实现可以发现: 线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法。

Worker 在执行完任务(firstTask)后,还会通过 runWorker() 无限循环获取工作队列里的任务来执行:

public void runWorker() {

try {
// 执行当前task或从队列里取出新的task
while (task != null || (task = getTask()) != null) {
w.lock();
beforeExecute(wt, task);
task.run(); // 执行Runnable.run()
afterExecute(task, thrown);
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}

关闭线程池

线程池的 shutdown 或者 shutdownNow 方法来关闭线程池。

  • shutdown 方法将执行平缓的关闭过程:线程池状态设置为 SHUTDOWN,不接收新的任务,同时等待已提交的任务执行完成,包括哪些在队列中的任务;
  • shutdownNow 方法将执行粗暴的关闭过程:线程池状态设置为 STOP,不接收新的任务,它将尝试取消(Interrupt)所有运行中的任务,并且清空等待队列(未开始的任务也不会再执行);

优化线程池

线程池参数的解析和使用建议:

  • 线程池大小:
    • 如果是 计算密集 任务,一般设置为 cpu 核心数 (ForkJoin 的 common 线程池)
    • 如果是 IO 密集 任务一般设置为核心数2~3倍(Tomcat 的 Poller 线程池 & Netty 的 sub-Reactor 线程池)
    • 业务线程池:视业务耗时和吞吐量而定(Tomcat 默认工作线程池 size = 200)
  • 预热线程池:默认情况下,核心工作线程值在初始的时候被创建,当新任务来到的时候被启动,但是我们可以通过重写 prestartCoreThreadprestartCoreThreads 方法来改变这种行为。通常场景我们可以在应用启动的时候来 WarmUp 核心线程,从而达到任务过来能够立马执行的结果,使得初始任务处理的时间得到一定优化。

  • 队列的选择:

    • 无界队列:使用无界队列如LinkedBlockingQueue没有指定最大容量的时候,将会引起当核心线程都在忙的时候,新的任务被放在队列上。
      • 因此,永远不会有大于 corePoolSize 的线程被创建,因此 maximumPoolSize 参数将失效。
      • 这种策略比较适合所有的任务都不相互依赖,独立执行。如 Web 服务器中,每个线程独立处理请求。
      • 但是当任务处理速度小于任务进入速度的时候会引起队列的无限膨胀。
      • 先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
    • 有界队列:有界队列如 ArrayBlockingQueue 帮助限制资源的消耗,但是不容易控制。队列长度和 maximumPoolSize 这两个值会相互影响,
      • 使用 大的队列 和小 maximumPoolSize 会降低 CPU 占用、操作系统资源、上下文切换的消耗,但是会降低吞吐量,如果任务被频繁的阻塞如 IO 线程,系统其实可以调度更多的线程。
      • 使用 小的队列 通常需要大 maximumPoolSize,从而使得 CPU 更忙一些,但是又会增加线程调度的消耗。
      • 总结一下:是IO密集型可以考虑 多些线程+小的队列 来平衡CPU的使用,CPU密集型可以考虑 少些线程+大的队列 减少线程调度的消耗。
  • 合理的拒绝策略: @todo

监控线程池

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
  • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+ getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecuteafterExecuteterminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

@ref: