实现此问题之前,有必要先了解一下 Java 描述的”锁”的原理。
等待对列和同步队列的简单概述
线程A 执行 synchronized(obj)
相当于 JVM会创建 一个 Monitor
(监视器) 对象,JVM 将 obj 对象的对象头的 MarkWord
字段 指向 Monitor,同时Monitor 的 Owner指向 线程A,而且 Owner只可以指向一个线程,这就相当于给 obj 对象加好锁了。
当前对象 obj 被上了一把锁后,那么,其他线程在执行synchronized(obj)
时,由于 obj 关联的 Monitor 的 Owner 已经 有指向了,其他线程就必须要阻塞等待了,阻塞等待就涉及到 Monitor
的等待对列和同步队列。
- 等待队列:线程A调用了对象
obj#wait()
方法,线程A就会释放obj的锁,并进入到了obj 的 Monitor 的同步队列。 - 同步队列:只有获取了对象 obj 的锁,线程才能执行对象的
synchronized
中的代码,obj的锁每次只有一个线程可以获得,其他线程只能在等待队列中等待。
注:等待队列中的线程不会被操作系统调度,处于
WAITING
状态;只有同步队列中的线程才会被操作系统根据自己的调度策略去调度,同步队列的线程处于BLOCKED
状态。
notify,notifyAll和wait的基础
Java的Object类提供了了三个final方法,用于实现多线程下资源的互斥访问,或者说是多线程间的通信。
当然,使用这些方法有一个前提,就是使用它们之前,必须先获取 “锁”,否则会抛出java.lang.IllegalMonitorStateException
异常。
例如:wait/notify/notifyAll 可以在
synchronized
同步块中使用。
调用 Object#wait() 发生了啥?
- 从同步队列中移除当前线程,封装当前线程的指针,并加入到等待队列里;
- 释放锁,并唤醒同步队列里的其他线程(唤醒就是让操作系统根据具体的调度策略去调度线程,成功被调度的就会获取锁,然后running);
- 挂起自己(就是让线程处于
WAITING
状态)。
调用 Object#notifyAll() 发生了啥?
- 将等待队列中的所有线程 移动到 同步队列;
- 然后,就是等待操作系统根据具体的调度策略去调度它们了。
简单的生产者和消费者实现
步骤
生产者:
- 判断资源是否充裕;
如果资源充裕,就没必要再生产了,等待消费者消费完资源为止。 - 如果资源不足,就必须立即生产资源;
资源生产完之后,必须通知消费者。
消费者:
- 判断资源是否充裕;
如果资源不足,就不能再消费了,等待生产者生产出资源为止 - 如果资源充足;
直接消费,之后,再通知生产者
问题
唤醒线程的问题(有可能会出现极端情况):
- 每次唤醒的都是生产者线程,消费者线程一直处于就绪状态,如果生产者不判断生产的必要性,那么,资源就会越积越多,超过仓库的容量。
- 也可能,每次唤醒的都是消费者线程,生产者生产完第一个资源,就一直处于就绪状态,如果消费者不判断是否可以消费,那么,就会出现 资源负数
解决
每次被唤醒后,都判断是否应该被唤醒,否则,就再次进入阻塞状态
- 方案一:(情况发生后,补救)
- 生产者:每次被唤醒后,都判断(生产的必要性) 再生产资源是否会超过仓库的容量
- 消费者:每次被唤醒后,都判断(是否可以消费)消费之后是否会出现 资源负数,即,我要的资源,是否都充足
- 方案二:(既然是Object#notifyAll 引起的问题,就不让这种情况发生)
- 设置两把锁,消费者锁和生产者锁(类似于 lock 和 condition)
- 所有生产者共享生产者锁,所有消费者共享消费者锁
源码
public class NotifyAndWaitTest {
public static void main(String[] args) throws Exception {
Data data = new Data();
// A 线程,生产资源 10 个
new Thread(()->{
for (int i = 0; i < 555; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
// B 线程,消费资源 10 个
new Thread(()->{
for (int i = 0; i < 555; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
// C 线程,生产资源 10 个
new Thread(()->{
for (int i = 0; i < 666; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
// D 线程,消费资源 10 个
new Thread(()->{
for (int i = 0; i < 666; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
static class Data{
// 当前资源个数
private int data = 0;
// 资源仓库的最大容量为 3
private final int MAX_SIZE = 3;
// +1
public synchronized void increment() throws InterruptedException {
// 资源充裕,等待消费者消费
// if (data >= 1){
// // 无限阻塞,直到被唤醒
// this.wait();
// }
while (data + 1 > MAX_SIZE){
// 没必要生产,无限阻塞,直到被唤醒
this.wait();
}
// 生产
data++;
System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
// 随机唤醒一个线程
// 这里其实应该唤醒一个消费者
// 但是,由于唤醒是随机的,所以,可能唤醒生产者
// 所以,在唤醒之后,生产者要判断是否有必要生产
// if 应该换成 while
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
// 资源不足,等待生产者生产
// if (data <= 0){
// this.wait();
// }
while (data - 1 < 0){
// 资源不足,无限等待,直到被唤醒
this.wait();
}
// 消费
data--;
System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
// 随机唤醒一个线程
// 这里其实应该唤醒一个生产者
// 但是,由于唤醒是随机的,所以,可能唤醒消费者
// 所以,在唤醒之后,消费者要判断是否可以消费
// if 应该换成 while
this.notifyAll();
}
}
}
附录
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/69726.html