技术学习笔记
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode

3.2生产者消费者模式

问题:

threadP 在 setValue 时,lock.wait 失去了锁, 线程还会继续存在吗? 『解答』还会继续存在, 但是处于 waiting 状态。没有能力去获取同步锁。

锁是可以重入的,是否意味着在 while 循环的作用下,会多次调用 setValue,而有一部分的调用被阻塞了得不到响应? 『解答』不会多次调用 setValue, 因为执行了lock。wait() 之后线程被挂起了。

锁可以重入,在上一个 lock.wait 未响应的情况下,是否可以再进入同步块进行对应操作? 『解答』执行了 lock.wait 线程就被挂起,进入 waiting 状态。

一个生产者、一个消费者

基于 wait/notify 机制

若数据域有内容,则生产者不生产,线程挂起;反之,生产者生产产品并且发出通知。

若数据域没有内容,则消费者不消费,线程挂起;反之,消费者消费产品并且发出通知。

P.java
public class P {
    String lock;
    public P(String lock){
        this.lock = lock;
    }
    public void setValue(){
        try{
            synchronized(lock){
                //使用 if 是不严谨的做法
                if (!ValueObject.value.equals("")){
                    lock.wait();//lock will be release, but thread exist always(waiting), and the thread loss the ability to get lock.
                }
                String str = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println("Set:" + str);
                //Produce and notify
                ValueObject.value = str;
                lock.notify();
            }
        }catch (Exception e){

        }

    }
}
C.java
public class C 
    String lock;
    public C(String lock){
        this.lock = lock;
    }
    public void getValue(){
        try{
            synchronized(lock){
                //使用 if 是不严谨的做法
                if (ValueObject.value.equals("")){
                    lock.wait();
                }
                String str = ValueObject.value;
                System.out.println("Get:" + str);
                //Consume and notify
                ValueObject.value = "";
                lock.notify();
            }
        }catch (Exception e){

        }

    }
}
ThreadP.java
public class ThreadP extends Thread{
    public P p;
    public ThreadP(P p){
        super();
        this.p = p;
    }
    @Override
    public void run(){
        while(true){
            p.setValue();
        }
    }
}
ThreadC.java
public class ThreadC extends Thread{
    public C c;
    public ThreadC(C c){
        super();
        this.c = c;
    }

    @Override
    public void run(){
        while(true){
            c.getValue();
        }
    }
}
Run.java
public class Run {
    public static void main(String[] args){
        String lock = "s";
        Thread producer = new ThreadP(new P(lock));
        Thread consumer = new ThreadC(new C(lock));

        consumer.start();
        producer.start();
    }
}

多生产者多消费者

多个生产者和多个消费者共同竞争一把锁。 多个生产者若使用 notify 进行通知,则可能出现『生产者通知消费者、消费者通知生产者、生产者通知生产者、消费者通知消费者』的情况,若通知出错的情况持续累积,就会造成程序假死。

P.java
public class P {
    String lock;
    public P(String lock){
        this.lock = lock;
    }
    public void setValue(){
        try{
            synchronized(lock){
                while (!ValueObject.value.equals("")){
                    lock.wait();
                }
                String str = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println(Thread.currentThread().getName() + " Set:" + str);
                //Produce and notify
                ValueObject.value = str;
                lock.notify();
            }
        }catch (Exception e){

        }

    }
}
C.java
public class C {
    String lock;
    public C(String lock){
        this.lock = lock;
    }
    public void getValue(){
        try{
            synchronized(lock){
                while (ValueObject.value.equals("")){
                    lock.wait();
                }
                String str = ValueObject.value;
                System.out.println(Thread.currentThread().getName() + " Get:" + str);
                //Consume and notify
                ValueObject.value = "";
                lock.notify();
            }
        }catch (Exception e){

        }

    }
}
ThreadP.java
public class ThreadP extends Thread{
    public P p;
    public ThreadP(P p){
        super();
        this.p = p;
    }
    @Override
    public void run(){
        while(true){
            p.setValue();
        }
    }
}
ThreadC.java
public class ThreadC extends Thread{
    public C c;
    public ThreadC(C c){
        super();
        this.c = c;
    }

