技术学习笔记
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode

4.1ReentrantLock

ReentrantLock 可重入锁

  • 嗅探锁定
  • 多路分支通知

1. 实现同步功能

  • 实现互斥
MyService.java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyService {
    private Lock lock = new ReentrantLock();
    public void service(){
        lock.lock();
        System.out.println(Thread.currentThread().getName());
        for (int i = 0; i < 20; i++) {
            System.out.print(i);
        }
        System.out.println();
        lock.unlock();
    }
}
MyThread.java
public class MyThread extends Thread{
    MyService service;
    public MyThread(MyService service){
        this.service = service;
    }
    @Override
    public void run(){
        service.service();
    }
}
Run.java
public class Run {
    public static void main(String[] args) {
        MyService s = new MyService();
        for (int i = 0; i < 5; i++) {
            MyThread t = new MyThread(s);
            t.setName("Thread" + i);
            t.start();
        }
    }
}

运行结果

2. wait/notify 的实现

synchronized 与 wait() 和 notity()/notifyAll()方法的结合可以实现『等待/通知』模式。

ReentrantLock 与 Condition 也可实现同样的功能。

在一个 Lock 对象中可以创建多个 Condition 对象。线程对象可以注册在指定的 Condition 中,实现有选择性的线程通知。

调用 condition.await() 之前需要先获取锁。

线程被 await() 挂起之后,interrupt() 也会引起 InterruptedException.

await(long time, TimeUnit unit) 可以设定挂起时间。

ReentrantLock 与 condition

ReentrantLock 的 await 与 condition 的 signal

