202507|技术日志

0710

线程池什么时候执行

代码示例

当调用 threadPoolExecutor.submit(() -> { … }) 时,任务就已经被提交到线程池中,并由线程池中的工作线程异步执行(前提是当前线程池有空闲线程)

因此在代码中这样的写法是完全符合多线程异步执行的

1
2
3
4
5
6
7
8
9
10
List<Future<?>> futureList= new ArrayList<>();
for( xxx ) {
Future<?> future = threadPoolExecutor.submit(() -> {
//do something
});
futureList.add(future);
}
for (Future<List<RcRiskRuleExecResult>> future : futureList) {
res.addAll(future.get());
}

只是在获取结果的时候遍历等待,但是此时线程可以确定的是已经提交开始执行了

比如 futureList 中有三个 future,性能的瓶颈只会在最慢获取结果的那一条里

最佳实践

使用 CompletableFuture 替换传统派 Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<CompletableFuture<List<?>>> futureList = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : dispatchedResult.entrySet()) {
...
CompletableFuture<List<?>> future = CompletableFuture.supplyAsync(() -> {
//do something
}, threadPoolExecutor);
futureList.add(future);
}

// 合并所有结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allFutures.join(); // 等待全部完成

// 收集结果
List<?> res = futureList.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.toList();

相关知识点

  • ThreadPoolExecutor 线程池
  • Future & CompletableFuture

ThreadLocalMap

我们都知道想要实现一个线程一份独立的数据的时候可以使用 ThreadLocal 来管理数据

ThreadLocal的底层实际上是基于他的静态内部类ThreadLocalMap

JDK文档:

ThreadLocalMap 是一个专门用于维护 ThreadLocal 值而定制化设计的 HashMap,所有对 ThreadLocalMap 的操作只会发生在 TheadLocal 类内部。为应对 very large and long-lived usages,HashMap 的 entry 使用弱引用作为key (?)

ThreadLocalMap 的 key 是 TheadLocal, value 是 ThreadLocal 存储的数据值

0711

线程池核心参数

一般我们都会使用线程池对象的构造函数来结合不同的场景定义线程池

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

核心参数:

  • corePoolSize 核心线程池个数
  • maximumPoolSize 任务队列最大长度
  • keepAliveTime 非核心线程最长回收时间(单位通过 unit 定义)

corePoolSize 在默认情况下定义了常驻的线程个数,maximumPoolSize 为线程池最大长度(也就是任务队列最大长度)一开始队列没有满的时候,只会有 corePoolSize 个线程跑;等到任务队列满了以后,此时会有 maximumPoolSize 个线程跑

maximumPoolSize - corePoolSize 就是非核心线程的个数,他们是会被回收的,在空闲的时候等待 keepAliveTime (unit) 后会被回收

0714

线程池的拒绝策略

针对拒绝策略的讨论,都是假设在任务队列已满、同时这个时候还有新的任务提交进来的时候

拒绝策略有下面这几种:

默认采用 AbortPolicy 如果满了直接抛出异常进行拒绝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

其他的策略:

  • CallerRunsPolicy:不会丢弃任务,而是由执行方法的调用线程(calling thread)直接运行;采用这种方法会影响程序的整体性能,但是如果对任务丢失很敏感的话可以采用这个拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
  • Discard:不抛异常,什么都不做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//do nothing
}
}
  • DiscardOldest:丢弃最先入队列的未处理任务来空出位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

线程池执行流程

结合JDK源码进行学习

一般来说我们都是直接调用 threadPoolExecutor.sumbit(() -> execute())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractExecutorService implements ExecutorService {

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

可以发现无论 submit 的实际入参是哪一个,最终都会被包装为 RunnableFuture 的任务,调用 execute 方法

RunnableFuture 是一个接口,组合了 Runnable 以及 Future,表示既可以被线程执行,同时也可以获取任务的结果或异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

202507|技术日志
http://example.com/2025/07/10/202507-技术日志/
作者
Noctis64
发布于
2025年7月10日
许可协议