1、是什么
AQS全称为AbstractQueuedSynchronizer抽象队列同步器,是重量级基础框架及整个JUC重要的基石,主要解决锁分配给谁的问题。
整体就是一个抽象的FIFO队列来完成资源获取线程的排队问题,并一个int类变量,表示持有锁的状态
我们常见的CountDownLatch、ReentrantLock、StampedLock、ReentrantReadWriteLock都是基于AQS的实现
java.util.concurrent.locks包下的结构
2、案例
public class AQSTest {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();//非公平锁
//A,B,C三个顾客,去银行办理业务。A先到此时窗口空无一人。他优先获得办理窗口的机会,办理业务
// A 耗时严重
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("------come in A");
try { TimeUnit.MINUTES.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
}finally {
reentrantLock.unlock();
}
},"A").start();
//B是第二个顾客,B一看到A把受理窗口占用,只能去候客区等待,进入AQS队列
//等待A办理完成,尝试去抢占受理窗口
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("------come in B");
}finally {
reentrantLock.unlock();
}
},"B").start();
//C是第三个顾客,C一看到A把受理窗口占用,只能去候客区等待,进入AQS队列
//等待A办理完成,尝试去抢占受理窗口,前面顾客B,FIFO
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("------come in C");
}finally {
reentrantLock.unlock();
}
},"C").start();
}
}
3、原理分析
3.1 概述
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
3.2 设计
AbstractQueuedSynchronizer类结构说明
子类Node成员变量说明
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占(排它)模式
static final Node EXCLUSIVE = null;
// 线程已被取消,当首节点释放锁后,
// 开始查找下一个 waitStatus < 0 的节点,
// 如果遇到已取消的线程,则移除
static final int CANCELLED = 1;
// 当前线程的后继线程需要被unpark(唤醒)
// 后继节点处于等待状态,当前节点(为-1)被取消或者中断时会通知后继节点,
// 使后继节点的线程得以运行
static final int SIGNAL = -1;
// 当前节点处于等待队列,节点线程等待在Condition上,
// 当其他线程对condition执行signall方法时,
// 等待队列转移到同步队列,加入到对同步状态的获取
static final int CONDITION = -2;
// 与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
static final int PROPAGATE = -3;
// 当前节点的状态,默认是 0
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// 等待获取锁而自旋的线程
volatile Thread thread;
// Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。
// 在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
// 在作为等待队列节点使用时,nextWaiter保存后继节点
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
获取:
while(!tryAcquire(arg)){
//如果线程尚未排队,则将其排队;
//可能阻塞当前线程;
}
释放:
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
要点
原子维护 state 状态
阻塞及恢复线程
维护队列
state 设计
state 使用 volatile 配合 cas 保证其修改时的原子性
state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
阻塞恢复设计
早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到
解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没 问题
park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细 park 线程还可以通过 interrupt 打断
队列设计
使用了 FIFO 先入先出队列,并不支持优先级队列
设计时借鉴了 CLH 队列,它是一种单向无锁队列
CLH 好处:
无锁,使用自旋
快速,无阻塞
主要用到 AQS 的并发工具类
3.3 AQS原理深入
3.3.1 CLH队列变体及AQS原理
CLH(Craig,Landin and Hagersten)是三个人,共同发明了一个可扩展、高性能、公平且基于自旋锁的链表;链表中的每个线程只在本地自旋前一个节点的状态,即该节点(线程)不断自旋获取前一个节点的状态;每个节点都有一个状态(要么自旋,要么释放锁)。
在AQS中,用到的数据结构是 CLH 的变体:
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
上图是AQS中CLH的变体结构,该结构是:
一个 FIFO(first-in-first-out)队列;
新的等待获取锁的线程先加入队尾(tail);
如果队列是空,则第一个新加入的节点立即获得锁;
新加入的线程本地自旋前一个节点的状态(如 C 不断自旋获取 B 的状态);
(当A释放锁时,B成为第一个节点),头节点并不能保证能够获得锁,只是有优先权,如果获取失败,则重新变为等待状态;
3.3.2 AQS队列变体数据结构
1)初始化队列结构,此时还没有线程来排队
2)当线程排队时候的队列变化
最终队列如下
/**
* The synchronization state. 当state为0时资源被抢占 为1时释放资源
*/
private volatile int state;
评论