(拉钩)Android工程师进阶34讲-10:深入理解AQS和CAS原理

0. 前言

AQS 全称 Abstract Queued Synchronized,一般翻译为同步器。它是一套实现多线程同步功能的框架,由 Doug Lea 操刀设计并开发实现的。AQS 在源码中被广泛使用,尤其是在JUC(Java Util Concurrent)中,比如ReentrantLockSemaphoreCountDownLatchThreadPoolExecutor。理解 AQS 对理解 JUC 以及其他组件很有帮助,并且在实际开发中也可以通过自定义 AQS 来实现各种需求场景。

注意:理解 AQS 需要一定额数据结构基础,尤其是双端队列,并对Unsafe有一定了解。

1. ReentrantLock 和 AQS 的关系

这里主要通过ReentrantLock来理解 AQS 内部的工作机制。首先从ReentrantLocklock()方法开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}

在里面只调用了Synclock()方法,这个Sync是什么呢?

1
2
3
4
5
6
7
8
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;

......

}

SyncReentrantLock的一个内部类。ReentrantLock并没有直接继承 AQS,而是通过内部Sync来扩展AQS的功能,然后ReentrantLock中存有Sync的全局变量引用。

SyncReentrantLock有两种实现:NonfairSyncFairSync,分别对应非公平锁和公平锁。以非公平锁为例,实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1)) // 通过cas操作来修改state状态,表示争抢锁的操作
setExclusiveOwnerThread(Thread.currentThread()); // 设置当前获得锁状态的线程
else
acquire(1); // 修改状态失败,尝试获取锁
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

可以看出,在非公平锁的lock()方法中,主要做了如下操作:

  • 如果通过CAS设置变量state(同步状态)成功,表示当前线程获取锁成功,则将当前线程设置为独占线程。
  • 如果通过CAS设置变量state(同步状态)失败,表示当前做正在被其他线程持有,则进入Acquire方法进行后续处理。

acquire()方法定义在AQS中,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire()主要做了三件事:

  • 1、tryAcquire():尝试获取锁;
  • 2、addWaiter():如果tryAcquire()尝试获取锁失败,则调用addWaiter()将当前线程添加到一个等待队列中;
  • 3、acquireQueued():处理加入到队列中的节点,通过自旋去尝试获取锁,根据情况将线程挂起或取消。

以上三个方法都被定义在AQS中,其中tryAcquire()有点特殊,其实现如下:

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

默认情况下,直接抛出异常,因此它需要在子类中重写,真正的获取锁的逻辑由子类同步器自己实现。

ReentrantLocktryAcquire()的实现(非公平锁)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前执行的线程
final Thread current = Thread.currentThread();
int c = getState(); // 获取 state 值
if (c == 0) { // c == 0,说明当前是无锁状态
// 通过cas操作来替换state的值为1
if (compareAndSetState(0, acquires)) {
// 设置当前线程持有独占锁
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

说明:

  • 获得当前线程,判断当前的锁的状态。
  • 如果state == 0,表示当前是无锁状态,通过cas更新state状态的值,返回true
  • 如果当前线程属于重入,则增加重入次数,返回true
  • 上述情况都不满足,则获取锁失败,返回false

一张图表示ReentrantLock.lock()过程:

图中可以看出,在ReentrantLock执行lock()过程中,大部分同步机制的核心逻辑都已经在AQS中实现,ReentrantLock自身只要实现某些特定步骤下的方法即可,这种设计模式叫模板模式。比如Android中,Activity的生命周期的执行流程都已经在framework中定义好了,子类Activity只要在相应的onCreate()onPause()等生命周期方法中提供相应的实现即可。

注意:不止ReentrantLock,JUC包中的其他组件如CountDownLatchSemaphor等都是通过一个内部类Sync来继承AQS,然后在内部中通过操作Sync来实现同步。这种做法的好处是将线程控制的逻辑控制在Sync内部,而对外面向用户提供的接口是自定义锁,这种聚合关系能够很好的解耦两者所关注的逻辑。

2. AQS核心功能原理分析

首先看看AQS中几个关键的属性:

1
2
3
4
5
6
7
8
9
static final class Node{
...
}

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

代码中展示了AQS中两个比较重要的属性Nodestate

2.1 state锁状态

state表示当前锁状态。当state = 0时表示无锁状态;当state > 0时,表示已经有线程获得锁,也就是state = 1,如果同一个线程多次获得同步锁时,state会递增,比如重入5次,那么state = 5。而在释放锁时,同样需要释放5次直到state = 0,其他线程才有资格获得锁。

state还有一个功能是实现锁的独占模式或共享模式。

  • 独占模式:只有一个线程能够持有同步锁。

在独占模式下,可以把state的初始值设置成0,当某个线程申请锁对象时,需要判断state的值是不是0,如果不是0,表示其他线程已经持有该锁,则本线程需要阻塞等待。

  • 共享模式:可以有多个线程持有同步锁。

共享模式下,比如某项操作允许10个线程同时进行,超过这个数量的线程就需要阻塞等待。那么只需要在线程申请对象时判断state的值是否小于10。如果小于10,就将state加1后继续同步语句的执行;如果等于10,说明已经有10个线程同时执行该操作,本线程需要阻塞等待。

2.2 Node双端队列节点

Node是一个先进先出的双端队列,并且是等待队列,当多线程争用资源被阻塞时会进入此队列。这个队列是AQS实现多线程同步的核心。

从之前ReentrantLock图中可以看出,在AQS中有两个Node的指针,分别指向队列的headtail

Node的主要结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class Node {
// 该等待同步的节点处于共享模式
static final Node SHARED = new Node();
// 该等待同步的节点处于独占模式
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// Node中的线程状态,这个和state是不一样的:有1,0,-1,-2,-3五个值
volatile int waitStatus;
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 等待锁的线程

...
}

默认情况下,AQS中的链表结构如下图所示:

2.3 获取锁失败后续流程分析

锁的意义就是使竞争到锁对象的线程执行同步代码,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock是如何实现让线程等待并唤醒的呢?

前面提到在ReentrantLock.lock()阶段,在acquire()方法中会先调用tryAcquireaddWaiteracquireQueued这三个方法来处理。tryAcquireReentrantLock中被复写并实现,如果返回true,说明成功获取锁,就继续执行同步代码语句。可是如果tryAcquire返回false,也就是当前锁对象被其他线程所持有,那么当前线程会被AQS如何处理呢?

2.3.1 addWaiter

首先当前获取锁失败的线程会被添加到一个等待队列的末端,具体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 将线程以Node的方式添加到队列中
*/
private Node addWaiter(Node mode) {
// 把当前线程封装到一个新的Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 将node插入到队列
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS替换当前尾部,成功则返回
pred.next = node;
return node;
}
}
enq(node); // 插入队列失败,进入enq自旋重试入队
return node;
}

