Skip to content

CountDownLatch

CountDownLatch中文名称叫做闭锁,也叫计数锁,不过不是用来加锁的,而是通过计数实现条件等待的功能。 CountDownLatch的使用场景有两个:

  1. 当前线程等待其他线程都执行完成之后,再执行。
  2. 所有线程满足条件后,再一起执行。

源码解析

CountDownLatch类

java
public class CountDownLatch {

    // 只有一个Sync同步变量
    private final Sync sync;

    // Sync继承自AQS,主要逻辑都在这里面
    private static final class Sync extends AbstractQueuedSynchronizer {

        // 只有这一个构造方法,需要指定计数器数值
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

}
public class CountDownLatch {

    // 只有一个Sync同步变量
    private final Sync sync;

    // Sync继承自AQS,主要逻辑都在这里面
    private static final class Sync extends AbstractQueuedSynchronizer {

        // 只有这一个构造方法,需要指定计数器数值
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

}

ReentrantLock一样,CountDownLatch也没有直接继承AQS,也是采用组合的方式,使用Sync同步变量实现计数的功能,而Sync同步变量才是真正继承AQS的。

countDown方法

java
public void countDown() {
    // 底层调用父类AQS中的releaseShared()方法
    sync.releaseShared(1);
}
public void countDown() {
    // 底层调用父类AQS中的releaseShared()方法
    sync.releaseShared(1);
}

countDown()方法里面调用的是父类AQS中的releaseShared()方法,而releaseShared()方法又在调用子类Sync中tryReleaseShared()方法。

java
/**
 * 父类AQS
 */
public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
            
    public final boolean releaseShared(int arg) {
        // tryReleaseShared()由子类实现
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    // 定义抽象方法,由子类实现
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
}
/**
 * 父类AQS
 */
public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
            
    public final boolean releaseShared(int arg) {
        // tryReleaseShared()由子类实现
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    // 定义抽象方法,由子类实现
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
}
java
/**
 * 子类Sync
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    
    // 实现父类AQS中的tryReleaseShared()方法
    @Override
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0) {
                return false;
            }
            int nextc = c-1;
            if (compareAndSetState(c, nextc)) {
                return nextc == 0;
            }
        }
    }
}
/**
 * 子类Sync
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    
    // 实现父类AQS中的tryReleaseShared()方法
    @Override
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0) {
                return false;
            }
            int nextc = c-1;
            if (compareAndSetState(c, nextc)) {
                return nextc == 0;
            }
        }
    }
}

Sync同步类中tryReleaseShared()方法逻辑很简单,就是把同步状态state值减一。

await源码

await()方法底层也是调用父类中acquireSharedInterruptibly()方法,而父类AQS又需要调用子类Sync中的具体实现。

java
public void await() throws InterruptedException {
    // 底层调用父类AQS中的releaseShared()方法
    sync.acquireSharedInterruptibly(1);
}

/**
 * 父类AQS
 */
public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // tryAcquireShared()由子类实现
        if (tryAcquireShared(arg) < 0) {
            doAcquireSharedInterruptibly(arg);
        }
    }

    // 定义抽象方法,由子类实现
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

}
public void await() throws InterruptedException {
    // 底层调用父类AQS中的releaseShared()方法
    sync.acquireSharedInterruptibly(1);
}

/**
 * 父类AQS
 */
public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // tryAcquireShared()由子类实现
        if (tryAcquireShared(arg) < 0) {
            doAcquireSharedInterruptibly(arg);
        }
    }

    // 定义抽象方法,由子类实现
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

}

子类Sync只需要实现tryAcquireShared()方法即可,而tryAcquireShared()方法的作用就是判断锁是否已经完全释放,即同步状态state=0。

java
/**
 * 子类Sync
 */
private static final class Sync extends AbstractQueuedSynchronizer {

    // 实现父类AQS中的tryAcquireShared()方法
    @Override
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
}
/**
 * 子类Sync
 */
private static final class Sync extends AbstractQueuedSynchronizer {

    // 实现父类AQS中的tryAcquireShared()方法
    @Override
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
}

示例

场景1

当前线程等待其他线程都执行完成之后,再执行。

  • 比如当前线程需要查询3个数据库,并且把查询结果汇总返回给前端。查询3个数据库的逻辑,可以分别使用3个线程加快查询速度。但是怎么判断3个线程都执行结束了呢?使用 CountDownLatch

  • 输出结果:

