锁的等待与唤醒 -- ConditionObject 源码解析

2018-08-13 18:13:01   最后更新: 2018-08-13 18:13:01   访问数量:107




此前在介绍 ReentrantLock 用法时,我们介绍了 ReentrantLock 与 Condition 的用法,类似于 Object 提供的 notify、notifyAll 方法来让线程进入等待与唤醒,那么这是如何实现的呢?

在介绍 AQS 源码时,我们提到,AQS 维护了两个队列 -- 同步队列和等待队列,到现在为止,我们仅仅使用了 AQS 的同步队列,却从没有使用过 AQS 的等待队列,那么 AQS 等待队列究竟是如何实现的呢?

本篇日志我们就来介绍一下 ReentrantLock 的 Condition 的实现,他就是通过 AQS 的等待队列来实现的

 

ReentrantLock 用法详解

AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

 

public Condition newCondition() { return sync.newCondition(); }

 

 

可以看到,他直接调用了内部类 Sync 的 newCondition 方法

那我们来看看 Sync 的 newCondition 方法又做了什么呢?

final ConditionObject newCondition() { return new ConditionObject(); }

 

他直接新建了一个 AbstractQueuedSynchronizer 类的内部类 ConditionObject

 

ConditionObject 中维护了一个双线链表,也就是上面提到的 AQS 等待队列,有两个成员分别指向双向链表的首尾

private transient Node firstWaiter; private transient Node lastWaiter;

 

 

而 Node 对象就是我们熟悉的 AQS 同步队列节点:

static final class Node { //标志Node的状态:独占状态。 static final Node SHARED = new Node(); //共享状态 static final Node EXCLUSIVE = null; //因为超时或者中断,node会被设置成取消状态,被取消的节点时不会参与到竞争中的,会一直保持取消状态不会转变为其他状态; //同步队列中使用 static final int CANCELLED = 1; //该节点的后继节点被阻塞,当前节点释放锁或者取消的时候(cancelAcquire)需要唤醒后继者。 //同步队列中使用 static final int SIGNAL = -1; //CONDITION队列中的状态,同步队列中节点没有该状态,当将一个node从CONDITION队列中transfer到同步队列中时,状态由CONDITION转换成0 //同步队列不使用,CONDITION队列中使用 static final int CONDITION = -2; //该状态表示下一次节点如果是Shared的,则无条件获取锁。 //同步队列中使用 static final int PROPAGATE = -3; //当一个新的node在同步队列中被创建时初始化为0,在CONDITION队列中创建时被初始化为CONDITION状态 volatile int waitStatus; //队列中的前驱节点 //CONDITION队列中使用 volatile Node prev; //同步队列中使用,指向下一个节点的引用 volatile Node next; //当前线程 volatile Thread thread; //CONDITION队列中指向下一个node的指针,同步队列中不使用 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } //返回前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }

 

 

此前我们已经介绍过,在线程获取锁以后,通过 Condition 对象的 await 方法可以让线程挂起,并暂时释放锁,直到其他线程调用该 Condition 对象的 signal 方法或 signalAll 方法或线程被中断

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

 

 

下面是整个过程的流程图

 

通过流程图我们可以看到,整个流程是非常清晰的,简单的来说就是将线程对应的 AQS Node 节点从 AQS 的同步队列中移动到 Condition 对象维护的等待队列的队尾,然后释放锁,挂起线程

但是,这里存在一个问题,当线程被唤醒的时候,为什么检测到线程被中断不直接抛出 InterruptedException 而是要等到获取锁成功之后呢?因为 await 方法的原则是,怎么进来的怎么出去,他只是让线程休眠一段时间,他不会因为线程被中断就让本来持有锁的线程在退出方法时没有持有锁,这样的话,用户去释放锁的时候就会抛出意外的异常 java.lang.IllegalMonitorStateException,这显然是不合理的

 

这三个方法与 await 方法做了相同的事情,那就是让出锁的所有权,进入等待,但是他们的独特之处在于,你可以定义让出锁所有权的最长等待时间

他们三个方法的源码非常相似,与不带参数的 await 方法的区别仅仅是使用 LockSupport.parkNanos(this, nanosTimeout) 来实现线程的挂起,同时,超时的时候强制将 Condition 队列中的 Node 节点放入到 AQS 的同步队列中

 

await(long time, TimeUnit unit)

public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }

 

返回被唤醒时是否已经超时

 

awaitNanos(long nanosTimeout)

public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }

 

与上面的 await(long time, TimeUnit unit) 方法相比,几乎是一模一样的,仅仅是去掉了 TimeUnit 转纳秒数的部分,同时返回了剩余的纳秒数

 

awaitUntil

public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }

 

 

awaitUninterruptibly 是最简单的 await 方法了,他没有对 interrupted 方法的返回结果做任何处理

public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }

 

 

public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }

 

 

 

signal 的流程比较简单,主要就做了下面三件事

  1. 权限校验
  2. 将节点从 Condition 队列放入 AQS 同步队列
  3. 唤醒线程

 

public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }

 

 

 

我们可以看到,和 signal 从流程上几乎是一样的,所有节点被逐个添加到 AQS 同步队列尾部

 






技术帖      技术分享      多线程      java      condition      并发安全      reentrantlock      aqs      并发控制     


京ICP备15018585号