Skip to content

Condition

java
public interface Condition {
    
  	//与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒
    //感觉就是和普通对象的wait和notify是对应的 同时,等待状态下是可以响应中断的
 	void await() throws InterruptedException;
    
  	//同上,但不响应中断(看名字都能猜到)
  	void awaitUninterruptibly();
    
  	//等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断
  	long awaitNanos(long nanosTimeout) throws InterruptedException;
    
  	//等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断
  	boolean await(long time, TimeUnit unit) throws InterruptedException;
    
  	//可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断
  	boolean awaitUntil(Date deadline) throws InterruptedException;
    
  	//唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行
  	void signal();
    
  	//同上,但是是唤醒所有等待线程
  	void signalAll();
}
public interface Condition {
    
  	//与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒
    //感觉就是和普通对象的wait和notify是对应的 同时,等待状态下是可以响应中断的
 	void await() throws InterruptedException;
    
  	//同上,但不响应中断(看名字都能猜到)
  	void awaitUninterruptibly();
    
  	//等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断
  	long awaitNanos(long nanosTimeout) throws InterruptedException;
    
  	//等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断
  	boolean await(long time, TimeUnit unit) throws InterruptedException;
    
  	//可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断
  	boolean awaitUntil(Date deadline) throws InterruptedException;
    
  	//唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行
  	void signal();
    
  	//同上,但是是唤醒所有等待线程
  	void signalAll();
}

在AQS中,Condition有一个实现类ConditionObject,而这里也是使用了链表实现了条件队列:

java
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** 条件队列的头结点 */
    private transient Node firstWaiter;
    /** 条件队列的尾结点 */
    private transient Node lastWaiter;
  
  	//...
}
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** 条件队列的头结点 */
    private transient Node firstWaiter;
    /** 条件队列的尾结点 */
    private transient Node lastWaiter;
  
  	//...
}

这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:

image-20220307115850295

我们知道,当一个线程调用await()方法时,会进入等待状态,直到其他线程调用signal()方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。

我们先来看看最关键的await()方法是如何实现的,为了防止一会绕晕,在开始之前,我们先明确此方法的目标:

  • 只有已经持有锁的线程才可以使用此方法
  • 当调用此方法后,会直接释放锁,无论加了多少次锁
  • 只有其他线程调用signal()或是被中断时才会唤醒等待中的线程
  • 被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)

源码:

java
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();   //如果在调用await之前就被添加了中断标记,那么会直接抛出中断异常
    Node node = addConditionWaiter();    //为当前线程创建一个新的节点,并将其加入到条件队列中
    int savedState = fullyRelease(node);    //完全释放当前线程持有的锁,并且保存一下state值,因为唤醒之后还得恢复
    int interruptMode = 0;     //用于保存中断状态
    while (!isOnSyncQueue(node)) {   //循环判断是否位于同步队列中,如果等待状态下的线程被其他线程唤醒,那么会正常进入到AQS的等待队列中(之后我们会讲)
        LockSupport.park(this);   //如果依然处于等待状态,那么继续挂起
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)   //看看等待的时候是不是被中断了
            break;
    }
  	//出了循环之后,那线程肯定是已经醒了,这时就差拿到锁就可以恢复运行了
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  //直接开始acquireQueued尝试拿锁(之前已经讲过了)从这里开始基本就和一个线程去抢锁是一样的了
        interruptMode = REINTERRUPT;
  	//已经拿到锁了,基本可以开始继续运行了,这里再进行一下后期清理工作
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();  //将等待队列中,不是Node.CONDITION状态的节点移除
    if (interruptMode != 0)   //依然是响应中断
        reportInterruptAfterWait(interruptMode);
  	//OK,接着该干嘛干嘛
}
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();   //如果在调用await之前就被添加了中断标记,那么会直接抛出中断异常
    Node node = addConditionWaiter();    //为当前线程创建一个新的节点,并将其加入到条件队列中
    int savedState = fullyRelease(node);    //完全释放当前线程持有的锁,并且保存一下state值,因为唤醒之后还得恢复
    int interruptMode = 0;     //用于保存中断状态
    while (!isOnSyncQueue(node)) {   //循环判断是否位于同步队列中,如果等待状态下的线程被其他线程唤醒,那么会正常进入到AQS的等待队列中(之后我们会讲)
        LockSupport.park(this);   //如果依然处于等待状态,那么继续挂起
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)   //看看等待的时候是不是被中断了
            break;
    }
  	//出了循环之后,那线程肯定是已经醒了,这时就差拿到锁就可以恢复运行了
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  //直接开始acquireQueued尝试拿锁(之前已经讲过了)从这里开始基本就和一个线程去抢锁是一样的了
        interruptMode = REINTERRUPT;
  	//已经拿到锁了,基本可以开始继续运行了,这里再进行一下后期清理工作
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();  //将等待队列中,不是Node.CONDITION状态的节点移除
    if (interruptMode != 0)   //依然是响应中断
        reportInterruptAfterWait(interruptMode);
  	//OK,接着该干嘛干嘛
}

