JDK8|AbstractQueuedSynchronizer

导论

Reference

干货:十分钟带你搞懂 Java AQS 核心设计与实现!

从ReentrantLock的实现看AQS的原理及应用

问题引入

在后端编程领域中,并发是经常需要考虑和处理的问题,一个很常见的业务场景是,在某个业务方法中,会出现多个线程同时需要操作某一个共享变量的情况。但是业务上硬性要求这个共享变量必须线程安全,一次只能由一个线程进行更新。

这种问题本质上可以抽象为 共享资源的竞争与同步关系

在这种情况下,我们必须要指定:

  • 得到执行权的线程应该如何标记这个共享变量使得别的线程不会再次抢夺
  • 没有得到执行权的线程应该何去何从。

问题实现思路

在进一步介绍 AbstractQueuedSynchronizer 之前,我们可以想象针对上述提到的两个问题,尤其是第二个:没有得到执行权的线程应该何去何从

  • 自旋等待:这种方式浪费CPU,实际上CPU还是在执行这个线程,只是没有做任何操作
  • 线程挂起,等到释放锁的时候唤醒:这种情况也不太好,如果同时挂起等待的线程较多,可能存在惊群的问题

为了高效解决这种大家都会遇到的场景,JDK设计了一个抽象的解决共享资源竞争与同步的框架,AbstractQueuedSynchronizer 应愿而生。

什么是AQS

AQS是JDK为了解决在多线程环境下,共享资源的竞争和状态同步关系的框架。

AQS集同步状态管理、线程阻塞、线程释放以及队列管理的同步框架。

通过后面的分析我们可以了解到:

AQS的核心思想是当多个线程竞争同一个共享资源的时候,未获得执行权的线程会被构造为一个Node节点丢入到一个 FIFO 的队列中。这个队列是一个双向队列,同时记录前驱和后继,在队列中的线程会保持阻塞直到被队列前一个元素唤醒。队列中只有队首节点有资格被唤醒竞争锁。

关注重点

对于 AQS 我们需要重点关注:

  • 内部的状态同步如何实现(状态管理)
  • 核心FIFO队列的设计(队列管理)
  • 竞争资源失败时线程的生命周期(线程阻塞)
  • 共享资源的释放(线程释放)

同步状态管理

1
2
3
4
5
6
public abstract class AbstractQueuedSynchronizer {
/**
* The synchronization state.
*/
private volatile int state;
}

AQS 内部使用一个 volatile 类型的变量来表示状态,这表示这个状态的更新对所有线程都可见。

AQS 使用这个变量来标识当前锁的占用情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   /**
* 尝试以独占模式获取。此方法应查询对象的状态是否允许其以独占模式获取,如果允许,则进行获取。
* 此方法总是由执行获取操作的线程调用。如果此方法报告失败,获取方法可能会将线程排队(如果尚未排队)
* 直到由其他线程释放时通知。这可用于实现 Lock.tryLock() 方法。
* 默认实现会抛出 UnsupportedOperationException。
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

/**
* 尝试将状态设置为反映在独占模式下释放锁的情况
* 此方法总是由执行释放的线程调用
* 默认实现会抛出 UnsupportedOperationException。
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

队列管理

基础信息

AQS的核心设计中,FIFO的队列是实现AQS的关键设计

它采用了一个双向队列,队列的每一个节点Node定义如下

1
2
3
4
5
6
7
static final class Node {

volatile Node prev;

volatile Node next;

}

节点还具有模式,分为SHARED和EXCLUSIVE模式

1
2
static final Node SHARED = new Node(); //队列中节点以SHARED模式等待
static final Node EXCLUSIVE = null; //队列中节点以EXCLUSIVE模式等待

其中队列中每一个线程所对应的节点还有状态,AQS中统一称作 waitedStatus

1
2
3
4
5
6
7
8
// 线程被取消
static final int CANCELLED = 1;
// 后续线程在锁释放后可以被唤醒
static final int SIGNAL = -1;
// 当前线程在 condition 队列中
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取将会无条件被传播下去
static final int PROPAGATE = -3;

在 AQS 顶部定义中,就维护了这个队列的队尾和队首节点

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractQueuedSynchronizer {

//队首指针,懒加载。
//除了初始化之外,它只能通过 setHead 方法修改
//注意:如果头部存在,其状态保证不是 CANCELLED。
private transient volatile Node head;

//队尾指针,懒加载。
//仅通过方法 enq 添加新的等待节点来修改队尾指针。
private transient volatile Node tail;

}

队列初始化

在上述我们通过文档知道了队列是懒加载的

所以其实是在元素入队的时候才实例化队头和队尾指针的,文档也说明了入队函数enq()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //初始化队列,设置一个空Node以唤醒队列后续队首节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

资源竞争

acquire方法中定义了AQS框架获取锁的完整流程

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}

这里会调用tryAcquire(),这个方法是抽象方法,交由子类进一步实现,默认是会抛出异常的

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

尝试获取锁,但是不一定保证能获取到,如果获取到了返回 true

因此通过 acquire 源码,如果没获取到锁,会尝试调用 addWaiter,也就是将线程构造成 AQS 队列中的一个 Node 了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//为当前线程和给定模式创建并排队节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果队尾不为空:新节点的前驱指向原先队尾,原队尾的后继指向当前节点,当前节点成为新的队尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//入队
enq(node);
return node;
}

JDK8|AbstractQueuedSynchronizer
http://example.com/2025/07/26/JDK8-AbstractQueuedSynchronizer/
作者
Noctis64
发布于
2025年7月26日
许可协议