博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
AbstractQueuedSynchronizer
阅读量:6412 次
发布时间:2019-06-23

本文共 5744 字,大约阅读时间需要 19 分钟。

AQS 是用来构建锁和同步工具的基本框架。本文主要基于 AQS 作者 Doug Lea 的论文 和 JDK 1.8 的文档。

这篇文章也同时发布在我的中。

不过英文好的话,还是直接看论文吧。

设计要求

同步(维持变量在各个线程间状态的一致性)至少需要两种操作:

  • acquire:阻塞线程直到*同步状态(synchronization state)*允许线程运行
  • release:改变同步状态,并且 unblock 一个或多个阻塞的线程

同时支持两种模式:

  • exclusive mode:一次只允许一个线程改变同步状态
  • shared mode:多个线程同时改变同步状态可能成功;一次同时唤醒多个线程

同时框架需要一些高级功能:

  • 非阻塞和阻塞地改变同步状态(如tryLocklock
  • 超时功能,超时即放弃尝试
  • 响应线程中断

实现

同步器的基本思想很直接简洁,用伪代码表示如下:

acquire

while (syncronization state does not allow acquire) {  enqueue current thread if not already queued;  possiblly block current thread;}复制代码

release

update synchronization state;if (state may premit a blocked thread acuire) {  unblock one or more queued thread;}复制代码

要实现这两个操作,需要三个基本模块的配合:

  • 以原子操作管理同步状态
  • block 和 unblock 线程
  • 维护 FIFO 队列

Synchronization state

AQS 使用一个 32 位整数(int)来代表共享资源,也就是同步状态。

该整数可以表现任何状态。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态 (尚未开始、运行、完成和取消)

Blocking

AQS 使用 JUC 包下LockSupport中的pack()unpack()方法来阻塞和唤醒进程。最终会调用Unsafe.park()Unsafe.unpack()两个 native 方法,最终的阻塞线程和唤醒线程具体实现还是由操作系统来实现的。

Queue

AQS 维护一个 FIFO 的队列,来管理阻塞的线程,可以实现公平性(也可以不公平),也就是同时支持公平锁和非公平锁两种模式。内部使用 CLH Lock,但是做了很多优化,比如CLH 锁不是自旋的而是阻塞的。

AQS 中的 CLH lock 和原汁原味的 CLH lock 相比,主要有两点不同:

  • 不使用自旋锁而是阻塞锁,调用pack()unpack()实现。

  • 节点有显式的后继节点next,原来的 CLH lock 不需要显式的链表因为当前一个节点为释放锁时,后一个节点在一直轮询,所以它能够拿到锁。而 AQS 的锁是阻塞的,需要调用unpack(Thread)来唤醒请求锁的线程,所以需要知道它的后继节点。

AQS 同时还设置了一个signal bit来避免不必要的pack()unpack()调用。在调用pack()之前,首先设置signal bit为 true,然后再次检查节点状态,如果还不能拿到锁,就调用pack()阻塞线程。

这样就可以用更加详细的伪代码来描述acquirerelease,这里只考虑exclusive mode、不可中断的、没有超时功能的情况:

acquire:

if (!tryAcquire(arg)) {  node = create and enqueue new node;  pred = node's effective predecessor;  while (pred is not head node || !tryAcquire(arg)) {    if (pred's signal bit is set)    	park();    else     	compareAndSet pred's signal bit to ture    pred = node's effective predecessor;  }  head = node;}复制代码

release:

