JDK8|ThreadPoolExecutor

创建与使用

线程池创建核心参数

一般我们都会使用线程池对象的构造函数来结合不同的场景定义线程池

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

核心参数:

  • corePoolSize 核心线程池个数
  • maximumPoolSize 任务队列最大长度
  • keepAliveTime 非核心线程最长回收时间(单位通过 unit 定义)

corePoolSize 在默认情况下定义了常驻的线程个数,maximumPoolSize 为线程池最大长度(也就是任务队列最大长度)一开始队列没有满的时候,只会有 corePoolSize 个线程跑;等到任务队列满了以后,此时会有 maximumPoolSize 个线程跑

maximumPoolSize - corePoolSize 就是非核心线程的个数,他们是会被回收的,在空闲的时候等待 keepAliveTime (unit) 后会被回收

剩下的构造函数参数(例如ThreadFactoryBlockingQueueRejectedExecutionHandler 我们会在后面的部分详细介绍)

执行接口

线程池的执行接口主要有两个:executesubmit

通过源码我们可以发现,submit 底层还是调用的 execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractExecutorService implements ExecutorService {

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

那么他们之间到底有什么区别?

上面的源码可以发现无论 submit 的实际入参是哪一个,最终都会被包装为 RunnableFuture 的任务,调用 execute 方法。RunnableFuture 是一个接口,组合了 Runnable 以及 Future,表示既可以被线程执行,同时也可以获取任务的结果或异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

因此,我们可以知道,execute 会直接执行,而 submit 则是返回一个 Future,供我们统一获取执行的结果。所以如果在任务执行的时候抛出了异常,那么 execute 方式执行的线程出现异常之后会直接销毁,但是 submit 方式执行的线程出现异常之后会将异常封装在 Future 中,等到 get Future 的时候才会抛出,因此 submit 方式执行的线程在异常出现时不会直接销毁

线程池的生命周期

线程池通过一个int同时存储线程池的运行状态和工作线程数量

运行状态有五种,高三位存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 五种运行状态,在 int 的高三位存储
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

//pack操作
private static int ctlOf(int rs, int wc) { return rs | wc; }

用一个int同时存储两个状态主要是在后续的操作中会很需要经常同时判断运行状态和工作线程数量的场景,因此从底层性能优化考虑采用位运算的方式进行存储,虽然缺乏一些可读性,但是性能优化拉满了

有pack就有unpack,下面是单独的unpack方法,用于获取线程执行状态或是工作线程数量

1
2
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }

任务管理

在上面我们知道了无论什么入口都是调用 ThreadPoolExecutor 的 execute 方法( sumbit 先是封装为 FutureTask 后通过 ThreadPoolExecutor 的父类 AbstractExecutorService 调用 Executor 接口定义的 execute 方法来执行的)

下面我们更进一步,我们想知道 execute 内部到底做了什么,流程是什么样的

这就涉及到线程池的任务管理,在实现底层,线程池采用了将 提交的任务任务的实际执行 分离解耦的思想

在使用方(也就是开发者),只需要调用 execute 或是 submit 将任务提交,具体如何执行,线程池内部有一套任务管理实现,可以分为四个部分:

  • 任务调度————execute入口,创建worker,放入队列或是拒绝
  • 任务缓冲————阻塞队列缓冲任务
  • 任务申请————worker不断消费阻塞队列任务
  • 任务拒绝————可配置的拒绝策略

execute入口任务调度

在上面我们知道了无论什么入口都是调用 ThreadPoolExecutor 的 execute 方法( sumbit 先是封装为 FutureTask 后通过 ThreadPoolExecutor 的父类调用 Executor 接口定义的 execute 方法来执行的)

下面我们来看任务管理的入口,这里就是线程池核心的任务运行机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 【1】线程池正在运行,且当前工作线程数 < corePoolSize,尝试创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 【2】核心工作线程满了,线程池仍处于运行状态 && 任务队列没满,将任务丢尽队列
if (isRunning(c) && workQueue.offer(command)) {
// double-check:防止任务入队之后线程池状态变为非RUNNING,或者线程池没有工作线程了
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 当前没有工作线程了,为了避免任务一直留在队列里没人处理,创建一个非核心线程执行
addWorker(null, false);
}
// 【3】如果不能入队(队列满了),尝试创建非核心线程(最大线程数之内)
else if (!addWorker(command, false))
// 【4】如果线程数已到最大值或线程池非RUNNING,则执行拒绝策略
reject(command);
}

源码写的比较反直觉,主要是check线程池的RUNNING状态在execute中并未显式指定,而是在 addWorker 中进行了判断(我们后面会提到)

