并发编程学习笔记 之 工具类CountDownLatch、CyclicBarrier详解

导读:本篇文章讲解 并发编程学习笔记 之 工具类CountDownLatch、CyclicBarrier详解,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1、CountDownLatch用法详解

概念

  CountDownLatch类允许一个或者多个线程去等待其他线程完成操作。CountDownLatch构建时接收一个int型参数,表示要等待的工作线程的个数,每个子任务执行完成后使用countDown方法进行减1操作,等待线程使用await方法进行等待操作,直到所有子任务完成,等待线程就会被唤起执行。

示例

  创建子线程,模拟执行任务,当子任务执行完成后,主线程被唤醒继续执行,具体实现如下:

public static void main(String[] args) throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(5);
    for(int i=0;i<5;i++){
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //模拟任务耗时执行
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName() + "线程执行了任务!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //计数器 count down,线程执行完成
                    latch.countDown();
                }

            }
        });
        t.start();
    }
    //使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
    latch.await();
    System.out.println("所有子任务已经执行完成了!");
}

注意:
1、构建CountDownLatch对象时的整数和子任务一一对应,如果构建时数值大于latch.countDown()执行次数,主线程将会一直等待,如果小于latch.countDown()执行此次,主线程会提前被唤醒。
2、为保证latch.countDown()方法的执行,一般放到finally 代码块中,保证在异常的情况下也可以被执行,避免主线程无限等待。

CountDownLatch常用方法

1、Sync 内部类
  CountDownLatch通过内部类Sync来实现同步语义,而Sync继承AQS(AbstractQueuedSynchronizer)。具体实现如下:

//定义Sync变量,供CountDownLatch中的方法使用
private final Sync sync;

private static final class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = 4982264981922014374L;

     Sync(int count) {
         setState(count);
     }

     int getCount() {
         return getState();
     }

     protected int tryAcquireShared(int acquires) {
         return (getState() == 0) ? 1 : -1;
     }

     protected boolean tryReleaseShared(int releases) {
         // Decrement count; signal when transition to zero
         for (;;) {
             int c = getState();
             if (c == 0)
                 return false;
             int nextc = c-1;
             if (compareAndSetState(c, nextc))
                 return nextc == 0;
         }
     }
 }

2、构造函数
  参数为不能小于0的正整型,标识计数器。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

3、await()方法
  CountDownLatch有两个构造函数,一个无参,一个带有过期时间参数。具体实现本别基于AQS的acquireSharedInterruptibly()和tryAcquireSharedNanos()方法完成。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
//AbstractQueuedSynchronizer类中
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 如果被中断,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取同步状态
    if (tryAcquireShared(arg) < 0)
        // 获取同步状态失败,自旋
        doAcquireSharedInterruptibly(arg);
}
//在CountDownLatch的Sync中重写,表示当count==0时,获取成功
protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}
public boolean await(long timeout, TimeUnit unit)
  throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
      throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

4、countDown()方法
  使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。

public void countDown() {
    sync.releaseShared(1);
}

5、getCount()方法
  获得latch的数值。

 public long getCount() {
   return sync.getCount();
}

2、CyclicBarrier用法详解

  CyclicBarrier(循环屏障),和CountDownLatch类似,也是一个同步助手工具,它允许多个线程在执行完相应的操作之后彼此等待共同到达一个障点(barrier point)。CyclicBarrier也非常适合用于某个串行化任务被分拆成若干个并行执行的子任务,当所有的子任务都执行结束之后再继续接下来的工作。

  CyclicBarrier也接收一个int型的参数,但是和CountDownLatch的参数意义不一样,这里表示屏障拦截的线程数量(分片)。和CountDownLatch相比,CyclicBarrier可以被重复使用,而CountDownLatch当计数器为0的时候就无法再次利用。

  同时CyclicBarrier还提供了另外一个构造函数CyclicBarrier(int parties, Runnable barrierAction) ,其中barrierAction可用于当所有线程到达屏障时,优先执行的操作,然后再唤醒所有线程。

示例

  按照上述CountDownLatch例子,我们使用CyclicBarrier也实现一下,如下:

public class CyclicBarrierTest {

    public static void main(String[] args) throws InterruptedException {
        final CyclicBarrier barrier = new CyclicBarrier(5);
        List<Thread> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //模拟任务耗时执行
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + "线程执行了任务!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        try {
                            //在此等待其他子线程到达barrier point
                            barrier.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }

                }
            });
            list.add(t);
            t.start();
        }
        list.forEach(t ->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println("所有子任务已经执行完成了!");
    }
}

  和CountDownLatch相比,构造工具对象类似,但是在子线程执行完成后,这里调用的是barrier.await()方法,而主线程是借用了List集合和子线程的join()方法实现,用起来似乎不如CountDownLatch直观好用。其实,这里就是因为两个工具类设计思想不一样:CountDownLatch使用计数器,当计数器为0时,唤醒其他线程;而CyclicBarrier则是使用屏障,当所有线程均到屏障后,就开始继续工作。因此实现上述例子,更加优雅的方式,我们把主线程也作为其中一个线程,这样构造CyclicBarrier对象时,参数+1,然后主线程也是用await()方法即可,实现如下:

public class CyclicBarrierTest {

