如果阅读过之前并发系列的文章,相信大家都知道在多线程竞争的情况下,我们可以使用volatile
,synchronized
,ReentrantLock
和ReentrantReadWriteLock
等工具来保证共享资源的线程安全性,今天聊聊,除了竞争,多线程是如何实现彼此之间的协同处理的。
wait/notify
在Java中,Object
对象提供了wait()/notify()/notifyAll()
方法,可以用来实现多个线程之间的协同处理,也就是控制线程之间的等待与唤醒。
-
wait():使当前线程进入阻塞状态,并且释放其持有的锁; -
notify(): 唤醒处于一个阻塞状态的线程; -
notifyAll():唤醒所有处于阻塞状态的线程。
还可以使用interrupt()方法唤醒一个被wait()方法阻塞的线程,被阻塞的线程会抛出InterruptException异常,在《如何优雅地关闭一个线程》中有详细说明。
wait()
和notify()
方法需要针对同一共享资源才能触发阻塞和唤醒,也就是说,一个线程调用了某个资源对象的wait()
方法,其他线程需要调用同一个对象的notify()
或者notifyAll()
方法才能够唤醒阻塞的线程。下面我们可以通过一个生产者/消费者模型案例来看一下wait()/notify()
的使用。
public class Producer implements Runnable {
private Queue<String> queue;
private int capacity;
public Producer(Queue<String> queue, int capacity) {
this.queue = queue;
this.capacity = capacity;
}
public void run() {
int i = 0;
for (; ; ) {
synchronized (this.queue) {
if (capacity == queue.size()) {
try {
System.out.println("-----producer.wait()-----");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String content = "HelloWorld" + i;
i++;
queue.add(content);
System.out.println("producer -- > " + content);
queue.notify();
}
}
}
}
Producer表示一个生产者,实现了Runnable
接口,在run()
方法中通过synchronized
对共享资源queue
加锁,当队列容量到达阈值时,会让当前线程等待,否则会一直向队列中添加数据,并且使用queue.notify()
唤醒处于阻塞状态的消费者线程。
public class Consumer implements Runnable {
private Queue<String> queue;
public Consumer(Queue<String> queue) {
this.queue = queue;
}
public void run() {
for (; ; ) {
synchronized (queue) {
if (queue.isEmpty()) {
try {
System.out.println("-----consumer.wait()-----");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String poll = queue.poll();
System.out.println("consumer --> " + poll);
queue.notify();
}
}
}
}
Consumer表示一个消费者,同样实现了Runnable
接口,在run()
方法中通过synchronized
对共享资源queue
加锁,当队列为空时,会当当前消费者线程等待,否则会一直从队列中取出数据,并且使用queue.notify()
唤醒处于等待状态的生产者线程。
测试代码:
public static void main(String[] args) {
Queue<String> queue = new LinkedList<String>();
Producer producer = new Producer(queue, 10);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
运行结果如下:
producer -- > HelloWorld17
producer -- > HelloWorld18
producer -- > HelloWorld19
-----producer.wait()-----
consumer --> HelloWorld10
consumer --> HelloWorld11
consumer --> HelloWorld12
consumer --> HelloWorld13
consumer --> HelloWorld14
从上面的例子可以看到wait()/notify()
方法都是结合synchronized
使用的,否则会抛出IllegaMonitorStateException
的异常,之所以要加同步代码块,有以下两个原因。
-
wait()/notify()方法是基于一个共享对象来实现线程间通信的,这意味着存在多个线程对该共享对象的竞争,为了保存原子性,需要加锁。 -
wait()/notify()方法需要实现线程的阻塞和唤醒,而 synchronized
本身实现了同步队列的机制,正好为wait()/notify()
方法提供了很好的协同机制。
如果你看过前面ReentrantLock
一问,想必对同步队列有一定的了解,此处的同步队列机制与AQS的设计是类似的,当争抢到锁的线程调用了wait()
方法时,会先释放锁,然后把当前线程加入到等待队列中,由于此时已经释放了锁,所以原本因为争抢不到锁而被放在同步队列中的线程会被唤醒竞争到锁,当有竞争到锁的线程调用了notify()
方法时,会将处于等待队列中的线程移到同步队列,等待被唤醒,然后重新竞争锁资源。示例图如下:
Thread.join()
除了wait/notify
可以实现线程之间的协同处理,还可以使用Thread.join()
方法来获取线程的执行结果。可以先看看如下例子:
public class JoinDemo {
static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0 ; i < 10; i++){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
}
});
t1.start();
System.out.println("before join: " + count);
t1.join();
System.out.println("after join: " + count);
}
}
运行结果:
before join: 0
after join: 10
在例子中,在main方法中使用t1线程
对全局变量count
进行自增,然后在main线程
中,再调用t1.join()
分别打印变量count
,可以看到,打印出来的值是不同的。t1.join()
方法会阻塞main线程
,准确的说应该是阻塞调用了join()
方法的线程,它会等到t1线程
执行完成后,重新唤醒阻塞的主线程,从而可以获得t1线程
的执行结果。执行流程如下:
Thread.join()源码分析
从上面的执行流程图中可以看到,join()
是通过阻塞唤醒的方式来实现的,现在我们通过源码,了解join()
是如何实现的。
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
// 如果当前线程是存活状态,调用wait进入阻塞状态
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
// 设置到阻塞等待时间
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
public final native void wait(long timeout) throws InterruptedException;
public final native boolean isAlive();
可以看到,join()
本质上也是用wait()/notify()
方法实现的,它使用synchronized
修饰了join()
方法,将当前线程示例作为对象锁,学过synchronized
的同学应该知道,当线程执行完成时,会唤醒对象锁上所有等待的线程(这里可以理解调用自身的notifyAll()
方法),然后不断通过while(isAlive())
判断当前线程示例是否存活,如果线程已经执行完成,则会返回false
,此时便会跳出循环,继续执行main线程
,这样便得到了异步线程的执行结果了。
总结
本文主要介绍了Java线程中的条件等待机制,如何通过wait/notify
实现线程之间的协同处理,并且简单介绍了join()
方法的实现原理。
示例源码:https://github.com/gd1228595104/Java-Concurrent-Programming-Series
公众号目前不能留言,点击阅读原文,进入我们的网站即可评论,并且后续关于本文的更新也会在网站进行更新。
原文始发于微信公众号(DevUnion):线程的条件等待机制
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/35435.html