AbstractQueuedSynchronizer(AQS) 的原理简述

导读:本篇文章讲解 AbstractQueuedSynchronizer(AQS) 的原理简述,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

简述

AQS(抽象同步器) 就是 AbstractQueuedSynchronizer 抽象类的简称,可以通过继承该类快速的实现同步多线程下的同步容器。

如:ReadWriteLock、ReentrantLock,或者 CountDownLatch 与 Semaphore,甚至是线程池类 ThreadPoolExecutor 都继承了 AQS。

可以一句话概括它的作用,

AQS的实现依赖内部的同步队列,也就是FIFO的双向链表,如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

AQS 内部的数据结构(双向链表)

AQS 的底层数据结构其实是一条双向链表、一个代表锁状态的变量 state和一个Condition。当加锁后,state会改变,而竞争锁的线程会被封装到节点中形成链表,并且尝试改变 state以获取锁。

https://oscimg.oschina.net/oscnet/20ac0ddf-4c81-457f-8970-9299d6190657.png

这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到AQS队列中去

public abstract class AbstractQueuedSynchronizer{
    // 锁的状态,没有线程占用锁时 为 0
    private volatile int state;		
    // Node的头帧
	private transient volatile Node head;	
    // Node的尾帧
    private transient volatile Node tail;	

    static final class Node{...}
}

AQS 中提供了 state变量做为锁状态,一般来说,0 被视为无锁状态,1 被视为加锁状态,如果是可重入锁,就会大于 1。

因此,AQS 中的加锁解锁实际上就是通过 CAS 改变 state的过程,即下列三个方法:

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

双向链表的节点(Node内部类)

static final class Node {
    /** 标记表示节点正在共享模式中等待 */
    static final Node SHARED = new Node();
    /** 标记表示节点正在独占模式下等待 */
    static final Node EXCLUSIVE = null;

    // 表示当前结点已取消等待。
    // 当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化
    static final int CANCELLED =  1;
    // 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
    static final int SIGNAL    = -1;
    // 表示当前线程正在等待某个条件,当某个线程在调用了 Condition#signal 方法后,当前结点将会被从Condition队列转移到同步队列中。
    static final int CONDITION = -2;
    // 处于该状态的线程在释放共享资源,或接收到释放共享资源的信号时需要通知后继结点,以防止通知丢失。
    static final int PROPAGATE = -3;
    /**
    * 节点的状态:
    * 	 1,表示节点已经取消等待;
    * 	 0,表示新节点入队时的默认状态;
    * 	-1,等待前面节点成功执行完之后待唤醒状态;
    * 	-2,持续等待状态,需要满足一定条件才能被唤醒;
    * 	-3,指示下一个节点应无条件传播的waitStatus值;
    */
    volatile int waitStatus;

    // 前驱节点
    volatile Node prev; 
    // 后继节点
    volatile Node next; 
    // 获取同步状态的线程
    volatile Thread thread;

    // 存储在Condition队列中的后继节点
    Node nextWaiter; 
    // 是否为共享锁
    final boolean isShared() { 
        return nextWaiter == SHARED;
    }
    // 获取前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    //将线程构造成一个Node,添加到等待队列
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    //这个方法会在Condition队列使用
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

注:和线程池中的状态一样,Node 只有小于 0 的时候才处于正常的等待状态中,因此很多地方通过判断是否小于 0 来确定节点是否处于等待状态。

AQS 独占锁的加锁过程

AQS 的同步过程其实就是同步队列节点中依次获取锁的过程; AQS 一共提供了独占和非独占两种获取资源的方法:

