Preview
ceph中的每个组件在启动的时候都会去启动一个信号处理线程(异步处理信号),下面会简单分析分析其源码实现
源码分析
关键类
SignalHandler(结构体也是类)// 信号处理类
Thread // 通用线程管理类
safe_handler // 用来处理每个信号的
启动信号监听线程
程序在启动的时候会初始化一个静态SignalHandler变量g_signal_handler,姑且理解成一个简单的单例吧
入口函数:init_async_signal_handler(),当g_signal_handler为空时,new一个SignalHandler赋值给g_signal_handler
SignalHandler是Thread的子类
SignalHandler的构造函数
SignalHandler() {
// create signal pipe,这个管道主要是用来与信号处理线程通讯的,ceph中大量的使用这种方式通信
int r = pipe_cloexec(pipefd, 0);
ceph_assert(r == 0);
// 设置非阻塞
r = fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
ceph_assert(r == 0);
// create thread,create函数是Thread函数中定义的
create("signal_handler");
}
创建线程函数中的注意点
int Thread::try_create(size_t stacksize)
{
...
sigset_t old_sigset;
// block_signals 函数的作用是设置信号掩码,下面详解
if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
block_signals(NULL, &old_sigset);
}
else {
int to_block[] = { SIGPIPE , 0 };
block_signals(to_block, &old_sigset);
}
// 创建线程,线程中运行_entry_func这个方法,参数是本实例对象
r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
restore_sigset(&old_sigset);
if (thread_attr) {
pthread_attr_destroy(thread_attr);
}
...
}
block_signals
void block_signals(const int *siglist, sigset_t *old_sigset)
{
sigset_t sigset;
if (!siglist) {
// 将所有的信号加入信号集
sigfillset(&sigset);
}
else {
int i = 0;
sigemptyset(&sigset);
while (siglist[i]) {
sigaddset(&sigset, siglist[i]);
++i;
}
}
// pthread_sigmask,在多线程的程序里,希望只在主线程中处理信号,可以使用该函数。新的线程会继承主线程的信号掩码
// SIG_BLOCK: 该进程新的信号屏蔽字是其当前信号屏蔽字和set指向信号集的并集。set包含了我们希望阻塞的附加信号
// SIG_UNBLOCK: 该进程新的信号屏蔽字是其当前信号屏蔽字和set所指向信号集补集的交集。set包含了我希望解除阻塞的信号
// SIG_SETMASK: 该进程新的信号屏蔽字将被set指向的信号集的值代替
int ret = pthread_sigmask(SIG_BLOCK, &sigset, old_sigset);
ceph_assert(ret == 0);
}
_entry_func中去运行entry_wrapper函数,entry_wrapper只是做设置线程名等辅助操作,最后运行entry函数,这个时候就是运行子类实现的函数了,Thread已经完成了自己通用的代码部分
entry的代码
void *entry() override {
// 直到stop为真时停止循环,终止线程,stop在shutdown函数中设置,这种设计模式都是老套路了
while (!stop) {
// build fd list
// 这里用到了poll模型,可以去了解下其原理,总的来说就是用轮询代替系统中断的事件复用
// 这里之所以长度要设置为33,主要是第一位给了pipfd[0]用来接收主线程的通讯
struct pollfd fds[33];
lock.lock();
// 这里就只是标准的的poll操作
int num_fds = 0;
fds[num_fds].fd = pipefd[0];
fds[num_fds].events = POLLIN | POLLERR;
fds[num_fds].revents = 0;
++num_fds;
// 信号量一共有32位,这里将需要设置的信号量都赋给poll来监听
for (unsigned i=0; i<32; i++) {
if (handlers[i]) {
fds[num_fds].fd = handlers[i]->pipefd[0];
fds[num_fds].events = POLLIN | POLLERR;
fds[num_fds].revents = 0;
++num_fds;
}
}
lock.unlock();
// wait for data on any of those pipes
// 这里就是开始进行poll监听,比如信号来了、主线程通信来了都会触发,这里将超时时间设置成了-1,那就是阻塞式永久等待了
int r = poll(fds, num_fds, -1);
...
// consume byte from signal socket, if any.
// 这里其实只是把数据读出来而已,不需要做其他处理,因为本身只是唤醒作用
TEMP_FAILURE_RETRY(read(pipefd[0], &v, 1));
lock.lock();
// 下面一大段都只是为了找到触发事件相应的信号,打印一些信息,调用该信号注册的handler函数
for (unsigned signum=0; signum<32; signum++) {
if (handlers[signum]) {
r = read(handlers[signum]->pipefd[0], &v, 1);
if (r == 1) {
siginfo_t * siginfo = &handlers[signum]->info_t;
ostringstream message;
message << "received signal: " << sig_str(signum);
switch (siginfo->si_code) {
case SI_USER:
message << " from " << get_name_by_pid(siginfo->si_pid);
// If PID is undefined, it doesn't have a meaning to be displayed
if (siginfo->si_pid) {
message << " (PID: " << siginfo->si_pid << ")";
} else {
message << " ( Could be generated by pthread_kill(), raise(), abort(), alarm() )";
}
message << " UID: " << siginfo->si_uid;
break;
default:
/* As we have a not expected signal, let's report the structure to help debugging */
message << ", si_code : " << siginfo->si_code;
message << ", si_value (int): " << siginfo->si_value.sival_int;
message << ", si_value (ptr): " << siginfo->si_value.sival_ptr;
message << ", si_errno: " << siginfo->si_errno;
message << ", si_pid : " << siginfo->si_pid;
message << ", si_uid : " << siginfo->si_uid;
message << ", si_addr" << siginfo->si_addr;
message << ", si_status" << siginfo->si_status;
break;
}
derr << message.str() << dendl;
handlers[signum]->handler(signum);
}
}
}
lock.unlock();
}
}
return NULL;
}
注册需要处理的信号
入口函数是:register_async_signal_handler_oneshot
核心函数是:register_handler
void SignalHandler::register_handler(int signum, signal_handler_t handler, bool oneshot)
{
int r;
// 信号量必须是[0,31]中的一个
ceph_assert(signum >= 0 && signum < 32);
// new一个处理类
safe_handler *h = new safe_handler;
// 新建管道
r = pipe_cloexec(h->pipefd, 0);
ceph_assert(r == 0);
// 设置非阻塞
r = fcntl(h->pipefd[0], F_SETFL, O_NONBLOCK);
ceph_assert(r == 0);
// 将具体的处理函数赋给处理类
h->handler = handler;
lock.lock();
// 将处理类放在handlers对应的位置中,因为信号量是非负整数,所以可以用于数组的索引,方便
handlers[signum] = h;
lock.unlock();
// signal thread so that it sees our new handler
// 这里就是唤醒信号处理线程,还记得struct pollfd fds[33]的第一位是用于通讯的吧
signal_thread();
// install our handler
// 下面就是将信号的处理函数注册的过程了
// 主要用到的是 sigaction方法,这个方法用于查询或设置信号处理方式,详情参考:http://c.biancheng.net/cpp/html/1142.html
struct sigaction oldact;
struct sigaction act;
memset(&act, 0, sizeof(act));
// handler_signal_hook是信号处理函数,代码详情见下面
act.sa_handler = (signal_handler_t)handler_signal_hook;
sigfillset(&act.sa_mask); // mask all signals in the handler
act.sa_flags = SA_SIGINFO | (oneshot ? SA_RESETHAND : 0);
int ret = sigaction(signum, &act, &oldact);
ceph_assert(ret == 0);
}
safe_handler的源码
struct safe_handler {
safe_handler() {
// 给变量分配内存
memset(pipefd, 0, sizeof(pipefd));
memset(&handler, 0, sizeof(handler));
memset(&info_t, 0, sizeof(info_t));
}
// 简单理解这个是包含信号量的额外信息的结构体吧,姑且称作信号信息
siginfo_t info_t;
// 这个就是用于poll监听的fd
int pipefd[2]; // write to [1], read from [0]
// 这个就是该信号量对应的处理函数
signal_handler_t handler;
};
handler_signal_hook中调用了queue_signal_info函数
void queue_signal_info(int signum, siginfo_t *siginfo, void * content) {
// If this signal handler is registered, the callback must be
// defined. We can do this without the lock because we will never
// have the signal handler defined without the handlers entry also
// being filled in.
ceph_assert(handlers[signum]);
// 将真正传入的信号量的信号信息复制给handlers中对应信号的处理类
memcpy(&handlers[signum]->info_t, siginfo, sizeof(siginfo_t));
// 这一步就是真正通知信号处理线程,信号来了,要处理了
int r = write(handlers[signum]->pipefd[1], " ", 1);
ceph_assert(r == 1);
}