自顶向下解读CountDownLatch

自顶向下解读CountDownLatch

CountDownLatch类的作用

CountDownLatch是JDK提供的并发流程控制的工具类,直译是“倒数门闩”,比喻为“拼多多10人拼团,人满了就发货”
典型例子:长途大巴车,人满发车。

顾名思义,它有一个计数器,用来倒数,这里用“门栓”来比喻这个类的工作时机:它会在倒数结束之前(也就是门栓被打开之前),一直处于等待状态,直到倒计时结束了(大门被打开了),此线程才继续工作。

CountDownLatch在java.util.concurrent包下,是在JDK1.5以后加入的。

CountDownLatch的两个典型用法

主要方法介绍

  • 仅有一个构造函数:public CountDownLatch(int count) { }; //参数count为需要倒数的数值;
  • await()//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  • await(long timeout, TimeUnit unit)//和await()类似,但是这里可以设置超时时间,如果超时就不再等待;
  • countDown()//将count值减1,直到为0时,等待的线程会被唤起。

用法一:一个线程等待其他多个线程都执行完毕,再继续自己的工作。

场景

去医院做全面体检,至少会有血常规化验、X光拍片,等这两项检验结果都出来之后,医院会汇总然后给咱们一个完整的体检报告,但是这两项实际上是并行的。
应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

代码

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch c = new CountDownLatch(5);
        // 5名工人
        final ExecutorService exec = Executors.newFixedThreadPool(5);

        for (int index = 0; index < 5; index++) {
            final int NO = index + 1;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + NO + "完成了工作。");
                    } catch (InterruptedException e) {
                    } finally {
                        // 每个人完成了工作,计数器就减一
                        c.countDown();
                    }
                }
            };
            exec.submit(run);
        }
        System.out.println("等待其他人完成工作...");
        c.await();
        System.out.println("所有人都已完成了工作。");
    }
}

用法二:多个线程等待某一个线程的信号,同时开始执行

场景

  • 运动会上,多个运动员同时进行100米的比赛。运动员需要等待裁判员的发令枪响后,才能统一起跑。
  • 实现最大并行:有时我们希望同时启动多个线程以实现最大并行度。例如,我们想测试一个类是否为单例。可以利用CountDownLatch同时让所有等待中的线程一起恢复。
  • 对服务器进行并发模拟测试的时候,需要首先创建很多个线程并且初始化,然后所有线程等待统一的信号,同时向服务器发送请求,这样就可以最高程度的模拟真实双11的并发场景,提前预演。

代码

public class CountDownLatchDemo {
    // 模拟100米跑步比赛,5名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。
    public static void main(String[] args) throws InterruptedException {

        // 运动员等待起跑信号
        final CountDownLatch begin = new CountDownLatch(1);

        // 5名选手
        final ExecutorService exec = Executors.newFixedThreadPool(5);

        for (int index = 0; index < 5; index++) {
            final int NO = index + 1;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        System.out.println("No." + NO + "准备完毕,等待发令枪信号");
                        // 等待
                        begin.await();
                        System.out.println("No." + NO + "开始跑步了。");
                    } catch (InterruptedException e) {
                    }
                }
            };
            exec.submit(run);
        }

        Thread.sleep(5000);//模拟裁判员检查发令枪、检查场地等动作。

        System.out.println("***发令枪响,比赛开始!***");
        // begin减一,开始游戏
        begin.countDown();
    }
}

综合用法一和用法二:运动员跑步

场景

运动会上,多个运动员同时进行100米的比赛。运动员需要等待裁判员的发令枪响后,才能统一起跑;而裁判员需要等最后一名运动员到达终点后,再宣布本轮比赛结束、进行后续工作(比如统计成绩、安排下轮比赛)。
这里有两个等待的动作,第一个是多个运动员等待发令枪,另一个是裁判员等待所有运动员到达终点。

代码

public class CountDownLatchDemo2 {

    // 模拟100米跑步比赛,5名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。
    public static void main(String[] args) throws InterruptedException {

        // 运动员等待起跑信号
        final CountDownLatch begin = new CountDownLatch(1);

        // 裁判员等待所有运动员到达终点
        final CountDownLatch end = new CountDownLatch(5);

        // 5名选手
        final ExecutorService exec = Executors.newFixedThreadPool(5);

        for (int index = 0; index < 5; index++) {
            final int NO = index + 1;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        System.out.println("No." + NO + "准备完毕,等待发令枪信号");
                        // 等待
                        begin.await();
                        System.out.println("No." + NO + "开始跑步了。");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + NO + "到达终点。");
                    } catch (InterruptedException e) {
                    } finally {
                        // 每个选手到达终点时,end就减一
                        end.countDown();
                    }
                }
            };
            exec.submit(run);
        }

        Thread.sleep(5000);//模拟裁判员检查发令枪、检查场地等动作。

        System.out.println("***发令枪响,比赛开始!***");
        // begin减一,开始游戏
        begin.countDown();
        // 等待end变为0,即所有选手到达终点
        end.await();
        System.out.println("***所有人到达终点,比赛结束!***");
        exec.shutdown();
    }
}

CountDownLatch的注意点

本文介绍的几种用法,仅是常见的用法,CountDownLatch有扩展用法:多个线程等多个线程完成执行后,再同时执行。只要运用好初始化和countDown(),剩下的逻辑可以由程序自由控制。

CountDownLatch是不能够重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例。

CountDownLatch原理、源码分析

原理图

CountDownLatch源码分析

这里对CountDownLatch类里面最重要的3个方法进行分析:

构造方法

CountDownLatch仅提供了一个构造方法,接收的参数就是需要计数的数量,直到countDown()方法被调用到了规定的次数,之前在等待的线程才会继续工作。

源码

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

这里的Sync是CountDownLatch的内部类,继承自AQS类,并重写了tryAcquireShared(int acquires)和tryReleaseShared(int releases)方法;sync是Sync的实例。

await()

调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可以看出,这里调用了acquireSharedInterruptibly(1)方法,最终会实际调用:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()

public void countDown() {
    sync.releaseShared(1);
}

最终回到用到重写过的

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
            }

总结

CountDownLatch类在创建实例的时候,需要传递倒数次数。
而每一次线程调用了await()方法,state变量会减一,直到为0。
state为0的时候,之前等待的线程会继续运行。
CountDownLatch不能回滚重置。


发表评论

电子邮件地址不会被公开。