MyService.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyService {
    private Lock lock = new ReentrantLock();
    private  Condition condition = lock.newCondition();
    public void await(){
        try{
            lock.lock();//需要先获取锁
            System.out.println("Ready");
            condition.await();
            System.out.println(Thread.currentThread().getName());
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void signal(){
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}
MyThread.java
public class MyThread extends Thread{
    MyService service;
    public MyThread(MyService service){
        this.service = service;
    }
    @Override
    public void run(){
        service.await();
    }
}

Run.java
public class Run {
    public static void main(String[] args) throws InterruptedException {
        MyService s = new MyService();
        MyThread t = new MyThread(s);
        t.setName("Ha");
        t.start();
        Thread.sleep(1000);
        s.signal();
    }
}

运行结果

Ready
Ha

先显示 Ready,等待 1 秒之后挂起的线程被通知,打印 Ha。

多个 Condition 通知部分线程

MyService.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyService {
    private ReentrantLock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();// Object monitor
    private Condition conditionB = lock.newCondition();// Object monitor
    public void awaitA(){
        try {
            lock.lock();
            System.out.println("A start");
            conditionA.await();
            System.out.println("A end");
        }catch (InterruptedException e){

        }finally {
            lock.unlock();
        }
    }
    public void awaitB(){
        try{
            lock.lock();
            System.out.println("B start");
            conditionB.await();
            System.out.println("B end");
        }catch (InterruptedException e){

        }finally{
            lock.unlock();
        }
    }

    public void signalA(){
        lock.lock();
        conditionA.signal();
        lock.unlock();
    }

    public void signalB(){
        lock.lock();
        conditionB.signal();
        lock.unlock();
    }
}
ThreadA.java
public class ThreadA extends Thread{

    MyService service;
    public ThreadA(MyService service){
        this.service = service;
    }

    @Override
    public void run(){
        service.awaitA();
    }
}

ThreadB.java
public class ThreadB extends Thread{
    MyService service;
    public ThreadB(MyService service){
        this.service = service;
    }
    @Override
    public void run(){
        service.awaitB();
    }
}

Run.java
public class Run {
    public static void main(String[] args) throws InterruptedException {
        MyService s = new MyService();
        ThreadA A = new ThreadA(s);
        ThreadB B = new ThreadB(s);
        A.start();//线程 A 将会被 ConditionA 挂起
        B.start();//线程 B 将会被 ConditionB 挂起
        Thread.sleep(1000);
        s.signalA();
        Thread.sleep(1000);
        s.signalB();
    }
}

运行结果

A start
B start
A end
B end

3. 生产者消费者模式

生产者消费者模式下,产品生产一个便可消费一个。当线程一对一的时候,每次生产消费都是由这两个线程执行的。当线程多对多的时候,每次生产消费的执行不会是由特定的两个线程承包,而是各个线程都有机会来履行职责。比如 P1、P2、C1、C2,P1 生产的产品,可由 C1 或 C2来消费;而 C1 消费的产品,可能来自 P1 或者 P2。

condition 作为线程监视器,可以绑定多个线程。

以下代码中,conditionP 绑定的是ThreadP, conditionC 绑定的是 ThreadC。

  • conditionP.signal() 随机通知一个ThreadP,conditionC.signal() 随机通知一个ThreadC。
  • conditionP.signalAll() 通知的是所有ThreadP,conditionC.signalAll() 通知的是所有ThreadC。

多生产者多消费者,若使用两个 Condition 和 signalAll()

多生产者和多消费者中,生产一个消费一个。通知属于同一个 Condition 的所有。

当生产者生产了产品之后,会通知所有的消费者。动作较快的消费者就可以消费产品,剩下的消费者由于没有产品可消费而继续等待。

同样的,当消费者消费产品之后会通知所有生产者。动作较快的生产者可以生产产品,剩下的生产者由于不需要生产而继续等待。

P.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class P {
    private ReentrantLock lock;
    public Condition conditionP;
    public Condition conditionC;
    public P(ReentrantLock lock){
        this.lock = lock;
        this.conditionP = lock.newCondition();
    }
    public void produce(){
        try{
            lock.lock();
            while (!Value.str.equals("")){
                System.out.println(Thread.currentThread().getName() + " await");
                conditionP.await();
                System.out.println(Thread.currentThread().getName() + " resume");
            }
            Value.str = new String("Fine" + Value.i++);
            System.out.println(Thread.currentThread().getName() + " running");
            conditionC.signalAll();//awake all threads that bind to conditionC
            lock.unlock();
        }catch(InterruptedException e){

        }
    }
    public void connect(C c){
        this.conditionC = c.conditionC;
    }

}
C.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class C {
    private ReentrantLock lock;
    public Condition conditionC;
    public Condition conditionP;
    public C(ReentrantLock lock){
        this.lock = lock;
        this.conditionC = lock.newCondition();
    }
    public void consume(){
        try{
            lock.lock();
            while (Value.str.equals("")){
                System.out.println(Thread.currentThread().getName() + " await");
                conditionC.await();
                System.out.println(Thread.currentThread().getName() + " resume");
            }
            System.out.println(Thread.currentThread().getName() + " consume " + Value.str);
            Value.str = new String("");
            conditionP.signalAll();//awake all threads that bind to conditionP
            lock.unlock();
        }catch (InterruptedException e){

        }
    }

    public void connect(P p){
        this.conditionP = p.conditionP;
    }
}
ThreadP.java
public class ThreadP extends Thread{
    private P p;
    public ThreadP (P p){
        this.p = p;
    }
    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            p.produce();
        }
    }
}
ThreadC.java
public class ThreadC extends Thread{
    private C c;
    public ThreadC(C c){
        this.c = c;
    }
    @Override
    public void run(){
        for (int i = 0; i < 50; i++) {
            c.consume();
        }
    }
}

Value.java
public class Value {
    public static String str = "";
    public static int i = 0;
}

Run.java
import java.util.concurrent.locks.ReentrantLock;

public class Run {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        P p = new P(lock);
        C c = new C(lock);

        p.connect(c);
        c.connect(p);

        ThreadC[] consumer = new ThreadC[2];
        ThreadP[] producer = new ThreadP[2];
        for (int i = 0; i < 2; i++) {
            consumer[i] = new ThreadC(c);
            producer[i] = new ThreadP(p);
            consumer[i].setName("consumer"+i);
            producer[i].setName("producer"+i);

            consumer[i].start();
            producer[i].start();
        }
    }
}

运行结果:多生产者与多消费者交替打印。生产一个打印一个。生产者和消费者线程存在『有被唤醒,但因为没有产品可操作而再次进入等待』的情况。

多生产者多消费者,若使用两个 Condition 和 signal()

多生产者和多消费者中,生产一个消费一个。生产一个产品之后只通知一人,消费一个产品之后只通知一人。

只需把以上代码中的 signalAll() 改为 signal()。

执行结果:多生产者与多消费者交替打印。生产一个打印一个。 conditionP.signal() 通知的时候在多个生产者线程中随机唤醒一个来生产。conditionC.signal() 通知的时候在多个消费者线程中随机唤醒一个来消费。

这种方式效率最好。

一个生产者一个消费者,若使用一个 Condition,signal()

执行结果:一个生产者和一个消费者交替打印。

4.公平锁和非公平锁

公平锁

lock = new ReentrantLock(true);

非公平锁

lock = new ReentrantLock(false);

5.使用 Condition 加一个控制变量,可以同步多个类别的线程的执行顺序

设定一个控制变量 nextPrint ,当前线程未轮到打印时则挂起,若可以执行则执行并且通知下一位(把nextPrint的值改变)。