if (tryRelease(arg) && head node's signal bit is set) {  compareAndSet head's signal bit to false;  unpack head's successor, if one exists}s复制代码

使用

实现一个同步器需要实现下面的方法:

tryAcquire()tryRelease()tryAcquireShared()tryReleaseShared()isHeldExclusively()复制代码

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法.

  • 支持独占 (排他) 获取锁的同步器应该实现tryAcquiretryReleaseisHeldExclusively
  • 支持共享获取的同步器应该实现tryAcquireSharedtryReleaseSharedisHeldExclusively
  • 当然也可以同时支持 exclusive 模式和 shared 模式,比如ReentrantReadWriteLock

实现一个同步器最好的设计模式是把功能委托给一个AQS的私有内部子类,而不是直接继承 AQS 来实现(这样会破坏同步器的简洁性,调用者可能会调用 AQS 的其他方法破坏同步状态)。

例子

import java.util.concurrent.locks.AbstractQueuedSynchronizer;/** * @author leer * Created at 4/25/19 6:24 PM * 一个不可重入的互斥锁 */public class Mutex {  static final class Sync extends AbstractQueuedSynchronizer {    @Override    protected boolean tryAcquire(int ignore) {      return compareAndSetState(0, 1);    }    @Override    protected boolean tryRelease(int ignore) {      setState(0);      return true;    }  }  private final Sync sync = new Sync();    public void lock() {    sync.acquire(0);  }    public void unlock() {    sync.release(0);  }}复制代码

AQS 在 Synchronizers 中的具体实现

ReentrantLock

  • ReentrantLock 是可重入的:所以需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于 1,也就是获得了重进入的效果,而其他线程只能被 park 住,直到这个线程重进入锁次数变成 0 而释放原子状态
  • ReentrantLock 有公平锁和非公平锁两种模式:对应的, ReentrantLock 内部有两个 AQS 的子类。(the fair one disabling barging)

非公平锁的tryAcquire实现:

final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }复制代码

ReentrantReadWriteLock

ReentrantReadWriteLock 使用 同步状态的 16 位来存放读锁计数,另外的 16 位存放写锁计数。

  • 写锁和ReentrantLock类似
  • 读锁使用 shared 模式的 AQS来支持多个读者同时读

Semaphore

Semaphore使用同步状态来保存当前可用许可数量。它重写tryAcquireShared来减少计数来模拟获取资源,如果计数小于 0 则会阻塞线程;重写tryReleaseShared来模拟释放资源。同时它也有公平模式和非公平模式。

final int nonfairTryAcquireShared(int acquires) {            for (;;) {                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }        protected final boolean tryReleaseShared(int releases) {            for (;;) {                int current = getState();                int next = current + releases;                if (next < current) // overflow                    throw new Error("Maximum permit count exceeded");                if (compareAndSetState(current, next))                    return true;            }        }复制代码

CountDownLatch

Semaphore类似,同步状态保存当前的计数值。countDown()调用releaseShared()await()方法调用acquireShared(),等待计数器到零。

protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }        protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                if (c == 0)                    return false;                int nextc = c-1;                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }复制代码

FutureTask

Only when JDK version < 1.7

FutureTask使用同步状态保存Future任务的状态(initial、running、cancelled、done)。

设置和取消一个任务将调用release(),调用Future.get()等待结果将会调用acquire()

参考

  • Doug Lea 的论文:

  • 《Java并发编程实战》第14章

转载于:https://juejin.im/post/5cf63a53f265da1b6a348138

你可能感兴趣的文章
Golang性能调优入门
查看>>
sqlloader外部表
查看>>
golang笔记——数组与切片
查看>>
屏蔽可忽略的js脚本错误
查看>>
【Vue】vue.js常用指令
查看>>
NFS学习
查看>>
MySql常用命令总结
查看>>
又一年...
查看>>
文件上传框的美化+预览+ajax
查看>>
Linux VFS
查看>>
ext不能选中复制属性_如何实现Extjs的grid单元格只让选择(即可以复制单元格内容)但是不让修改?...
查看>>
python中print的作用*8、不能+8_在 Python 3.x 中语句 print(*[1,2,3]) 不能正确执行。 (1.0分)_学小易找答案...
查看>>
python 生成html代码_使用Python Markdown 生成 html
查看>>
axure如何导出原件_Axure 教程:轻松导出图标字体所有图标
查看>>
laravel input值必须不等于0_框架不提供,动手造一个:Laravel表单验证自定义用法...
查看>>
cad填充图案乱理石_太快了吧!原来大神是这样用CAD图案填充的
查看>>
activator.createinstance 需要垃圾回收么_在垃圾回收器中有哪几种判断是否需要被回收的方法...
查看>>
rocketmq 消息指定_RocketMQ入坑系列(一)角色介绍及基本使用
查看>>
redis zset转set 反序列化失败_掌握好Redis的数据类型,面试心里有底了
查看>>
p图软件pⅰc_娱乐圈最塑料的夫妻,P图永远只P自己,太精彩了吧!
查看>>