Attach机制是JVM提供一种JVM进程间通信的能力,能让一个进程传命令给另外一个进程,并让它执行内部的一些操作。
比如:为了让另外一个JVM进程把线程dump出来,那么首先跑了一个jstack的进程,然后传了个pid的参数,告诉它要哪个进程进行线程dump,既然是两个进程,那肯定涉及到进程间通信,以及传输协议的定义,比如:要执行什么操作,传了什么参数等。
有时当我们感觉线程一直卡在某个地方,想知道卡在哪里,首先想到的是进行 线程dump,而常用的命令是jstack,我们就可以看到如下线程栈:
"Attach Listener" daemon prio=5 tid=0x00007fb0c6800800 nid=0x440b waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Service Thread" daemon prio=5 tid=0x00007fb0c584d800 nid=0x5303 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" daemon prio=5 tid=0x00007fb0c482e000 nid=0x5103 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" daemon prio=5 tid=0x00007fb0c482c800 nid=0x4f03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=5 tid=0x00007fb0c4815800 nid=0x4d03 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=5 tid=0x00007fb0c4813800 nid=0x3903 in Object.wait() [0x00000001187d2000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007aaa85568> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
- locked <0x00000007aaa85568> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:189)
"Reference Handler" daemon prio=5 tid=0x00007fb0c4800000 nid=0x3703 in Object.wait() [0x00000001186cf000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007aaa850f0> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:503)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
- locked <0x00000007aaa850f0> (a java.lang.ref.Reference$Lock)
"main" prio=5 tid=0x00007fb0c5800800 nid=0x1903 waiting on condition [0x0000000107962000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at Test.main(Test.java:5)
"VM Thread" prio=5 tid=0x00007fb0c583d800 nid=0x3503 runnable
"GC task thread#0 (ParallelGC)" prio=5 tid=0x00007fb0c401e000 nid=0x2503 runnable
"GC task thread#1 (ParallelGC)" prio=5 tid=0x00007fb0c401e800 nid=0x2703 runnable
"GC task thread#2 (ParallelGC)" prio=5 tid=0x00007fb0c401f800 nid=0x2903 runnable
...
"VM Periodic Task Thread" prio=5 tid=0x00007fb0c5845000 nid=0x5503 waiting on condition
在上面的Thread Dump日志中,出现了两个线程:“Attach Listener” 和 “Signal Dispatcher”,这两个线程便是Attach机制的关键。
那么JVM是如何启动这两个线程呢?JVM有很多线程主要在thread.cpp里的create_vm方法体里实现:
JvmtiExport::enter_live_phase();
// 1. Signal Dispatcher 需要在发布VMInit事件之前启动
os::signal_init();
// 2. Start Attach Listener 如果配置 +StartAttachListener; 否则会延迟启动
if (!DisableAttachMechanism) {
if (StartAttachListener || AttachListener::init_at_startup()) {
AttachListener::init();
}
}
其中JVM相关参数:DisableAttachMechanism,StartAttachListener ,ReduceSignalUsage 均默认是 false:
如上面create_vm源码所示,在启动的时候有可能不会创建AttachListener线程,那么 在上面Thread Stack日志中看到的AttachListener线程是怎么创建的呢,这个就要关注另外一个线程“Signal Dispatcher”了,顾名思义是处理信号的,这个线程是在JVM启动的时候肯定会创建的。
Signal Dispatcher 线程
在os.cpp中的 signal_init() 函数中,启动了signal dispatcher 线程,对signal dispather线程主要是用于处理信号,等待信号并且分发处理,可以详细看 signal_thread_entry 的方法
// 该方法用于Signal Dispatcher线程处理接受到的信号
static void signal_thread_entry(JavaThread* thread, TRAPS) {
os::set_priority(thread, NearMaxPriority);
while (true) {
int sig;
{
// FIXME : Currently we have not decieded what should be the status
// for this java thread blocked here. Once we decide about
// that we should fix this. 等待信号
sig = os::signal_wait();
}
if (sig == os::sigexitnum_pd()) {
// Terminate the signal thread
return;
}
switch (sig) {
case SIGBREAK: {
// Check if the signal is a trigger to start the Attach Listener - in that
// case don't print stack traces.
if (!DisableAttachMechanism && AttachListener::is_init_trigger()) {
continue;
}
// Print stack traces
// Any SIGBREAK operations added here should make sure to flush
// the output stream (e.g. tty->flush()) after output. See 4803766.
// Each module also prints an extra carriage return after its output.
VM_PrintThreads op;
VMThread::execute(&op);
VM_PrintJNI jni_op;
VMThread::execute(&jni_op);
VM_FindDeadlocks op1(tty);
VMThread::execute(&op1);
Universe::print_heap_at_SIGBREAK();
if (PrintClassHistogram) {
VM_GC_HeapInspection op1(gclog_or_tty, true /* force full GC before heap inspection */,
true /* need_prologue */);
VMThread::execute(&op1);
}
if (JvmtiExport::should_post_data_dump()) {
JvmtiExport::post_data_dump();
}
break;
}
default: {
// Dispatch the signal to java
HandleMark hm(THREAD);
klassOop k = SystemDictionary::resolve_or_null(vmSymbolHandles::sun_misc_Signal(), THREAD);
KlassHandle klass (THREAD, k);
if (klass.not_null()) {
JavaValue result(T_VOID);
JavaCallArguments args;
args.push_int(sig);
JavaCalls::call_static(
&result,
klass,
vmSymbolHandles::dispatch_name(),
vmSymbolHandles::int_void_signature(),
&args,
THREAD
);
}
if (HAS_PENDING_EXCEPTION) {
// tty is initialized early so we don't expect it to be null, but
// if it is we can't risk doing an initialization that might
// trigger additional out-of-memory conditions
if (tty != NULL) {
char klass_name[256];
char tmp_sig_name[16];
const char* sig_name = "UNKNOWN";
instanceKlass::cast(PENDING_EXCEPTION->klass())->
name()->as_klass_external_name(klass_name, 256);
if (os::exception_name(sig, tmp_sig_name, 16) != NULL)
sig_name = tmp_sig_name;
warning("Exception %s occurred dispatching signal %s to handler"
"- the VM may need to be forcibly terminated",
klass_name, sig_name );
}
CLEAR_PENDING_EXCEPTION;
}
}
}
}
}
可以看到通过 os::signal_wait(); 等待信号,而在Linux里是通过 sem_wait() 来实现,当接受到信号是SIGBREAK(在JVM里做了#define,其实就是SIGQUIT)的时候,就会触发 AttachListener::is_init_trigger()的执行初始化attach listener线程。
- 第一次收到信号,会开始初始化,当初始化成功,将会直接返回,而且 不返回任何线程stack的信息(通过socket file的操作返回),并且第二次将不在需要初始化。如果初始化不成功,将直接在控制台的outputstream中打印线程栈信息;
- 第二次收到信号,如果已经初始化过,将直接在控制台中打印线程的栈信息。如果没有初始化,继续初始化,走和第一次相同的流程;
比如:我们经常会 使用 kill -3 pid的操作打印出线程栈信息,可以看到具体的实现是在Signal Dispatcher 线程中完成的,因为kill -3 pid 并不会创建.attach_pid#pid文件,所以一直初始化不成功,从而线程的栈信息被打印到控制台中
Attach Listener 线程
Attach Listener 线程是负责接收到外部的命令,而对该命令进行执行的并且把结果返回给发送者。在JVM启动的时候,如果没有指定 +StartAttachListener,该Attach Listener线程是不会启动的。
在接受到 quit 信号之后,会调用 AttachListener::is_init_trigger() 方法, AttachListener::is_init_trigger() 内会调用AttachListener::init() 启动了Attach Listener 线程,在不同的操作系统下初始化实现是不同的,在linux中是在attachListener_Linux.cpp文件中实现的。
AttachListener::is_init_trigger() 代码如下:
bool AttachListener::is_init_trigger() {
if (init_at_startup() || is_initialized()) {
return false; // initialized at startup or already initialized
}
char fn[PATH_MAX+1];
sprintf(fn, ".Attach_pid%d", os::current_process_id());
int ret;
struct stat64 st;
RESTARTABLE(::stat64(fn, &st), ret);
if (ret == -1) {
snprintf(fn, sizeof(fn), "%s/.Attach_pid%d",
os::get_temp_directory(), os::current_process_id());
RESTARTABLE(::stat64(fn, &st), ret);
}
if (ret == 0) {
// simple check to avoid starting the Attach mechanism when
// a bogus user creates the file
if (st.st_uid == geteuid()) {
// 创建AttachListener线程
init();
return true;
}
}
return false;
}
一开始会 判断当前进程目录下是否有个.Attach_pid文件,如果没有就会在/tmp下创建一个/tmp/.Attach_pid,当那个文件的uid和自己的uid是一致的情况下(为了安全)再调用init方法。
// Starts the Attach Listener thread
void AttachListener::init() {
EXCEPTION_MARK;
klassOop k = SystemDictionary::resolve_or_fail(vmSymbols::java_lang_Thread(), true, CHECK);
instanceKlassHandle klass (THREAD, k);
instanceHandle thread_oop = klass->allocate_instance_handle(CHECK);
const char thread_name[] = "Attach Listener";
Handle string = java_lang_String::create_from_str(thread_name, CHECK);
// Initialize thread_oop to put it into the system threadGroup
Handle thread_group (THREAD, Universe::system_thread_group());
JavaValue result(T_VOID);
JavaCalls::call_special(&result, thread_oop,
klass,
vmSymbols::object_initializer_name(),
vmSymbols::threadgroup_string_void_signature(),
thread_group,
string,
CHECK);
KlassHandle group(THREAD, SystemDictionary::ThreadGroup_klass());
JavaCalls::call_special(&result,
thread_group,
group,
vmSymbols::add_method_name(),
vmSymbols::thread_void_signature(),
thread_oop, // ARG 1
CHECK);
{ MutexLocker mu(Threads_lock);
JavaThread* listener_thread = new JavaThread(&Attach_listener_thread_entry);
// Check that thread and osthread were created
if (listener_thread == NULL || listener_thread->osthread() == NULL) {
vm_exit_during_initialization("java.lang.OutOfMemoryError",
"unable to create new native thread");
}
java_lang_Thread::set_thread(thread_oop(), listener_thread);
java_lang_Thread::set_daemon(thread_oop());
listener_thread->set_threadObj(thread_oop());
Threads::add(listener_thread);
Thread::start(listener_thread);
}
}
此时水落石出了,看到创建了一个线程,并且取名为Attach Listener。再看看Linux系统下其子类LinuxAttachListener的init方法:
int LinuxAttachListener::init() {
char path[UNIX_PATH_MAX]; // socket file
char initial_path[UNIX_PATH_MAX]; // socket file during setup
int listener; // listener socket (file descriptor)
// register function to cleanup
::atexit(listener_cleanup);
int n = snprintf(path, UNIX_PATH_MAX, "%s/.java_pid%d",
os::get_temp_directory(), os::current_process_id());
if (n < (int)UNIX_PATH_MAX) {
n = snprintf(initial_path, UNIX_PATH_MAX, "%s.tmp", path);
}
if (n >= (int)UNIX_PATH_MAX) {
return -1;
}
// create the listener socket
listener = ::socket(PF_UNIX, SOCK_STREAM, 0);
if (listener == -1) {
return -1;
}
// bind socket
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, initial_path);
::unlink(initial_path);
int res = ::bind(listener, (struct sockaddr*)&addr, sizeof(addr));
if (res == -1) {
RESTARTABLE(::close(listener), res);
return -1;
}
// put in listen mode, set permissions, and rename into place
res = ::listen(listener, 5);
if (res == 0) {
RESTARTABLE(::chmod(initial_path, S_IREAD|S_IWRITE), res);
if (res == 0) {
res = ::rename(initial_path, path);
}
}
if (res == -1) {
RESTARTABLE(::close(listener), res);
::unlink(initial_path);
return -1;
}
set_path(path);
set_listener(listener);
return 0;
}
看到其创建了一个监听套接字,并创建了一个文件/tmp/.java_pid,这个文件就是客户端之前一直在轮询等待的文件,随着这个文件的生成,意味着Attach的创建过程圆满结束了。
Attach Listener线程接收到请求时,具体的请求处理在 attach_listener_thread_entry 方法体中实现:
static void attach_listener_thread_entry(JavaThread* thread, TRAPS) {
os::set_priority(thread, NearMaxPriority);
if (AttachListener::pd_init() != 0) {
return;
}
AttachListener::set_initialized();
for (;;) {
AttachOperation* op = AttachListener::dequeue();
if (op == NULL) {
return; // dequeue failed or shutdown
}
ResourceMark rm;
bufferedStream st;
jint res = JNI_OK;
// handle special detachall operation
if (strcmp(op->name(), AttachOperation::detachall_operation_name()) == 0) {
AttachListener::detachall();
} else {
// find the function to dispatch too
AttachOperationFunctionInfo* info = NULL;
for (int i=0; funcs[i].name != NULL; i++) {
const char* name = funcs[i].name;
assert(strlen(name) <= AttachOperation::name_length_max, "operation <= name_length_max");
if (strcmp(op->name(), name) == 0) {
info = &(funcs[i]);
break;
}
}
// check for platform dependent attach operation
if (info == NULL) {
info = AttachListener::pd_find_operation(op->name());
}
if (info != NULL) {
// dispatch to the function that implements this operation
res = (info->func)(op, &st);
} else {
st.print("Operation %s not recognized!", op->name());
res = JNI_ERR;
}
}
// operation complete - send result and output to client
op->complete(res, &st);
}
}
从代码来看就是 从队列里不断取AttachOperation,然后找到请求命令对应的方法进行执行,比如一开始说的jstack命令,找到 { “threaddump”, thread_dump }的映射关系,然后执行thread_dump方法。
AttachOperation有很多种类,比如:内存dump,线程dump,类信息统计(比如加载的类及大小以及实例个数等),动态加载agent,动态设置vm flag(但是并不是所有的flag都可以设置的,因为有些flag是在jvm启动过程中使用的,是一次性的),打印vm flag,获取系统属性等,这些对应的源码(AttachListener.cpp)如下:
static AttachOperationFunctionInfo funcs[] = {
// 第二个参数是命令对应的处理函数
{ "agentProperties", get_agent_properties },
{ "datadump", data_dump },
{ "dumpheap", dump_heap },
{ "load", JvmtiExport::load_agent_library },
{ "properties", get_system_properties },
{ "threaddump", thread_dump },
{ "inspectheap", heap_inspection },
{ "setflag", set_flag },
{ "printflag", print_flag },
{ "jcmd", jcmd },
{ NULL, NULL }
};
再来看看其要调用的 AttachListener::dequeue();:
AttachOperation* AttachListener::dequeue() {
JavaThread* thread = JavaThread::current();
ThreadBlockInVM tbivm(thread);
thread->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or
// java_suspend_self() via check_and_wait_while_suspended()
AttachOperation* op = LinuxAttachListener::dequeue();
// were we externally suspended while we were waiting?
thread->check_and_wait_while_suspended();
return op;
}
最终会调用的是 LinuxAttachListener::dequeue():
LinuxAttachOperation* LinuxAttachListener::dequeue() {
for (;;) {
int s;
// wait for client to connect
struct sockaddr addr;
socklen_t len = sizeof(addr);
// 如果没有请求的话,会一直accept在那里
RESTARTABLE(::accept(listener(), &addr, &len), s);
if (s == -1) {
return NULL; // log a warning?
}
// get the credentials of the peer and check the effective uid/guid
// - check with jeff on this.
struct ucred cred_info;
socklen_t optlen = sizeof(cred_info);
if (::getsockopt(s, SOL_SOCKET, SO_PEERCRED, (void*)&cred_info, &optlen) == -1) {
int res;
RESTARTABLE(::close(s), res);
continue;
}
uid_t euid = geteuid();
gid_t egid = getegid();
if (cred_info.uid != euid || cred_info.gid != egid) {
int res;
RESTARTABLE(::close(s), res);
continue;
}
// peer credential look okay so we read the request
LinuxAttachOperation* op = read_request(s);
if (op == NULL) {
int res;
RESTARTABLE(::close(s), res);
continue;
} else {
return op;
}
}
}
如上代码中可以看到,如果没有请求的话,会一直accept在那里,当来了请求,然后就会创建一个套接字,并读取数据,构建出LinuxAttachOperation返回,找到请求对应的操作,调用操作得到结果并把结果写到这个socket的文件,如果你把socket的文件删除,jstack/jmap会出现错误信息 unable to open socket file:........
jstack/jmap命令流程图
以jstack的实现来说明触发Attach这一机制进行的过程,jstack命令的实现其实是一个叫做JStack.java的类,jstack命令首先会attach到目标JVM进程,产生VirtualMachine类;Linux系统下,其实现类为LinuxVirtualMachine,调用其remoteDataDump方法,打印堆栈信息;查看JStack.java代码后会走到下面的方法里:
private static void runThreadDump(String pid, String args[]) throws Exception {
VirtualMachine vm = null;
try {
// jstack命令首先会attach到目标JVM进程
vm = VirtualMachine.Attach(pid);
} catch (Exception x) {
String msg = x.getMessage();
if (msg != null) {
System.err.println(pid + ": " + msg);
} else {
x.printStackTrace();
}
if ((x instanceof AttachNotSupportedException) &&
(loadSAClass() != null)) {
System.err.println("The -F option can be used when the target " +
"process is not responding");
}
System.exit(1);
}
// Cast to HotSpotVirtualMachine as this is implementation specific
// method.
// 输出堆栈信息
InputStream in = ((HotSpotVirtualMachine)vm).remoteDataDump((Object[])args);
// read to EOF and just print output
byte b[] = new byte[256];
int n;
do {
n = in.read(b);
if (n > 0) {
String s = new String(b, 0, n, "UTF-8");
System.out.print(s);
}
} while (n > 0);
in.close();
vm.detach();
}
那么VirtualMachine是如何连接到目标JVM进程的呢?请注意 VirtualMachine.Attach(pid); 这行代码,触发Attach pid的关键,如果是在Linux下具体的实现逻辑在 sun.tools.attach.LinuxVirtualMachine 的构造函数:
LinuxVirtualMachine(AttachProvider provider, String vmid) throws AttachNotSupportedException, IOException
{
super(provider, vmid);
// This provider only understands pids
int pid;
try {
pid = Integer.parseInt(vmid);
} catch (NumberFormatException x) {
throw new AttachNotSupportedException("Invalid process identifier");
}
// Find the socket file. If not found then we attempt to start the
// Attach mechanism in the target VM by sending it a QUIT signal.
// Then we attempt to find the socket file again.
path = findSocketFile(pid);
if (path == null) {
File f = createAttachFile(pid);
try {
// On LinuxThreads each thread is a process and we don't have the
// pid of the VMThread which has SIGQUIT unblocked. To workaround
// this we get the pid of the "manager thread" that is created
// by the first call to pthread_create. This is parent of all
// threads (except the initial thread).
if (isLinuxThreads) {
int mpid;
try {
mpid = getLinuxThreadsManager(pid);
} catch (IOException x) {
throw new AttachNotSupportedException(x.getMessage());
}
assert(mpid >= 1);
sendQuitToChildrenOf(mpid);
} else {
sendQuitTo(pid);
}
// give the target VM time to start the Attach mechanism
int i = 0;
long delay = 200;
int retries = (int)(AttachTimeout() / delay);
do {
try {
Thread.sleep(delay);
} catch (InterruptedException x) { }
path = findSocketFile(pid);
i++;
} while (i <= retries && path == null);
if (path == null) {
throw new AttachNotSupportedException(
"Unable to open socket file: target process not responding " +
"or HotSpot VM not loaded");
}
} finally {
f.delete();
}
}
// Check that the file owner/permission to avoid Attaching to
// bogus process
checkPermissions(path);
// Check that we can connect to the process
// - this ensures we throw the permission denied error now rather than
// later when we attempt to enqueue a command.
int s = socket();
try {
connect(s, path);
} finally {
close(s);
}
}
- 查找/tmp目录下是否存在".java_pid"+pid文件;
- 如果文件不存在,则首先创建"/proc/" + pid + "/cwd/" + ".attach_pid" + pid文件;
- 通过kill命令发送SIGQUIT信号给目标JVM进程,由于JVM里除了信号线程,其他线程都设置了对此信号的屏蔽,因此收不到该信号,于是该信号就传给了“Signal Dispatcher”;
- 目标JVM进程接收到信号之后,会在/tmp目录下创建".java_pid"+pid文件;
- 当发现/tmp目录下存在".java_pid"+pid文件,LinuxVirtualMachine会通过connect系统调用连接到该文件描述符,后续通过该fd进行双方的通讯;
JVM接受SIGQUIT信号的相关逻辑处理,则是在前面 signal_thread_entry 方法中进行实现。
前面JStack.java源码中,输出堆栈信息是通过调用remoteDataDump方法实现的,该方法就是通过往前面提到的fd中写入threaddump指令,读取返回结果,从而得到目标JVM的堆栈信息。