在使用Lock之前,我们使用的最多的同步方式应该是synchronized关键字来实现同步了,再配合Object的wait()、notify()系列方法可以实现等待/通知模式,可参考java并发02-细说等待通知机制 (wait/notify)。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。

一、背景

synchronized+wait/notify

synchronized+wait/notify实现等待通知模式

(1)被通知的线程是JVM随机选择的。(注:JDK注释为随机选择,实际不是,具体请参考java并发02-细说等待通知机制 (wait/notify)

(2)使用同一个锁对象调用wait()方法的线程都会被注册在该对象上

(3)伪代码:

线程1{
    synchronized(对象){
        while(条件满足){
            对象.wait();
        }
        ...
    }
}

线程2{
    synchronized(对象){
        对象.notifyAll()/对象.notify();
        ...
    }
}

condition+lock

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

(1)支持选择性通知,调度线程上更加灵活

(2)Lock对象里面可以创建多个Condition实例(对象监视器)

(3)支持单独唤醒部分指定线程

(4)伪代码

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
线程1{
    lock.lock();
    try{
        condition.await();
        ...
    }finally{
        lock.unlock();
    }
}

线程2{
    lock.lock();
    try{
        condition.signal()//condition.signalAll();
        ...
    }finally{
        lock.unlock();
    }
}

在Condition中,用await()替换wait(),用signal()替换 notify(),用signalAll()替换notifyAll(),对于我们以前使用传统的Object方法,Condition都能够给予实现。

二、代码实例

实现一个生产者消费者模型:生产者往仓库里添加商品,若仓库容量到达上限,生产者等待消费者消费,否则往里添加商品,并通知消费者消费;消费者从仓库获取商品,若仓库没有库存,消费者等待生产者生产,否则往外获取商品,并通知生产者生产

public class ConditionDemo {
    public static void main(String[] args) {
        Depot depot = new Depot();

        Producer producer = new Producer(depot);
        Customer customer = new Customer(depot);

        producer.produce(10);
        customer.consume(5);
        producer.produce(15);
        customer.consume(10);
        customer.consume(15);
        producer.produce(10);

    }

}

/**
 * 仓库
 *
 */
class Depot {
    /**
     * 仓库大小
     */
    private int depotSize;

    /**
     * 独占锁
     */
    private Lock lock;

    /**
     * 仓库容量
     */
    private int capacity;

    private Condition fullCondition;
    private Condition emptyCondition;

    public Depot(){
        this.depotSize = 0;
        this.lock = new ReentrantLock();
        this.capacity = 15;
        this.fullCondition = lock.newCondition();
        this.emptyCondition = lock.newCondition();
    }

    /**
     * 商品入库
     * @param value 入库商品数量
     */
    public void put(int value){
        lock.lock();
        try {
            //当前需要入库的商品数量
            int left = value;
            while (left>0){
                //库存已满时,“生产者”等待“消费者”消费
                while (depotSize >= capacity){
                    fullCondition.await();
                }
                //获取当前仓库可以入库的商品数量
                int inc = depotSize + left > capacity ? capacity - depotSize : left;
                depotSize += inc;
                left -= inc;
                System.out.println(Thread.currentThread().getName() + "----要入库数量: " + value +";;实际入库数量:" + inc + ";;仓库货物数量:" + depotSize + ";;没有入库数量:" + left);

                //入库新的商品了之后,通知消费者可以消费了
                emptyCondition.signal();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
            lock.unlock();
        }
    }

    /**
     * 商品出库
     * @param value 需要出库的商品数量
     */
    public void get(int value){
        lock.lock();

        try {
            int left = value;
            while (left>0){
                //仓库里没有商品了,消费者等待生产者生产
                while (depotSize<=0){
                    emptyCondition.await();
                }
                //获取可以出库的商品数量
                int dec = depotSize < left? depotSize : left;
                depotSize -= dec;
                left -= dec;
                System.out.println(Thread.currentThread().getName() + "----要消费的数量:" + value +";;实际消费的数量: " + dec + ";;仓库现存数量:" + depotSize + ";;有多少件商品没有消费:" + left);

                //告诉生产者可以生产了
                fullCondition.signal();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
            lock.unlock();
        }
    }
}

/**
 * 生产者
 */
class Producer {
    private Depot depot;

    public Producer(Depot depot){
        this.depot = depot;
    }

    public void produce(final int value){
        new Thread(){
            @Override
            public void run(){
                depot.put(value);
            }
        }.start();
    }
}

/**
 * 消费者
 */
class Customer {
    private Depot depot;

    public Customer(Depot depot){
        this.depot = depot;
    }

    public void consume(final int value){
        new Thread(){
            public void run(){
                depot.get(value);
            }
        }.start();
    }
}

三、方法介绍

1.void await() throws InterruptedException;

  • 让当前线程进入等待状态,直到它被唤醒或被中断。
  • 与当前Condition对象相关联的Lock锁会自动释放。
  • 为了线程调度目的,当前线程会变为不可用的,直到出现以下四种情形之一:

    • 其他线程调用了当前Condition对象的signal()方法,并且当前线程恰好被选为被唤醒线程。
    • 其他线程调用了当前Condition对象的signalAll()方法
    • 其他线程调用了当前线程的interrupt()方法,而当前线程支持线程中断。
    • 产生虚假唤醒。
  • 以上所有的情形中,在awit()方法返回之前,当前线程必须重新获取与当前Condition对象相关联的Lock锁。
  • 在执行awit()方法过程中,如果当前线程被中断(interrupt())了,则会抛出一个InterruptedException异常,并将中断状态重置。

2.void awaitUninterruptibly();

  • 让当前线程进入等待状态,直到它被唤醒。
  • 与当前Condition对象相关联的Lock锁会自动释放。
  • 为了线程调度目的,当前线程会变为不可用的,直到出现以下三种情形之一:

    • 其他线程调用了当前Condition对象的signal()方法,并且当前线程恰好被选为被唤醒线程。
    • 其他线程调用了当前Condition对象的signalAll()方法
    • 产生虚假唤醒。
  • 以上所有的情形中,在awit()方法返回之前,当前线程必须重新获取与当前Condition对象相关联的Lock锁。
  • 该等待不可中断。

3.long awaitNanos(long nanosTimeout) throws InterruptedException;

  • 让当前线程进入等待状态,直到它被唤醒、被中断或限定时间超时。
  • 与当前Condition对象相关联的Lock锁会自动释放。
  • 为了线程调度目的,当前线程会变为不可用的,直到出现以下五种情形之一:

    • 其他线程调用了当前Condition对象的signal()方法,并且当前线程恰好被选为被唤醒线程。
    • 其他线程调用了当前Condition对象的signalAll()方法
    • 其他线程调用了当前线程的interrupt()方法,而当前线程支持线程中断。
    • 限定的等待时间超时。
    • 产生虚假唤醒。
  • 以上所有的情形中,在awit()方法返回之前,当前线程必须重新获取与当前Condition对象相关联的Lock锁。
  • 在执行awit()方法过程中,如果当前线程被中断(interrupt())了,则会抛出一个InterruptedException异常,并将中断状态重置。
  • 这个方法会返回剩余的等待时间;如果已经超时,则返回值可能是0或负值。这个值可以用来确定是否还要继续等待。使用纳秒是为了进行精度控制。通常这个方法的用法如下:
boolean aMethod(long timeout, TimeUnit unit) {
  //获取纳秒
  long nanos = unit.toNanos(timeout);
  //加锁
  lock.lock();
  try {
    //如果继续等待
    while (!conditionBeingWaitedFor()) {
      //如果已经超时,则...
      if (nanos <= 0L)
        return false;
      //获取剩余等待时间
      nanos = theCondition.awaitNanos(nanos);
    }
    // ...
  } finally {
    //解锁
    lock.unlock();
  }
}1234567891011121314151617181920

4.boolean await(long time, TimeUnit unit) throws InterruptedException;

  • 让当前线程进入等待状态,直到它被唤醒、被中断或限定时间超时。
  • 这个方法等同于awaitNanos(unit.toNanos(time)) > 0,不再赘述。

5.boolean awaitUntil(Date deadline) throws InterruptedException;

  • 让当前线程进入等待状态,直到它被唤醒、被中断或截止时间过去。
  • 与当前Condition对象相关联的Lock锁会自动释放。
  • 为了线程调度目的,当前线程会变为不可用的,直到出现以下五种情形之一:

    • 其他线程调用了当前Condition对象的signal()方法,并且当前线程恰好被选为被唤醒线程。
    • 其他线程调用了当前Condition对象的signalAll()方法
    • 其他线程调用了当前线程的interrupt()方法,而当前线程支持线程中断。
    • 指定的截止时间已经过去。
    • 产生虚假唤醒。
  • 以上所有的情形中,在awit()方法返回之前,当前线程必须重新获取与当前Condition对象相关联的Lock锁。
  • 在执行awit()方法过程中,如果当前线程被中断(interrupt())了,则会抛出一个InterruptedException异常,并将中断状态重置。
  • 这个方法的返回值表示截止时间是否已经过去。
  • 通常这个方法的用法如下:
boolean aMethod(Date deadline) {
  boolean stillWaiting = true;
  //加锁
  lock.lock();
  try {
    //如果继续等待
    while (!conditionBeingWaitedFor()) {
      if (!stillWaiting)
        return false;
      //获取指定的截止时间已经过去
      stillWaiting = theCondition.awaitUntil(deadline);
    }
    // ...
  } finally {
    //解锁
    lock.unlock();
  }
}123456789101112131415161718

6.void signal();

  • 唤醒一个线程。
  • 如果当前的Condition对象上有等待的线程,则唤醒其中的一个线程。
  • 这个被唤醒的线程必须在从awit()方法返回之前重新获取锁。

7.void signalAll();

  • 唤醒所有线程。
  • 如果当前的Condition对象上有等待的线程,则唤醒全部的线程。
  • 每个被唤醒的线程必须在从awit()方法返回之前重新获取锁。

四、实现原理

参考资料

java并发系列之五-锁

[Lock系列-Condition](