java
pool-1-thread-2 执行完成 
pool-1-thread-1 执行完成 
pool-1-thread-3 执行完成 
所有任务执行完成。
pool-1-thread-2 执行完成 
pool-1-thread-1 执行完成 
pool-1-thread-3 执行完成 
所有任务执行完成。
java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest1 {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建一个线程池,用来执行3个查询任务
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 2. 创建一个计数锁,数量是3
        CountDownLatch countDownLatch = new CountDownLatch(3);

        // 3. 启动3个查询任务
        for (int i = 0; i < 3; i++) {
            executorService.submit(() -> {
                try {
                    // 4. 睡眠1秒,模拟任务执行过程
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 执行完成");
                    // 5. 任务执行完成,计数器减一
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                }
            });
        }

        // 6. 等待所有任务执行完成
        // 在3个任务没有执行完成之前,await()方法会一直阻塞,直到3个任务都执行完成。
        countDownLatch.await();
        System.out.println("所有任务执行完成。");

        // 7. 关闭线程池
        executorService.shutdown();
    }
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest1 {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建一个线程池,用来执行3个查询任务
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 2. 创建一个计数锁,数量是3
        CountDownLatch countDownLatch = new CountDownLatch(3);

        // 3. 启动3个查询任务
        for (int i = 0; i < 3; i++) {
            executorService.submit(() -> {
                try {
                    // 4. 睡眠1秒,模拟任务执行过程
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 执行完成");
                    // 5. 任务执行完成,计数器减一
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                }
            });
        }

        // 6. 等待所有任务执行完成
        // 在3个任务没有执行完成之前,await()方法会一直阻塞,直到3个任务都执行完成。
        countDownLatch.await();
        System.out.println("所有任务执行完成。");

        // 7. 关闭线程池
        executorService.shutdown();
    }
}

场景2

所有线程满足条件后,再一起执行。

  • 如系统中多个任务线程存在先后依赖关系,必须等待其他线程启动完成后,才能一起执行。

  • 与场景1不同,这里创建CountDownLatch计数器的时候,指定的数量是1,因为3个任务需要满足同一个条件,就是都启动完成,也就是只需要调用一次countDown()方法。

  • 输出结果:

java
pool-1-thread-1 启动完成
pool-1-thread-2 启动完成
pool-1-thread-3 启动完成
所有任务启动完成,开始执行。
pool-1-thread-1 执行完成
pool-1-thread-3 执行完成
pool-1-thread-2 执行完成
pool-1-thread-1 启动完成
pool-1-thread-2 启动完成
pool-1-thread-3 启动完成
所有任务启动完成,开始执行。
pool-1-thread-1 执行完成
pool-1-thread-3 执行完成
pool-1-thread-2 执行完成
java
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建一个线程池,用来执行3个任务
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 2. 创建一个计数锁,数量是1
        CountDownLatch countDownLatch = new CountDownLatch(1);

        // 3. 启动3个任务
        for (int i = 0; i < 3; i++) {
            executorService.submit(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 启动完成");
                    // 4. 等待其他任务启动完成
                    countDownLatch.await();
                    // 5. 睡眠1秒,模拟任务执行过程
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 执行完成");
                } catch (InterruptedException e) {
                }
            });
        }

        // 6. 所有任务启动完成,计数器减一
        countDownLatch.countDown();
        System.out.println("所有任务启动完成,开始执行。");

        // 7. 关闭线程池
        executorService.shutdown();
    }
}
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建一个线程池,用来执行3个任务
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 2. 创建一个计数锁,数量是1
        CountDownLatch countDownLatch = new CountDownLatch(1);

        // 3. 启动3个任务
        for (int i = 0; i < 3; i++) {
            executorService.submit(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 启动完成");
                    // 4. 等待其他任务启动完成
                    countDownLatch.await();
                    // 5. 睡眠1秒,模拟任务执行过程
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 执行完成");
                } catch (InterruptedException e) {
                }
            });
        }

        // 6. 所有任务启动完成,计数器减一
        countDownLatch.countDown();
        System.out.println("所有任务启动完成,开始执行。");

        // 7. 关闭线程池
        executorService.shutdown();
    }
}