3.2生产者消费者模式
问题:
threadP 在 setValue 时,lock.wait 失去了锁, 线程还会继续存在吗? 『解答』还会继续存在, 但是处于 waiting 状态。没有能力去获取同步锁。
锁是可以重入的,是否意味着在 while 循环的作用下,会多次调用 setValue,而有一部分的调用被阻塞了得不到响应? 『解答』不会多次调用 setValue, 因为执行了lock。wait() 之后线程被挂起了。
锁可以重入,在上一个 lock.wait 未响应的情况下,是否可以再进入同步块进行对应操作? 『解答』执行了 lock.wait 线程就被挂起,进入 waiting 状态。
基于 wait/notify 机制
若数据域有内容,则生产者不生产,线程挂起;反之,生产者生产产品并且发出通知。
若数据域没有内容,则消费者不消费,线程挂起;反之,消费者消费产品并且发出通知。
P.java
public class P {
String lock;
public P(String lock){
this.lock = lock;
}
public void setValue(){
try{
synchronized(lock){
//使用 if 是不严谨的做法
if (!ValueObject.value.equals("")){
lock.wait();//lock will be release, but thread exist always(waiting), and the thread loss the ability to get lock.
}
String str = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("Set:" + str);
//Produce and notify
ValueObject.value = str;
lock.notify();
}
}catch (Exception e){
}
}
}
C.java
public class C
String lock;
public C(String lock){
this.lock = lock;
}
public void getValue(){
try{
synchronized(lock){
//使用 if 是不严谨的做法
if (ValueObject.value.equals("")){
lock.wait();
}
String str = ValueObject.value;
System.out.println("Get:" + str);
//Consume and notify
ValueObject.value = "";
lock.notify();
}
}catch (Exception e){
}
}
}
ThreadP.java
public class ThreadP extends Thread{
public P p;
public ThreadP(P p){
super();
this.p = p;
}
@Override
public void run(){
while(true){
p.setValue();
}
}
}
ThreadC.java
public class ThreadC extends Thread{
public C c;
public ThreadC(C c){
super();
this.c = c;
}
@Override
public void run(){
while(true){
c.getValue();
}
}
}
Run.java
public class Run {
public static void main(String[] args){
String lock = "s";
Thread producer = new ThreadP(new P(lock));
Thread consumer = new ThreadC(new C(lock));
consumer.start();
producer.start();
}
}
多个生产者和多个消费者共同竞争一把锁。 多个生产者若使用 notify 进行通知,则可能出现『生产者通知消费者、消费者通知生产者、生产者通知生产者、消费者通知消费者』的情况,若通知出错的情况持续累积,就会造成程序假死。
P.java
public class P {
String lock;
public P(String lock){
this.lock = lock;
}
public void setValue(){
try{
synchronized(lock){
while (!ValueObject.value.equals("")){
lock.wait();
}
String str = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println(Thread.currentThread().getName() + " Set:" + str);
//Produce and notify
ValueObject.value = str;
lock.notify();
}
}catch (Exception e){
}
}
}
C.java
public class C {
String lock;
public C(String lock){
this.lock = lock;
}
public void getValue(){
try{
synchronized(lock){
while (ValueObject.value.equals("")){
lock.wait();
}
String str = ValueObject.value;
System.out.println(Thread.currentThread().getName() + " Get:" + str);
//Consume and notify
ValueObject.value = "";
lock.notify();
}
}catch (Exception e){
}
}
}
ThreadP.java
public class ThreadP extends Thread{
public P p;
public ThreadP(P p){
super();
this.p = p;
}
@Override
public void run(){
while(true){
p.setValue();
}
}
}
ThreadC.java
public class ThreadC extends Thread{
public C c;
public ThreadC(C c){
super();
this.c = c;
}
@Override
public void run(){
while(true){
c.getValue();
}
}
}
run.java
public class Run {
public static void main(String[] args){
String lock = "s";
Thread[] producer = new ThreadP[2];
Thread[] consumer = new ThreadC[2];
for (int i = 0; i < 2; i++) {
producer[i] = new ThreadP(new P(lock));
producer[i].setName("生产者" + (i+1));
consumer[i] = new ThreadC(new C(lock));
consumer[i].setName("消费者" + (i+1));
producer[i].start();
consumer[i].start();
}
}
}
以上代码只随机通知一个线程,即是说不能确定通知的对象是消费者还是生产者,如下图,当生产者生产产品之后若通知的是其他生产者线程,就会出现假死。
如何解决 notify 通知不到其他类(假死)?
使用 notifyAll()
通知(notify)的速度比直接代码执行(比如 while 语句)慢。
因为通知涉及到线程调度,所以 notifyAll 之后,同步块结束释放锁, while会直接进行下一次的 setValue 或者 getValue.
错误的使用(等待的条件发生了改变)
多个线程先后进入 if 块的的时候,条件还是满足的,于是多个线程进入waiting 状态,并且释放锁。当其中一个线程离开等待状态,并且通知消费者来修改进入等待的条件( value 被清空),那么其他处于等待的线程还是会被唤醒,也做出一样的动作。
运行过程中造成了『生产者1生产之后,通知所有。生产者2被通知到则继续生产,通知所有。生产者1被通知到则继续生产,通知所有。循环往复』、『消费者1消费了产品,通知所有。消费者2被通知到则继续消费,通知所有。消费者1被通知到则继续消费,通知所有。循环往复』的问题的偶然出现。
synchronized(lock){
//使用 if,处于 waiting 状态的线程被通知,若 ValueObject.value 为空,则在条件不满足的情况下也会执行业务代码(if外面的部分)
if (!ValueObject.value.equals("")){
System.out.println(Thread.currentThread().getName() +" waiting");
lock.wait();
}
String str = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println(Thread.currentThread().getName() +" running" + " Set:" + str + " : "+ ValueObject.num++);
//Produce and notify
ValueObject.value = str;
lock.notify();
}
正确的使用:
waiting 状态的线程被通知之后,要再次判断是否应该继续等待(对等待条件再次判断)。
synchronized(lock){
//此处使用的是 while。处于 waiting 状态的线程被通知时,若ValueObject.value 为空,则线程应该继续等待。
while (!ValueObject.value.equals("")){
System.out.println(Thread.currentThread().getName() +" waiting");
lock.wait();
}
String str = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println(Thread.currentThread().getName() +" running" + " Set:" + str + " : "+ ValueObject.num++);
//Produce and notify
ValueObject.value = str;
lock.notify();
}
这样的多个生产者对多个消费者的模式类似于点对点通信。
对于同一块数据域,自己操作了数据域就得通知其他的所有的端,在收到通知的时候需要再次确认数据域是否可操作,不可操作则再次进入等待。
- 首先需要确保使用同一把锁。
- 一个生产者一个消费者的情况下,使用 if 与 notify 不会影响线程的执行逻辑。
- 『一个生产者多个消费者、多个生产者一个消费者、多个生产者多个消费者』的情况下,必须使用 while 和 notifyAll 来保证程序运行准确。
- lock.wait()使得当前线程进入 waiting 状态,并且释放锁。