/**
* 插入节点到队列中,如果队列未初始化则初始化,然后再插入
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 如果队列从未被初始化,需要初始化一个空的Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

有两种情况会致使插入队列失败:

  • 1、tail为空:说明队列从未初始化,因此需要调用enq()方法在队列中插入一个空的Node
  • 2、compareAndSetTail()失败:说明插入过程中有线程修改了此队列,因此需要说明enq()将当前node重新插入到队列末端。

经过addWaiter方法之后,此时线程以Node的方式被加入到队列的末端,但是线程还没有被执行阻塞操作,真正的阻塞操作是在下面的acquireQueued()方法中判断执行。

2.3.2 acquireQueued

acquireQueued()方法中不会立即挂起该节点中的线程,因此在插入节点的过程中,之前持有锁的线程可能已经执行完毕并释放锁,所以这里使用自旋再次去尝试获取锁(不放过任何优化细节)。如果自旋操作还没没有获取到锁,那么就将该线程挂起(阻塞),该方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 在队列中的节点通过此方法获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 检测当前节点前驱是否head,这是试获取锁的资格。
// 如果是,则调用tryAcquire尝试获取锁,
// 成功,则将head置为当前节点。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果未成功获取锁则根据前驱节点判断是否需要阻塞。
// 如果阻塞过程中被中断,则置interrupted标志位为true。
// shouldParkAfterFailedAcquire方法在前驱状态不为SINGAL的情况下都会循环重试获取锁。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可以看出在shouldParkAfterFailedAcquire()方法中会判断当前线程是否应该被挂起,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 根据前驱节点中的waitStatus来判断是否需要阻塞当前线程。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果是SINGAL状态,返回true,将当前线程挂起
return true;
if (ws > 0) {
/*
* 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前一个非取消节点。
* 当前线程之后会再次回到循环并尝试获取锁。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 等待状态为0或PROPAGATE(-3),设置前驱的等待状态为SINGAL,
* 并且之后会回到循环再次重试获取锁。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

首先获取前驱节点的waitStatus值,Node中的waitStatus一共有5种取值,分别代表的意义如下:
|waitStatus值|意义|
|:-:|:–|
|CANCELLED(1)|当前线程因为超时或者中断被取消。这是一个终结状态,也就是状态到此为止|
|SIGNAL(-1)|当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的|
|CONDITION(-2)|当前线程在condition队列中|
|PROPAGATE(-3)|用于将唤醒后继线程继续传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的|
|0|表示无锁状态|

接下来根据waitStatus不同的值进行不同的操作,主要有以下几种情况:

  • 如果waitStatus = SIGNAL,返回true,将当前线程挂起,等待后续唤醒操作即可。
  • 如果waitStatus = CANCELLED,会将此前驱节点从队列中删除,并在循环中逐步寻找下一个不是CANCELLED状态的节点作为当前节点的前驱节点。
  • 如果waitStatus既不是SIGNAL,也不是CANCELLED,则将当前节点的前驱节点状态设置为SIGNAL,这样做的好处是下一次执行shouldParkAfterFaildAcquire()时可以直接返回true,挂起线程。

代码再回到acquireQueued()中,如果shouldParkAfterFaileAcquire()返回true,表示线程需要被挂起,那么会继续调用parkAndCheckInterrupt()方法执行真正的阻塞线程代码,具体如下:

1
2
3
4
5
6
7
8
9
10
11
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

在里面调用了LockSupport.park()。在LockSupport.park()中,调用了Unsafe API来执行底层native方法将线程挂起,代码到这里已经到了操作系统的层面。

至此,获取锁的大体流程已经分析完毕,总结:

  • AQS的模板方法acquire()通过调用子类自定义实现的tryAcquire()获取锁;
  • 如果获取锁失败,通过addWaiter()方法将线程构造成Node节点插入到同步队列队尾;
  • acquireQueued()方法中以自旋的方式尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行LockSupport(Unsafe)中的native API来实现阻塞线程。

2.4 释放锁流程分析

在上面加锁阶段被阻塞的线程需要被唤醒之后才能重新执行。那具体AQS是何时尝试唤醒等待队列中被阻塞的线程呢?

和加锁过程一样,释放锁需要从ReentrantLock.unlock()方法开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

首先调用tryRelease()来尝试释放锁,如果成功,最终会调用AQS中的unparkSuccessor()方法来实现释放锁的操作。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unparkSuccessor(Node node) {
// 获取头节点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点(实际是head节点)的下一个节点
Node s = node.next;
// 如果下一个节点是null或者下个节点是CANCEL状态,就找到队列最开始的非CALCEL的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部节点开始找,到队首,找到队列第一个waitStatus < 0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态 <= 0,那就把当前节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}

解释说明:

  • 首先获取当前节点(实际上传入的是head节点)的状态,如果head节点的下一个节点是null,或者下一个节点的状态为CANCEL,则从等待队列的尾部开始遍历,直到寻找到第一个waitStatus < 0的节点。
  • 如果最终遍历的节点不为空,再调用LockSupport.unpark()方法,调用底层方法唤醒线程。至此,线程被唤醒的时机分析完毕。

2.5 不得不说的CAS

不管在加锁还是释放锁阶段,都提到了通用的操作:compareAndSetXXX。这种操作最终会调用Unsafe中的API进行CAS操作。

CAS全称Compare And Swap,译为比较和替换,是一种通过硬件实现并发安全的常用技术,底层通过利用CPU的CAS指令对缓存加锁或总线加锁的方式实现多处理器之间的原子操作。

它的实现过程主要有3个操作数:内存值V、旧的预期值E、要修改的新值U,当且仅当预期值E和内存值V相同时,才将内存值V修改为U,否则什么都不做。

3. 自定义AQS

通过自定义AQS来实现同步机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyLock {
private Sync sync = new Sync();

/**
* 加锁
*/
public void lock() {
sync.acquire(1);
}

