概述
Java中的锁,object的wait、sleep都能够堵塞线程,它们到底是如何实现的呢?
源码分析
Java中的可重入锁都是通过LockSupport调用Unsafe的park、unpark方法实现的:
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
oop obj = thread->current_park_blocker();
if (event.should_commit()) {
event.set_klass(obj ? obj->klass() : (klassOop)NULL);
event.set_timeout(time);
event.set_address(obj ? (TYPE_ADDRESS) (uintptr_t) obj : 0);
event.commit();
}
UNSAFE_END
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
Parker* p = NULL;
if (jthread != NULL) {
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Grab lock if apparently null or using older version of library
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */
p->unpark();
}
UNSAFE_END
可以看到底层都是通过调用thread->parker()实现的,而根据之前Thread的源码分析,Thread.sleep也是采用类似方法实现的;
Pakrer对象
在JavaThread中定义了成员变量:
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
void JavaThread::initialize() {
// Initialize fields
_parker = Parker::Allocate(this) ;
}
这里的_parker是Parker实例,继承自os::PlatformParker ,其park方法实现如下:
void os::PlatformEvent::park() {
//_Event是个int变量,如果CAS更新成功,即成功将_Event减1,则退出循环
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
//v=_Event,v=0表示无许可,则需要堵塞等待获得许可;
if (v == 0) {
int status = pthread_mutex_lock(_mutex);//获取锁
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
//等待许可,调用pthread_cond_wait进行等待
while (_Event < 0) {
//pthread_cond_wait会加入等待队列,同时释放_mutex锁,
//等待signal方法唤醒,唤醒之后需要重新获取_mutex锁,方法才能返回
status = pthread_cond_wait(_cond, _mutex);
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
//获取许可之后,设置可用许可数为0;由此可见许可数最大为1
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);//释放锁
assert_status(status == 0, status, "mutex_unlock");
OrderAccess::fence();//内存屏障语句
}
guarantee (_Event >= 0, "invariant") ;
}
void os::PlatformEvent::unpark() {
//使用原子方法xchg将1放入寄存器,与_Event所指的内容交换,即_Event=1,然后返回_Event原先的值,
//如果返回值大于等于0,表示有许可有用,方法直接返回;
if (Atomic::xchg(1, &_Event) >= 0) return;
//如果原来的_Event小于0,说明有park方法进入了pthread_cond_wait堵塞
int status = pthread_mutex_lock(_mutex);//获取锁
assert_status(status == 0, status, "mutex_lock");
int AnyWaiters = _nParked;
assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
//NPTL存在瑕疵,当pthread_cond_timedwait() 方法时间参数为过去时间,
//会导致_cond变量被破坏或者线程被hang住;
if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
AnyWaiters = 0;
//调用signal唤醒pthread_cond_wait调用线程,不判断方法执行结果
pthread_cond_signal(_cond);
}
//然后释放_mutex锁
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
if (AnyWaiters != 0) {
// //调用signal唤醒pthread_cond_wait调用线程,并判断方法执行结果
status = pthread_cond_signal(_cond);
assert_status(status == 0, status, "cond_signal");
}
}
这个地方要说明下,如果WorkAroundNPTLTimedWaitHang=true,会先调用signal,然后释放锁;如果WorkAroundNPTLTimedWaitHang=false,会先释放锁,然后调用signal;
- 先signal,后释放锁 :signal之后,等待线程可以马上运行,但由于无法获取锁,会马上进入waiting状态;
- 先释放锁,后signal:释放锁之后,如果有等待线程,可能pthread_cond_signal还没运行就发生了线程切换;在极少的情况下,由于pthread_cond_wait有spurious wakeup(伪唤醒)问题,可能导致park方法提前返回,这个时候使用者需要判断cond的状态,再次调用park;
关于WorkAroundNPTLTimedWaitHang还是有些疑问,后续继续探讨;
Atomic方法
在上面的源码中,使用到了Atomic的xchg和cmpxchg方法,这两个方法是采用汇编实现的:
汇编方法的语法如下:
asm ( assembler template
: output operands (optional)
: input operands (optional)
: clobbered registers list (optional)
);
output operands和inpupt operands指定参数,它们从左到右依次排列,用','分割,编号从0开始;
inline jint Atomic::xchg (jint exchange_value, volatile jint* dest) {
__asm__ volatile ( "xchgl (%2),%0"
: "=r" (exchange_value)
: "0" (exchange_value), "r" (dest)
: "memory");
return exchange_value;
}
将exchange_value(%0)的值放入通用寄存器,与dest(%2)所指的内容进行交换,返回dest指针原指向内容的值;
- %0为exchange_value,%1为exchange_value,%2为dest;
- "r"表示将dest的值读到一个通用寄存器;
- "0"表示和%0使用同样的通用寄存器,此处表示将exchange_value值读入通用寄存器;
- "=r"表示将结果写入到exchange_value,而且要使用通用寄存器,由于通用寄存器的内容已经被设置为dest所指向的内容,因此exchange_value等于原dest所指向的内容;
- asm指示编译器在此插入汇编语句;
- volatile告诉编译器,严禁将此处的汇编语句与其它的语句重组合优化,即原原本本按原来的样子处理这里的汇编;
- memory强制gcc编译器假设RAM所有内存单元均被汇编指令修改,这样cpu中的registers和cache中已缓存的内存单元中的数据将作废。cpu将不得不在需要的时候重新读取内存中的数据。
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
static inline bool is_MP() {
assert(_processor_count > 0, "invalid processor count");
return _processor_count > 1;
}
- mp表示是否属于多核cpu环境,如果是则LOCK_IF_MP会插入lock指令;
- %0为exchange_value,%1为exchange_value,%2为compare_value,%3为dest,%4为mp;
- "a" (compare_value)表示将compare_value读入eax寄存器,与%3(dest)进行比较,如果相等则将%1(exchange_value)写入dest;
- "=a" (exchange_value)表示将eax寄存器的内容写入到exchange_value;