202507|技术日志

0710

线程池什么时候执行

代码示例

当调用 threadPoolExecutor.submit(() -> { … }) 时,任务就已经被提交到线程池中,并由线程池中的工作线程异步执行(前提是当前线程池有空闲线程)

因此在代码中这样的写法是完全符合多线程异步执行的

1
2
3
4
5
6
7
8
9
10
List<Future<?>> futureList= new ArrayList<>();
for( xxx ) {
Future<?> future = threadPoolExecutor.submit(() -> {
//do something
});
futureList.add(future);
}
for (Future<List<RcRiskRuleExecResult>> future : futureList) {
res.addAll(future.get());
}

只是在获取结果的时候遍历等待,但是此时线程可以确定的是已经提交开始执行了

比如 futureList 中有三个 future,性能的瓶颈只会在最慢获取结果的那一条里

最佳实践

使用 CompletableFuture 替换传统派 Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<CompletableFuture<List<?>>> futureList = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : dispatchedResult.entrySet()) {
...
CompletableFuture<List<?>> future = CompletableFuture.supplyAsync(() -> {
//do something
}, threadPoolExecutor);
futureList.add(future);
}

// 合并所有结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allFutures.join(); // 等待全部完成

// 收集结果
List<?> res = futureList.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.toList();

相关知识点

  • ThreadPoolExecutor 线程池
  • Future & CompletableFuture

ThreadLocalMap

我们都知道想要实现一个线程一份独立的数据的时候可以使用 ThreadLocal 来管理数据

ThreadLocal的底层实际上是基于他的静态内部类ThreadLocalMap

JDK文档:

ThreadLocalMap 是一个专门用于维护 ThreadLocal 值而定制化设计的 HashMap,所有对 ThreadLocalMap 的操作只会发生在 TheadLocal 类内部。为应对 very large and long-lived usages,HashMap 的 entry 使用弱引用作为key (?)

ThreadLocalMap 的 key 是 TheadLocal, value 是 ThreadLocal 存储的数据值

0711

线程池核心参数

一般我们都会使用线程池对象的构造函数来结合不同的场景定义线程池

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

核心参数:

  • corePoolSize 核心线程池个数
  • maximumPoolSize 任务队列最大长度
  • keepAliveTime 非核心线程最长回收时间(单位通过 unit 定义)

corePoolSize 在默认情况下定义了常驻的线程个数,maximumPoolSize 为线程池最大长度(也就是任务队列最大长度)一开始队列没有满的时候,只会有 corePoolSize 个线程跑;等到任务队列满了以后,此时会有 maximumPoolSize 个线程跑

maximumPoolSize - corePoolSize 就是非核心线程的个数,他们是会被回收的,在空闲的时候等待 keepAliveTime (unit) 后会被回收

0714

线程池的拒绝策略

针对拒绝策略的讨论,都是假设在任务队列已满、同时这个时候还有新的任务提交进来的时候

拒绝策略有下面这几种:

默认采用 AbortPolicy 如果满了直接抛出异常进行拒绝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

其他的策略:

  • CallerRunsPolicy:不会丢弃任务,而是由执行方法的调用线程(calling thread)直接运行;采用这种方法会影响程序的整体性能,但是如果对任务丢失很敏感的话可以采用这个拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
  • Discard:不抛异常,什么都不做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//do nothing
}
}
  • DiscardOldest:丢弃最先入队列的未处理任务来空出位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

线程池执行流程

结合JDK源码进行学习

一般来说我们都是直接调用 threadPoolExecutor.sumbit(() -> execute())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractExecutorService implements ExecutorService {

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

可以发现无论 submit 的实际入参是哪一个,最终都会被包装为 RunnableFuture 的任务,调用 execute 方法

RunnableFuture 是一个接口,组合了 Runnable 以及 Future,表示既可以被线程执行,同时也可以获取任务的结果或异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

0722

参考

CSDN

美团技术团队

生命周期

线程池通过一个int同时存储线程池的运行状态和工作线程数量

运行状态有五种,高三位存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 五种运行状态,在 int 的高三位存储
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

//pack操作
private static int ctlOf(int rs, int wc) { return rs | wc; }

用一个int同时存储两个状态主要是在后续的操作中会很需要经常同时判断运行状态和工作线程数量的场景,因此从底层性能优化考虑采用位运算的方式进行存储,虽然缺乏一些可读性,但是性能优化拉满了

有pack就有unpack,下面是单独的unpack方法,用于获取线程执行状态或是工作线程数量

1
2
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }

