signal

Preview

ceph中的每个组件在启动的时候都会去启动一个信号处理线程(异步处理信号),下面会简单分析分析其源码实现

源码分析

关键类

SignalHandler(结构体也是类)// 信号处理类
Thread  // 通用线程管理类
safe_handler // 用来处理每个信号的

启动信号监听线程

程序在启动的时候会初始化一个静态SignalHandler变量g_signal_handler,姑且理解成一个简单的单例吧

入口函数:init_async_signal_handler(),当g_signal_handler为空时,new一个SignalHandler赋值给g_signal_handler

SignalHandler是Thread的子类

SignalHandler的构造函数

SignalHandler() {
    // create signal pipe,这个管道主要是用来与信号处理线程通讯的,ceph中大量的使用这种方式通信
    int r = pipe_cloexec(pipefd, 0);
    ceph_assert(r == 0);
    // 设置非阻塞
    r = fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
    ceph_assert(r == 0);

    // create thread,create函数是Thread函数中定义的
    create("signal_handler");
}

创建线程函数中的注意点

int Thread::try_create(size_t stacksize)
{
    ...
    sigset_t old_sigset;
    // block_signals 函数的作用是设置信号掩码,下面详解
    if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
        block_signals(NULL, &old_sigset);
    }
    else {
        int to_block[] = { SIGPIPE , 0 };
        block_signals(to_block, &old_sigset);
    }
    // 创建线程,线程中运行_entry_func这个方法,参数是本实例对象
    r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
    restore_sigset(&old_sigset);

    if (thread_attr) {
    pthread_attr_destroy(thread_attr);  
    }
    ...
}

block_signals

void block_signals(const int *siglist, sigset_t *old_sigset)
{
  sigset_t sigset;
  if (!siglist) {
    // 将所有的信号加入信号集
    sigfillset(&sigset);
  }
  else {
    int i = 0;
    sigemptyset(&sigset);
    while (siglist[i]) {
      sigaddset(&sigset, siglist[i]);
      ++i;
    }
  }
  // pthread_sigmask,在多线程的程序里,希望只在主线程中处理信号,可以使用该函数。新的线程会继承主线程的信号掩码
  //  SIG_BLOCK: 该进程新的信号屏蔽字是其当前信号屏蔽字和set指向信号集的并集。set包含了我们希望阻塞的附加信号
  // SIG_UNBLOCK: 该进程新的信号屏蔽字是其当前信号屏蔽字和set所指向信号集补集的交集。set包含了我希望解除阻塞的信号
  // SIG_SETMASK: 该进程新的信号屏蔽字将被set指向的信号集的值代替
  int ret = pthread_sigmask(SIG_BLOCK, &sigset, old_sigset);
  ceph_assert(ret == 0);
}

_entry_func中去运行entry_wrapper函数,entry_wrapper只是做设置线程名等辅助操作,最后运行entry函数,这个时候就是运行子类实现的函数了,Thread已经完成了自己通用的代码部分

entry的代码

void *entry() override {
    // 直到stop为真时停止循环,终止线程,stop在shutdown函数中设置,这种设计模式都是老套路了
    while (!stop) {
      // build fd list
      // 这里用到了poll模型,可以去了解下其原理,总的来说就是用轮询代替系统中断的事件复用
      // 这里之所以长度要设置为33,主要是第一位给了pipfd[0]用来接收主线程的通讯
      struct pollfd fds[33];

      lock.lock();
      // 这里就只是标准的的poll操作
      int num_fds = 0;
      fds[num_fds].fd = pipefd[0];
      fds[num_fds].events = POLLIN | POLLERR;
      fds[num_fds].revents = 0;
      ++num_fds;

      // 信号量一共有32位,这里将需要设置的信号量都赋给poll来监听
      for (unsigned i=0; i<32; i++) {
    if (handlers[i]) {
      fds[num_fds].fd = handlers[i]->pipefd[0];
      fds[num_fds].events = POLLIN | POLLERR;
      fds[num_fds].revents = 0;
      ++num_fds;
    }
      }
      lock.unlock();

      // wait for data on any of those pipes
      // 这里就是开始进行poll监听,比如信号来了、主线程通信来了都会触发,这里将超时时间设置成了-1,那就是阻塞式永久等待了
      int r = poll(fds, num_fds, -1);

      ...

    // consume byte from signal socket, if any.
    // 这里其实只是把数据读出来而已,不需要做其他处理,因为本身只是唤醒作用
    TEMP_FAILURE_RETRY(read(pipefd[0], &v, 1));

    lock.lock();
    // 下面一大段都只是为了找到触发事件相应的信号,打印一些信息,调用该信号注册的handler函数
    for (unsigned signum=0; signum<32; signum++) {
      if (handlers[signum]) {
        r = read(handlers[signum]->pipefd[0], &v, 1);
        if (r == 1) {
          siginfo_t * siginfo = &handlers[signum]->info_t;
              ostringstream message;
              message << "received  signal: " << sig_str(signum);
              switch (siginfo->si_code) {
                case SI_USER:
                  message << " from " << get_name_by_pid(siginfo->si_pid);
                  // If PID is undefined, it doesn't have a meaning to be displayed
                  if (siginfo->si_pid) {
                    message << " (PID: " << siginfo->si_pid << ")";
                  } else {
                    message << " ( Could be generated by pthread_kill(), raise(), abort(), alarm() )";
                  }
                  message << " UID: " << siginfo->si_uid;
                  break;
                default:
                  /* As we have a not expected signal, let's report the structure to help debugging */
                  message << ", si_code : " << siginfo->si_code;
                  message << ", si_value (int): " << siginfo->si_value.sival_int;
                  message << ", si_value (ptr): " << siginfo->si_value.sival_ptr;
                  message << ", si_errno: " << siginfo->si_errno;
                  message << ", si_pid : " << siginfo->si_pid;
                  message << ", si_uid : " << siginfo->si_uid;
                  message << ", si_addr" << siginfo->si_addr;
                  message << ", si_status" << siginfo->si_status;
                  break;
              }
              derr << message.str() << dendl;
              handlers[signum]->handler(signum);
        }
      }
    }
    lock.unlock();
      } 
    }
    return NULL;
  }

