1、是什么

AQS全称为AbstractQueuedSynchronizer抽象队列同步器,是重量级基础框架及整个JUC重要的基石,主要解决锁分配给谁的问题。

整体就是一个抽象的FIFO队列来完成资源获取线程的排队问题,并一个int类变量,表示持有锁的状态

我们常见的CountDownLatch、ReentrantLock、StampedLock、ReentrantReadWriteLock都是基于AQS的实现

java.util.concurrent.locks包下的结构

2、案例

public class AQSTest {
    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();//非公平锁
        //A,B,C三个顾客,去银行办理业务。A先到此时窗口空无一人。他优先获得办理窗口的机会,办理业务
        // A 耗时严重
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("------come in A");
                try { TimeUnit.MINUTES.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
            }finally {
                reentrantLock.unlock();
            }
        },"A").start();

        //B是第二个顾客,B一看到A把受理窗口占用,只能去候客区等待,进入AQS队列
        //等待A办理完成,尝试去抢占受理窗口
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("------come in B");
            }finally {
                reentrantLock.unlock();
            }
        },"B").start();
        //C是第三个顾客,C一看到A把受理窗口占用,只能去候客区等待,进入AQS队列
        //等待A办理完成,尝试去抢占受理窗口,前面顾客B,FIFO
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("------come in C");
            }finally {
                reentrantLock.unlock();
            }
        },"C").start();
    }

}

3、原理分析

3.1 概述

  1. 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

  • getState - 获取 state 状态

  • setState - 设置 state 状态

  • compareAndSetState - cas 机制设置 state 状态

  1. 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

  2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList

  3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

3.2 设计

AbstractQueuedSynchronizer类结构说明

子类Node成员变量说明

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

     static final class Node {
        // 共享模式
        static final Node SHARED = new Node();
        // 独占(排它)模式
        static final Node EXCLUSIVE = null;

        // 线程已被取消,当首节点释放锁后,
        // 开始查找下一个 waitStatus < 0 的节点,
        // 如果遇到已取消的线程,则移除
        static final int CANCELLED =  1;
        // 当前线程的后继线程需要被unpark(唤醒)
        // 后继节点处于等待状态,当前节点(为-1)被取消或者中断时会通知后继节点,
        // 使后继节点的线程得以运行
        static final int SIGNAL    = -1;
        // 当前节点处于等待队列,节点线程等待在Condition上,
        // 当其他线程对condition执行signall方法时,
        // 等待队列转移到同步队列,加入到对同步状态的获取
        static final int CONDITION = -2;
        // 与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
        static final int PROPAGATE = -3;

        // 当前节点的状态,默认是 0 
        volatile int waitStatus;

        volatile Node prev;
        volatile Node next;

        // 等待获取锁而自旋的线程
        volatile Thread thread;

        // Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。
        // 在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
        // 在作为等待队列节点使用时,nextWaiter保存后继节点
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
}

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

tryAcquire

tryRelease

tryAcquireShared

tryReleaseShared

isHeldExclusively

获取:

while(!tryAcquire(arg)){
//如果线程尚未排队,则将其排队;
//可能阻塞当前线程;

}

释放:

// 如果释放锁成功
if (tryRelease(arg)) {
 // 让阻塞线程恢复运行
}

要点

  • 原子维护 state 状态

  • 阻塞及恢复线程

  • 维护队列

state 设计

  • state 使用 volatile 配合 cas 保证其修改时的原子性

  • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

阻塞恢复设计

  • 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到

  • 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没 问题

  • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细 park 线程还可以通过 interrupt 打断

队列设计

  • 使用了 FIFO 先入先出队列,并不支持优先级队列

  • 设计时借鉴了 CLH 队列,它是一种单向无锁队列

CLH 好处:
  • 无锁,使用自旋

  • 快速,无阻塞

主要用到 AQS 的并发工具类

3.3 AQS原理深入

3.3.1 CLH队列变体及AQS原理

CLH(Craig,Landin and Hagersten)是三个人,共同发明了一个可扩展、高性能、公平且基于自旋锁的链表;链表中的每个线程只在本地自旋前一个节点的状态,即该节点(线程)不断自旋获取前一个节点的状态;每个节点都有一个状态(要么自旋,要么释放锁)。

在AQS中,用到的数据结构是 CLH 的变体:

      +------+  prev +-----+       +-----+
 head |      | <---- |     | <---- |     |  tail
      +------+       +-----+       +-----+

上图是AQS中CLH的变体结构,该结构是:

  1. 一个 FIFO(first-in-first-out)队列;

  2. 新的等待获取锁的线程先加入队尾(tail);

  3. 如果队列是空,则第一个新加入的节点立即获得锁;

  4. 新加入的线程本地自旋前一个节点的状态(如 C 不断自旋获取 B 的状态);

  5. (当A释放锁时,B成为第一个节点),头节点并不能保证能够获得锁,只是有优先权,如果获取失败,则重新变为等待状态;

3.3.2 AQS队列变体数据结构

1)初始化队列结构,此时还没有线程来排队

2)当线程排队时候的队列变化

最终队列如下

     /**
     * The synchronization state.  当state为0时资源被抢占  为1时释放资源
     */
    private volatile int state;