还有一点是double-check操作,在核心线程满了同时可以入队的情况下,入队后还需要进行一次check,因为此时线程池状态可能发生变化(比如调用了shutdown())导致此时没有可用线程了,但是队列里还是有任务,因此这种情况下需要创建一个非核心线程执行任务

在后续也会提到,addWorker() 是关键方法,它不仅创建线程,还会判断线程池状态、线程数上限等

任务缓冲

execute 函数中的 workQueue.offer() 指的就是将任务提交到任务缓冲队列中

线程池底层采用阻塞队列来实现任务缓冲,任务缓冲是线程池实现任务的提交和线程执行任务解耦的关键

我们在之前说实例化线程池的时候也需要指定一个阻塞队列来构造线程池对象,这里就是指定缓冲队列

不推荐使用JDK自带的Executors直接创建线程池也是因为自带的Executors类创建线程池的时候,默认在阻塞队列这一块给的是Integer.MAX_VALUE 的容量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

}

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
}

这就意味着队列基本上是无限大,容易出现OOM的问题

getTask任务申请

在 execute 函数中,会调用 addWorker 尝试创建worker,addWorker 函数的参数有时为 null 有时非空,这代表任务的执行分两种情况:

  • 一种是直接新建线程worker在构造函数中携带任务(Worker后面会提到),这种情况只会在第一次创建线程的时候才会触发
  • 更常见的则是线程不断从阻塞队列中获取任务进行执行

第二种情况的核心逻辑在 getTask

这一部分最重要的点在于:默认情况下,核心线程是会一直阻塞等待新的任务的,也就是调用阻塞队列 workQueue.take();而非核心线程则是会进行有限时间的超时等待,是调用 workQueue.poll(keepAliveTime, TimeUnit),一旦非核心线程超时则会进入idle空闲状态,之后就会被线程池回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 手动修改配置(令核心线程也具有keepAlive)或是为非核心线程时则会进行超时判断
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //有限时间超时获取
workQueue.take(); //阻塞获取
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

任务拒绝

针对拒绝策略的讨论,都是假设在任务队列已满、同时这个时候还有新的任务提交进来的时候

拒绝策略有下面这几种:

默认采用 AbortPolicy 如果满了直接抛出异常进行拒绝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

其他的策略:

  • CallerRunsPolicy:不会丢弃任务,而是由执行方法的调用线程(calling thread)直接运行;采用这种方法会影响程序的整体性能,但是如果对任务丢失很敏感的话可以采用这个拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
  • Discard:不抛异常,什么都不做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//do nothing
}
}
  • DiscardOldest:丢弃最先入队列的未处理任务来空出位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

Worker工作线程

前面我们说完了线程池内部的任务提交、调度、获取流程,这是一个大模块

另一个大模块则是线程的执行,任务的提交和线程的执行互相解耦内部通信则是通过我们实例化线程池时指定的阻塞队列来实现的

在线程池中,Worker类是执行线程的对象,一个 Worker 就是一个线程

来看下官方的定义

1
2
3
4
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}

addWorker

在 execute 函数中我们知道核心的执行逻辑被放在了 addWorker 里,addWorker 会尝试创建 Worker 实例,根据不同的入参,实例创建完的行为也不同

addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//附带初始任务 实例化Worker
w = new Worker(firstTask);
//一个Worker就是一个线程,获取Worker线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//执行worker中的run方法,启动Worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

由于 worker 实现了 runnable 接口,因此在 addWorker 的时候,实例化 Worker 对象完成后, worker 就通过线程池构造函数中 ThreadFactory 创建好了 thread 属性

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

后续只要调用 worker.thread.start(),执行的都会是 Worker 重写的 run 方法

1
2
3
4
/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}

因此经过一系列 CAS 检查后,在 addWorker 实例化完 Worker 对象后,通过调用 w.thread.start() 启动了这个 Worker 线程

(CAS这里我们后面再关注,先挖一个坑)

因此我们关注点需要放在 Worker 的 runWorker 方法上

runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//默认先检查是否有构造Worker实例时携带的初始任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环从阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

到这一步就很明了了,Worker启动后会不断调用 getTask 直到 getTask 返回 null,一旦返回 null 就会被回收

而在之前我们也知道,当默认配置下,核心线程不会进入空闲态,非核心线程在有限阻塞时间获取不到任务后,就会进入空闲态返回 null 值

Worker回收

Worker 的回收是基于 JVM 的垃圾回收自动回收的

在 ThreadPoolExecutor 中有一个 HashSet<Worker> 来记录所有非回收 Worker 的引用防止被GC

1
2
3
4
5
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

JDK8|ThreadPoolExecutor
http://example.com/2025/07/25/JDK8-ThreadPoolExecutor/
作者
Noctis64
发布于
2025年7月25日
许可协议