Java并发工具类-CyclicBarrier
LOL
加载游戏等待引发的问题
一天小开下班回家迫不及待打开电脑打开了LOL
开始玩了,游戏匹配成功便进入了加载画面。这时候小开想到了一个问题,这个场景使用代码如何实现呢?首先我们假设每个玩家都是一个线程,因为电脑或者网络原因,每个人加载游戏的资源速度并不相同,如果不做控制那么就会出现有的玩家预先加载完提前进入游戏而有的玩家则还停留在等待画面。这在实际中肯定是不可行的,那么使用程序如何来表达出来呢?
CyclicBarrier
小开去网上找了找资料,发现JDK中的并发工具包提供的CyclicBarrier
可以很轻松的实现这个功能。CyclicBarrier
是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共点。
主要的方法
-
构造方法。该类主要提供了两个构造方法,方法定义如下:
public CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties)
其中参数parties
代表的参与的线程数量,这里代表的就是我们游戏中线程的数量。barrierAction
代表的是一个指令,该指令执行的时机就是当所有线程都到达公共点
时才会执行,同时这个参数可以为空。
-
await()
。如果所有线程都还未到达公共点时,则线程将一直等待下去。
在所有玩家还没加载好资源之前,那么即使我已经到达了100%
的进度,我还是无法开始游戏只能等待其他玩家准备好。
代码实现
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
String userName = (i + 1) + "号";
new Thread(new Player(userName,cyclicBarrier)).start();
}
}
}
class Player implements Runnable {
/**
* 玩家名称
*/
private final String userName;
private final CyclicBarrier cyclicBarrier;
private final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public Player(String userName, CyclicBarrier cyclicBarrier) {
this.userName = userName;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
printLog("玩家[%s]开始加载游戏.....",userName);
try {
Random random = new Random();
TimeUnit.SECONDS.sleep(random.nextInt(10));
printLog("玩家[%s]加载游戏完毕",userName);
cyclicBarrier.await();
printLog("玩家[%s]进入游戏对战",userName);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
private void printLog(String content,String ... args){
System.out.println(sdf.format(new Date())+":"+String.format(content,args));
}
}
运行代码结果如下:
15:06:40:玩家[1号]开始加载游戏.....
15:06:40:玩家[9号]开始加载游戏.....
15:06:40:玩家[7号]开始加载游戏.....
15:06:40:玩家[4号]开始加载游戏.....
15:06:40:玩家[3号]开始加载游戏.....
15:06:40:玩家[5号]开始加载游戏.....
15:06:40:玩家[6号]开始加载游戏.....
15:06:40:玩家[8号]开始加载游戏.....
15:06:40:玩家[2号]开始加载游戏.....
15:06:40:玩家[10号]开始加载游戏.....
15:06:40:玩家[4号]加载游戏完毕
15:06:40:玩家[6号]加载游戏完毕
15:06:42:玩家[2号]加载游戏完毕
15:06:44:玩家[5号]加载游戏完毕
15:06:45:玩家[7号]加载游戏完毕
15:06:46:玩家[8号]加载游戏完毕
15:06:46:玩家[1号]加载游戏完毕
15:06:46:玩家[9号]加载游戏完毕
15:06:47:玩家[10号]加载游戏完毕
15:06:48:玩家[3号]加载游戏完毕
15:06:48:玩家[4号]进入游戏对战
15:06:48:玩家[6号]进入游戏对战
15:06:48:玩家[2号]进入游戏对战
15:06:48:玩家[7号]进入游戏对战
15:06:48:玩家[5号]进入游戏对战
15:06:48:玩家[1号]进入游戏对战
15:06:48:玩家[8号]进入游戏对战
15:06:48:玩家[9号]进入游戏对战
15:06:48:玩家[10号]进入游戏对战
15:06:48:玩家[3号]进入游戏对战
从打印结果可以看出,大家是同一时间开始游戏,然后每个玩家开始加载游戏资源,并且每个用户加载游戏资源所需的时间并不是一样的,但是最后大家都是同一时间进入对战的。
await()
方法
前面说了在所有线程到达公共点之前,先到达公共点的线程会在公共点等待直到所有线程都到达公共点。但是某些情况下可能出现所有线程都到达公共点的情况,这种情况该怎么办呢?
-
等待中的线程被中断
。如果到达公共点的线程被中断了,那么被中断的线程会抛出InterruptedException
异常,而与其一起等待在同一公共点的线程将会抛出BrokenBarrierException
。
public class CyclicBarrierDemo2 {
public static void main(String[] args) throws InterruptedException {
//设置参与者为3
CyclicBarrier cb = new CyclicBarrier(3);
Thread t1 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+":开始执行");
cb.await();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
} catch (BrokenBarrierException e){
System.out.println(Thread.currentThread().getName()+":BrokenBarrierException");
}
});
t1.setName("t1");
t1.start();
Thread t2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+":开始执行");
cb.await();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
} catch (BrokenBarrierException e){
System.out.println(Thread.currentThread().getName()+":BrokenBarrierException");
}
});
t2.setName("t2");
t2.start();
//主线程等待一秒后中断线程t1
TimeUnit.SECONDS.sleep(1);
t1.interrupt();
}
}
线程t1
被中断了,而线程t2
并不会还停留在等待状态,而是会抛出BrokenBarrierException
停止等待。最后的运行结果如下所示:
t1:开始执行
t2:开始执行
t1:InterruptedException
t2:BrokenBarrierException
-
等待超时导致当前线程超时停止等待
。await
方法还可以设置等待超时时间,在公共点等待了指定时间后,如果其他线程还未到达公共点那么该线程将抛出TimeoutException
异常,而其他线程将抛出BrokenBarrierException
终止等待。
public class CyclicBarrierDemo2 {
public static void main(String[] args) throws InterruptedException {
//设置参与者为3
CyclicBarrier cb = new CyclicBarrier(3);
Thread t1 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+":开始执行");
cb.await(1,TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
} catch (BrokenBarrierException e){
System.out.println(Thread.currentThread().getName()+":BrokenBarrierException");
} catch (TimeoutException e) {
System.out.println(Thread.currentThread().getName()+":TimeoutException");
}
});
t1.setName("t1");
t1.start();
Thread t2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+":开始执行");
cb.await();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
} catch (BrokenBarrierException e){
System.out.println(Thread.currentThread().getName()+":BrokenBarrierException");
}
});
t2.setName("t2");
t2.start();
}
}
上面代码中,线程t1
设置1s
后等待超时,超时之后等待超时线程抛出TimeoutException
,而其他线程将抛出BrokenBarrierException
。而运行结果也可以印证这一点:
t1:开始执行
t2:开始执行
t1:TimeoutException
t2:BrokenBarrierException
-
线程还未到达公共点或者
在公共点等待其他线程到达,CyclicBarrier的reset方法被调用
。该情况将导致所有等待在同一公共点上的线程抛出BrokenBarrierException
异常。
public class CyclicBarrierDemo2 {
public static void main(String[] args) throws InterruptedException {
//设置参与者为3
CyclicBarrier cb = new CyclicBarrier(3);
Thread t1 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+":开始执行");
cb.await();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
} catch (BrokenBarrierException e){
System.out.println(Thread.currentThread().getName()+":BrokenBarrierException");
}
});
t1.setName("t1");
t1.start();
Thread t2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+":开始执行");
cb.await();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
} catch (BrokenBarrierException e){
System.out.println(Thread.currentThread().getName()+":BrokenBarrierException");
}
});
t2.setName("t2");
t2.start();
//1s后重置BrokenBarrier
TimeUnit.SECONDS.sleep(1);
cb.reset();
}
}
上面代码中线程t1
和t2
到达了公共点等待,1s
后BrokenBarrier
被重置这将导致线程t1和t2
抛出BrokenBarrierException
异常退出等待。
t1:开始执行
t2:开始执行
t1:BrokenBarrierException
t2:BrokenBarrierException
使用线程池时的坑
在使用线程池时我们特别需要注意一点,如果稍不注意很可能会导致所有线程一直停留在等待状态,永远无法结束等待。下面直接看示例代码:
public class CyclicBarrierDemo3 {
public static void main(String[] args) {
ThreadPoolExecutor service = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
int n = 5;
CyclicBarrier cb = new CyclicBarrier(n);
for (int i = 0; i < n; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+":运行开始");
cb.await();
System.out.println(Thread.currentThread().getName()+":运行结束");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}
如上面代码所示,我们构建了一个最小线程数为4
,最大线程数为8
的线程池用来执行任务。接着创建一个计数为5
的CyclicBarrier
,然后我们向线程池中提交5
个任务执行。运行代码之后会发现,最后只会有4
个任务开始运行了,而第五个任务始终没办法开始执行。这是为什么呢?
因为前面4
个线程到达了公共点之后会进入等待状态,而此时需要执行第5
个任务时,线程池中已经没有线程了。我们虽然设置了最大线程数量为8
,但此时线程池并不会创建新的线程,而是将任务放到了等待队列中。我们这里的等待队列长度为1024
,这有该队列满了之后,线程池才会触发扩容,所以这就导致了第5
个任务被放入了等待队列,而前4
个任务会一直处在等待中。
与CountDownLatch
的差异对比
之前的文章《Java并发工具类-CountDownLatch》
中有讲过CountDownLatch
,这里我们将其与CyclicBarrier
做一下对比。
-
CyclicBarrier
可以循环使用,而CountDownLatch
是一次性的。 -
CyclicBarrier
强调的是所有线程到达同一点之后线程再继续运行,而提前到达公共点的线程是无法自由活动的,它必须等待其他线程到达之后才能恢复到运行状态。 -
CountDownLatch
强调的是某一个线程或一组线程等待其他线程完成某个任务(通常这个任务由多个小任务构成)之后再次运行。
原文始发于微信公众号(一只菜鸟程序员):Java并发工具类-CyclicBarrier
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/72913.html