导论 Reference 干货:十分钟带你搞懂 Java AQS 核心设计与实现!
从ReentrantLock的实现看AQS的原理及应用
问题引入 在后端编程领域中,并发是经常需要考虑和处理的问题,一个很常见的业务场景是,在某个业务方法中,会出现多个线程同时需要操作某一个共享变量的情况。但是业务上硬性要求这个共享变量必须线程安全,一次只能由一个线程进行更新。
这种问题本质上可以抽象为 共享资源的竞争与同步关系
在这种情况下,我们必须要指定:
得到执行权的线程应该如何标记这个共享变量使得别的线程不会再次抢夺
没有得到执行权的线程应该何去何从。
问题实现思路 在进一步介绍 AbstractQueuedSynchronizer 之前,我们可以想象针对上述提到的两个问题,尤其是第二个:没有得到执行权的线程应该何去何从
。
自旋等待:这种方式浪费CPU,实际上CPU还是在执行这个线程,只是没有做任何操作
线程挂起,等到释放锁的时候唤醒:这种情况也不太好,如果同时挂起等待的线程较多,可能存在惊群 的问题
为了高效解决这种大家都会遇到的场景,JDK设计了一个抽象的解决共享资源竞争与同步的框架,AbstractQueuedSynchronizer 应愿而生。
什么是AQS AQS是JDK为了解决在多线程环境下,共享资源的竞争和状态同步关系的框架。
AQS集同步状态管理、线程阻塞、线程释放以及队列管理的同步框架。
通过后面的分析我们可以了解到:
AQS的核心思想是当多个线程竞争同一个共享资源的时候,未获得执行权的线程会被构造为一个Node节点丢入到一个 FIFO 的队列中。这个队列是一个双向队列,同时记录前驱和后继,在队列中的线程会保持阻塞直到被队列前一个元素唤醒。队列中只有队首节点有资格被唤醒竞争锁。
关注重点 对于 AQS 我们需要重点关注:
内部的状态同步如何实现(状态管理)
核心FIFO队列的设计(队列管理)
竞争资源失败时线程的生命周期(线程阻塞)
共享资源的释放(线程释放)
同步状态管理 1 2 3 4 5 6 public abstract class AbstractQueuedSynchronizer { private volatile int state; }
AQS 内部使用一个 volatile 类型的变量来表示状态,这表示这个状态的更新对所有线程都可见。
AQS 使用这个变量来标识当前锁的占用情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException (); } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException (); }
队列管理 基础信息 AQS的核心设计中,FIFO的队列是实现AQS的关键设计
它采用了一个双向队列,队列的每一个节点Node定义如下
1 2 3 4 5 6 7 static final class Node { volatile Node prev; volatile Node next; }
节点还具有模式,分为SHARED和EXCLUSIVE模式
1 2 static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ;
其中队列中每一个线程所对应的节点还有状态,AQS中统一称作 waitedStatus
1 2 3 4 5 6 7 8 static final int CANCELLED = 1 ;static final int SIGNAL = -1 ;static final int CONDITION = -2 ;static final int PROPAGATE = -3 ;
在 AQS 顶部定义中,就维护了这个队列的队尾和队首节点
1 2 3 4 5 6 7 8 9 10 11 12 public abstract class AbstractQueuedSynchronizer { private transient volatile Node head; private transient volatile Node tail; }
队列初始化 在上述我们通过文档知道了队列是懒加载的
所以其实是在元素入队的时候才实例化队头和队尾指针的,文档也说明了入队函数enq()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
资源竞争 acquire方法中定义了AQS框架获取锁的完整流程
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
这里会调用tryAcquire()
,这个方法是抽象方法,交由子类进一步实现,默认是会抛出异常的
1 2 3 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException (); }
尝试获取锁,但是不一定保证能获取到,如果获取到了返回 true
因此通过 acquire 源码,如果没获取到锁,会尝试调用 addWaiter
,也就是将线程构造成 AQS 队列中的一个 Node 了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
CountDownLatch JDK源码中的CountDownLatch就是基于AQS实现的一个最佳学习案例
下面我们来结合CountDownLatch进行学习
导论 CountDownLatch 允许一个或多个线程 等待 其他线程完成一系列操作后再继续执行(典型的 “等待 - 通知” 模型)
在实际业务中,我们常常用他来实现多线程环境下线程的等待机制
例如:服务启动时,异步初始化关键组件:配置文件读取、数据库连接,缓存初始化
我们希望这三件事情异步执行,主线程等三个事情执行完毕后再继续
基础api 基于上述的需求场景,直接看代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import java.util.concurrent.CountDownLatch;public class InitApplicationDemo { private static final int INIT_TASK_COUNT = 3 ; public static void main (String[] args) throws InterruptedException { CountDownLatch initLatch = new CountDownLatch (INIT_TASK_COUNT); System.out.println("主线程:开始启动初始化任务..." ); new Thread (() -> { try { System.out.println("初始化线程1:开始加载配置文件..." ); Thread.sleep(1000 ); System.out.println("初始化线程1:配置文件加载完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { initLatch.countDown(); } }, "Init-Config" ).start(); new Thread (() -> { try { System.out.println("初始化线程2:开始连接数据库..." ); Thread.sleep(1500 ); System.out.println("初始化线程2:数据库连接成功" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { initLatch.countDown(); } }, "Init-DB" ).start(); new Thread (() -> { try { System.out.println("初始化线程3:开始预热缓存..." ); Thread.sleep(800 ); System.out.println("初始化线程3:缓存预热完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { initLatch.countDown(); } }, "Init-Cache" ).start(); initLatch.await(); System.out.println("主线程:所有初始化任务完成,应用启动成功!" ); } }
可以看出,CountDownLatch的使用很简单:实例化对象时定义一个状态(计数器)
某个线程可以通过 countDown()
方法将计数器减 1(每调用一次减 1)
想要等待的线程通过 await()
方法进行等待,直到计数器减到 0 后被唤醒
因此在上面的代码中,我们实现逻辑的最终流程如下:
主线程 启动时创建 CountDownLatch
,计数器初始值为 3(对应 3 个初始化任务)
启动 3 个初始化线程,分别执行加载配置、连接数据库、预热缓存(模拟耗时操作)
每个初始化线程完成任务后,在 finally
中调用 countDown()
,确保无论任务是否异常,计数器都会减 1
主线程调用 initLatch.await()
后阻塞,等待计数器从 3 减到 0
当最后一个初始化线程调用 countDown()
后,计数器归 0,主线程被唤醒,继续执行 “应用启动” 逻辑
源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount () { return getState(); } @Override protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) { return nextc == 0 ; } } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void countDown () { sync.releaseShared(1 ); } public long getCount () { return sync.getCount(); } }
CountDownLatch 的源码短小精悍,核心在于其中的 Sync
类,这个类继承了 AQS 框架,重写了父类暴露的两个核心方法tryAcquireShared
以及 tryReleaseShared
countDown 1 2 3 public void countDown () { sync.releaseShared(1 ); }
父类的模板中定义如下:
1 2 3 4 5 6 7 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
实际上调用的是CountDownLatch重写的tryReleaseShared
1 2 3 4 5 6 7 8 9 10 11 protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
这一段逻辑就是递减state值,compareAndSetState
是原子地设置状态,如果成功将状态更新到0,那么就会进入AQS中的doReleaseShared
,唤醒同步队列中的等待线程