日志模块

Previews

日志系统是一个系统的核心组成部分,是展示系统运行状态、错误追溯的最好方式,搞明白日志的相关代码有利于我们调试ceph
代码中的各种锁就不解释了哈

总体设计思路

  1. 日志记录采用异步模式
  2. 尽量使用宏定义来提高程序运行速度
  3. 使用thread_local缓存一些东西

源码解析

启动

关键类

还是需要看代码,大概知道里面的数据结构

CephContext  // ceph的全局上下文,保存一些全局的东西
SubsystemMap // 这个是用来表示不同子系统的map,比如mds osd等
ConfigValues // 这个就是存放配置文件的参数,这个里面有SubsystemMap类型的成员变量
Log // 日志类,继承Thread类

在程序启动时会解析参数列表,将参数的key-value存放在ConfigValues的实例对象的map中,同时初始化SubsystemMap(调用的是ConfigValues::set_logging函数)

void ConfigValues::set_logging(int which, const char* val)
{
  // 至少你应该直到ceph是怎样设置日志等级的哈,不然你看不懂
  int log, gather;
  int r = sscanf(val, "%d/%d", &log, &gather);
  if (r >= 1) {
    // 如果只传了1而不是1/5这种,那么gather等于log
    if (r < 2) {
      gather = log;
    }
    // 设置日志等级,就是我们设置的哪个1/2这个,前面的1就是log的值,2就是gather的值
    subsys.set_log_level(which, log);
    subsys.set_gather_level(which, gather);
  }
}

然后调用全局初始化函数global_init,这个函数里面会初始化ceph的上下文类CephContext,这个类的构造函数里面会去实例化Log类,日志类里面也有SubsystemMap类型的成员变量,用来标识
当前模块和当前日志等级

CephContext::CephContext()
{
  ...
  // 将上面实例化的ConfigValues对象中的SubsystemMap类型变量传个Log
  _log = new ceph::logging::Log(&_conf->subsys);
  ...
}

然后会common_init_finish函数中调用Log::start()函数,去启动日志线程,如何启动的参考之前的信号处理的分析文章(都是继承的Thread类)

日志线程

下面有必要简单介绍下Log类中的成员的作用

class Log : private Thread
{
  using EntryRing = boost::circular_buffer<ConcreteEntry>;
  using EntryVector = std::vector<ConcreteEntry>;

  static const std::size_t DEFAULT_MAX_NEW = 100;
  static const std::size_t DEFAULT_MAX_RECENT = 10000;

  Log **m_indirect_this;
  // 用来标识子系统类型,和日志等级
  const SubsystemMap *m_subs;
  // 日志类进出队列的锁
  std::mutex m_queue_mutex;
  // 日志文件flush到磁盘的锁
  std::mutex m_flush_mutex;
  // 条件变量,当m_new的容量不足时需要等待m_new中的日志拷贝到m_flush中
  std::condition_variable m_cond_loggers;
  // 条件变量,当新的日志进入m_new中时,需要通知日志线程进行flush操作
  std::condition_variable m_cond_flusher;

  // 这两个其实就是用来判断程序是否处于加锁状态的
  pthread_t m_queue_mutex_holder;
  pthread_t m_flush_mutex_holder;

  // 保存新来的日志,还未进入flush阶段
  EntryVector m_new;    ///< new entries
  // 保存flush阶段的日志
  EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)
  // 用一个环形缓冲区来保存变量,当缓冲区满了,新的数据会覆盖旧的,正好实现了保存最近数据的功能
  EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
  // 日志文件的路径
  std::string m_log_file;
  // 日志文件的文件描述符
  int m_fd = -1;
  // user id 和group id
  uid_t m_uid = 0;
  gid_t m_gid = 0;

  int m_fd_last_error = 0;  ///< last error we say writing to fd (if any)

  int m_syslog_log = -2, m_syslog_crash = -2;
  int m_stderr_log = -1, m_stderr_crash = -1;
  int m_graylog_log = -3, m_graylog_crash = -3;

  std::string m_log_stderr_prefix;

  std::shared_ptr<Graylog> m_graylog;

  // 在flush阶段用来保存日志内容
  std::vector<char> m_log_buf;
  // 用来标志日志线程是否需要停止,线程会一直去检测这个变量,一旦为true就停止线程
  bool m_stop = false;
  // 容器size
  std::size_t m_max_new = DEFAULT_MAX_NEW;
  std::size_t m_max_recent = DEFAULT_MAX_RECENT;

  bool m_inject_segv = false;

调用了start函数后就启动了一个日志线程,该线程运行的是Log::entry()这个函数,下面看看这个函数

