4.1ReentrantLock
ReentrantLock 可重入锁
- 嗅探锁定
- 多路分支通知
- 实现互斥
MyService.java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyService {
private Lock lock = new ReentrantLock();
public void service(){
lock.lock();
System.out.println(Thread.currentThread().getName());
for (int i = 0; i < 20; i++) {
System.out.print(i);
}
System.out.println();
lock.unlock();
}
}
MyThread.java
public class MyThread extends Thread{
MyService service;
public MyThread(MyService service){
this.service = service;
}
@Override
public void run(){
service.service();
}
}
Run.java
public class Run {
public static void main(String[] args) {
MyService s = new MyService();
for (int i = 0; i < 5; i++) {
MyThread t = new MyThread(s);
t.setName("Thread" + i);
t.start();
}
}
}
运行结果
synchronized 与 wait() 和 notity()/notifyAll()方法的结合可以实现『等待/通知』模式。
ReentrantLock 与 Condition 也可实现同样的功能。
在一个 Lock 对象中可以创建多个 Condition 对象。线程对象可以注册在指定的 Condition 中,实现有选择性的线程通知。
调用 condition.await() 之前需要先获取锁。
线程被 await() 挂起之后,interrupt() 也会引起 InterruptedException.
await(long time, TimeUnit unit) 可以设定挂起时间。
ReentrantLock 的 await 与 condition 的 signal
MyService.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyService {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void await(){
try{
lock.lock();//需要先获取锁
System.out.println("Ready");
condition.await();
System.out.println(Thread.currentThread().getName());
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void signal(){
lock.lock();
condition.signal();
lock.unlock();
}
}
MyThread.java
public class MyThread extends Thread{
MyService service;
public MyThread(MyService service){
this.service = service;
}
@Override
public void run(){
service.await();
}
}
Run.java
public class Run {
public static void main(String[] args) throws InterruptedException {
MyService s = new MyService();
MyThread t = new MyThread(s);
t.setName("Ha");
t.start();
Thread.sleep(1000);
s.signal();
}
}
运行结果
Ready
Ha
先显示 Ready,等待 1 秒之后挂起的线程被通知,打印 Ha。
MyService.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyService {
private ReentrantLock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();// Object monitor
private Condition conditionB = lock.newCondition();// Object monitor
public void awaitA(){
try {
lock.lock();
System.out.println("A start");
conditionA.await();
System.out.println("A end");
}catch (InterruptedException e){
}finally {
lock.unlock();
}
}
public void awaitB(){
try{
lock.lock();
System.out.println("B start");
conditionB.await();
System.out.println("B end");
}catch (InterruptedException e){
}finally{
lock.unlock();
}
}
public void signalA(){
lock.lock();
conditionA.signal();
lock.unlock();
}
public void signalB(){
lock.lock();
conditionB.signal();
lock.unlock();
}
}
ThreadA.java
public class ThreadA extends Thread{
MyService service;
public ThreadA(MyService service){
this.service = service;
}
@Override
public void run(){
service.awaitA();
}
}
ThreadB.java
public class ThreadB extends Thread{
MyService service;
public ThreadB(MyService service){
this.service = service;
}
@Override
public void run(){
service.awaitB();
}
}
Run.java
public class Run {
public static void main(String[] args) throws InterruptedException {
MyService s = new MyService();
ThreadA A = new ThreadA(s);
ThreadB B = new ThreadB(s);
A.start();//线程 A 将会被 ConditionA 挂起
B.start();//线程 B 将会被 ConditionB 挂起
Thread.sleep(1000);
s.signalA();
Thread.sleep(1000);
s.signalB();
}
}
运行结果
A start
B start
A end
B end
生产者消费者模式下,产品生产一个便可消费一个。当线程一对一的时候,每次生产消费都是由这两个线程执行的。当线程多对多的时候,每次生产消费的执行不会是由特定的两个线程承包,而是各个线程都有机会来履行职责。比如 P1、P2、C1、C2,P1 生产的产品,可由 C1 或 C2来消费;而 C1 消费的产品,可能来自 P1 或者 P2。
condition 作为线程监视器,可以绑定多个线程。
以下代码中,conditionP 绑定的是ThreadP, conditionC 绑定的是 ThreadC。
- conditionP.signal() 随机通知一个ThreadP,conditionC.signal() 随机通知一个ThreadC。
- conditionP.signalAll() 通知的是所有ThreadP,conditionC.signalAll() 通知的是所有ThreadC。
多生产者和多消费者中,生产一个消费一个。通知属于同一个 Condition 的所有。
当生产者生产了产品之后,会通知所有的消费者。动作较快的消费者就可以消费产品,剩下的消费者由于没有产品可消费而继续等待。
同样的,当消费者消费产品之后会通知所有生产者。动作较快的生产者可以生产产品,剩下的生产者由于不需要生产而继续等待。
P.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class P {
private ReentrantLock lock;
public Condition conditionP;
public Condition conditionC;
public P(ReentrantLock lock){
this.lock = lock;
this.conditionP = lock.newCondition();
}
public void produce(){
try{
lock.lock();
while (!Value.str.equals("")){
System.out.println(Thread.currentThread().getName() + " await");
conditionP.await();
System.out.println(Thread.currentThread().getName() + " resume");
}
Value.str = new String("Fine" + Value.i++);
System.out.println(Thread.currentThread().getName() + " running");
conditionC.signalAll();//awake all threads that bind to conditionC
lock.unlock();
}catch(InterruptedException e){
}
}
public void connect(C c){
this.conditionC = c.conditionC;
}
}
C.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class C {
private ReentrantLock lock;
public Condition conditionC;
public Condition conditionP;
public C(ReentrantLock lock){
this.lock = lock;
this.conditionC = lock.newCondition();
}
public void consume(){
try{
lock.lock();
while (Value.str.equals("")){
System.out.println(Thread.currentThread().getName() + " await");
conditionC.await();
System.out.println(Thread.currentThread().getName() + " resume");
}
System.out.println(Thread.currentThread().getName() + " consume " + Value.str);
Value.str = new String("");
conditionP.signalAll();//awake all threads that bind to conditionP
lock.unlock();
}catch (InterruptedException e){
}
}
public void connect(P p){
this.conditionP = p.conditionP;
}
}
ThreadP.java
public class ThreadP extends Thread{
private P p;
public ThreadP (P p){
this.p = p;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
p.produce();
}
}
}
ThreadC.java
public class ThreadC extends Thread{
private C c;
public ThreadC(C c){
this.c = c;
}
@Override
public void run(){
for (int i = 0; i < 50; i++) {
c.consume();
}
}
}
Value.java
public class Value {
public static String str = "";
public static int i = 0;
}
Run.java
import java.util.concurrent.locks.ReentrantLock;
public class Run {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
P p = new P(lock);
C c = new C(lock);
p.connect(c);
c.connect(p);
ThreadC[] consumer = new ThreadC[2];
ThreadP[] producer = new ThreadP[2];
for (int i = 0; i < 2; i++) {
consumer[i] = new ThreadC(c);
producer[i] = new ThreadP(p);
consumer[i].setName("consumer"+i);
producer[i].setName("producer"+i);
consumer[i].start();
producer[i].start();
}
}
}
运行结果:多生产者与多消费者交替打印。生产一个打印一个。生产者和消费者线程存在『有被唤醒,但因为没有产品可操作而再次进入等待』的情况。
多生产者和多消费者中,生产一个消费一个。生产一个产品之后只通知一人,消费一个产品之后只通知一人。
只需把以上代码中的 signalAll() 改为 signal()。
执行结果:多生产者与多消费者交替打印。生产一个打印一个。 conditionP.signal() 通知的时候在多个生产者线程中随机唤醒一个来生产。conditionC.signal() 通知的时候在多个消费者线程中随机唤醒一个来消费。
这种方式效率最好。
执行结果:一个生产者和一个消费者交替打印。
公平锁
lock = new ReentrantLock(true);
非公平锁
lock = new ReentrantLock(false);
设定一个控制变量 nextPrint ,当前线程未轮到打印时则挂起,若可以执行则执行并且通知下一位(把nextPrint的值改变)。
顺序是:A 执行,A 通知 B;B 执行,B 通知 C;C 执行,C 通知 A。
Service.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Service {
private ReentrantLock lock;
public Condition conditionA;
public Condition conditionB;
public Condition conditionC;
int nextPrint = 1;
public Service(ReentrantLock lock) {
this.lock = lock;
this.conditionA = lock.newCondition();
this.conditionB = lock.newCondition();
this.conditionC = lock.newCondition();
}
public void serviceA() {
try {
lock.lock();
while (nextPrint != 1){
conditionA.await();
}
for (int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
conditionB.signalAll();
nextPrint = 2;
} catch (InterruptedException e){
}finally{
lock.unlock();
}
}
public void serviceB() {
try {
lock.lock();
while (nextPrint != 2){
conditionB.await();
}
for (int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
conditionC.signalAll();
nextPrint = 3;
} catch (InterruptedException e){
}finally{
lock.unlock();
}
}
public void serviceC() {
try {
lock.lock();
while(nextPrint != 3){
conditionC.await();
}
for (int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
conditionA.signal();
nextPrint = 1;
} catch (InterruptedException e){
}finally{
lock.unlock();
}
}
}
ThreadA.java
public class ThreadA extends Thread{
private Service service;
public ThreadA(Service service){
this.service = service;
}
@Override
public void run(){
service.serviceA();
}
}
ThreadB.java
public class ThreadB extends Thread{
private Service service;
public ThreadB(Service service){
this.service = service;
}
@Override
public void run(){
service.serviceB();
}
}
ThreadC.java
public class ThreadC extends Thread{
private Service service;
public ThreadC(Service service){
this.service = service;
}
@Override
public void run(){
service.serviceC();
}
}
Run.java
import java.util.concurrent.locks.ReentrantLock;
public class Run {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Service service = new Service(lock);
ThreadA[] threadA = new ThreadA[5];
ThreadB[] threadB = new ThreadB[5];
ThreadC[] threadC = new ThreadC[5];
for (int i = 0; i < 5; i++) {
threadA[i] = new ThreadA(service);
threadB[i] = new ThreadB(service);
threadC[i] = new ThreadC(service);
threadA[i].setName("A" + i);
threadB[i].setName("B" + i);
threadC[i].setName("C" + i);
threadA[i].start();
threadB[i].start();
threadC[i].start();
}
}
}
执行结果为,A、B、C 三类线程依次按顺序执行,严格按照ABC的顺序。
-
lock.getHoldCount() 查询当前线程保持此锁的个数。
-
lock.getQueueLength() 当前在等待锁释放的线程数。只能得到大致的数字,用于系统状态监视,不能用作同步。
-
lock.getWaitQueueLength(Condition condition) 获取与 condition 相关联且处于等待状态的线程估计数。
-
lock.hasQueueThread(Thread thread) 查询指定线程是否正在等待获取此锁定。
-
lock.hasQueueThreads() 查询是否有线程正在等待获取此锁定。
-
lock.hasQueueWaiters(Condition condition) 查询是否有线程正在等待与此锁定有关的 condition。
-
lock.isFair() 判断是否为公平锁,ReentrantLock 默认是非公平锁。
-
lock.isHeldByCurrentThread() 查询当前线程是否持有该锁定。
-
lock.isLocked()
-
lock.lockInterruptibly() 如果当前线程未被中断,则获取锁,如果当前线程已经被中断,则出现异常。
-
lock.tryLock() 仅在调用时锁定未被其他线程保持的情况下,才获取该锁。
-
lock.tryLock(long timeout, TimeUnit unit) 如果锁定在给定时间内没有被其他线程保持,且当前线程未中断,则获取该锁的锁定。
-
condition.awaitUniterruptruptibly() 进行无视中断异常的等待,在等待过程中,线程中断不会触发异常,锁的保持状态仍然继续保持,直到被通知。
-
condition.awaitUntil(Date deadline) 线程有限时间的挂起。在时间到达之前可以被其他线程唤醒,在时间到达之后自我唤醒。