ObjectMonitor不仅是重量级锁的实现,还是Object的wait/notify/notifyAll方法的底层核心实现,本篇博客就详细探讨该类的实现。
1、定义
ObjectMonitor的定义位于hotspot\src\share\vm\runtime\objectMonitor.hpp中,包含如下属性:
- volatile markOop _header; // 锁对象oop的原始对象头
- void* volatile _object; // 关联的锁对象oop
- double SharingPad [1] ; // temp to reduce false sharing
- void * volatile _owner; // 占用当前锁的线程
- volatile jlong _previous_owner_tid; // thread id of the previous owner of the monitor
- volatile intptr_t _recursions; //记录嵌套(递归)加锁的次数,最外层的锁的_recursions属性为0
- int OwnerIsThread ; // 表明当前owner原来持有轻量级锁
- ObjectWaiter * volatile _cxq ; // cxq链表头元素
- ObjectWaiter * volatile _EntryList ; // EntryList 链表头元素
- Thread * volatile _succ ; // Heir presumptive thread - used for futile wakeup throttling
- Thread * volatile _Responsible ;
- int _PromptDrain ; // rqst to drain cxq into EntryList ASAP
- volatile int _Spinner ; // 用来记录正在自旋的线程数
- volatile int _SpinFreq ; // Spin 1-out-of-N attempts: success rate
- volatile int _SpinClock ;
- volatile int _SpinDuration ; //用来控制自旋的总次数
- volatile intptr_t _SpinState ; // MCS/CLH list of spinners
- volatile intptr_t _count; // 抢占该锁的线程数
- volatile intptr_t _waiters; // 调用wait方法后等待的线程数
- ObjectWaiter * volatile _WaitSet; // 调用wait方法后等待的ObjectWaiter链表
- volatile int _WaitSetLock; // 操作WaitSet链表的锁
- int _QMix ; // Mixed prepend queue discipline
- ObjectMonitor * FreeNext ; // Free list linkage
2、TrySpin_VaryDuration
默认配置下自旋的次数是会自适应调整的,可以通过参数指定自旋固定的次数,注意在自旋的过程中会判断是否进入安全点同步,如果是则终止自旋。
int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) {
//Knob_FixedSpin默认是0,表示固定自旋的次数
int ctr = Knob_FixedSpin ;
if (ctr != 0) {
//每一次while循环都是一次自旋,在指定的次数内抢占成功就是成功,否则失败
while (--ctr >= 0) {
//尝试抢占该锁,如果成功返回1
if (TryLock (Self) > 0) return 1 ;
//抢占失败,该方法直接返回0
SpinPause () ;
}
return 0 ;
}
//Knob_PreSpin的默认值是10
for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) {
if (TryLock(Self) > 0) {
//抢占成功
int x = _SpinDuration ;
//Knob_SpinLimit的默认值是5000
if (x < Knob_SpinLimit) {
//增加_SpinDuration
//Knob_Poverty的默认值是1000,Knob_BonusB对的默认值是100
if (x < Knob_Poverty) x = Knob_Poverty ;
//即_SpinDuration的最小值是1100,最大值是5000
_SpinDuration = x + Knob_BonusB ;
}
return 1 ;
}
SpinPause () ;
}
ctr = _SpinDuration ;
//Knob_SpinBase的默认值是10
if (ctr < Knob_SpinBase) ctr = Knob_SpinBase ;
if (ctr <= 0) return 0 ;
//Knob_SuccRestrict默认为0
if (Knob_SuccRestrict && _succ != NULL) return 0 ;
//Knob_OState默认为3,NotRunnable用于判断目标线程是否退出,如果已退出则终止自旋
if (Knob_OState && NotRunnable (Self, (Thread *) _owner)) {
TEVENT (Spin abort - notrunnable [TOP]);
return 0 ;
}
//Knob_MaxSpinners默认为-1
int MaxSpin = Knob_MaxSpinners ;
if (MaxSpin >= 0) {
if (_Spinner > MaxSpin) {
TEVENT (Spin abort -- too many spinners) ;
return 0 ;
}
//原子的将_Spinner属性加1,不断循环直到修改成功
Adjust (&_Spinner, 1) ;
}
int hits = 0 ;
int msk = 0 ;
//Knob_CASPenalty默认值是-1
int caspty = Knob_CASPenalty ;
//Knob_OXPenalty默认值是-1
int oxpty = Knob_OXPenalty ;
//Knob_SpinSetSucc默认值是1
int sss = Knob_SpinSetSucc ;
if (sss && _succ == NULL ) _succ = Self ;
Thread * prv = NULL ;
// There are three ways to exit the following loop:
// 1. A successful spin where this thread has acquired the lock.
// 2. Spin failure with prejudice
// 3. Spin failure without prejudice
while (--ctr >= 0) {
if ((ctr & 0xFF) == 0) {
//0xFF就是256,即每自旋256次就需要检查是否开启了安全点同步
if (SafepointSynchronize::do_call_back()) {
//do_call_back返回true,说明进入了安全点同步
TEVENT (Spin: safepoint) ;
//跳转到Abort
goto Abort ;
}
//Knob_UsePause默认值是1
if (Knob_UsePause & 1) SpinPause () ;
//SpinCallbackFunction默认为NULL
int (*scb)(intptr_t,int) = SpinCallbackFunction ;
if (hits > 50 && scb != NULL) {
int abend = (*scb)(SpinCallbackArgument, 0) ;
}
}
if (Knob_UsePause & 2) SpinPause() ;
if (ctr & msk) continue ;
++hits ;
if ((hits & 0xF) == 0) {
//BackOffMask默认值是0
msk = ((msk << 2)|3) & BackOffMask ;
}
Thread * ox = (Thread *) _owner ;
if (ox == NULL) {
//该锁未被占用,通过cas抢占
ox = (Thread *) Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (ox == NULL) {
//抢占成功
if (sss && _succ == Self) {
_succ = NULL ;
}
//原子的将_Spinner减1
if (MaxSpin > 0) Adjust (&_Spinner, -1) ;
//增加_SpinDuration
int x = _SpinDuration ;
if (x < Knob_SpinLimit) {
if (x < Knob_Poverty) x = Knob_Poverty ;
_SpinDuration = x + Knob_Bonus ;
}
return 1 ;
}
//CAS抢占失败,caspty默认是-1
prv = ox ;
TEVENT (Spin: cas failed) ;
if (caspty == -2) break ;
if (caspty == -1) goto Abort ;
ctr -= caspty ;
continue ;
} //if结束
//如果占有该锁的线程发生改变了,oxpty默认值是-1
if (ox != prv && prv != NULL ) {
TEVENT (spin: Owner changed)
if (oxpty == -2) break ;
if (oxpty == -1) goto Abort ;
ctr -= oxpty ;
}
//记录下当前占用锁的线程
prv = ox ;
//如果占有该锁的线程退出了,则终止自旋
if (Knob_OState && NotRunnable (Self, ox)) {
TEVENT (Spin abort - notrunnable);
goto Abort ;
}
if (sss && _succ == NULL ) _succ = Self ;
} //while循环结束
TEVENT (Spin failure) ;
{
int x = _SpinDuration ;
if (x > 0) {
//Knob_Penalty的默认值是200
x -= Knob_Penalty ;
if (x < 0) x = 0 ;
//实际就是将_SpinDuration减去Knob_Penalty
_SpinDuration = x ;
}
}
Abort:
if (MaxSpin >= 0) Adjust (&_Spinner, -1) ;
if (sss && _succ == Self) {
_succ = NULL ;
OrderAccess::fence() ;
//尝试获取锁
if (TryLock(Self) > 0) return 1 ;
}
return 0 ;
}
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
//如果不等于NULL说明某个线程依然占用该锁
if (own != NULL) return 0 ;
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
//如果交换成功,说明抢占成功
assert (_recursions == 0, "invariant") ;
assert (_owner == Self, "invariant") ;
return 1 ;
}
//抢占失败返回-1
if (true) return -1 ;
}
}
extern "C" {
int SpinPause() {
return 0;
}
}
3、ObjectWaiter
ObjectWaiter表示一个等待获取ObjectMonitor锁的线程,其定义如下:

其中next和prev属性表示 ObjectWaiter链表中的前后节点,_thread和_event都是关联的线程属性,TState用于描述当前ObjectWaiter的状态,刚创建时的状态是TS_RUN,加入到cxq链表中状态是TS_CXQ,加入到EntryList链表后变成TS_ENTER,加入到WaitSet链表中的状态就是TS_WAIT,另外两个状态枚举没有调用。_active用于记录当前线程是否开启了线程监控,如果开启了可以通过jmm接口获取线程运行的统计数据,比如锁抢占的次数和累计耗时。_notified属性用于记录该ObjectWaiter是否被某个线程唤醒了而不是因为线程中断唤醒的,_notifier_tid用于记录执行唤醒动作的线程指针。
三个方法的实现如下:
ObjectWaiter::ObjectWaiter(Thread* thread) {
_next = NULL;
_prev = NULL;
_notified = 0;
//初始状态
TState = TS_RUN ;
//关联的线程
_thread = thread;
_event = thread->_ParkEvent ;
_active = false;
assert (_event != NULL, "invariant") ;
}
void ObjectWaiter::wait_reenter_begin(ObjectMonitor *mon) {
JavaThread *jt = (JavaThread *)this->_thread;
_active = JavaThreadBlockedOnMonitorEnterState::wait_reenter_begin(jt, mon);
}
void ObjectWaiter::wait_reenter_end(ObjectMonitor *mon) {
JavaThread *jt = (JavaThread *)this->_thread;
JavaThreadBlockedOnMonitorEnterState::wait_reenter_end(jt, _active);
}
static bool wait_reenter_begin(JavaThread *java_thread, ObjectMonitor *obj_m) {
assert((java_thread != NULL), "Java thread should not be null here");
bool active = false;
if (is_alive(java_thread) && ServiceUtil::visible_oop((oop)obj_m->object())) {
active = contended_enter_begin(java_thread);
}
return active;
}
static void wait_reenter_end(JavaThread *java_thread, bool active) {
if (active) {
java_thread->get_thread_stat()->contended_enter_end();
}
//修改线程状态
set_thread_status(java_thread, java_lang_Thread::RUNNABLE);
}
//判断目标JavaThread是否存活的
static bool is_alive(JavaThread* java_thread) {
return java_thread != NULL && java_thread->threadObj() != NULL;
}
//如果这个oop是Java代码可见的,则返回true
static inline bool visible_oop(oop o) {
//如果是已经删除的JNI引用则不可见
if (o == JNIHandles::deleted_handle()) {
return false;
}
// instance
if (o->is_instance()) {
//如果是java_lang_Class的实例,即用来保存类静态属性的oop则返回false
//否则返回true
if (o->klass() != SystemDictionary::Class_klass()) {
return true;
}
//如果是基本类型
if (java_lang_Class::is_primitive(o)) {
return true;
}
//获取o所属的klass
Klass* k = java_lang_Class::as_Klass(o);
if (k->is_klass()) {
//普通Java类
if (k->oop_is_instance()) {
return true;
}
//对象数组
if (k->oop_is_objArray()) {
return true;
}
//多维数组
if (k->oop_is_typeArray()) {
return true;
}
}
return false;
}
// object arrays are visible if they aren't system object arrays
if (o->is_objArray()) {
return true;
}
// type arrays are visible
if (o->is_typeArray()) {
return true;
}
// everything else (Method*s, ...) aren't visible
return false;
}; // end of visible_oop()
static bool contended_enter_begin(JavaThread *java_thread) {
//修改Java状态
set_thread_status(java_thread, java_lang_Thread::BLOCKED_ON_MONITOR_ENTER);
//ThreadStatistics用于统计一些静态属性
ThreadStatistics* stat = java_thread->get_thread_stat();
stat->contended_enter();
//is_thread_monitoring_contention方法返回_thread_monitoring_contention_enabled属性,该属性默认为false,可通过jmm接口开启
bool active = ThreadService::is_thread_monitoring_contention();
if (active) {
stat->contended_enter_begin();
}
return active;
}
static void set_thread_status(JavaThread* java_thread,
java_lang_Thread::ThreadStatus state) {
java_lang_Thread::set_thread_status(java_thread->threadObj(), state);
}
其中wait_reenter_begin方法将线程的状态置为BLOCKED_ON_MONITOR_ENTER,如果通过JMM接口开启了JavaThread监控则记录当前线程开始锁抢占了;wait_reenter_end方法将线程的状态置为RUNNABLE,如果开启了JavaThread监控,则记录当前线程锁抢占结束了。
其中_thread_monitoring_contention_enabled属性的调用链如下:

jmm_SetBoolAttribute方法的实现如下:

该方法就是sun_management_ThreadImpl类的相关set方法的底层实现,如下:

即Java代码中可以通过此方法开启Java线程的运行情况的监控。
4、EnterI
EnterI方法会初始化线程自旋相关配置,然后自旋尝试获取锁,获取失败后将当前线程加入到ObjectWaiter队列中,然后借助底层操作系统的互斥量让当前线程处于休眠状态,如果持有锁的线程释放了锁就会唤醒该线程,被唤醒后该线程会尝试获取锁,获取失败再自旋,依然获取失败再次进入休眠状态,如此循环直到获取锁为止,获取成功后将当前线程对应的ObjectWaiter从队列中移除。
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
//校验线程状态已经处于阻塞中
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
if (TryLock (Self) > 0) {
//尝试获取锁,获取成功则返回
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
//初始化自旋相关配置参数
DeferredInitialize () ;
if (TrySpin (Self) > 0) {
//再次尝试自旋,获取锁成功则返回
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
//自旋获取锁失败,将当前线程加入到等待队列中并且park
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
//创建一个ObjectWaiter并初始化
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
//原子的修改_cxq为node,如果修改成功则终止循环,表示已经成功加入到链表中
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
//修改失败,某个线程改变了cxq
if (TryLock (Self) > 0) {
//再次尝试获取锁,获取成功则返回
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
//SyncFlags对的默认值是0
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
//nxt或者_EntryList为NULL,说明当前线程是第一个阻塞的线程,将_Responsible原子的修改为当前线程
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
//尝试获取锁
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
//原子的将_Responsible置为Self
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
//将目标线程park掉,底层通过操作系统的互斥量实现,让当前线程休眠
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
//增加等待时间,最大不超过1s
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
//线程被唤醒了,即某个占用锁的线程释放了锁,尝试抢占该锁
if (TryLock(Self) > 0) break ;
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
//增加计数
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
//增加计数
++ nWakeups ;
//Knob_SpinAfterFutile默认值是1,此时会再次尝试自旋获取锁
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
//Knob_ResetEvent默认值是0
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
if (_succ == Self) _succ = NULL ;
//强制所有修改立即生效
OrderAccess::fence() ;
}
//for循环结束,当前线程已经获取了锁
assert (_owner == Self , "invariant") ;
assert (object() != NULL , "invariant") ;
//将其从EntryList或者cxq链表中移除
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
assert (_succ != Self, "invariant") ;
if (_Responsible == Self) {
//将_Responsible置为NULL
_Responsible = NULL ;
OrderAccess::fence(); // Dekker pivot-point
}
if (SyncFlags & 8) {
OrderAccess::fence() ;
}
return ;
}
void ObjectMonitor::DeferredInitialize () {
//初始化完成时会将InitDone置为1,即只初始化第一次即可
if (InitDone > 0) return ;
if (Atomic::cmpxchg (-1, &InitDone, 0) != 0) {
//将其原子的修改为-1,如果修改失败说明有一个线程已经完成了修改
//自旋等待该线程完成初始化
while (InitDone != 1) ;
return ;
}
// SyncKnobs是一个配置项,用来配置跟自旋等待相关的属性
if (SyncKnobs == NULL) SyncKnobs = "" ;
//获取其字符长度
size_t sz = strlen (SyncKnobs) ;
//分配一个字符数组
char * knobs = (char *) malloc (sz + 2) ;
if (knobs == NULL) {
//分配失败抛出异常
vm_exit_out_of_memory (sz + 2, OOM_MALLOC_ERROR, "Parse SyncKnobs") ;
guarantee (0, "invariant") ;
}
//复制到knobs
strcpy (knobs, SyncKnobs) ;
//加1的字符置为0,表示字符串结束
knobs[sz+1] = 0 ;
for (char * p = knobs ; *p ; p++) {
if (*p == ':') *p = 0 ;
}
//初始化各项配置,kvGetInt负责查找配置项的值
#define SETKNOB(x) { Knob_##x = kvGetInt (knobs, #x, Knob_##x); }
SETKNOB(ReportSettings) ;
SETKNOB(Verbose) ;
SETKNOB(FixedSpin) ;
SETKNOB(SpinLimit) ;
SETKNOB(SpinBase) ;
SETKNOB(SpinBackOff);
SETKNOB(CASPenalty) ;
SETKNOB(OXPenalty) ;
SETKNOB(LogSpins) ;
SETKNOB(SpinSetSucc) ;
SETKNOB(SuccEnabled) ;
SETKNOB(SuccRestrict) ;
SETKNOB(Penalty) ;
SETKNOB(Bonus) ;
SETKNOB(BonusB) ;
SETKNOB(Poverty) ;
SETKNOB(SpinAfterFutile) ;
SETKNOB(UsePause) ;
SETKNOB(SpinEarly) ;
SETKNOB(OState) ;
SETKNOB(MaxSpinners) ;
SETKNOB(PreSpin) ;
SETKNOB(ExitPolicy) ;
SETKNOB(QMode);
SETKNOB(ResetEvent) ;
SETKNOB(MoveNotifyee) ;
SETKNOB(FastHSSEC) ;
#undef SETKNOB
if (Knob_Verbose) {
//检查配置的合法性
sanity_checks();
}
if (os::is_MP()) {
BackOffMask = (1 << Knob_SpinBackOff) - 1 ;
if (Knob_ReportSettings) ::printf ("BackOffMask=%X\n", BackOffMask) ;
} else {
Knob_SpinLimit = 0 ;
Knob_SpinBase = 0 ;
Knob_PreSpin = 0 ;
Knob_FixedSpin = -1 ;
}
if (Knob_LogSpins == 0) {
ObjectMonitor::_sync_FailedSpins = NULL ;
}
//释放knobs的内存
free (knobs) ;
//让修改立即生效
OrderAccess::fence() ;
//标识初始化完成
InitDone = 1 ;
}
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
assert (_owner == Self, "invariant") ;
assert (SelfNode->_thread == Self, "invariant") ;
if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
//正常情况走此分支,将SelfNode从_EntryList中移除
//默认配置下,cxq链表中的节点会被转移到EntryList链表中,状态就置为TS_ENTER
ObjectWaiter * nxt = SelfNode->_next ;
ObjectWaiter * prv = SelfNode->_prev ;
if (nxt != NULL) nxt->_prev = prv ;
if (prv != NULL) prv->_next = nxt ;
if (SelfNode == _EntryList ) _EntryList = nxt ;
assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
TEVENT (Unlink from EntryList) ;
} else {
guarantee (SelfNode->TState == ObjectWaiter::TS_CXQ, "invariant") ;
ObjectWaiter * v = _cxq ;
assert (v != NULL, "invariant") ;
//如果v不等于SelfNode直接进入下面的分支,如果等于执行后面的CAS逻辑,将_cxq修改为next,如果修改失败会进入if分支
if (v != SelfNode || Atomic::cmpxchg_ptr (SelfNode->_next, &_cxq, v) != v) {
if (v == SelfNode) {
assert (_cxq != v, "invariant") ;
//修改失败,说明有其他线程修改了cxq,这里重新获取cxq
v = _cxq ; // CAS above failed - start scan at head of list
}
ObjectWaiter * p ;
ObjectWaiter * q = NULL ;
//遍历找到SelfNode,将其移除
for (p = v ; p != NULL && p != SelfNode; p = p->_next) {
q = p ;
assert (p->TState == ObjectWaiter::TS_CXQ, "invariant") ;
}
assert (v != SelfNode, "invariant") ;
assert (p == SelfNode, "Node not found on cxq") ;
assert (p != _cxq, "invariant") ;
assert (q != NULL, "invariant") ;
assert (q->_next == p, "invariant") ;
q->_next = p->_next ;
}
TEVENT (Unlink from cxq) ;
}
//prev和next属性置为null
SelfNode->_prev = (ObjectWaiter *) 0xBAD ;
SelfNode->_next = (ObjectWaiter *) 0xBAD ;
SelfNode->TState = ObjectWaiter::TS_RUN ;
}
5、JavaThreadBlockedOnMonitorEnterState / OSThreadContendState / ThreadBlockInVM
JavaThreadBlockedOnMonitorEnterState继承自JavaThreadStatusChanger,该类在构造函数中保存线程原来的状态,并在析构函数中恢复线程的原来的运行状态,其实现如下:
JavaThreadStatusChanger(JavaThread* java_thread,
java_lang_Thread::ThreadStatus state) : _old_state(java_lang_Thread::NEW) {
save_old_state(java_thread);
set_thread_status(state);
}
JavaThreadStatusChanger(JavaThread* java_thread) : _old_state(java_lang_Thread::NEW) {
save_old_state(java_thread);
}
~JavaThreadStatusChanger() {
set_thread_status(_old_state);
}
void save_old_state(JavaThread* java_thread) {
_java_thread = java_thread;
_is_alive = is_alive(java_thread);
if (is_alive()) {
//获取线程状态
_old_state = java_lang_Thread::get_thread_status(_java_thread->threadObj());
}
}
void set_thread_status(java_lang_Thread::ThreadStatus state) {
if (is_alive()) {
//如果线程是存活的则设置线程状态
set_thread_status(_java_thread, state);
}
}
static void set_thread_status(JavaThread* java_thread,
java_lang_Thread::ThreadStatus state) {
java_lang_Thread::set_thread_status(java_thread->threadObj(), state);
}
bool is_alive() {
return _is_alive;
}
JavaThreadBlockedOnMonitorEnterState同样是借助构造和析构函数完成状态变更的,其实现如下:
JavaThreadBlockedOnMonitorEnterState(JavaThread *java_thread, ObjectMonitor *obj_m) :
_stat(NULL), _active(false), JavaThreadStatusChanger(java_thread) {
assert((java_thread != NULL), "Java thread should not be null here");
_active = false;
if (is_alive() && ServiceUtil::visible_oop((oop)obj_m->object()) && obj_m->contentions() > 0) {
_stat = java_thread->get_thread_stat();
//contended_enter_begin方法会修改线程状态为BLOCKED_ON_MONITOR_ENTER
_active = contended_enter_begin(java_thread);
}
}
~JavaThreadBlockedOnMonitorEnterState() {
//会调用父类的析构函数将线程状态还原回去
if (_active) {
_stat->contended_enter_end();
}
}
//返回抢占这个锁的线程数
inline intptr_t ObjectMonitor::contentions() const {
return _count;
}
其中涉及的线程状态的定义如下:

OSThreadContendState的实现类似,用于修改原生的本地线程的状态,其实现如下:

其中原生线程的状态的定义如下:
ThreadBlockInVM用于修改java线程的状态,此状态是JVM内部使用的,用于实现让当前线程进入安全点,其实现如下:

上述调用中设置external_suspend的JVM_SuspendThread方法就是Thread类suspend0本地方法的实现,是Thread的suspend方法的底层实现,该方法是一个已经被废弃的方法,因为该方法容易导致死锁,如下:

- AddWaiter / DequeueWaiter /DequeueSpecificWaiter
AddWaiter方法用于将目标ObjectWaiter加入到双向循环链表中,DequeueWaiter用于移除链表头_WaitSet对应的节点,该节点是最早加入到链表的,即按照加入链表的先后顺序依次从链表中移除,DequeueSpecificWaiter用于移除指定节点,不一定是_WaitSet对应的节点。其实现如下:
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
//将目标节点放入一个双向的循环链表中
if (_WaitSet == NULL) {
//如果_WaitSet还是空的,当前节点就是第一个
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
//如果_WaitSet不是空的,将其插入到head的prev节点上
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
//注意tail在初始状态下就是head,所以插入第二个节点时修改next属性,实际是修改head的next属性
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
//如果_WaitSet为不空
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev != NULL, "node already removed from list");
assert(node->_next != NULL, "node already removed from list");
//从_WaitSet中取出一个ObjectWaiter,实际就是取出_WaitSet对应的head节点,该
//节点是最早加入到链表中的
ObjectWaiter* next = node->_next;
if (next == node) {
//_WaitSet只有一个节点
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else {
//将node从链表中移除
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
//如果移除的就是_WaitSet,将next置为_WaitSet
_WaitSet = next;
}
}
//相关属性置为null
node->_next = NULL;
node->_prev = NULL;
}
上述逻辑可以结合以下用例来理解,如下:
//依次添加node,node2,node3,node4,node5 5个节点时各节点的引用关系
prev next
----------------
node node node
=================
node2 node node2
node node2 node
=================
node node2 node3
node3 node node2
node2 node3 node
=================
node2 node3 node4
node4 node node2
node3 node4 node
node node2 node3 //node2节点的引用关系不变
=================
node3 node4 node5
node5 node node2
node4 node5 node
node node2 node3 //node2和node3节点的引用关系不变
node2 node3 node4
删除node节点后node2作为_WaitSet
================
node4 node5 node2
node5 node2 node3
node node2 node3 //node2和node3节点的引用关系不变
node2 node3 node4
2、wait
wait方法是Object的wait方法的底层实现,该方法会创建一个ObjectWaiter并加入到链表中,然后释放占有的锁,让当前线程休眠,当当前线程因为等待超时,被中断或者被其他线程唤醒时就再次抢占锁,抢占逻辑就是之前的enter方法,抢占成功后wait方法退出。
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
//获取当前线程
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
//初始化配置,如果已经初始化则返回
DeferredInitialize () ;
//检查当前线程是否获取了锁,如果没有则抛出异常
CHECK_OWNER();
EventJavaMonitorWait event;
//如果线程被中断了且不是因为未处理异常导致的
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
//发布JVMTI事件
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, false);
}
if (event.should_commit()) {
post_monitor_wait_event(&event, 0, millis, false);
}
TEVENT (Wait - Throw IEX) ;
//抛出异常
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
//设置属性,记录当前线程等待的ObjectMonitor
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
//创建ObjectWaiter,将其状态置为TS_WAIT
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
//获取操作ObjectWaiter链表的锁_WaitSetLock
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
//将当前节点插入到ObjectWaiter链表中
AddWaiter (&node) ;
//释放锁
Thread::SpinRelease (&_WaitSetLock) ;
//SyncFlags默认为0
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // record the old recursion count
//等待的线程数加1
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
//释放该锁
exit (true, Self) ; // exit the monitor
guarantee (_owner != Self, "invariant") ;
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
//修改线程状态为OBJECT_WAIT
OSThreadWaitState osts(osthread, true);
{
//修改线程状态从_thread_in_vm到_thread_blocked
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) { //_notified为0表示没有其他线程唤醒
//将当前线程park,让其处于休眠状态
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
//当前线程从park状态被唤醒了
//ExitSuspendEquivalent默认返回false
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} //退出代码块时会切换线程状态 _thread_blocked -> _thread_in_vm
//如果是线程被中断或者等待超时则状态是TS_WAIT,如果是被nofity唤醒的则应该是TS_RUN
if (node.TState == ObjectWaiter::TS_WAIT) {
//获取锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
//如果是TS_WAIT,则将其从链表中移除
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
//将状态置为TS_RUN
node.TState = ObjectWaiter::TS_RUN ;
}
//释放锁
Thread::SpinRelease (&_WaitSetLock) ;
}
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
//让修改立即生效
OrderAccess::loadload() ;
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
}
if (event.should_commit()) {
post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
//重新获取该锁
enter (Self) ;
} else {
//该ObjectWaiter已经被唤醒了,但是等待获取锁的时候线程被中断了
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
//如果不是因为notify被唤醒
if (!WasNotified) {
// 可能因为等待超时或者Thread.interrupt()被唤醒
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
//如果线程中断则抛出异常
THROW(vmSymbols::java_lang_InterruptedException());
}
}
}
#define CHECK_OWNER() \
do { \
if (THREAD != _owner) { \
//如果owner属性不是当前线程
if (THREAD->is_lock_owned((address) _owner)) { \
//如果owner属性位于当前线程栈帧中,说明该锁是由轻量级锁膨胀来的
//修改owner属性为当前线程
_owner = THREAD ; /* Convert from basiclock addr to Thread addr */ \
_recursions = 0; \
OwnerIsThread = 1 ; \
} else { \
//当前线程没有获取锁,则抛出异常
TEVENT (Throw IMSX) ; \
THROW(vmSymbols::java_lang_IllegalMonitorStateException()); \
} \
} \
} while (false)
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
// 判断其是否被中断,如果是且clear_interrupted为true,则将其中断标识清除掉
return os::is_interrupted(thread, clear_interrupted);
}
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//获取关联的原生线程
OSThread* osthread = thread->osthread();
//获取其是否被中断
bool interrupted = osthread->interrupted();
if (interrupted && clear_interrupted) {
//清除被中断标识
osthread->set_interrupted(false);
}
return interrupted;
}
//ReenterI和EnterI的逻辑基本相同,用于获取对象锁
void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {
assert (Self != NULL , "invariant") ;
assert (SelfNode != NULL , "invariant") ;
assert (SelfNode->_thread == Self , "invariant") ;
assert (_waiters > 0 , "invariant") ;
//校验目标对象的对象头就是当前ObjectMonitor的指针
assert (((oop)(object()))->mark() == markOopDesc::encode(this) , "invariant") ;
assert (((JavaThread *)Self)->thread_state() != _thread_blocked, "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
int nWakeups = 0 ;
for (;;) {
ObjectWaiter::TStates v = SelfNode->TState ;
//校验状态
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
assert (_owner != Self, "invariant") ;
//尝试获取锁
if (TryLock (Self) > 0) break ;
//尝试自旋获取锁
if (TrySpin (Self) > 0) break ;
TEVENT (Wait Reentry - parking) ;
{
//修改线程状态
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
jt->set_suspend_equivalent();
//SyncFlags默认是0
if (SyncFlags & 1) {
Self->_ParkEvent->park ((jlong)1000) ;
} else {
Self->_ParkEvent->park () ;
}
// were we externally suspended while we were waiting?
for (;;) {
//ExitSuspendEquivalent默认返回false
if (!ExitSuspendEquivalent (jt)) break ;
if (_succ == Self) { _succ = NULL; OrderAccess::fence(); }
jt->java_suspend_self();
jt->set_suspend_equivalent();
}
}
//尝试获取锁
if (TryLock(Self) > 0) break ;
TEVENT (Wait Reentry - futile wakeup) ;
++ nWakeups ;
// Assuming this is not a spurious wakeup we'll normally
// find that _succ == Self.
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a contending thread
// *must* retry _owner before parking.
OrderAccess::fence() ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
}//for循环结束
//for循环结束,已经获取了锁
assert (_owner == Self, "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
//从链表中移除
UnlinkAfterAcquire (Self, SelfNode) ;
if (_succ == Self) _succ = NULL ;
assert (_succ != Self, "invariant") ;
//修改状态为TS_RUN
SelfNode->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ; // see comments at the end of EnterI()
}
3、notify
notify方法时Object的notify方法的底层实现,用于“唤醒”WaitSet链表头对应的线程,即最早加入到该链表的等待线程,注意在默认配置下(默认的处理策略是2,不同策略的处理逻辑不同),并不会直接unpark该线程,而是将其加入到cxq链表的前面,相当于调用了一次EnterI方法。加入到cxq链表后,当关联的锁被释放了就会unpark该线程,注意只是唤醒,然后该线程调用enter方法抢占锁,因此此时可能有其他线程在同时调用enter方法抢占锁。
void ObjectMonitor::notify(TRAPS) {
//检查当前线程是否占用该锁,如果没有抛出异常
CHECK_OWNER();
if (_WaitSet == NULL) {
//如果没有等待的线程则退出
TEVENT (Empty-Notify) ;
return ;
}
//Knob_MoveNotifyee属性默认是2
int Policy = Knob_MoveNotifyee ;
//获取锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
//将链表头元素移除并返回
ObjectWaiter * iterator = DequeueWaiter() ;
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
//将状态置为TS_ENTER
iterator->TState = ObjectWaiter::TS_ENTER ;
}
//_notified置为1表示该ObjectWaiter被唤醒了
iterator->_notified = 1 ;
Thread * Self = THREAD;
//记录当前线程ID
iterator->_notifier_tid = Self->osthread()->thread_id();
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
//根据不同的策略执行不同的处理
if (Policy == 0) { //将iterator插入到_EntryList头元素的前面
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { //将iterator插入到_EntryList链表的末尾
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
ObjectWaiter * Tail ;
//不断遍历找到链表最后一个元素
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { //将iterator插入到_cxq头元素的前面
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { //将iterator插入到_cxq链表末尾的后面
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
//往后遍历找到最后一个元素
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
//将等待的线程直接unpark唤醒
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
//修改线程状态,记录锁竞争开始
iterator->wait_reenter_begin(this);
}
} //if结束
//释放锁
Thread::SpinRelease (&_WaitSetLock) ;
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
//增加计数
ObjectMonitor::_sync_Notifications->inc() ;
}
}
4、notifyAll
notifyAll方法就是Object的notifyAll方法的底层实现,对单个ObjectWaiter其处理逻辑跟notify是一致的,相比notify的实现就是增加了一个for循环,会不断的从_WaitSet链表中移除头元素,然后执行notify的处理逻辑,直到_WaitSet链表为空退出循环。
void ObjectMonitor::notifyAll(TRAPS) {
//检查当前线程是否占用该锁,如果没有抛出异常
CHECK_OWNER();
ObjectWaiter* iterator;
if (_WaitSet == NULL) {
//如果没有等待的线程则退出
TEVENT (Empty-NotifyAll) ;
return ;
}
//Knob_MoveNotifyee属性默认是2
int Policy = Knob_MoveNotifyee ;
int Tally = 0 ;
//获取锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
//if变成for循环
for (;;) {
//获取头部元素,头部节点为最早加入到链表中的节点
iterator = DequeueWaiter () ;
//如果为空则终止循环
if (iterator == NULL) break ;
TEVENT (NotifyAll - Transfer1) ;
//增加计数
++Tally ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
//_notified置为1表示该ObjectWaiter被唤醒了
iterator->_notified = 1 ;
Thread * Self = THREAD;
//记录当前线程ID
iterator->_notifier_tid = Self->osthread()->thread_id();
if (Policy != 4) {
//将状态置为TS_ENTER
iterator->TState = ObjectWaiter::TS_ENTER ;
}
//根据不同的策略执行不同的处理
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { //将iterator插入到_EntryList头元素的前面
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { //将iterator插入到_EntryList链表的末尾
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { //将iterator插入到_cxq头元素的前面
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
if (Policy == 3) { //将iterator插入到_cxq链表末尾的后面
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
//将等待的线程直接unpark唤醒
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
//修改线程状态,记录锁竞争开始
iterator->wait_reenter_begin(this);
}
}//for循环结束
//释放锁
Thread::SpinRelease (&_WaitSetLock) ;
if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
//增加计数
ObjectMonitor::_sync_Notifications->inc(Tally) ;
}
}
5、exit
exit用于释放锁,即将owner属性置为NULL,默认配置下会通过unpark唤醒_EntryList链表头部节点对应的等待线程,如果EntryList链表为空,则将cxq链表中的元素加入到EntryList链表中且顺序保持不变,即优先唤醒最近等待的线程。注意exit方法并不会因为安全点同步而阻塞,exit方法退出后继续执行,无论解释执行或者编译执行则会都被阻塞;exit方式释放锁后,被唤醒的线程占用了该锁,在enter方法获取锁准备切换线程状态时会被阻塞。
//第一个参数not_suspended用于debug的,可以忽略
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {
//如果owner位于当前线程调用栈帧,说明该锁是轻量级锁膨胀来的
assert (_recursions == 0, "invariant") ;
//修改owner属性
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
//其他线程占用该锁,直接返回
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
if (_recursions != 0) {
//不等于0说明是嵌套加锁,将_recursions减1即可返回
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
// SyncFlags默认值是0
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
for (;;) {
assert (THREAD == _owner, "invariant") ;
//Knob_ExitPolicy默认值是0
if (Knob_ExitPolicy == 0) {
//将_owner属性置为NULL,释放锁,如果某个线程正在自旋抢占该锁,则会抢占成功
//即这种策略会优先保证通过自旋抢占锁的线程获取锁,而其他处于等待队列中的线程则靠后
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
//让修改立即生效
OrderAccess::storeload() ; // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
//如果_EntryList或者cxq链表都是空的,则直接返回
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
//如果_EntryList或者cxq链表不是空的,则原子的设置owner属性为当前线程,尝试抢占锁
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
//抢占失败则返回,等占用该锁的线程释放后再处理链表中的等待线程
return ;
}
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ;
// Ratify the previously observed values.
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
//有可能cxq插入了一个新节点,导致上面的if不成立,需要重新获取锁
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
//如果_EntryList或者cxq链表不是空的则不释放锁,避免二次抢占锁,即优先处理等待队列中的线程
TEVENT (Inflated exit - complex egress) ;
}
}
guarantee (_owner == THREAD, "invariant") ;
ObjectWaiter * w = NULL ;
//Knob_QMode的默认值是0
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
//通过unpark唤醒cxq对应的线程,唤醒后会将cxq从链表中移除
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
//将cxq链表中的元素插入到_EntryList链表的末尾
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
//将_cxq原子的置为NULL,如果失败则更新w,重新尝试直到成功为止
//置为NULL后,如果有新的节点插入进来就形成了一个新的cxq链表
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
//遍历cxq中的所有节点,将其置为TS_ENTER
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
ObjectWaiter * Tail ;
//遍历_EntryList找到末尾元素,将w插入到后面
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
}
if (QMode == 4 && _cxq != NULL) {
//将cxq链表中的元素插入到_EntryList链表的头部
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
//将_cxq原子的置为NULL,如果失败则更新w,重新尝试直到成功为止
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
//遍历cxq中的所有节点,将其置为TS_ENTER
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
//插入到_EntryList的头部
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
}
w = _EntryList ;
if (w != NULL) {
//通过unpark唤醒w对应的线程,唤醒后会该线程会负责将w从EntryList链表中移除
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
//如果_EntryList为空
w = _cxq ;
if (w == NULL) continue ;//如果cxq为空则重新循环,不会进入此分支
//cxq不为NULL
for (;;) {
assert (w != NULL, "Invariant") ;
//将cxq原子的修改为NULL
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
assert (w != NULL , "invariant") ;
assert (_EntryList == NULL , "invariant") ;
if (QMode == 1) {
//遍历cxq中的元素将其加入到_EntryList中,注意顺序跟cxq中是返的
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
//遍历cxq中的元素将其加入到_EntryList中,注意此时cxq链表的头元素被赋值给EntryList
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
//cxq中的元素是通过next属性串联起来的,prev属性没有,此处遍历加上prev属性
//当EntryList头元素被移除了是取next属性作为EntryList
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
//唤醒w对应的线程
ExitEpilog (Self, w) ;
return ;
}
}
}
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
//Knob_SuccEnabled默认是1,succ表示很有可能占用该锁的线程
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
//将owner属性置为NULL
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
//唤醒目标线程
Trigger->unpark() ;
if (ObjectMonitor::_sync_Parks != NULL) {
//增加计数
ObjectMonitor::_sync_Parks->inc() ;
}
}
6、try_enter / complete_exit
try_enter用于实现Unsafe类的tryMonitorEnter方法,会尝试获取锁,如果获取失败则直接返回false;complete_exit用于释放目标锁,在嵌套加锁的情形下只需要调用complete_exit一次即可,如果是exit则需要调用多次。
bool ObjectMonitor::try_enter(Thread* THREAD) {
if (THREAD != _owner) {
if (THREAD->is_lock_owned ((address)_owner)) {
//如果该线程已经占有了该锁,该锁由轻量级锁膨胀而来
assert(_recursions == 0, "internal state error");
//修改owner等属性
_owner = THREAD ;
_recursions = 1 ;
OwnerIsThread = 1 ;
return true;
}
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
//原子的设置owner属性,修改失败
return false;
}
//修改成功
return true;
} else {
//当前线程已经占有该锁,将记录嵌套加锁的计数器加1
_recursions++;
return true;
}
}
intptr_t ObjectMonitor::complete_exit(TRAPS) {
Thread * const Self = THREAD;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize();
if (THREAD != _owner) {
if (THREAD->is_lock_owned ((address)_owner)) {
//如果是轻量级锁膨胀来的
assert(_recursions == 0, "internal state error");
_owner = THREAD ; /* Convert from basiclock addr to Thread addr */
_recursions = 0 ;
OwnerIsThread = 1 ;
}
}
guarantee(Self == _owner, "complete_exit not owner");
intptr_t save = _recursions; // record the old recursion count
//_recursions置为0,即嵌套加锁的情形下不需要多次调用exit了
_recursions = 0; // set the recursion level to be 0
//释放该锁
exit (true, Self) ; // exit the monitor
guarantee (_owner != Self, "invariant");
return save;
}
7、总结
ObjectMonitor维护了三个ObjectWaiter链表,分别是cxq链表、EntryList链表和WaitSet链表,对应链表中ObjectWaiter的状态分别是TS_CXQ,TS_ENTER和TS_WAIT。调用enter方法时,如果自旋获取锁失败就会创建一个ObjectWaiter并加入到cxq链表中,某个已经获取锁的线程调用wait方法会创建一个ObjectWaiter并加入到WaitSet链表中,当某个线程调用notify/notifyAll方法“唤醒”该线程时,会将该ObjectWaiter从WaitSet链表中移除然后加入到cxq链表头。当某个获取锁的线程释放锁时,就会唤醒EntryList链表头对应的线程,如果EntryList链表为空,则将此时的cxq链表中的元素整体转移到EntryList链表中,然后同样的唤醒EntryList链表头对应的线程,被唤醒后该线程一样调用enter方法抢占锁。