    public static void main(String[] args) throws InterruptedException {
        final CyclicBarrier barrier = new CyclicBarrier(6);
        for(int i=0;i<5;i++){
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //模拟任务耗时执行
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + "线程执行了任务!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        try {
                            //在此等待其他子线程到达barrier point
                            barrier.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }

                }
            });
            t.start();
        }
        try {
            barrier.await();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("所有子任务已经执行完成了!");
    }
}

  除了上述区别,CyclicBarrier还可以被重复使用,我们这里模拟学生上课的场景,来了解CyclicBarrier可重用用法。

  • 每节课开始前,开始点名,模拟五名学生
  • 每个线程代表一名学生,准备就绪,准备上课
  • 点名完成后,老师开始上课
  • 课程结束后,课间活动
  • 继续下节课
public class CyclicBarrier2Test {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

        final CyclicBarrier barrier = new CyclicBarrier(6, new Runnable() {
            @Override
            public void run() {
                System.out.println("点名完成,开始上课!");
            }
        });
        dianMing("语文",barrier);
        barrier.await();
        System.out.println("语文课已经结束,学生课间活动!");

        dianMing("数学",barrier);
        barrier.await();
        System.out.println("数学课已经结束,学生课间活动!");
    }

    /**
     * 模拟上课点名
     * @param type
     * @param barrier
     */
    public static void dianMing(String type, CyclicBarrier barrier){
        System.out.println("当前课程:" + type + ",准备上课,开始点名:");
        int count = 5;//5个学生
        for(int i=0;i<count;i++){
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //模拟任务耗时执行
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + "童鞋,已经准备就绪,等待上课!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        try {
                            //在此等待其他子线程到达barrier point
                            barrier.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }

                }
            });
            t.start();
        }
    }
}

输出结果如下:

当前课程:语文,准备上课,开始点名:
Thread-3童鞋,已经准备就绪,等待上课!
Thread-1童鞋,已经准备就绪,等待上课!
Thread-4童鞋,已经准备就绪,等待上课!
Thread-2童鞋,已经准备就绪,等待上课!
Thread-0童鞋,已经准备就绪,等待上课!
点名完成,开始上课!
语文课已经结束,学生课间活动!
当前课程:数学,准备上课,开始点名:
Thread-6童鞋,已经准备就绪,等待上课!
Thread-7童鞋,已经准备就绪,等待上课!
Thread-5童鞋,已经准备就绪,等待上课!
Thread-8童鞋,已经准备就绪,等待上课!
Thread-9童鞋,已经准备就绪,等待上课!
点名完成,开始上课!
数学课已经结束,学生课间活动!
CyclicBarrier常用方法

1、构造函数
  CyclicBarrier包括了两个构造函数,在前面例子中,我们都使用到了。

  • CyclicBarrier(int parties)构造器:构造CyclicBarrier并且传入parties,其中parties表示需要到达屏障的线程数,到达屏障的线程数量等于parties时,就会继续执行后续代码。
  • CyclicBarrier(int parties, Runnable barrierAction)构造器:构造CyclicBarrier不仅传入parties,而且指定一个Runnable接口,当所有的线程到达barrier point的时候,该Runnable接口会被调用,有时我们需要在所有任务执行结束之后执行某个动作,这时就可以使用这种CyclicBarrier的构造方式了。

2、int getParties()方法
  获取CyclicBarrier在构造时的parties,该值一经CyclicBarrier创建将不会被改变。

3、await()方法
  CyclicBarrier中有两个await重载方法,如下:

  • await()方法我们使用最多的一个方法,调用该方法之后,当前线程将会进入阻塞状态,等待其他线程执行await()方法进入barrier point,进而全部退出阻塞状态,当CyclicBarrier内部的count为0时,调用await()方法将会直接返回而不会进入阻塞状态。

  • await(long timeout, TimeUnit unit)方法:该方法与无参的await方法类似,只不过增加了超时的功能,当其他线程在设定的时间内没有到达barrier point时,当前线程也会退出阻塞状态。

4、isBroken()
  返回barrier的broken状态,某个线程由于执行await方法而进入阻塞状态,如果该线程被执行了中断操作,那么isBroken()方法将会返回true。

  • 当一个线程由于在执行CyclicBarrier的await方法而进入阻塞状态时,这个时候对该线程执行中断操作会导致CyclicBarrier被broken。
  • 被broken的CyclicBarrier此时已经不能再直接使用了,如果想要使用就必须使用reset方法对其重置。
  • 如果有其他线程此时也由于执行了await方法而进入阻塞状态,那么该线程会被唤醒并且抛出BrokenBarrierException异常。

5、getNumberWaiting()方法
  该方法返回当前barrier有多少个线程执行了await方法而不是还有多少个线程未到达barrier point。
6、reset()方法
  主要作用是中断当前barrier,并且重新生成一个generation,还有将barrier内部的计数器count设置为parties值,但是需要注意的是,如果还有未到达barrier point的线程,则所有的线程将会被中断并且退出阻塞,此时isBroken()方法将返回false而不是true。

3、CyclicBarrier 与 CountDownLatch 不同

  • CoundDownLatch的await方法会等待计数器被count down到0,而执行CyclicBarrier的await方法的线程将会等待其他线程到达barrier point。
  • CyclicBarrier内部的计数器count是可被重置的,进而使得CyclicBarrier也可被重复使用,而CoundDownLatch则不能。
  • CyclicBarrier是由Lock和Condition实现的,而CountDownLatch则是由同步控制器AQS(AbstractQueuedSynchronizer)来实现的。
  • 在构造CyclicBarrier时不允许parties为0,而CountDownLatch则允许count为0。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/68707.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!