一、java中如何应用线程
- 实现Runnbale接口
- 继承Thread类(本质上是Runnable接口的实现)
- Callable、Future(带返回值的线程)
- ThreadPool线程池
二、线程的生命周期
-
生命周期图解
首先通过下面的流程图整体查看下线程生命周期的几个状态值:
-
状态详解:
线程有以下六个状态- NEW
A thread that has not yet started is in this state.(一个初始化但还未启动线程处于这种状态)
- RUNNABLE
A thread executing in the Java virtual machine is in this state.(一个在jvm中执行的线程处于这种状态)
- BLOCKED
A thread that is blocked waiting for a monitor lock is in this state.(一个等待监视器锁时被阻塞的线程处于这种状态)
- WAITING
A thread that is waiting indefinitely for another thread to perform a particular action is in this state.(一个无限期等待另一个线程执行特定操作的线程处于这种状态)
- TIMED_WAITING
A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.(一个在指定时间内等待另一个线程执行特定操作的线程处于这种状态)
- TERMINATED
A thread that has exited is in this state.(一个已经退出的线程处于这种状态)
代码演示:
package com.laozhao.thread;
import java.util.concurrent.TimeUnit;
public class ThreadStatusDemo {
public static void main(String[] args) {
// RUNNABLE
new Thread(() -> {
while (true) {
System.out.println("thread runnable");
}
}, "thread_runnable").start();
// TIMED_WAITING
new Thread(() -> {
while (true) {
System.out.println("thread sleep");
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "thread_time_waiting").start();
// WAITING
new Thread(() -> {
while (true) {
synchronized (ThreadStatusDemo.class) {
try {
ThreadStatusDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "thread_waiting").start();
// TIMED_WAITING
new Thread(new BlockedDemo(), "thread_blocked_01").start();
// BLOCKED
new Thread(new BlockedDemo(), "thread_blocked_02").start();
}
static class BlockedDemo extends Thread {
@Override
public void run() {
synchronized (BlockedDemo.class) {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
通过以下几个步骤就可以查看类的栈信息(基于idea)
在ThreadStatusDemo类中右键,选择Recompile 'ThreadStatusDemo.java'
class文件夹下右键,选择open in terminal,打开终端
输入jps查看java进程
- 此处能看到ThreadStatusDemo的进程id是82457, 然后执行jstack 82457查看栈信息,主要如下:
"thread_blocked_02" #17 prio=5 os_prio=31 tid=0x00007fcf3185b000 nid=0x5503 waiting for monitor entry [0x000070000b013000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.laozhao.thread.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:64)
- waiting to lock <0x0000000740008638> (a java.lang.Class for com.laozhao.thread.ThreadStatusDemo$BlockedDemo)
at java.lang.Thread.run(Thread.java:748)
"thread_blocked_01" #15 prio=5 os_prio=31 tid=0x00007fcf3185a800 nid=0x3907 waiting on condition [0x000070000af10000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.laozhao.thread.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:64)
- locked <0x0000000740008638> (a java.lang.Class for com.laozhao.thread.ThreadStatusDemo$BlockedDemo)
at java.lang.Thread.run(Thread.java:748)
"thread_waiting" #12 prio=5 os_prio=31 tid=0x00007fcf32184000 nid=0x3c03 in Object.wait() [0x000070000ae0d000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000074000a700> (a java.lang.Class for com.laozhao.thread.ThreadStatusDemo)
at java.lang.Object.wait(Object.java:502)
at com.laozhao.thread.ThreadStatusDemo.lambda$main$2(ThreadStatusDemo.java:33)
- locked <0x000000074000a700> (a java.lang.Class for com.laozhao.thread.ThreadStatusDemo)
at com.laozhao.thread.ThreadStatusDemo$$Lambda$3/1329552164.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"thread_time_waiting" #11 prio=5 os_prio=31 tid=0x00007fcf3207e800 nid=0x3d03 waiting on condition [0x000070000ad0a000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.laozhao.thread.ThreadStatusDemo.lambda$main$1(ThreadStatusDemo.java:21)
at com.laozhao.thread.ThreadStatusDemo$$Lambda$2/668386784.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"thread_runnable" #10 prio=5 os_prio=31 tid=0x00007fcf328a5800 nid=0x3703 runnable [0x000070000ac07000]
java.lang.Thread.State: RUNNABLE
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
- locked <0x0000000740067a00> (a java.io.BufferedOutputStream)
at java.io.PrintStream.write(PrintStream.java:482)
- locked <0x000000074000cfb0> (a java.io.PrintStream)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104)
- locked <0x000000074000a918> (a java.io.OutputStreamWriter)
at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
at java.io.PrintStream.newLine(PrintStream.java:546)
- eliminated <0x000000074000cfb0> (a java.io.PrintStream)
at java.io.PrintStream.println(PrintStream.java:807)
- locked <0x000000074000cfb0> (a java.io.PrintStream)
at com.laozhao.thread.ThreadStatusDemo.lambda$main$0(ThreadStatusDemo.java:12)
at com.laozhao.thread.ThreadStatusDemo$$Lambda$1/764977973.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
通过虚拟机栈信息,我们可以清楚的看到
- thread_blocked_02处于BLOCKED状态,因为它和thread_blocked_01争抢同一个锁失败。
- thread_blocked_01处于TIMED_WAITING状态,是因为我们在线程中调用了sleep方法
- thread_waiting处于WAITING状态
- thread_time_waiting处于TIMED_WAITING状态
- thread_runnable处于RUNNABLE状态等。
三、线程的启动、终止和复位
- 线程的启动
线程的启动调用的是start()方法而非run()方法,我们首先查看下start方法的源码如下:
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0(); // 调用native方法
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();
可以看到start方法实际调用的是native的start0()来启动一个线程,start0()是在Thread的静态代码块中来注册的,代码如下:
public class Thread implements Runnable {
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}
...
}
registerNatives 的本地方法的定义在文件 Thread.c ,主要代码如下:
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
代码中可以看到start0()实际调用的是JVM_StartThread方法,顾名思义这个方法是启动一个线程,而启动线程的时候一定会调用java中定义的run方法,我们继续通过jvm代码去验证,在jvm.cpp中我们看到以下代码:
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
...
native_thread = new JavaThread(&thread_entry, sz); // 创建线程
...
Thread::start(native_thread); // 启动线程
JVM_END
可以看到线程中主要做了两件事,一个是创建了一个java线程,另一个是启动了这个java线程。
- 首先我们查看创建线程的时候执行了哪些操作?在thread.cpp中我们找到创建线程的构造方法:
// entry_point 函数名称
// stack_sz 进程内已有的线程数量
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
#if INCLUDE_ALL_GCS
, _satb_mark_queue(&_satb_mark_queue_set),
_dirty_card_queue(&_dirty_card_queue_set)
#endif // INCLUDE_ALL_GCS
{
if (TraceThreadEvents) {
tty->print_cr("creating thread %p", this);
}
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
os::create_thread(this, thr_type, stack_sz); // 调用操作系统的创建线程方法来创建线程
_safepoint_visible = false;
}
从代码中可以清楚的看到,创建线程的方法中主要是根据操作系统的create_thread方法来创建java线程
- 接着查看启动线程时执行了哪些操作?在thread.cpp中我们看到如下代码:
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
// 设置线程的状态为RUNNABLE
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread); // 调用操作系统方法来启动线程
}
}
启动主要是设置线程的状态为RUNNABLE,然后调用操作系统的方法来启动线程
- 线程的终止
先来看下Thread类中提供的两个终止线程的方法stop和interrupt的源码
@Deprecated
public final void stop() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
checkAccess();
if (this != Thread.currentThread()) {
security.checkPermission(SecurityConstants.STOP_THREAD_PERMISSION);
}
}
// A zero status value corresponds to "NEW", it can't change to
// not-NEW because we hold the lock.
if (threadStatus != 0) {
resume(); // Wake up thread if it was suspended; no-op otherwise
}
// The VM can handle all thread states
stop0(new ThreadDeath());
}
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
private native void interrupt0();
从源码可以看到,stop方法已经被标识为过时。此方法结束线程时不能保证线程资源的正常释放,因此会导致程序可能出现一些不确定的状态,所以在jdk1.6.0之后提供了interrupt方法,当一个线程调用当前线程的interrupt方法,表示仅通知当前线程可以中断线程的执行了,至于什么时候中断,取决于当前线程自己。线程可以通过isInterrupted方法来判断是否被中断,简单看个例子:
package com.laozhao.thread;
import java.util.concurrent.TimeUnit;
public class InterruptDemo {
private static int count;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
// Thread.currentThread().isInterrupted() 未中断时返回false
// 调用interrupt方法之后返回true
while (!Thread.currentThread().isInterrupted()) {
count++;
}
System.out.println("count: " + count);
});
thread.start();
TimeUnit.SECONDS.sleep(1);
// 调用线程中断方法
thread.interrupt();
}
}
运行后控制台打印结果如下:
count: 525485037
Process finished with exit code 0
从上面的Thread类源码中可以看出interrupt方法最终调用的是native的interrupt0方法,在jvm的代码可以看到,实际调用的是JVM_interrupt方法,我们在jvm.cpp中到此方法查看代码如下:
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
实际是调用了线程的interrupt方法,并传入了当前线程,接着在thread.cpp中找到调用如下:
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
可以看出thread类的interrupt方法最终调用了操作系统的interrupt方法,我们以windows系统为例,在os_windows.cpp中找到调用代码如下:
void os::interrupt(Thread* thread) {
assert(!thread->is_Java_thread() || Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we post
// the interrupt event.
// 内存屏障
OrderAccess::release();
SetEvent(osthread->interrupt_event());
// For JSR166: unpark after setting status
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
至此我们看到,调用interrupt方法时的主要处理逻辑,包括设置interrupted为true,唤醒其他线程等操作。interrupted的值是维护在osThread.hpp文件中,感兴趣的可以自行查看。
- 线程的复位:
interrupted方法实际调用的是isInterrupted方法传入true,首先看下Thread类源码定义:
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
private native boolean isInterrupted(boolean ClearInterrupted);
我们既然对线程可以终止,当然也可以复位,因为终止线程的流程可能是延时的,我们可以通过其他方式来对线程进行复位.
- 调用线程的静态方法Thread.interrupted()对设置中断标识的线程复位:
package com.laozhao.thread;
import java.util.concurrent.TimeUnit;
public class InterruptedDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
// Thread.currentThread().isInterrupted() 未中断时返回false
// 调用interrupt方法之后返回true
// 调用interrupted方法之后返回false
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("线程复位前状态:" + Thread.currentThread().isInterrupted());
Thread.interrupted();
System.out.println("线程复位后状态:" + Thread.currentThread().isInterrupted());
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
// 调用线程中断方法
thread.interrupt();
}
}
从这个例子中可以看出,当调用线程的interrupt方法后,isInterrupted状态为true,当我们接着调用复位方法Thread.interrupted()以后线程的状态变为false。
- 通过InterruptedException异常对设置中断标识的线程复位:
package com.laozhao.thread;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
public class InterruptExceptionDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("收到中断请求,已复位");
e.printStackTrace();
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
System.out.println("线程的中断标识:" + thread.isInterrupted());
}
}
运行输出结果:
线程的中断标识:true
收到中断请求,已复位
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.laozhao.thread.InterruptExceptionDemo.lambda$main$0(InterruptExceptionDemo.java:12)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
上面的例子中简单的看到了线程复位的应用,我们从jvm源码中查看线程复位是怎么实现的?
首先调用的是thread.cpp中的is_interrupted方法,传入了当前线程和中断标识为true.
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}
紧接着调用了操作系统的is_interrupted方法,我们还是以windows系统的为例查看,在os_windows.cpp文件中找到代码如下:
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(!thread->is_Java_thread() || Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted();
// There is no synchronization between the setting of the interrupt
// and it being cleared here. It is critical - see 6535709 - that
// we only clear the interrupt state, and reset the interrupt event,
// if we are going to report that we were indeed interrupted - else
// an interrupt can be "lost", leading to spurious wakeups or lost wakeups
// depending on the timing
if (interrupted && clear_interrupted) {
osthread->set_interrupted(false);
ResetEvent(osthread->interrupt_event());
} // Otherwise leave the interrupted state alone
return interrupted;
}
可以看到先拿到系统线程,然后判断中断标识和传入的清楚标识,如果都为true,则设置中断标识为false。