任务调度

在上面我们知道了无论什么入口都是调用 ThreadPoolExecutor 的 execute 方法( sumbit 先是封装为 FutureTask 后通过 ThreadPoolExecutor 的父类调用 Executor 接口定义的 execute 方法来执行的)

下面我们来看任务管理的入口,这里就是线程池核心的任务运行机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 【1】线程池正在运行,且当前工作线程数 < corePoolSize,尝试创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 【2】核心工作线程满了,线程池仍处于运行状态 && 任务队列没满,将任务丢尽队列
if (isRunning(c) && workQueue.offer(command)) {
// double-check:防止任务入队之后线程池状态变为非RUNNING,或者线程池没有工作线程了
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 当前没有工作线程了,为了避免任务一直留在队列里没人处理,创建一个非核心线程执行
addWorker(null, false);
}
// 【3】如果不能入队(队列满了),尝试创建非核心线程(最大线程数之内)
else if (!addWorker(command, false))
// 【4】如果线程数已到最大值或线程池非RUNNING,则执行拒绝策略
reject(command);
}

源码写的比较反直觉,主要是check线程池的RUNNING状态在execute中并未显式指定,而是在 addWorker 中进行了判断(我们后面会提到)

还有一点是double-check操作,在核心线程满了同时可以入队的情况下,入队后还需要进行一次check,因为此时线程池状态可能发生变化(比如调用了shutdown())导致此时没有可用线程了,但是队列里还是有任务,因此这种情况下需要创建一个非核心线程执行任务

在后续也会提到,addWorker() 是关键方法,它不仅创建线程,还会判断线程池状态、线程数上限等。

任务缓冲

线程池底层采用阻塞队列来实现任务缓冲,任务缓冲是线程池实现任务的提交和线程执行任务解耦的关键

我们在实例化线程池的时候也需要指定一个阻塞队列来构造线程池对象,这里就是指定缓冲队列

不推荐使用JDK自带的Executors直接创建线程池也是因为自带的Executors类创建线程池的时候,默认在阻塞队列这一块给的是Integer.MAX_VALUE 的容量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

}

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
}

这就意味着队列基本上是无限大,容易出现OOM的问题

算法:刷了一题20.有效的括号

核心实现思路很简单,一次遍历string,遇到左括号的,push,遇到右括号的,pop,比较是否是一对,不是一对说明不是有效的括号。在遍历完成后,栈一定是空的,才说明是有效的括号

0723

算法:手写二分查找,基本功 704.二分查找

任务申请

我们都知道任务的执行分两种情况:

  • 一种是直接新建线程worker在构造函数中携带任务(Worker后面会提到),这种情况只会在第一次创建线程的时候才会触发
  • 更常见的则是线程不断从阻塞队列中获取任务进行执行

第二种情况的核心逻辑在 getTask

这一部分最重要的点在于:默认情况下,核心线程是会一直阻塞等待新的任务的,也就是调用阻塞队列 workQueue.take();而非核心线程则是会进行有限时间的超时等待,是调用 workQueue.poll(keepAliveTime, TimeUnit),一旦非核心线程超时则会进入idle空闲状态,之后就会被线程池回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 手动修改配置(令核心线程也具有keepAlive)或是为非核心线程时则会进行超时判断
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //有限时间超时获取
workQueue.take(); //阻塞获取
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

0724

工作线程

前面我们说完了线程池内部的任务提交、调度、获取流程,这是一个大模块

另一个大模块则是线程的执行,任务的提交和线程的执行互相解耦内部通信则是通过我们实例化线程池时指定的阻塞队列来实现的

在线程池中,Worker类是执行线程的对象

来看下官方的定义

1
2
3
4
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}

Worker 类主要负责维护执行任务的线程的中断控制状态,以及其他一些次要的记账工作。

这个类通过机会性地扩展 AbstractQueuedSynchronizer 来简化围绕每个任务执行的锁的获取和释放,从而防止打算唤醒等待任务的 Worker 线程的中断,反而中断正在运行的任务。

我们实现了一个简单的非重入互斥锁,而不是使用 ReentrantLock,因为我们不希望 Worker 任务在调用如 setCorePoolSize 之类的池控制方法时能够重新获取锁。

此外,为了在线程实际开始运行任务之前抑制中断,我们将锁状态初始化为负值,并在启动时清除它(在 runWorker 中)。


202507|技术日志
http://example.com/2025/07/10/202507-技术日志/
作者
Noctis64
发布于
2025年7月10日
许可协议