AQS 是用来构建锁和同步工具的基本框架。本文主要基于 AQS 作者 Doug Lea 的论文 和 JDK 1.8 的文档。
这篇文章也同时发布在我的中。
不过英文好的话,还是直接看论文吧。
设计要求
同步(维持变量在各个线程间状态的一致性)至少需要两种操作:
- acquire:阻塞线程直到*同步状态(synchronization state)*允许线程运行
- release:改变同步状态,并且 unblock 一个或多个阻塞的线程
同时支持两种模式:
- exclusive mode:一次只允许一个线程改变同步状态
- shared mode:多个线程同时改变同步状态可能成功;一次同时唤醒多个线程
同时框架需要一些高级功能:
- 非阻塞和阻塞地改变同步状态(如
tryLock
和lock
) - 超时功能,超时即放弃尝试
- 响应线程中断
实现
同步器的基本思想很直接简洁,用伪代码表示如下:
acquire
while (syncronization state does not allow acquire) { enqueue current thread if not already queued; possiblly block current thread;}复制代码
release
update synchronization state;if (state may premit a blocked thread acuire) { unblock one or more queued thread;}复制代码
要实现这两个操作,需要三个基本模块的配合:
- 以原子操作管理同步状态
- block 和 unblock 线程
- 维护 FIFO 队列
Synchronization state
AQS 使用一个 32 位整数(int)来代表共享资源,也就是同步状态。
该整数可以表现任何状态。比如,
Semaphore
用它来表现剩余的许可数,ReentrantLock
用它来表现拥有它的线程已经请求了多少次锁;FutureTask
用它来表现任务的状态 (尚未开始、运行、完成和取消)
Blocking
AQS 使用 JUC 包下LockSupport
中的pack()
和unpack()
方法来阻塞和唤醒进程。最终会调用Unsafe.park()
和Unsafe.unpack()
两个 native 方法,最终的阻塞线程和唤醒线程具体实现还是由操作系统来实现的。
Queue
AQS 维护一个 FIFO 的队列,来管理阻塞的线程,可以实现公平性(也可以不公平),也就是同时支持公平锁和非公平锁两种模式。内部使用 CLH Lock,但是做了很多优化,比如CLH 锁不是自旋的而是阻塞的。
AQS 中的 CLH lock 和原汁原味的 CLH lock 相比,主要有两点不同:
-
不使用自旋锁而是阻塞锁,调用
pack()
和unpack()
实现。 -
节点有显式的后继节点
next
,原来的 CLH lock 不需要显式的链表因为当前一个节点为释放锁时,后一个节点在一直轮询,所以它能够拿到锁。而 AQS 的锁是阻塞的,需要调用unpack(Thread)
来唤醒请求锁的线程,所以需要知道它的后继节点。
AQS 同时还设置了一个
signal bit
来避免不必要的pack()
和unpack()
调用。在调用pack()
之前,首先设置signal bit
为 true,然后再次检查节点状态,如果还不能拿到锁,就调用pack()
阻塞线程。
这样就可以用更加详细的伪代码来描述acquire
和release
,这里只考虑exclusive mode、不可中断的、没有超时功能的情况:
acquire:
if (!tryAcquire(arg)) { node = create and enqueue new node; pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set) park(); else compareAndSet pred's signal bit to ture pred = node's effective predecessor; } head = node;}复制代码
release:
if (tryRelease(arg) && head node's signal bit is set) { compareAndSet head's signal bit to false; unpack head's successor, if one exists}s复制代码
使用
实现一个同步器需要实现下面的方法:
tryAcquire()tryRelease()tryAcquireShared()tryReleaseShared()isHeldExclusively()复制代码
以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法.
- 支持独占 (排他) 获取锁的同步器应该实现
tryAcquire
、tryRelease
、isHeldExclusively
- 支持共享获取的同步器应该实现
tryAcquireShared
、tryReleaseShared
、isHeldExclusively
- 当然也可以同时支持 exclusive 模式和 shared 模式,比如
ReentrantReadWriteLock
实现一个同步器最好的设计模式是把功能委托给一个AQS的私有内部子类,而不是直接继承 AQS 来实现(这样会破坏同步器的简洁性,调用者可能会调用 AQS 的其他方法破坏同步状态)。
例子
import java.util.concurrent.locks.AbstractQueuedSynchronizer;/** * @author leer * Created at 4/25/19 6:24 PM * 一个不可重入的互斥锁 */public class Mutex { static final class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int ignore) { return compareAndSetState(0, 1); } @Override protected boolean tryRelease(int ignore) { setState(0); return true; } } private final Sync sync = new Sync(); public void lock() { sync.acquire(0); } public void unlock() { sync.release(0); }}复制代码
AQS 在 Synchronizers 中的具体实现
ReentrantLock
- ReentrantLock 是可重入的:所以需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于 1,也就是获得了重进入的效果,而其他线程只能被 park 住,直到这个线程重进入锁次数变成 0 而释放原子状态
- ReentrantLock 有公平锁和非公平锁两种模式:对应的, ReentrantLock 内部有两个 AQS 的子类。(the fair one disabling barging)
非公平锁的tryAcquire
实现:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }复制代码
ReentrantReadWriteLock
ReentrantReadWriteLock 使用 同步状态的 16 位来存放读锁计数,另外的 16 位存放写锁计数。
- 写锁和
ReentrantLock
类似 - 读锁使用 shared 模式的 AQS来支持多个读者同时读
Semaphore
Semaphore
使用同步状态来保存当前可用许可数量。它重写tryAcquireShared
来减少计数来模拟获取资源,如果计数小于 0 则会阻塞线程;重写tryReleaseShared
来模拟释放资源。同时它也有公平模式和非公平模式。
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }复制代码
CountDownLatch
和Semaphore
类似,同步状态保存当前的计数值。countDown()
调用releaseShared()
,await()
方法调用acquireShared()
,等待计数器到零。
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }复制代码
FutureTask
Only when JDK version < 1.7
FutureTask
使用同步状态保存Future
任务的状态(initial、running、cancelled、done)。
设置和取消一个任务将调用release()
,调用Future.get()
等待结果将会调用acquire()
。
参考
-
Doug Lea 的论文:
-
《Java并发编程实战》第14章