线程间通信方式(2)

前文了解了线程通信方式中的Object.wait/Object.notify以及Semaphore,接下来我们继续了解其他的线程减通信方式。

CountDownLatch

CountDownLatch利用一个指定的计数初始化,由于调用了countDown方法,await方法会阻塞直到当前技术为0,之后所有等待的线程都会被释放,任何后续的await都会立即返回。

CountDownLatch是一个通用的同步工具,可以用于许多目的。如果CountDownLatch初始化的计数为1,则可以用作简单的开/关锁存器或门,也就意味着所有调用await的线程都在门处等待,直到由调用countDown()的线程打开它。一个初始化为N的CountDownLatch可以用来让一个线程等待,直到N个线程完成了某个动作,或者某个动作完成了N次。

CountDownLatch核心接口说明如下:

方法名称 描述 备注
await() 使当前线程进入等待状态,直到计数减为0或者当前线程被中断,否则不会唤醒 /
await(long timeout, TimeUnit unit) 等待timeout时间后,计数还没减成0,则等待线程唤醒继续执行,不再等待 /
countDown() 对计数减1,如果减到了0,就会唤醒等待的线程 /
getCount() 获取当前计数的值 /

举个简单例子,某主任务中一共要发起5个子任务,等待所有任务完成后主任务继续,此时在主任务执行线程以计数取值为5初始化CountDownLatch,调用await等待,在每个子任务完成时,调用countDown方法使计数减1,等到5个子任务全部完成后,此时计数减为0,主任务唤醒,继续执行。

通过例子和描述可以看出CountDownLatch属于一次性操作,计数无法重置。

以主任务中发起5个子任务为例,使用CountDownLatch的代码实现如下:

public static void main(String[] args) {
    Thread mainThread = new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("MainThread start working");
            CountDownLatch countDownLatch = new CountDownLatch(5);
            try {
                for (int i=0;i<5;i++) {
                    Thread subThread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            System.out.println(Thread.currentThread().getName()+" start working");
                            System.out.println(Thread.currentThread().getName()+" completed");
                            countDownLatch.countDown();
                        }
                    });
                    subThread.setName("SubThread"+(i+1));
                    subThread.start();
                }
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("MainThread completed");
        }
    });
    mainThread.setName("MainThread");
    mainThread.start();
}

运行结果如下:

线程间通信方式(2)
1-4-3-20

对于CountDownLatch 而言,如果在计数为0前,有子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临,所以使用CountDownLatch一定要注意线程异常的处理。

CyclicBarrier

CyclicBarrier同样是一个多线程同步的工具类,译为循环栅栏,其允许一组线程互相等待到达一个同步的屏障点,CyclicBarrier在一组固定大小的线程中存在相互等待的场景下十分有用,之所以称为循环栅栏,是因为其在其他线程释放后可以重新使用。

CyclicBarrier使用一个指定计数和Runnable进行初始化,初始化完成后,当CyclicBarrier.await调用次数等于计数,也就是等待线程数等于计数时,则会触发初始化传入Runnable运行,该Runnable运行在最后一个进入等待的线程中,随后所有等待线程唤醒继续执行。

7位选手参加田径比赛,所有人在起点就位,准备完成后,发射信号枪起跑。用CyclicBarrier来实现比赛逻辑,则以计数7和运行发射信号枪的Runnable初始化CyclicBarrier,每一个选手起点准备完成后,都调用一次CyclicBarrier.await,当所有选手都准备完成,发射信号枪的Runnable运行,比赛开始,实现代码如下:

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(7new Runnable() {
        @Override
        public void run() {
            System.out.println("所有选手准备完成,发射信号枪,比赛开始,运行在:"+Thread.currentThread().getName());
        }
    });
    for (int i=0;i<7;i++) {
        int finalI = i;
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println((finalI +1)+"号选手准备完成,运行在:"+Thread.currentThread().getName());
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println((finalI +1)+"号选手准备达到准点,运行在:"+Thread.currentThread().getName());
            }
        });
        thread.setName("选手"+(i+1));
        thread.start();
    }
}

输出如下:

线程间通信方式(2)
1-4-3-21

对于CyclicBarrier 而言,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出。

Lock,Condition与ReentrantLock

