简述
AQS(抽象同步器) 就是 AbstractQueuedSynchronizer
抽象类的简称,可以通过继承该类快速的实现同步多线程下的同步容器。
如:ReadWriteLock、ReentrantLock,或者 CountDownLatch 与 Semaphore,甚至是线程池类 ThreadPoolExecutor 都继承了 AQS。
可以一句话概括它的作用,
AQS的实现依赖内部的同步队列,也就是FIFO
的双向链表,如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node
加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
AQS 内部的数据结构(双向链表)
AQS 的底层数据结构其实是一条双向链表、一个代表锁状态的变量 state
和一个Condition
。当加锁后,state会改变,而竞争锁的线程会被封装到节点中形成链表,并且尝试改变 state以获取锁。
这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到AQS队列中去。
public abstract class AbstractQueuedSynchronizer{
// 锁的状态,没有线程占用锁时 为 0
private volatile int state;
// Node的头帧
private transient volatile Node head;
// Node的尾帧
private transient volatile Node tail;
static final class Node{...}
}
AQS 中提供了 state变量做为锁状态,一般来说,0 被视为无锁状态,1 被视为加锁状态,如果是可重入锁,就会大于 1。
因此,AQS 中的加锁解锁实际上就是通过 CAS 改变 state的过程,即下列三个方法:
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
双向链表的节点(Node内部类)
static final class Node {
/** 标记表示节点正在共享模式中等待 */
static final Node SHARED = new Node();
/** 标记表示节点正在独占模式下等待 */
static final Node EXCLUSIVE = null;
// 表示当前结点已取消等待。
// 当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化
static final int CANCELLED = 1;
// 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
static final int SIGNAL = -1;
// 表示当前线程正在等待某个条件,当某个线程在调用了 Condition#signal 方法后,当前结点将会被从Condition队列转移到同步队列中。
static final int CONDITION = -2;
// 处于该状态的线程在释放共享资源,或接收到释放共享资源的信号时需要通知后继结点,以防止通知丢失。
static final int PROPAGATE = -3;
/**
* 节点的状态:
* 1,表示节点已经取消等待;
* 0,表示新节点入队时的默认状态;
* -1,等待前面节点成功执行完之后待唤醒状态;
* -2,持续等待状态,需要满足一定条件才能被唤醒;
* -3,指示下一个节点应无条件传播的waitStatus值;
*/
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 获取同步状态的线程
volatile Thread thread;
// 存储在Condition队列中的后继节点
Node nextWaiter;
// 是否为共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//将线程构造成一个Node,添加到等待队列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//这个方法会在Condition队列使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
注:和线程池中的状态一样,Node 只有小于 0 的时候才处于正常的等待状态中,因此很多地方通过判断是否小于 0 来确定节点是否处于等待状态。
AQS 独占锁的加锁过程
AQS 的同步过程其实就是同步队列节点中依次获取锁的过程; AQS 一共提供了独占和非独占两种获取资源的方法:
- acquire():以独占模式获取锁;
- release():以独占模式释放锁;
- acquireShared():以共享模式获取锁;
- releaseShared():以共享模式释放锁;
独占锁
独占锁, 即只有占有锁的线程才能操作资源,在 synchronize
底层的锁中,独占通过锁对象对象头中的指针来声明独占的线程,而在 AQS 中则通过父类 AbstractOwnableSynchronizer
提供的 exclusiveOwnerThread
变量来声明独占的线程:
private transient Thread exclusiveOwnerThread;
AQS 独占锁加锁的方法是 acquire()
,其中涉及到 tryAcquire()
方法是一个空实现,并未提供其他具体实现, 需要由子类实现并在在里面进行具体的独占判断:
public final void acquire(int arg) {
// 尝试获取锁
if (!tryAcquire(arg) &&
// 添加到等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 进入等待队列后阻塞
selfInterrupt();
}
里面还涉及到 addWaiter()
,acquireQueued()
和 selfInterrupt()
四个方法。
获取锁资源
在 AQS 中,tryAccquire()
是一个未实现的方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里我们以用可重入锁 ReentrantLock
为例,
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果 锁没有被占用
if (c == 0) {
// 如果当前同步队列中没有线程在等待
if (!hasQueuedPredecessors() &&
// 尝试CAS修改state
compareAndSetState(0, acquires)) {
// 将当前锁设为自己独占
setExclusiveOwnerThread(current);
return true;
}
}
// 如果锁已经被自己获取过了,即重入
else if (current == getExclusiveOwnerThread()) {
// state + 1,即多获取一次锁,即可重入锁
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 没有获取锁
return false;
}
注:非公平锁与公平锁的
tryAccquire()
主要差别在于,公平锁会先看看有没有线程在等待,没有才去竞争锁,而非公平锁不会看有没有线程在等待,无论如何都会先去竞争一次锁。
公平锁和非公平锁
ReentranLock
分为公平锁和非公平锁。
公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
添加节点至等待队列
addWaiter()
方法用于创建并添加等待节点。
private Node addWaiter(Node mode) {
// 以共享或者独占模式创建节点
Node node = new Node(Thread.currentThread(), mode);
// 尾插法插入节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果队列为空,自旋初始化 AQS 中的头结点和尾节点
enq(node);
return node;
}
注:这里的头结点实际上是一个哨兵节点,本身并无意义,当等待队列排队获取资源的时候,会直接从 head.next 开始。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在同步队列中获取锁
acquireQueued
这个方法主要做两件事:
- 如果当前节点已经是队列第二个结点了,并且获取锁成功,就设置当前节点为新头结点,然后执行完毕后设置为中断;
- 如果当前节点不是队列第二个节点,或者获取锁不成功,就挂起当前节点,等待上一节点的唤醒。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的上一节点
final Node p = node.predecessor();
// 再次尝试获取锁,如果前驱节点已经是头节点了,并且获取锁成功
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱节点不是头结点,并且获取锁失败
// 如果前驱节点需要被 park 挂起
if (shouldParkAfterFailedAcquire(p, node) &&
// 挂起当前线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 旧头结点已经处理完了,直接删除
if (failed)
cancelAcquire(node);
}
}
AQS 加锁过程的总结
当线程使用 acquire()
方法获取锁的时候:
- 先执行
tryAcquire()
方法,这是个需要由子类实现的空方法,公平或者非公平在这个方法中让线程去获取锁,获得锁的线程要修改state
; - 然后,失败的线程需要执行
addWaiter()
方法,这个方法用于将线程封装到节点中,并以尾插法插入等待队列的链表,同时,如果等待队列没有初始化就会在此处先初始化; - 接着,添加完成的节点执行
acquireQueued()
方法,此时会再次试图获取锁,如果此时还是失败,就会判断当前节点的前驱节点是否失效:如果不是就直接将前驱节点状态改为SIGNAL
,然后执行parkAndCheckInterrupt()
方法挂起当前线程;如果是就一直找到一个正常等待的前驱节点为止,改前驱节点状态然后再挂起线程。
AQS 独占锁的释放过程
和 AQS 使用 acquire()
方法加锁的过程类似,AQS 也有一个 release()
的解锁方法,他们同样需要实现类自己去实现 tryRelease()
方法。
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 如果当前头节点不为空,且不为初始状态
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
释放锁
和 tryAcquire()一样,AQS 不提供 tryRelease()的具体实现,而是交由子类去实现它。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
依然以可重入锁 ReentrantLock
为例,去了解 ReentrantLock#tryRelease()
的实现。
虽然 ReentrantLock
中有公平锁和非公平锁两种实现,但是他们是释放过程都是一样的,都通过他们的父类,即继承 AQS 的内部类 Sync#tryRelease()
方法来实现释放的功能:
protected final boolean tryRelease(int releases) {
// 可重入锁,减去一次持锁次数
int c = getState() - releases;
// 如果当前线程不是持有锁的线程则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果可重入次数为0,说明确实释放锁了
if (c == 0) {
free = true;
// 独占线程设置为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
这个地方,就是让 tryRelease()
去执行释放锁的过程,换句话说,就是改变 state
。
唤醒等待队列的后继节点
unparkSuccessor()
方法的主要用途是:
- 在前驱节点(其实就是同步队列的头结点)释放锁后,去唤醒同步队列中的后继节点;
- 如果后继节点处于
CANCELLED
状态,说明该节点已经挂掉了,就从尾节点向前找到离后继节点最近的节点去唤醒,否则直接唤醒后继节点。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 如果头节点状态还处于等待状态,则改回初始状态
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果后继节点存在并被标记为CANCELLED状态
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点开始,找到离node最近的处于等待状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点
LockSupport.unpark(s.thread);
}
AQS 独占锁释放锁总结
当线程使用 release()
方法释放锁的时候:
- 先执行
tryRelease()
方法释放资源,改变state
以释放锁 - 再执行
unparkSuccessor()
方法唤醒后继节点,如果后继节点挂了,就找到最近的下一个处于等待状态的有效节点唤醒。
AQS 共享锁的加锁和释放过程
相对 AQS 独占锁,共享锁在 AQS 中以及提供好的相关的实现。共享锁通过 acquireShared()
方法加锁,通过releaseShared()
方法解锁。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
获取锁资源
tryAcquireShared()
也是一个空实现方法,需要由子类去实现。根据注释,我们不难理解它的作用:
- 检查是否支持共享锁,如果是才能获取锁;
- 根据后继等待节点的情况返回值:大于 0 说明有后继等待节点,执行完以后继续唤醒后继节点;等于 0 说明当前已是最后一个可以获取共享锁的节点,不再唤醒后继节点;小于 0 说明锁获取失败,需要进入等待队列。
其中,针对共享锁,比较具有代表性的是读写锁 ReentrantReadWriteLock
,它通过 state
的高 16 位记录读锁,低 16 位记录写锁;在获取锁资源的时候,如果检测存在写锁则无法获得锁,如果是读锁则获取资源并递增读锁计数器,这部分的逻辑就是在其子类中得到的实现。
唤醒后继节点
基于上面的 tryAcquireShared()
方法,doAcquireShared()
要做的事情显然很明了了:
- 如果后继节点可以以共享模式唤醒,就直接依次唤醒;
- 否则,则跟获取独占锁的流程一样,再次尝试获取资源无果后将后将节点代表的线程挂起。
private void doAcquireShared(int arg) {
// 创建共享模式的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点已经是头结点,即当前节点需要获取锁
if (p == head) {
// 尝试获取共享锁
int r = tryAcquireShared(arg);
// 唤醒后继节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断当前线程是否需要被挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里有一个 setHeadAndPropagate()
方法,根据方法名可以猜出是用来设置头结点和唤醒后继共享节点的:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置头结点
setHead(node);
// 如果后续有需要唤醒的节点,并且当前节点没有被CANCELLED
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果下一节点处于共享状态
if (s == null || s.isShared())
// 释放共享锁
doReleaseShared();
}
}
这里其实只做了一些条件判断,确保有后继节点并且后继节点是正常节点,核心逻辑其实是 doReleaseShared()
方法:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒节点
unparkSuccessor(h);
}
else if (ws == 0 &&
// 如果后续节点不需要唤醒,则设置为PROPAGATE避免影响后继节点的唤醒
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
解锁过程
解锁使用的releaseShared()
方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
这里的 tryReleaseShared()
其实跟独占锁的tryRelease()
类似,即改变状态以表示释放资源,而 doReleaseShared()
即上文唤醒后继节点的方法。
共享锁加锁和解锁的总结
共享锁和独占锁的根本区别在于,当头是共享模式时,它被唤醒后会直接尝试唤醒后继所有共享模式的节点,直到遇到第一个非共享模式的节点为止,而不是跟独占锁一样只唤醒后继节点。
AQS 总结
-
AQS 在内部为此了一个变量
state
,用于记录锁状态,线程通过 CAS 修改state
即是加锁解锁过程。 -
AQS 内存维护了一条双向链表,即同步队列,等待锁的线程被封装为
Node
节点连成链表,通过LockSuppor
工具类的park()
和unpark()
方法切换等待状态。 -
AQS 提供了独占和非独占两种锁实现方式,分别提供了
acquire()/release()
和acquireShared()/releaseShared()
两套加锁解锁方式; 同时,基于state
有衍生出可重入和非可重入锁的实现(即重入锁在state=1的情况下继续递增),解锁在state
上递减直到为 0 为止。并且,根据是否先判断等待队列中是否已存在等待线程,然后再尝试获取锁的情况,又分出了公平锁和非公平锁两种实现。
附录
compareAndSwapObject 到底比较的是什么?
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/69713.html