并发编程之循环屏障CyclicBarrier

得意时要看淡,失意时要看开。不论得意失意,切莫大意;不论成功失败,切莫止步。志得意满时,需要的是淡然,给自己留一条退路;失意落魄时,需要的是泰然,给自己觅一条出路并发编程之循环屏障CyclicBarrier,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

前言

前面我们分享了线程等待其他线程执行同步类CountDownLatch,CountDownLatch是一个或多个线程等待其他线程执行完成才执行,比如主线程等待其他多个子线程执行完成再执行主线程逻辑。但是,在实际生产开发过程中有些场景需要多个线程相互等待,比如合并计算结果、比赛计时等等,这个时候我们就需要用到循环屏障Cyclicbarrier。

什么是CyclicBarrier

CyclicBarrier翻译为循环屏障,正如其名它是一个可以循环使用,可以让一组线程相互等待,线程只有全部到达屏障才能使屏障消失的一个同步辅助类。

CyclicBarrier原理

CyclicBarrier内部使用可重入ReentrantLock保证线程同步(并发编程之可重入锁ReentrantLock),使用Condition来保证线程阻塞隔离和唤醒,使用组parties来保存可重用副本个数,使用–count来计算还没有到达屏障的线程数目。在实际的运行过程中,CyclicBarrier会使用count值模拟一个屏障,count > 0则表示还有没有到达屏障的线程,此时当前线程需要加入阻塞;count == 0则表示所有线程都已经到达屏障点,此时会重置CyclicBarrier为下次重用做准备并唤醒所有阻塞线程。

CyclicBarrier VS CountDownLatch

CountDownLatch图示:

在这里插入图片描述

CyclicBarrier图示:

在这里插入图片描述

两者的异同:

1、CyclicBarrier 与 CountDownLatch 都是同步辅助类,都达到了线程同步的效果;
2、CyclicBarrier 是多个线程相互等待,CountDownLatch则是一个或多个等待其他线程;
3、CyclicBarrier 内部使用可重入锁ReentrantLock保证同步,CountDownLatch 基于抽象同步队列AQS,由于ReentrantLock 也是基于AQS实现,故两者的同步逻辑都差距不大。

CyclicBarrier核心源码

我们进入JUC CyclicBarrier 核心源码:

//重置cyclicbarrier
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

/**
 * 将当前屏障设置为已经打破并唤醒所有阻塞
 */
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

