导论
Reference
干货:十分钟带你搞懂 Java AQS 核心设计与实现!
从ReentrantLock的实现看AQS的原理及应用
问题引入
在后端编程领域中,并发是经常需要考虑和处理的问题,一个很常见的业务场景是,在某个业务方法中,会出现多个线程同时需要操作某一个共享变量的情况。但是业务上硬性要求这个共享变量必须线程安全,一次只能由一个线程进行更新。
这种问题本质上可以抽象为 共享资源的竞争与同步关系
在这种情况下,我们必须要指定:
- 得到执行权的线程应该如何标记这个共享变量使得别的线程不会再次抢夺
- 没有得到执行权的线程应该何去何从。
问题实现思路
在进一步介绍 AbstractQueuedSynchronizer 之前,我们可以想象针对上述提到的两个问题,尤其是第二个:没有得到执行权的线程应该何去何从
。
- 自旋等待:这种方式浪费CPU,实际上CPU还是在执行这个线程,只是没有做任何操作
- 线程挂起,等到释放锁的时候唤醒:这种情况也不太好,如果同时挂起等待的线程较多,可能存在惊群的问题
为了高效解决这种大家都会遇到的场景,JDK设计了一个抽象的解决共享资源竞争与同步的框架,AbstractQueuedSynchronizer 应愿而生。
什么是AQS
AQS是JDK为了解决在多线程环境下,共享资源的竞争和状态同步关系的框架。
AQS集同步状态管理、线程阻塞、线程释放以及队列管理的同步框架。
通过后面的分析我们可以了解到:
AQS的核心思想是当多个线程竞争同一个共享资源的时候,未获得执行权的线程会被构造为一个Node节点丢入到一个 FIFO 的队列中。这个队列是一个双向队列,同时记录前驱和后继,在队列中的线程会保持阻塞直到被队列前一个元素唤醒。队列中只有队首节点有资格被唤醒竞争锁。
关注重点
对于 AQS 我们需要重点关注:
- 内部的状态同步如何实现(状态管理)
- 核心FIFO队列的设计(队列管理)
- 竞争资源失败时线程的生命周期(线程阻塞)
- 共享资源的释放(线程释放)
同步状态管理
1 2 3 4 5 6
| public abstract class AbstractQueuedSynchronizer {
private volatile int state; }
|
AQS 内部使用一个 volatile 类型的变量来表示状态,这表示这个状态的更新对所有线程都可见。
AQS 使用这个变量来标识当前锁的占用情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
|
队列管理
基础信息
AQS的核心设计中,FIFO的队列是实现AQS的关键设计
它采用了一个双向队列,队列的每一个节点Node定义如下
1 2 3 4 5 6 7
| static final class Node { volatile Node prev; volatile Node next; }
|
节点还具有模式,分为SHARED和EXCLUSIVE模式
1 2
| static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;
|
其中队列中每一个线程所对应的节点还有状态,AQS中统一称作 waitedStatus
1 2 3 4 5 6 7 8
| static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
|
在 AQS 顶部定义中,就维护了这个队列的队尾和队首节点
1 2 3 4 5 6 7 8 9 10 11 12
| public abstract class AbstractQueuedSynchronizer { private transient volatile Node head;
private transient volatile Node tail;
}
|
队列初始化
在上述我们通过文档知道了队列是懒加载的
所以其实是在元素入队的时候才实例化队头和队尾指针的,文档也说明了入队函数enq()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
|
资源竞争
acquire方法中定义了AQS框架获取锁的完整流程
1 2 3 4 5
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
|
这里会调用tryAcquire()
,这个方法是抽象方法,交由子类进一步实现,默认是会抛出异常的
1 2 3
| protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
|
尝试获取锁,但是不一定保证能获取到,如果获取到了返回 true
因此通过 acquire 源码,如果没获取到锁,会尝试调用 addWaiter
,也就是将线程构造成 AQS 队列中的一个 Node 了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
|