一个线程修改了一个对象的值,另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行有时另一个线程。前者是生产者,后者是消费者,这种模式隔离了“做什么”(what)和“怎么做”(how),在功能层面上实现了解耦,体系结构上具备良好的伸缩性,在Java语言中是如何实现类似的等待通知机制的呢? java中Object对象包含三个final方法,它们允许线程就资源的锁状态进行通信,这三个方法分别是wait()、notify()、notifyAll(),今天我们就来学习一下这三个方法。

一、等待通知机制

java中实现通知等待机制最简单的方式就是让消费者不断地循环检查变量是否符合预期,如下面代码所示,在while循环中设置 不满足的条件,如果条件满足就退出循环,从而完成消费者工作。

while (value != desire) {
    try {
        Thread.sleep(1000);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
doSomething();

该段代码在不满足条件时就睡眠一段时间,其目的是减少过多的无效尝试,降低对处理器资源的浪费,上述方式存在以下问题:

1、难以确保及时性,在睡眠时,基本不消耗处理器资源,但是如果睡眠过久,就不能及时发现条件的变化,也就是及时性难以保证。

2、难以降低开销,如果睡眠时间降低为1毫秒,这样消费者就能很迅速地发现条件的变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

以上两个问题,看似矛盾难以调和,但是通过Java的wait()/notify()实现的等待/通知机制就能够很好地解决这个矛盾并实现所需的功能。

等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上,Object作为java中所有对象的基类,其存在的价值不言而喻,其中wait()和notify()方法的实现为多线程协作提供了保证。

二、java中的等待通知机制

2.1 方法介绍

wait():

wait()方法会让当前线程进入等待状态,同时也会释放当前线程所持有的锁,直到其它线程调用了该对象的notify()或notifyAll()方法,该对象才会被唤醒进入就绪状态,wati有三个重载方法,分别是:

#一直等待,直到被唤醒
public final void wait() throws InterruptedException
#一直等待,直到被唤醒或超时(毫秒)
public final native void wait(long timeout) throws InterruptedException;
#一直等待,直到被唤醒或超时(纳秒)
public final void wait(long timeout, int nanos) throws InterruptedException

notify()

唤醒等待该对象同步锁的其中一个线程,并放入该对象的锁池中。对象的锁池中线程可以去竞争得到对象锁,然后开始执行。注意调用对象的该方法也需要先获得该对象的锁。

notifyAll()

同上,但是是唤醒等待当前对象同步锁的所有线程

2.2 代码实例

需求详情:实现一个生产者-消费者模型,其中消费者线程正在等待队列中的消息对象,生产者线程将消息对象放入队列,并通知等待的线程。

public class WaitNotifyTest {
    public static void main(String[] args) {
        Message msg = new Message("Process it");
        Waiter waiter1 = new Waiter(msg);
        new Thread(waiter1, "1号线程").start();

        Waiter waiter2 = new Waiter(msg);
        new Thread(waiter2, "2号线程").start();

        Notifier notifier = new Notifier(msg);
        new Thread(notifier, "唤醒线程").start();
    }
}
//消息对象
class Message {
    private String msg;

    public Message(String str){
        this.msg=str;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String str) {
        this.msg=str;
    }
}

//消费者
class Waiter implements Runnable {
    private Message msg;

    public Waiter(Message m){
        this.msg=m;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        synchronized (msg) {
            try{
                System.out.println(name+" 获得锁:"+msg);
                System.out.println(name+" 即将执行wait方法,释放锁,进入等待被唤醒状态");
                msg.wait();
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println(name+" 已被唤醒:"+System.currentTimeMillis());
            //process the message now
            System.out.println(name+" 消息处理完成: "+msg.getMsg());
        }
    }
}
//生产者
class Notifier implements Runnable {

    private Message msg;

    public Notifier(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name+" 已经启动: "+System.currentTimeMillis());
        try {
            Thread.sleep(1000);
            synchronized (msg) {
                msg.setMsg(name+" 我是一条消息");
                msg.notify();
                // msg.notifyAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

当我们执行测试程序,会输出以下结果,但是程序执行并没有完成,因为我们调用了notify()方法,只是唤醒了其中的一个线程,另外一个线程在一直等待

1号线程 获得锁:com.wangst.study.bingfa.Message@b20b63
1号线程 即将执行wait方法,释放锁,进入等待被唤醒状态
2号线程 获得锁:com.wangst.study.bingfa.Message@b20b63
2号线程 即将执行wait方法,释放锁,进入等待被唤醒状态
唤醒线程 已经启动
1号线程 已被唤醒:1606703340787
1号线程 消息处理完成: 唤醒线程 我是一条消息

当我们调用notifyAll()方法时,会唤醒所有等待该对象的线程,会输出以下结果:

1号线程 获得锁:com.wangst.study.bingfa.Message@283323cb
1号线程 即将执行wait方法,释放锁,进入等待被唤醒状态
2号线程 获得锁:com.wangst.study.bingfa.Message@283323cb
2号线程 即将执行wait方法,释放锁,进入等待被唤醒状态
唤醒线程 已经启动: 1606703566657
2号线程 已被唤醒:1606703567657
2号线程 消息处理完成: 唤醒线程 我是一条消息
1号线程 已被唤醒:1606703567657
1号线程 消息处理完成: 唤醒线程 我是一条消息

2.3 总结:

  1. 一定要在synchronized中使用wait()/notify()/notifyAll(),先获取对象锁,否则jvm会抛出IllegalMonitorStateException异常。
  2. 使用wait()时,判断线程是否进入wait状态的条件一定要使用while而不要使用if,因为等待线程可能会被错误地唤醒,所以应该使用while循环在等待前等待后都检查唤醒条件是否被满足,保证安全性。
  3. notify()或notifyAll()方法调用后,线程不会立即释放锁,只会将wait中的线程从等待队列移到同步队列,也就是线程状态从waitting变为blocked;处于blocked状态的线程可以去抢占锁。
  4. wait()方法返回的前提是线程重新获得了对象的锁

三、wait()方法实现

Object类中的wait()方法源码为

public final void wait() throws InterruptedException {
    wait(0);
}

wait()方法调用了它的重载方法wait(long),其声明如下

public final native void wait(long timeout) throws InterruptedException;

可以看到这是一个native方法,方法的具体实现,我们可以通过OpenJdk的源码(Object.c)来找到

static JNINativeMethod methods[] = {
    {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
    {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
    {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
    {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

其中,JVM_MonitorWait和JVM_MonitorNotify分别对应于wait()和notify()方法,JVM_MonitorWait方法声明是在jvm.h中,如下所示

JNIEXPORT void JNICALL
JVM_MonitorWait(JNIEnv *env, jobject obj, jlong ms);

方法实现为:

JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
  JVMWrapper("JVM_MonitorWait");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  assert(obj->is_instance() || obj->is_array(), "JVM_MonitorWait must apply to an object");
  JavaThreadInObjectWaitState jtiows(thread, ms != 0);
  if (JvmtiExport::should_post_monitor_wait()) {
    JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
  }
  ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END

可以看到JVM_MonitorWait方法最终调用了ObjectSynchronizer的wait方法。

void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
  if (UseBiasedLocking) {
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
  }
  if (millis < 0) {
    TEVENT (wait - throw IAX) ;
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
  ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
  DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
  monitor->wait(millis, true, THREAD);
 
  /* This dummy call is in place to get around dtrace bug 6254741.  Once
     that's fixed we can uncomment the following line and remove the call */
  // DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
  dtrace_waited_probe(monitor, obj, THREAD);
}

该方法首先判断了参数的合法性,然后调用ObjectSynchronizer::inflate()方法返回了一个ObjectMonitor对象,ObjectSynchronizer类中的方法大部分都是通过ObjectMonitor对象来实现的,inflate()方法声明为

// Inflate light weight monitor to heavy weight monitor
static ObjectMonitor* inflate(Thread * Self, oop obj);

可以看到,inflate()方法是将轻量级锁膨胀为重量级锁,关于轻量级锁、重量级锁以及ObjectMonitor的介绍可以看这一篇细说synchronized,最终,是通过调用ObjectMonitor的wait()方法来实现等待的,其主要代码如下

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   ...
   // create a node to be put into the queue
   // Critically, after we reset() the event but prior to park(), we must check
   // for a pending interrupt.
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag
 
   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.
 
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
 
   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }
   intptr_t save = _recursions; // record the old recursion count
   _waiters++;                  // increment the number of waiters
   _recursions = 0;             // set the recursion level to be 1
   exit (Self) ;                    // exit the monitor
   guarantee (_owner != Self, "invariant") ;
   ...
 
   if (node._notified != 0 && _succ == Self) {
      node._event->unpark();
   }
 
   // The thread is on the WaitSet list - now park() it.
   ...
}

ObjectMonitor的wait()方法的实现主要分为以下几个步骤

  1. 将调用等待线程封装为ObjectWaiter类的对象node
  2. 通过ObjectMonitor::AddWaiter方法将node添加到_WaitSet列表中
  3. 通过ObjectMonitor::exit方法释放当前的ObjectMonitor对象,这样其它竞争线程就可以获取该ObjectMonitor对象
  4. 最终通过底层的park()方法挂起线程

1、将调用等待线程封装为ObjectWaiter类的对象node

ObjectWaiter类声明如下:

class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
  enum Sorted  { PREPEND, APPEND, SORTED } ;
  ObjectWaiter * volatile _next;
  ObjectWaiter * volatile _prev;
  Thread*       _thread;
  ParkEvent *   _event;
  volatile int  _notified ;
  volatile TStates TState ;
  Sorted        _Sorted ;           // List placement disposition
  bool          _active ;           // Contention monitoring is enabled
 public:
  ObjectWaiter(Thread* thread);
 
  void wait_reenter_begin(ObjectMonitor *mon);
  void wait_reenter_end(ObjectMonitor *mon);
};

ObjectWaiter对象是双向链表结构,保存了_thread(当前线程)以及当前的状态TState等数据,每个等待锁的线程都会被封装成ObjectWaiter对象。

2、通过ObjectMonitor::AddWaiter方法将node添加到_WaitSet列表中

调用此方法前后需要获取和释放_WaitSet列表的_WaitSetLock锁。从注释中可以看到,_WaitSet列表其实是一个双向循环链表。

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

3、通过ObjectMonitor::exit方法释放当前的ObjectMonitor对象,这样其它竞争线程就可以获取该ObjectMonitor对象

void ATTR ObjectMonitor::exit(TRAPS) {
   Thread * Self = THREAD ;
   if (THREAD != _owner) {
     if (THREAD->is_lock_owned((address) _owner)) {
       // Transmute _owner from a BasicLock pointer to a Thread address.
       // We don't need to hold _mutex for this transition.
       // Non-null to Non-null is safe as long as all readers can
       // tolerate either flavor.
       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } else {
       // NOTE: we need to handle unbalanced monitor enter/exit
       // in native code by throwing an exception.
       // TODO: Throw an IllegalMonitorStateException ?
       TEVENT (Exit - Throw IMSX) ;
       assert(false, "Non-balanced monitor enter/exit!");
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       return;
     }
   }
 
   ...
}

4、最终通过底层的park()方法挂起线程

四、notify()方法实现

notify()方法最终通过ObjectMonitor的void notify(TRAPS)实现。

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
 
  int Policy = Knob_MoveNotifyee ;
 
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;
  if (iterator != NULL) {
     TEVENT (Notify1 - Transfer) ;
     guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
     guarantee (iterator->_notified == 0, "invariant") ;
     if (Policy != 4) {
        iterator->TState = ObjectWaiter::TS_ENTER ;
     }
     iterator->_notified = 1 ;
 
     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }
 
     if (Policy == 0) {       // prepend to EntryList
         
     } else if (Policy == 1) {      // append to EntryList
         
     } else if (Policy == 2) {      // prepend to cxq
         
     }
     ...
}

ObjectMonitor的notify()方法的实现主要分为以下几个步骤:

  1. 若_WaitSet为NULL,即没有需要唤醒的线程,则直接退出。 _
  2. 通过ObjectMonitor::DequeueWaiter方法,获取_WaitSet列表中的第一个ObjectWaiter节点。_
  3. 根据不同的策略,将取出来的ObjectWaiter节点,加入到_EntryList或则通过Atomic::cmpxchg_ptr指令进行自旋操作cxq
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

这里需要注意的是,在jdk的notify()方法注释中说明的是随机唤醒一个线程,这里其实是第一个ObjectWaiter节点。

五、notifyAll()方法实现

lock.notifyAll()方法最终通过ObjectMonitor的void notifyAll(TRAPS)实现。该方法和notify()方法比较类似,不同的是,notifyAll()通过for循环取出_WaitSetObjectWaiter节点,并根据不同策略,加入到_EntryList或则进行自旋操作。

void ObjectMonitor::notifyAll(TRAPS) {
  ...
 
  for (;;) {
     iterator = DequeueWaiter () ;
     if (iterator == NULL) break ;
     TEVENT (NotifyAll - Transfer1) ;
     ++Tally ;
 
     ...
 
     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }
 
     if (Policy == 0) {       // prepend to EntryList
         
     } else if (Policy == 1) {      // append to EntryList
         
     } else if (Policy == 2) {      // prepend to cxq
         
     }
  }
}

六、总结

通过上述的分析,可以发现,wait()方法会释放所占有的ObjectMonitor对象,而notify()和notifyAll()并不会释放所占有的ObjectMonitor对象,它们的主要工作是将相应的线程从_WaitSet转移到 _EntryList中,然后等待竞争获取锁。

其实真正释放ObjectMonitor对象的时间点是在执行monitorexit指令,一旦释放ObjectMonitor对象后,_EntryList中ObjectWaiter节点所保存的线程就可以竞争ObjectMonitor对象进行加锁操作了。

参考资料

https://www.cnblogs.com/liukaifeng/p/10052662.html

https://blog.csdn.net/qq_38293564/article/details/80432875