/**
 * 核心方法
 */
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    //可重入锁保证同步           
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        //是否已经打破屏障验证
        if (g.broken)
            throw new BrokenBarrierException();
        //线程打断验证
        if (Thread.interrupted()) {
            //打破屏障
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                //重置屏障并唤醒所有阻塞线程
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 线程自旋加入阻塞并增加超时逻辑
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

如上源码所示,我们对主要的步骤做了相对的注释。
CyclicBarrier的核心部分就是dowait()方法,我们外部调用的await()方法实际就是调用的这个方法。
dowait()方法内部使用可重入锁ReentrantLock保证线程同步,该方法的主要逻辑是:
1、首先会验证当前屏障是否已经打破,当前线程是否已经被打断,如果已经被打破或者打断则直接会释放所有阻塞线程;
2、然后会将计数器–count,如果count == 0 则表示所有线程到达屏障点,此时会先执行构造方法传入的线程,后调用nextGeneration()方法重置CyclicBarrier并唤醒所有阻塞线程;如果count > 0 则表示还有线程没有到达屏障,则当前线程自旋加入阻塞。

实战演示

之前参加过的一道面试机试题目:请模拟出赛马比赛中各个赛马到达终点的时间。

这个题目的场景其实就是要让每个赛马都准备好了发令开始计时,然后各个赛马根据自身的情况完成比赛给出结束时间。这里我们可以用CyclicBarrier 模拟赛马开跑的时候,然后我们随机让线程睡眠模拟赛马跑步时间,最后打印出各个赛马到达的时间即可。

1、创建测试demo

/**
 * CyclicBarrier模拟赛马比赛
 * @author senfel
 * @version 1.0
 * @date 2023/5/8 11:27
 */
public class CyclicBarrierDemo {


    /**
     * 比赛方法
     * @author senfel
     * @date 2023/5/8 11:29
     * @return void
     */
    public static void matchFun() throws Exception{
        //线程池模拟赛马池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //cyclicBarrier 屏障模拟发令
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        //时间格式化
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
        //模拟比赛结束
        CountDownLatch countDownLatch = new CountDownLatch(5);
        //模拟比赛,每场比赛5匹赛马
        for(int i=0;i<5;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    //赛马等待发令
                    System.err.println("赛马:"+Thread.currentThread().getName()+"等待发令");
                    try {
                        cyclicBarrier.await();
                        LocalDateTime startTime = LocalDateTime.now();
                        System.err.println("赛马:"+Thread.currentThread().getName()+"开始起跑,起跑时间为:"+ startTime.format(dateTimeFormatter));
                        //线程随机睡眠模拟赛马赛跑时间
                        int time = new Random().nextInt(20)  * 1000;
                        Thread.sleep(time);
                        LocalDateTime endTime = LocalDateTime.now();
                        Duration between = Duration.between(startTime, endTime);
                        long millis = between.toMillis();
                        System.err.println("赛马:"+Thread.currentThread().getName()+"完成比赛," +
                                "起跑时间为:"+startTime.format(dateTimeFormatter)+"," +
                                "到达时间为:"+endTime.format(dateTimeFormatter)+"," +
                                "耗时:"+millis+"毫秒。");
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        countDownLatch.await();
        System.err.println("比赛完成");
        executorService.shutdown();
    }
}

2、创建测试用例

@SpringBootTest
class DemoApplicationTests {

/**
 * 模拟赛马测试用例
 * @author senfel
 * @date 2023/5/8 13:42
 * @return void
 */
public void matchTest() throws Exception{
    CyclicBarrierDemo.matchFun();
}

}

3、查看测试结果

赛马:pool-1-thread-2等待发令
赛马:pool-1-thread-1等待发令
赛马:pool-1-thread-3等待发令
赛马:pool-1-thread-5等待发令
赛马:pool-1-thread-4等待发令

赛马:pool-1-thread-3开始起跑,起跑时间为:12:33:51
赛马:pool-1-thread-1开始起跑,起跑时间为:12:33:51
赛马:pool-1-thread-2开始起跑,起跑时间为:12:33:51
赛马:pool-1-thread-4开始起跑,起跑时间为:12:33:51
赛马:pool-1-thread-5开始起跑,起跑时间为:12:33:51

赛马:pool-1-thread-2完成比赛,起跑时间为:12:33:51,到达时间为:12:33:55,耗时:4002毫秒。
赛马:pool-1-thread-1完成比赛,起跑时间为:12:33:51,到达时间为:12:33:57,耗时:6002毫秒。
赛马:pool-1-thread-3完成比赛,起跑时间为:12:33:51,到达时间为:12:33:58,耗时:7002毫秒。
赛马:pool-1-thread-4完成比赛,起跑时间为:12:33:51,到达时间为:12:34:03,耗时:12002毫秒。
赛马:pool-1-thread-5完成比赛,起跑时间为:12:33:51,到达时间为:12:34:10,耗时:19002毫秒。

比赛完成

写在最后

CyclicBarrier循环屏障是多线程并发编程中的常用同步类,我们可以用它来实现多个线程相互等待,并且可重用。其中的dowait()方式是核心方法,用可重入锁ReentrantLock保证了方法逻辑同步功能,使用Condition的await()、signalAll()方法来阻塞和唤醒线程。

⭐️路漫漫其修远兮,吾将上下而求索 🔍

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/154631.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!