  • acquire():以独占模式获取锁;
  • release():以独占模式释放锁;
  • acquireShared():以共享模式获取锁;
  • releaseShared():以共享模式释放锁;

独占锁

独占锁, 即只有占有锁的线程才能操作资源,在 synchronize 底层的锁中,独占通过锁对象对象头中的指针来声明独占的线程,而在 AQS 中则通过父类 AbstractOwnableSynchronizer 提供的 exclusiveOwnerThread 变量来声明独占的线程:

private transient Thread exclusiveOwnerThread;

AQS 独占锁加锁的方法是 acquire(),其中涉及到 tryAcquire() 方法是一个空实现,并未提供其他具体实现, 需要由子类实现并在在里面进行具体的独占判断:

public final void acquire(int arg) {
    // 尝试获取锁
    if (!tryAcquire(arg) &&
        // 添加到等待队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 进入等待队列后阻塞
        selfInterrupt();
}

里面还涉及到 addWaiter(),acquireQueued()selfInterrupt()四个方法。

获取锁资源

在 AQS 中,tryAccquire() 是一个未实现的方法:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

这里我们以用可重入锁 ReentrantLock 为例,

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 如果 锁没有被占用
    if (c == 0) {
        // 如果当前同步队列中没有线程在等待
        if (!hasQueuedPredecessors() &&
            // 尝试CAS修改state
            compareAndSetState(0, acquires)) {
            // 将当前锁设为自己独占
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果锁已经被自己获取过了,即重入
    else if (current == getExclusiveOwnerThread()) {
        // state + 1,即多获取一次锁,即可重入锁
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 没有获取锁
    return false;
}

注:非公平锁与公平锁的 tryAccquire() 主要差别在于,公平锁会先看看有没有线程在等待,没有才去竞争锁,而非公平锁不会看有没有线程在等待,无论如何都会先去竞争一次锁。

公平锁和非公平锁

ReentranLock 分为公平锁和非公平锁。

公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。

非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。

添加节点至等待队列

addWaiter() 方法用于创建并添加等待节点。

private Node addWaiter(Node mode) {
    // 以共享或者独占模式创建节点
    Node node = new Node(Thread.currentThread(), mode);
    // 尾插法插入节点
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果队列为空,自旋初始化 AQS 中的头结点和尾节点
    enq(node);
    return node;
}

注:这里的头结点实际上是一个哨兵节点,本身并无意义,当等待队列排队获取资源的时候,会直接从 head.next 开始。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在同步队列中获取锁

acquireQueued

这个方法主要做两件事:

  1. 如果当前节点已经是队列第二个结点了,并且获取锁成功,就设置当前节点为新头结点,然后执行完毕后设置为中断;
  2. 如果当前节点不是队列第二个节点,或者获取锁不成功,就挂起当前节点,等待上一节点的唤醒。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的上一节点
            final Node p = node.predecessor();
            // 再次尝试获取锁,如果前驱节点已经是头节点了,并且获取锁成功
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为头结点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            
            // 前驱节点不是头结点,并且获取锁失败
            // 如果前驱节点需要被 park 挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 挂起当前线程
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 旧头结点已经处理完了,直接删除
        if (failed)
            cancelAcquire(node);
    }
}

AQS 加锁过程的总结

当线程使用 acquire() 方法获取锁的时候:

  1. 先执行tryAcquire()方法,这是个需要由子类实现的空方法,公平或者非公平在这个方法中让线程去获取锁,获得锁的线程要修改 state
  2. 然后,失败的线程需要执行addWaiter()方法,这个方法用于将线程封装到节点中,并以尾插法插入等待队列的链表,同时,如果等待队列没有初始化就会在此处先初始化;
  3. 接着,添加完成的节点执行acquireQueued()方法,此时会再次试图获取锁,如果此时还是失败,就会判断当前节点的前驱节点是否失效:如果不是就直接将前驱节点状态改为 SIGNAL ,然后执行 parkAndCheckInterrupt() 方法挂起当前线程;如果是就一直找到一个正常等待的前驱节点为止,改前驱节点状态然后再挂起线程。

AQS 独占锁的释放过程

和 AQS 使用 acquire() 方法加锁的过程类似,AQS 也有一个 release() 的解锁方法,他们同样需要实现类自己去实现 tryRelease()方法。

public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 如果当前头节点不为空,且不为初始状态
        if (h != null && h.waitStatus != 0)
            // 唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁

和 tryAcquire()一样,AQS 不提供 tryRelease()的具体实现,而是交由子类去实现它。

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

依然以可重入锁 ReentrantLock 为例,去了解 ReentrantLock#tryRelease()的实现。

虽然 ReentrantLock 中有公平锁和非公平锁两种实现,但是他们是释放过程都是一样的,都通过他们的父类,即继承 AQS 的内部类 Sync#tryRelease()方法来实现释放的功能:

protected final boolean tryRelease(int releases) {
    // 可重入锁,减去一次持锁次数
    int c = getState() - releases;
    // 如果当前线程不是持有锁的线程则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果可重入次数为0,说明确实释放锁了
    if (c == 0) {
        free = true;
        // 独占线程设置为null
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

这个地方,就是让 tryRelease() 去执行释放锁的过程,换句话说,就是改变 state

唤醒等待队列的后继节点

unparkSuccessor() 方法的主要用途是:

  1. 在前驱节点(其实就是同步队列的头结点)释放锁后,去唤醒同步队列中的后继节点;
  2. 如果后继节点处于 CANCELLED 状态,说明该节点已经挂掉了,就从尾节点向前找到离后继节点最近的节点去唤醒,否则直接唤醒后继节点。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        // 如果头节点状态还处于等待状态,则改回初始状态
        compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    // 如果后继节点存在并被标记为CANCELLED状态
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点开始,找到离node最近的处于等待状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒节点
        LockSupport.unpark(s.thread);
}

AQS 独占锁释放锁总结

当线程使用 release()方法释放锁的时候:

  1. 先执行tryRelease()方法释放资源,改变 state以释放锁
  2. 再执行unparkSuccessor()方法唤醒后继节点,如果后继节点挂了,就找到最近的下一个处于等待状态的有效节点唤醒。

AQS 共享锁的加锁和释放过程

相对 AQS 独占锁,共享锁在 AQS 中以及提供好的相关的实现。共享锁通过 acquireShared()方法加锁,通过releaseShared()方法解锁。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

获取锁资源

tryAcquireShared()也是一个空实现方法,需要由子类去实现。根据注释,我们不难理解它的作用:

  1. 检查是否支持共享锁,如果是才能获取锁;
  2. 根据后继等待节点的情况返回值:大于 0 说明有后继等待节点,执行完以后继续唤醒后继节点;等于 0 说明当前已是最后一个可以获取共享锁的节点,不再唤醒后继节点;小于 0 说明锁获取失败,需要进入等待队列。

其中,针对共享锁,比较具有代表性的是读写锁 ReentrantReadWriteLock,它通过 state高 16 位记录读锁,低 16 位记录写锁;在获取锁资源的时候,如果检测存在写锁则无法获得锁,如果是读锁则获取资源并递增读锁计数器,这部分的逻辑就是在其子类中得到的实现。

唤醒后继节点

基于上面的 tryAcquireShared()方法,doAcquireShared()要做的事情显然很明了了:

  1. 如果后继节点可以以共享模式唤醒,就直接依次唤醒;
  2. 否则,则跟获取独占锁的流程一样,再次尝试获取资源无果后将后将节点代表的线程挂起。
private void doAcquireShared(int arg) {
    // 创建共享模式的节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点已经是头结点,即当前节点需要获取锁
            if (p == head) {
                // 尝试获取共享锁
                int r = tryAcquireShared(arg);
                // 唤醒后继节点
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            // 判断当前线程是否需要被挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里有一个 setHeadAndPropagate()方法,根据方法名可以猜出是用来设置头结点和唤醒后继共享节点的:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 设置头结点
    setHead(node);
    
    // 如果后续有需要唤醒的节点,并且当前节点没有被CANCELLED
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果下一节点处于共享状态
        if (s == null || s.isShared())
            // 释放共享锁
            doReleaseShared();
    }
}

这里其实只做了一些条件判断,确保有后继节点并且后继节点是正常节点,核心逻辑其实是 doReleaseShared()方法:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 如果后续节点不需要唤醒,则设置为PROPAGATE避免影响后继节点的唤醒
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

解锁过程

解锁使用的releaseShared()方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

这里的 tryReleaseShared()其实跟独占锁的tryRelease()类似,即改变状态以表示释放资源,而 doReleaseShared()即上文唤醒后继节点的方法。

共享锁加锁和解锁的总结

共享锁和独占锁的根本区别在于,当头是共享模式时,它被唤醒后会直接尝试唤醒后继所有共享模式的节点,直到遇到第一个非共享模式的节点为止,而不是跟独占锁一样只唤醒后继节点。


AQS 总结

  1. AQS 在内部为此了一个变量 state,用于记录锁状态,线程通过 CAS 修改 state即是加锁解锁过程。

  2. AQS 内存维护了一条双向链表,即同步队列,等待锁的线程被封装为 Node 节点连成链表,通过 LockSuppor 工具类的 park()unpark()方法切换等待状态。

  3. AQS 提供了独占和非独占两种锁实现方式,分别提供了 acquire()/release()acquireShared()/releaseShared()两套加锁解锁方式; 同时,基于 state有衍生出可重入和非可重入锁的实现(即重入锁在state=1的情况下继续递增),解锁在 state上递减直到为 0 为止。并且,根据是否先判断等待队列中是否已存在等待线程,然后再尝试获取锁的情况,又分出了公平锁和非公平锁两种实现。

附录

AQS源码分析

compareAndSwapObject 到底比较的是什么?

CAS、AQS讲解

AbstractQueuedSynchronizer超详细原理解析

深入理解 JUC:AQS 队列同步器

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/69713.html

(0)
小半的头像小半

相关推荐

极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!