Log::entry()

void *Log::entry()
{
  // 打开日志文件,获得日志文件的文件描述符(多说一句其实文件描述符就是内核中记录已打开文件数组的索引)
  reopen_log_file();
  {
    std::unique_lock lock(m_queue_mutex);
    m_queue_mutex_holder = pthread_self();
    // 只有m_stop不为真,就一直循环,(Log::stop函数才能置m_stop为真)
    while (!m_stop) {
      // 
      if (!m_new.empty()) {
        m_queue_mutex_holder = 0;
        lock.unlock();
        // 进入flush阶段,将日志内容输出到指定地方(日志文件、控制台、系统日志等)
        flush();
        lock.lock();
        m_queue_mutex_holder = pthread_self();
        // 这里continue是为了防止在执行flush函数时又有日志进入,在这continue就不用在下面wait了,就可以实时打印日志
        continue;
      }
      // 等待主程序调用submit_entry,将新的日志加入m_new中
      m_cond_flusher.wait(lock);
    }
    m_queue_mutex_holder = 0;
  }
  flush();
  return NULL;
}

flush函数

void Log::flush()
{
  std::scoped_lock lock1(m_flush_mutex);
  m_flush_mutex_holder = pthread_self();

  {
    std::scoped_lock lock2(m_queue_mutex);
    m_queue_mutex_holder = pthread_self();
    assert(m_flush.empty());
    // 将m_new中的日志转移到m_flush中,日志进入flush阶段
    m_flush.swap(m_new);
    // 这个时候m_new空了,通知submit_entry可以接收新的日志了
    m_cond_loggers.notify_all();
    m_queue_mutex_holder = 0;
  }

  _flush(m_flush, false);
  m_flush_mutex_holder = 0;
}

_flush函数(日志输出的核心函数)

void Log::_flush(EntryVector& t, bool crash)
{
  long len = 0;
  if (t.empty()) {
    assert(m_log_buf.empty());
    return;
  }
  if (crash) {
    len = t.size();
  }
  for (auto& e : t) {
    auto prio = e.m_prio;  // 打印日志时传进来的
    auto stamp = e.m_stamp;  // 时间戳
    auto sub = e.m_subsys;  // 用来标识不同子系统的, 比如osd, mds
    auto thread = e.m_thread;  // 这个是指向要打印日志的那个线程
    auto str = e.strv();  // 日志的内容
    // 判断是否需要打印,比如你设置的日志等级为3,本日志为1(dout(1)),那么就为真
    bool should_log = crash || m_subs->get_log_level(sub) >= prio;
    bool do_fd = m_fd >= 0 && should_log;
    // 系统日志,和上面同理
    bool do_syslog = m_syslog_crash >= prio && should_log;
    bool do_stderr = m_stderr_crash >= prio && should_log;
    bool do_graylog2 = m_graylog_crash >= prio && should_log;

    if (do_fd || do_syslog || do_stderr) {
      const std::size_t cur = m_log_buf.size();
      std::size_t used = 0;
      const std::size_t allocated = e.size() + 80;
      m_log_buf.resize(cur + allocated);
      // 获取m_log_buf的头指针
      char* const start = m_log_buf.data();
      // 将指针移动到第一个为空的位置
      char* pos = start + cur;

      if (crash) {
        used += (std::size_t)snprintf(pos + used, allocated - used, "%6ld> ", -(--len));
      }
      // 加上时间戳
      used += (std::size_t)append_time(stamp, pos + used, allocated - used);
      // 加上是哪个线程打印的日志,不是指本日志线程哈
      used += (std::size_t)snprintf(pos + used, allocated - used, " %lx %2d ", (unsigned long)thread, prio);
      // 加上日志数据
      memcpy(pos + used, str.data(), str.size());
      used += str.size();
      pos[used] = '\0';
      ceph_assert((used + 1 /* '\n' */) < allocated);
      
      // 输出到系统日志里
      if (do_syslog) {
        syslog(LOG_USER|LOG_INFO, "%s", pos);
      }
      // 输出到标准输出
      if (do_stderr) {
        std::cerr << m_log_stderr_prefix << std::string_view(pos, used) << std::endl;
      }

      /* now add newline */
      pos[used++] = '\n';
      
      if (do_fd) {
        m_log_buf.resize(cur + used);
      } else {
        m_log_buf.resize(0);
      }
      // 你懂得吧,每次写文件都是一个耗时的操作,这里的处理是将所有日志累计到一个量再一起写入
      if (m_log_buf.size() > MAX_LOG_BUF) {
        _flush_logbuf();
      }
    }

    if (do_graylog2 && m_graylog) {
      m_graylog->log_entry(e);
    }
    
    // 记录最近的日志
    m_recent.push_back(std::move(e));
  }
  t.clear();

  _flush_logbuf();
}