    @Override
    public void run(){
        while(true){
            c.getValue();
        }
    }
}
run.java
public class Run {
    public static void main(String[] args){
        String lock = "s";
        Thread[] producer = new ThreadP[2];
        Thread[] consumer = new ThreadC[2];

        for (int i = 0; i < 2; i++) {
            producer[i] = new ThreadP(new P(lock));
            producer[i].setName("生产者" + (i+1));
            consumer[i] = new ThreadC(new C(lock));
            consumer[i].setName("消费者" + (i+1));
            producer[i].start();
            consumer[i].start();
        }
    }
}

以上代码只随机通知一个线程,即是说不能确定通知的对象是消费者还是生产者,如下图,当生产者生产产品之后若通知的是其他生产者线程,就会出现假死。

如何解决 notify 通知不到其他类(假死)?

使用 notifyAll()

通知(notify)的速度比直接代码执行(比如 while 语句)慢

因为通知涉及到线程调度,所以 notifyAll 之后,同步块结束释放锁, while会直接进行下一次的 setValue 或者 getValue.

关于 while 和 if 的问题:

错误的使用(等待的条件发生了改变)

多个线程先后进入 if 块的的时候,条件还是满足的,于是多个线程进入waiting 状态,并且释放锁。当其中一个线程离开等待状态,并且通知消费者来修改进入等待的条件( value 被清空),那么其他处于等待的线程还是会被唤醒,也做出一样的动作。

运行过程中造成了『生产者1生产之后,通知所有。生产者2被通知到则继续生产,通知所有。生产者1被通知到则继续生产,通知所有。循环往复』、『消费者1消费了产品,通知所有。消费者2被通知到则继续消费,通知所有。消费者1被通知到则继续消费,通知所有。循环往复』的问题的偶然出现。

 synchronized(lock){
                //使用 if,处于 waiting 状态的线程被通知,若 ValueObject.value 为空,则在条件不满足的情况下也会执行业务代码(if外面的部分)
                if (!ValueObject.value.equals("")){
                    System.out.println(Thread.currentThread().getName() +" waiting");
                    lock.wait();
                }
                String str = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println(Thread.currentThread().getName() +" running" + " Set:" + str + " : "+ ValueObject.num++);
                //Produce and notify
                ValueObject.value = str;
                lock.notify();
            }

正确的使用:

waiting 状态的线程被通知之后,要再次判断是否应该继续等待(对等待条件再次判断)。

 synchronized(lock){
                //此处使用的是 while。处于 waiting 状态的线程被通知时,若ValueObject.value 为空,则线程应该继续等待。
                while (!ValueObject.value.equals("")){
                    System.out.println(Thread.currentThread().getName() +" waiting");
                    lock.wait();
                }
                String str = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println(Thread.currentThread().getName() +" running" + " Set:" + str + " : "+ ValueObject.num++);
                //Produce and notify
                ValueObject.value = str;
                lock.notify();
            }
            

这样的多个生产者对多个消费者的模式类似于点对点通信。

对于同一块数据域,自己操作了数据域就得通知其他的所有的端,在收到通知的时候需要再次确认数据域是否可操作,不可操作则再次进入等待。

生产者消费者模式的总结

  • 首先需要确保使用同一把锁。
  • 一个生产者一个消费者的情况下,使用 if 与 notify 不会影响线程的执行逻辑。
  • 『一个生产者多个消费者、多个生产者一个消费者、多个生产者多个消费者』的情况下,必须使用 while 和 notifyAll 来保证程序运行准确。
  • lock.wait()使得当前线程进入 waiting 状态,并且释放锁。