顺序是:A 执行,A 通知 B;B 执行,B 通知 C;C 执行,C 通知 A。

Service.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Service {
    private ReentrantLock lock;
    public Condition conditionA;
    public Condition conditionB;
    public Condition conditionC;
    int nextPrint = 1;
    public Service(ReentrantLock lock) {
        this.lock = lock;
        this.conditionA = lock.newCondition();
        this.conditionB = lock.newCondition();
        this.conditionC = lock.newCondition();
    }

    public void serviceA() {
        try {
            lock.lock();
            while (nextPrint != 1){
                conditionA.await();
            }
            for (int i = 0; i < 3; i++) {
                System.out.println(Thread.currentThread().getName() + " " + i);
            }
            conditionB.signalAll();
            nextPrint = 2;

        } catch (InterruptedException e){

        }finally{
            lock.unlock();
        }
    }

    public void serviceB() {
        try {
            lock.lock();
            while (nextPrint != 2){
                conditionB.await();
            }
            for (int i = 0; i < 3; i++) {
                System.out.println(Thread.currentThread().getName() + " " + i);
            }
            conditionC.signalAll();
            nextPrint = 3;

        } catch (InterruptedException e){

        }finally{
            lock.unlock();
        }
    }

    public void serviceC() {
        try {
            lock.lock();
            while(nextPrint != 3){
                conditionC.await();
            }
            for (int i = 0; i < 3; i++) {
                System.out.println(Thread.currentThread().getName() + " " + i);
            }
            conditionA.signal();
            nextPrint = 1;
        } catch (InterruptedException e){

        }finally{
            lock.unlock();
        }

    }
}
ThreadA.java
public class ThreadA extends Thread{
    private Service service;
    public ThreadA(Service service){
        this.service = service;
    }
    @Override
    public void run(){
        service.serviceA();
    }
}
ThreadB.java
public class ThreadB extends Thread{
    private Service service;
    public ThreadB(Service service){
        this.service = service;
    }
    @Override
    public void run(){
        service.serviceB();
    }
}

ThreadC.java
public class ThreadC extends Thread{
    private Service service;
    public ThreadC(Service service){
        this.service = service;
    }
    @Override
    public void run(){
        service.serviceC();
    }
}

Run.java
import java.util.concurrent.locks.ReentrantLock;
public class Run {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Service service = new Service(lock);
        ThreadA[] threadA = new ThreadA[5];
        ThreadB[] threadB = new ThreadB[5];
        ThreadC[] threadC = new ThreadC[5];
        for (int i = 0; i < 5; i++) {
            threadA[i] = new ThreadA(service);
            threadB[i] = new ThreadB(service);
            threadC[i] = new ThreadC(service);
            threadA[i].setName("A" + i);
            threadB[i].setName("B" + i);
            threadC[i].setName("C" + i);
            threadA[i].start();
            threadB[i].start();
            threadC[i].start();
        }
    }
}

执行结果为,A、B、C 三类线程依次按顺序执行,严格按照ABC的顺序。

其他方法

  • lock.getHoldCount() 查询当前线程保持此锁的个数。

  • lock.getQueueLength() 当前在等待锁释放的线程数。只能得到大致的数字,用于系统状态监视,不能用作同步。

  • lock.getWaitQueueLength(Condition condition) 获取与 condition 相关联且处于等待状态的线程估计数。

  • lock.hasQueueThread(Thread thread) 查询指定线程是否正在等待获取此锁定。

  • lock.hasQueueThreads() 查询是否有线程正在等待获取此锁定。

  • lock.hasQueueWaiters(Condition condition) 查询是否有线程正在等待与此锁定有关的 condition。

  • lock.isFair() 判断是否为公平锁,ReentrantLock 默认是非公平锁。

  • lock.isHeldByCurrentThread() 查询当前线程是否持有该锁定。

  • lock.isLocked()

  • lock.lockInterruptibly() 如果当前线程未被中断,则获取锁,如果当前线程已经被中断,则出现异常。

  • lock.tryLock() 仅在调用时锁定未被其他线程保持的情况下,才获取该锁。

  • lock.tryLock(long timeout, TimeUnit unit) 如果锁定在给定时间内没有被其他线程保持,且当前线程未中断,则获取该锁的锁定。

  • condition.awaitUniterruptruptibly() 进行无视中断异常的等待,在等待过程中,线程中断不会触发异常,锁的保持状态仍然继续保持,直到被通知。

  • condition.awaitUntil(Date deadline) 线程有限时间的挂起。在时间到达之前可以被其他线程唤醒,在时间到达之后自我唤醒。