AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

2018-08-08 16:15:07   最后更新: 2018-08-08 16:19:22   访问数量:144




此前的日志中,我们介绍了 CAS

CAS 思想与 java 原子操作的实现

 

本篇日志中,我们来介绍 java 中线程同步的另一个基础工具类 -- AQS

AQS (Abstract Queued Synchronizer) 是 JDK 提供的一套基于 FIFO 同步队列的阻塞锁和相关同步器的一个同步框架,通过 AQS 我们可以很容易地实现我们自己需要的独占锁或共享锁

java 中,我们曾经介绍过的信号量、ReentrantLock、CountDownLatch 等工具都是通过 AQS 来实现的

java 线程同步工具类

 

AQS 又称为队列同步器,java 是通过 AbstractQueuedSynchronizer 类来实现其思想的

他通过一个 int 类型的成员变量 state 来控制同步状态,state = 0 时,则说明没有任何线程占用锁,当 state = 1 时,则说明有一个线程目前正在占用锁

它支持实现共享锁与独占锁,下面我们就从源码来剖析,分析一下 AQS 的实现原理

 

 

AbstractQueuedSynchronizer 继承自抽象类 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 这个类定义了存储独占当前锁的线程和获取的方法

AbstractQueuedSynchronizer 类通过内部类 Node 构成的 FIFO 同步队列来完成线程获取锁的排队工作

同时,AbstractQueuedSynchronizer 通过 ConditionObject 来构建等待队列

 

static final class Node { // 共享模式 static final Node SHARED = new Node(); // 独占模式 static final Node EXCLUSIVE = null; // 标识线程已处于结束状态 static final int CANCELLED = 1; // 等待被唤醒状态 static final int SIGNAL = -1; // 条件状态 static final int CONDITION = -2; // 在共享模式中使用表示获得的同步状态会被传播 static final int PROPAGATE = -3; // 等待状态, 存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种 volatile int waitStatus; // 同步队列中前驱结点 volatile Node prev; // 同步队列中后继结点 volatile Node next; // 请求锁的线程 volatile Thread thread; // 等待队列中的后继结点 Node nextWaiter; //判断是否为共享模式 final boolean isShared() { return nextWaiter == SHARED; } //获取前驱结点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } //..... }

 

Node 类是一个典型的双向链表元素结构,拥有前驱和后继的引用

其中 SHARED 和 EXCLUSIVE 常量分别代表共享模式和独占模式,所谓共享模式是一个锁允许多条线程同时操作,如信号量 Semaphore 采用的就是基于 AQS 的共享模式实现的

而独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如 ReentranLock

 

AbstractQueuedSynchronizer waitStatus 取值
常量意义
CANCELLED1同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,节点终极状态
SIGNAL-1等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行
CONDITION-2该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
PROPAGATE-3在共享模式中,该状态标识结点的线程处于可运行状态
00初始状态

 

AbstractQueuedSynchronizer 类作为抽象的基础框架类,通过定义模板方法的方式提供了一套实现锁的模板,其最基本的锁实现方式需要子类复写模板:

protected boolean tryAcquire(int arg); // 获取独占锁 protected boolean tryRelease(int arg); // 释放独占锁 protected int tryAcquireShared(int arg); // 获取共享锁 protected boolean tryReleaseShared(int arg); // 释放共享锁 protected boolean isHeldExclusively(); // 判断是否持有独占锁

 

ReentrantLock、Semaphore、CountDownLatch 等工具都是通过复写上述若干方法实现的

 

AbstractQueuedSynchronizer 通过 acquire 方法获取锁

下图是整个加锁过程的流程图

 

 

通过整个流程,我们看到,独占模式获取锁的原则就是:

  • 获取锁失败的节点插入队尾
  • 队首不存储任何信息
  • 队首的后继非 CANCELLED 状态节点是唯一有权利获取锁的节点

 

acquire 方法

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

 

acquire 方法调用了我们上面已经提到的需要子类复写的获取独占锁方法 tryAcquire,框架只定义了获取失败后如何处理 -- addWaiter

 

获取锁失败后入队操作 -- addWaiter

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

 

这是一个典型的双向队列节点入队操作,毋庸细说,他通过我们已经讲到过的 CAS 操作实现了尾节点的判断和更新

private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }

 

而兜底方法 enq 有两种可能被调用

  1. 当前队列为空,尚没有任何元素存在
  2. 并发环境下,原子操作 compareAndSetTail 失败

 

因此,enq 方法中承担了两部分工作的处理:

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

 

可以看到,enq 方法中通过原子操作和循环的方式实现了并发环境下尾节点的添加,之所以这部分逻辑从 addWaiter 中抽离,是因为大部分情况下是不会出现上述两种可能的,将正常业务逻辑与特殊且不常用业务逻辑分离,是值得学习的一种代码组织方式

 

重新获取锁及更新节点状态 -- acquireQueued