写日志

ceph提供了很多中写日志的宏,我这只分析dout

应用举例

dout(1) << "asok_command: " << command << " " << cmdmap
      << " (starting...)" << dendl;

定义, 简单的宏定义,应该很好看懂

#define dout(v) ldout((dout_context), (v))
#define dout_prefix *_dout

#if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
....
#elif defined(WITH_SEASTAR) && defined(WITH_ALIEN)
...
#define dout_impl(cct, sub, v)                      \
  do {                                  \
  // 匿名函数,判断这个日志要不要打印
  const bool should_gather = [&](const auto cctX) {         \
    if constexpr (ceph::dout::is_dynamic<decltype(sub)>::value ||   \
          ceph::dout::is_dynamic<decltype(v)>::value) {     \
      return cctX->_conf->subsys.should_gather(sub, v);         \
    } else {                                \
      /* The parentheses are **essential** because commas in angle  \
       * brackets are NOT ignored on macro expansion! A language's  \
       * limitation, sorry. */                      \
      return (cctX->_conf->subsys.template should_gather<sub, v>());    \
    }                                   \
  }(cct);                               \
                                    \
  if (should_gather) {                          \
    ceph::logging::MutableEntry _dout_e(v, sub);                        \
    static_assert(std::is_convertible<decltype(&*cct),          \
                      CephContext* >::value,        \
          "provided cct must be compatible with CephContext*"); \
    auto _dout_cct = cct;                       \
    std::ostream* _dout = &_dout_e.get_ostream();

#define dendl_impl std::flush;                                          \
    _dout_cct->_log->submit_entry(std::move(_dout_e));                  \
  }                                                                     \
  } while (0)
#endif  // WITH_SEASTAR

#define ldout(cct, v)  dout_impl(cct, dout_subsys, v) dout_prefix

注意这个dout_prefix,在编译时会替换成*_dout,调用dout时变成一个std::ostream类型的指针,所以可以用<< 操作

上面这个宏在最后会调用Log::submit_entry函数,将这个日志实例对象加入到m_new中,

submit_entry函数

void Log::submit_entry(Entry&& e)
{
  ...
  if (unlikely(m_inject_segv))
    *(volatile int *)(0) = 0xdead;

  // wait for flush to catch up
  // 这里是如果m_new超过了最大容量,需要等待m_new中的数据被挪到m_flush中
  while (is_started() &&
     m_new.size() > m_max_new) {
    if (m_stop) break; // force addition
    m_cond_loggers.wait(lock);
  }
  // 将日志对象放到m_new中
  m_new.emplace_back(std::move(e));
  // 通知日志线程处理这些日志
  m_cond_flusher.notify_all();
  ...
}

一些小细节

都知道ceph是超级大佬写的,那肯定是有很多细节可以学习的

最近日志记录的实现

ceph会记录最近的日志打印,使用了boost::circular_buffer<ConcreteEntry>,这个数据结构,这个数据结构的特点就是它是一个大小固定的环形结构,
新的数据一直向里面写,写满之后会直接覆盖老的数据,这样就实现了记录最近日志的功能

日志分级

我们在开发功能的时候,为了调试会加很多日志打印,但是上线的时候就把这些日志打印给干掉了,这其实不是很好,不能每次调试就又去加上吧,分级这个理念很不错

线程级缓存

我们知道ceph的日志是包装成了一个Entry类的,但是每次实例化都会花费很多性能,所以ceph做了一个线程级缓存
代码在./src/common/StackStringStream.h中,有兴趣可以看看,总的来说它搞了个inline static thread_local修饰的结构体,这个结构体只要实例化一次,
只有在线程被销毁时才会被销毁,这就避免了每次打印日志都要实例化

记录两个知识点

c++中父类引用可以去指向子类实例对象,但是不能通过这个父类引用对象直接访问子类对象的特有成员和函数,如果这个子类继承并实现了父类的方法,那么调用这个父类引用的这个方法,
其实是在调子类,通过这个特性可以访问到子类特有的一些数据,比如父类有方法:virtual int test(){} ,子类继承实现了这个方法:int test(){return a;} int a;,那么
父类就可以访问到这个子类的a的值了, 具体可以看./src/log/Entry.h中的实现,

父类没法直接隐式转化成子类对象,但是子类要是实现了对应的类型转化函数就可以了,例子可以查看./src/log/Entry.h中的ConcreteEntry::ConcreteEntry& operator=(const Entry& e)

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容