实际上await()方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:

  • 只有持有锁的线程才能唤醒锁所属的Condition等待的线程
  • 优先唤醒条件队列中的第一个,如果唤醒过程中出现问题,接着找往下找,直到找到一个可以唤醒的
  • 唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中,让其参与到锁的竞争中
  • 拿到锁之后,线程才能恢复运行

image-20220307120449303

源码:

java
public final void signal() {
    if (!isHeldExclusively())    //先看看当前线程是不是持有锁的状态
        throw new IllegalMonitorStateException();   //不是?那你不配唤醒别人
    Node first = firstWaiter;    //获取条件队列的第一个结点
    if (first != null)    //如果队列不为空,获取到了,那么就可以开始唤醒操作
        doSignal(first);
}
public final void signal() {
    if (!isHeldExclusively())    //先看看当前线程是不是持有锁的状态
        throw new IllegalMonitorStateException();   //不是?那你不配唤醒别人
    Node first = firstWaiter;    //获取条件队列的第一个结点
    if (first != null)    //如果队列不为空,获取到了,那么就可以开始唤醒操作
        doSignal(first);
}
java
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)   //如果当前节点在本轮循环没有后继节点了,条件队列就为空了
            lastWaiter = null;   //所以这里相当于是直接清空
        first.nextWaiter = null;   //将给定节点的下一个结点设置为null,因为当前结点马上就会离开条件队列了
    } while (!transferForSignal(first) &&   //接着往下看
             (first = firstWaiter) != null);   //能走到这里只能说明给定节点被设定为了取消状态,那就继续看下一个结点
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)   //如果当前节点在本轮循环没有后继节点了,条件队列就为空了
            lastWaiter = null;   //所以这里相当于是直接清空
        first.nextWaiter = null;   //将给定节点的下一个结点设置为null,因为当前结点马上就会离开条件队列了
    } while (!transferForSignal(first) &&   //接着往下看
             (first = firstWaiter) != null);   //能走到这里只能说明给定节点被设定为了取消状态,那就继续看下一个结点
}
java
final boolean transferForSignal(Node node) {
    /*
     * 如果这里CAS失败,那有可能此节点被设定为了取消状态
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //CAS成功之后,结点的等待状态就变成了默认值0,接着通过enq方法直接将节点丢进AQS的等待队列中,相当于唤醒并且可以等待获取锁了
  	//这里enq方法返回的是加入之后等待队列队尾的前驱节点,就是原来的tail
    Node p = enq(node);
    int ws = p.waitStatus;   //保存前驱结点的等待状态
  	//如果上一个节点的状态为取消, 或者尝试设置上一个节点的状态为SIGNAL失败(可能是在ws>0判断完之后马上变成了取消状态,导致CAS失败)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);  //直接唤醒线程
    return true;
}
final boolean transferForSignal(Node node) {
    /*
     * 如果这里CAS失败,那有可能此节点被设定为了取消状态
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //CAS成功之后,结点的等待状态就变成了默认值0,接着通过enq方法直接将节点丢进AQS的等待队列中,相当于唤醒并且可以等待获取锁了
  	//这里enq方法返回的是加入之后等待队列队尾的前驱节点,就是原来的tail
    Node p = enq(node);
    int ws = p.waitStatus;   //保存前驱结点的等待状态
  	//如果上一个节点的状态为取消, 或者尝试设置上一个节点的状态为SIGNAL失败(可能是在ws>0判断完之后马上变成了取消状态,导致CAS失败)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);  //直接唤醒线程
    return true;
}

其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?

这里其实是为了进行优化而编写,直接unpark会有两种情况:

  • 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
  • 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws > 0之后!compareAndSetWaitStatus(p, ws, Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws, Node.SIGNAL) == false

如果这里被提前unpark,那么在await()方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。所以,大致流程下:

image-20220307131536020