/**
* 释放锁
*/
public void unlock() {
sync.release(1);
}

static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}

@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
}
}

MyLock就是一个最简单的独占锁,通过使用MyLock也能实现同synchronizedReentrantLock相同的功能。比如下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Test {
static int count = 0;
static MyLock myLock = new MyLock();

public static void main(String[] args) throws InterruptedException {
testMyLock();
}

public static void testMyLock() throws InterruptedException {
Runnable runnable = () -> {
try {
myLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myLock.unlock();
}
};

Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}

最终打印的count值为20000,说明两个线程之间是线程安全的同步操作。

4. 总结

AQS是一套框架,在框架内部已经封装好了大部分同步需要的逻辑,在AQS内部维护了一个状态指示器state和一个等待队列Node,通过state的操作分为两种模式:独占模式和共享模式。导致AQS有两种不同的实现:独占锁(ReentrantLock等)和分享锁(CountDownLatch、读写锁等)。这里主要从独占锁的角度分析了AQS的加锁和释放锁的流程。

理解AQS的原理对理解JUC包中其他组件实现的基础有帮助,并且理解其原理才能更好的扩展其功能。上层开发人员可以基于此框架基础上进行扩展实现适合不同场景、不同功能的锁。其中几个有可能需要子类同步器实现的方法如下。

  • lock()
  • tryAcquire():独占方式。尝试获取资源,成功则返回true,失败则返回false
  • tryRelease():独占方式。尝试释放资源,成功则返回true,失败则返回false
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false
  • Copyrights © 2019-2020 Tyler Liu

请我喝杯咖啡吧~

支付宝
微信