注册需要处理的信号

入口函数是:register_async_signal_handler_oneshot

核心函数是:register_handler

void SignalHandler::register_handler(int signum, signal_handler_t handler, bool oneshot)
{
  int r;

  // 信号量必须是[0,31]中的一个
  ceph_assert(signum >= 0 && signum < 32);
  // new一个处理类
  safe_handler *h = new safe_handler;
  // 新建管道
  r = pipe_cloexec(h->pipefd, 0);
  ceph_assert(r == 0);
  // 设置非阻塞
  r = fcntl(h->pipefd[0], F_SETFL, O_NONBLOCK);
  ceph_assert(r == 0);

  // 将具体的处理函数赋给处理类
  h->handler = handler;
  lock.lock();
  // 将处理类放在handlers对应的位置中,因为信号量是非负整数,所以可以用于数组的索引,方便
  handlers[signum] = h;
  lock.unlock();

  // signal thread so that it sees our new handler
  // 这里就是唤醒信号处理线程,还记得struct pollfd fds[33]的第一位是用于通讯的吧
  signal_thread();
  
  // install our handler
  // 下面就是将信号的处理函数注册的过程了
  // 主要用到的是 sigaction方法,这个方法用于查询或设置信号处理方式,详情参考:http://c.biancheng.net/cpp/html/1142.html
  struct sigaction oldact;
  struct sigaction act;
  memset(&act, 0, sizeof(act));
  // handler_signal_hook是信号处理函数,代码详情见下面
  act.sa_handler = (signal_handler_t)handler_signal_hook;
  sigfillset(&act.sa_mask);  // mask all signals in the handler
  act.sa_flags = SA_SIGINFO | (oneshot ? SA_RESETHAND : 0);
  int ret = sigaction(signum, &act, &oldact);
  ceph_assert(ret == 0);
}

safe_handler的源码

struct safe_handler {

    safe_handler() {
      // 给变量分配内存
      memset(pipefd, 0, sizeof(pipefd));
      memset(&handler, 0, sizeof(handler));
      memset(&info_t, 0, sizeof(info_t));    
    }
    // 简单理解这个是包含信号量的额外信息的结构体吧,姑且称作信号信息
    siginfo_t info_t; 
    // 这个就是用于poll监听的fd
    int pipefd[2];  // write to [1], read from [0]
    // 这个就是该信号量对应的处理函数
    signal_handler_t handler;
  };

handler_signal_hook中调用了queue_signal_info函数

void queue_signal_info(int signum, siginfo_t *siginfo, void * content) {
    // If this signal handler is registered, the callback must be
    // defined.  We can do this without the lock because we will never
    // have the signal handler defined without the handlers entry also
    // being filled in.
    ceph_assert(handlers[signum]);
    // 将真正传入的信号量的信号信息复制给handlers中对应信号的处理类
    memcpy(&handlers[signum]->info_t, siginfo, sizeof(siginfo_t));
    // 这一步就是真正通知信号处理线程,信号来了,要处理了
    int r = write(handlers[signum]->pipefd[1], " ", 1);
    ceph_assert(r == 1);
  }
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容