Lock通常译为锁,其是一种控制多个线程对共享资源访问的工具,与同步方法和语句相比,Lock提供了更多广泛的操作,其允许的结构更灵活,比如不同的属性,多个关联的Condition对象等。通常情况下,可以通过锁控制一次只有一个线程可以访问共享资源,所有线程在访问共享资源时都需要获取锁。部分锁也支持多共享资源进行并发访问,比如ReadWriteLock。

Lock核心函数如下表所示:

函数名称 描述 备注
lock() 获取对象锁,如果锁不可用,则当前线程被阻塞,在获取锁前处于休眠状态 /
unlock() 释放锁,锁释放操作应处于finally块内,确保在任何情况下都能释放,以免造成死锁 /
tryLock() 锁是否可用,true-可用,false-不可用 /

Condition通常译为条件,其将Object的wait,notify,notifyAll提取到不同的对象中,通过将这些对象与任一锁对象的使用结合起来,从而达到单个对象具有多个等待集的效果,在Lock+Condition的模式中,Lock代替了前文中Object.wait+synchronized中的synchronized,Condition代替了Object.wait/notify/notifyAll。

Condition核心函数如下表所示:

函数名称 描述 备注
await() 当前线程进入等待状态,直到被通知或中断,线程从await方法返回进入运行状态的情况包括:1.其他线程调用了该Condition的signal或signalAll方法。2.其他线程中断当前线程。 /
signal() 唤醒一个等待在该Condition上的线程 /
signalAll() 唤醒所有等待在该Condition上的线程 /

在实现上Lock和Condition均为接口,所以我们一般使用Lock的实现类ReentrantLock来实现锁机制,借助ReentrantLock对象内部的newCondition获得Condition的实现对象,进而搭配完成线程间通信,接下来我们使用ReentrantLock实现消费者-生产者模式,代码如下:

private static int count = 0;
public static void main(String[] args) {
    // 创建Lock对象
    ReentrantLock lock = new ReentrantLock();
    // 创建Condition对象
    Condition condition = lock.newCondition();
    // 创建盘子
    Counter counter = new Counter();
    Thread incThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (count < 4) {
                lock.lock();
                try {
                    if (counter.getCount() != 0) {
                        condition.await();
                    }
                    counter.incCount();
                    count++;
                    System.out.println("Inc Thread ++,current count is:" + counter.getCount());
                    condition.signalAll();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    incThread.setName("Inc Thread");
    incThread.start();

    Thread decThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (count < 4) {
                lock.lock();
                try {
                    if (counter.getCount() == 0) {
                        condition.await();
                    }
                    counter.decCount();
                    System.out.println("Dec Thread --,current count is:" + counter.getCount());
                    condition.signalAll();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    decThread.setName("Dec Thread");
    decThread.start();
}

运行结果如下:

线程间通信方式(2)
1-4-3-23

前面提到过,Condition将Object的wait,notify,notifyAll提取到不同的对象中,也就意味着我们可以更灵活的控制锁获取,我们新建两个Condition对象,一个控制生产者,一个控制消费者,代码如下:

private static int count = 0;
public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    // 控制生产者
    Condition incCondition = lock.newCondition();
    // 控制消费者
    Condition decCondition = lock.newCondition();
    Counter counter = new Counter();
    Thread incThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (count < 4) {
                lock.lock();
                try {
                    if (counter.getCount() != 0) {
                        // 阻塞当前生产线程
                        incCondition.await();
                    }
                    counter.incCount();
                    count++;
                    System.out.println("Inc Thread ++,current count is:" + counter.getCount());
                    // 唤醒消费者
                    decCondition.signalAll();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    incThread.setName("Inc Thread");
    incThread.start();

    Thread decThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (count < 4) {
                lock.lock();
                try {
                    if (counter.getCount() == 0) {
                        // 阻塞当前消费线程
                        decCondition.await();
                    }
                    counter.decCount();
                    System.out.println("Dec Thread --,current count is:" + counter.getCount());
                    // 唤醒生产线程
                    incCondition.signalAll();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    decThread.setName("Dec Thread");
    decThread.start();
}

运行结果如下:

线程间通信方式(2)
1-4-3-22


原文始发于微信公众号(小海编码日记):线程间通信方式(2)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/67801.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!