如上所述,并发环境下有可能在插入列表之前尚需要等待锁,但在插入列表后,马上又可以获取到锁,因此此时再次获取锁就可以减少不必要的等待

于是,在上述 addWaiter 方法执行完,又执行了 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋操作 for (;;) { // 获取当前插入节点的前置节点 final Node p = node.predecessor(); // 如果当前节点的前置节点是 head 则重新获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果需要线程进入等待,则等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

 

最终,通过 shouldParkAfterFailedAcquire 方法判断是否需要让线程挂起,如果需要挂起,则调用 parkAndCheckInterrupt 方法挂起线程

线程的挂起最终是通过 Unsafe 的 static native 方法 park 实现的

 

节点出队 -- cancelAcquire

最终,无论是线程获取锁过程中发生异常还是超时唤醒,都需要将当前 node 出队,这就是 cancelAcquire 方法做的事

private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }

 

 

可以看到,上面这段代码处理了三种情况:

  1. node 是队尾节点
  2. node 的前驱不是队首节点,node 也不是队尾节点
  3. node 的前驱是队首节点

 

其中,对于情况 1 和情况 2,都进行了 node 的出队,但是第三种情况却没有执行出队方法,这是为什么呢?

原因是当前节点已经被标记为 CANCELLED 状态,那么,在后继节点执行 shouldParkAfterFailedAcquire 方法时,会先让其前驱节点,也就是我们当前的 node 节点出队

 

唤醒后继节点 -- unparkSuccessor

由上所述,当 node 是队首节点的后继时,执行了 unparkSuccessor 方法

当前节点是队列中唯一等待锁的节点,所以必须让出获取锁的权限,让他的后继去获取锁,这就是 unparkSuccessor 方法做的事情

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

 

 

代码通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒了该节点对应的线程

LockSupport.unpark 最终调用了 UNSAFE 类的 unpark 方法

public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }

 

 

那么,被唤醒的线程执行了什么呢?我们要找到该线程是什么时候被挂起的,那就是在上述的 acquireQueued 方法中,我们回看 acquireQueued 方法,被唤醒的线程在执行完 Thread.interrupted() 方法后继续循环,尝试获取锁,从而保证了锁的独占与竞争

parkAndCheckInterrupt 方法并没有对 Thread.interrupted() 的返回值做任何处理,由此可见,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的

 

上面提到,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的,AQS 还提供了另一个方法 acquireInterruptibly 加锁,从而让获取锁失败等待的线程可以被中断

 

public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }

 

 

可中断独占锁加锁失败处理 -- doAcquireInterruptibly

可以看到,这里获取锁失败后调用了 doAcquireInterruptibly 方法,doAcquireInterruptibly 方法与 acquire 的结构非常像:

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

 

可以看到,这里仍然是通过独占模式获取锁,但是在 parkAndCheckInterrupt 返回后,与此前 acquire 方法中继续循环的方式不同,取而代之的,抛出了 InterruptedException 异常,中断线程执行

 

独占锁的解锁较为简单,因为加锁成功后,该线程对应的节点已经从同步队列中移除,此处如果解锁成功,只需要唤醒下一个节点去竞争锁即可

 

 

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

 

这里仍然调用了 unparkSuccessor 方法,通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒线程

 

 

 

从图上可以看到,共享锁的加锁主要做了下面三项工作:

  1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
  2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
  3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

 

获取锁传递与唤醒 --

整个流程与独占锁的加锁流程非常类似,最大的区别就是 setHeadAndPropagate 方法,这个方法做的事情就是我们上面提到的:

  • 如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待

 

//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //记录当前头节点 //设置新的头节点,即把当前获取到锁的节点设置为头节点 //注:这里是获取到锁之后的操作,不需要并发控制 setHead(node); //这里意思有两种情况是需要执行唤醒操作 //1.propagate > 0 表示调用方指明了后继节点需要被唤醒 //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒 //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒 if (s == null || s.isShared()) doReleaseShared(); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }

 

 

唤醒后继节点 -- doReleaseShared

doReleaseShared 方法进行了后续节点的唤醒

private void doReleaseShared() { for (;;) { //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了 //其实就是唤醒上面新获取到共享锁的节点的后继节点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //表示后继节点需要被唤醒 if (ws == Node.SIGNAL) { //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //执行唤醒操作 unparkSuccessor(h); } //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } //如果头结点没有发生变化,表示设置完成,退出循环 //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试 if (h == head) break; } }

 

 

我们再来看共享锁的解锁操作

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

 

结合上面我们已经介绍过的 doReleaseShared 方法源码,共享锁的解锁非常简单,与独占模式的解锁一样,他在解锁成功以后进行了后续节点的唤醒操作,从而保证锁的竞争

 






技术帖      技术分享            源码      java      加锁      解锁      aqs      共享锁abstract queued synchronizer     


京ICP备15018585号