MDS源码分析-6 mdlog

主要类图

可放大后查看原图


mdlog-uml.png

mdlog流程梳理

下面以创建文件(OPENC)为例,来分析mdlog产生、提交及flush的过程

首先,获取到当前的LogSegment

mdr->ls = mdlog->get_current_segment();

那么,当前segment怎么来的呢?

  // 代码很简单,直接取segments的最后一个
  LogSegment *get_current_segment() {
    assert(!segments.empty());
    return segments.rbegin()->second;
  }

这就有必要探究一下segments从何而来
MDSRank::start()函数的尾段,会执行MDSRank::starting_done()

// 也就是,在MDS启动时,会产生第一个LogSegment
void MDSRank::starting_done()
{ 
  dout(3) << "starting_done" << dendl;
  assert(is_starting()); 
  request_state(MDSMap::STATE_ACTIVE);
    
  mdlog->start_new_segment();
}

// in MDLog.h
void start_new_segment() {
  Mutex::Locker l(submit_mutex);
  _start_new_segment();
}

// in MDLog.cc
void MDLog::_start_new_segment()
{
  _prepare_new_segment();
  _journal_segment_subtree_map(NULL);
}

void MDLog::_prepare_new_segment()
{
  assert(submit_mutex.is_locked_by_me());
  // 可见Segment的序号就是它包含的第一个LogEvent的seq
  uint64_t seq = event_seq + 1;
  segments[seq] = new LogSegment(seq);
  ...
  mds->mdcache->advance_stray();
}

void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
{
  assert(submit_mutex.is_locked_by_me());
  ESubtreeMap *sle = mds->mdcache->create_subtree_map();
  sle->event_seq = get_last_segment_seq();

  _submit_entry(sle, new C_MDL_Flushed(this, onsync));
}

除了启动以外,在其它几处,也会产生新的LogSegment:

  1. MDLog::submit_entry()时发现当前Segment的event数量超过mds_log_events_per_segment限制时
  2. MDCache关闭时,会产生一个新的LogSegment,便于flush之前所有的LogSegment
  if (mds->mdlog->get_num_segments() > 0) {
    auto ls = mds->mdlog->get_current_segment();
    if (ls->num_events > 1 || !ls->dirty_dirfrags.empty()) {
      // Current segment contains events other than subtreemap or
      // there are dirty dirfrags (see CDir::log_mark_dirty())
      mds->mdlog->start_new_segment();
      mds->mdlog->flush();
    }
  } 
  1. mds scrub完成后,因为可能修复了部分数据,所以需要立即回刷一次所有LogSegment,采取的方式也是产生一个新的LogSegment,然后flush之前的所有
  2. 显示调用flush_mdlog
产生新的LogEvent,子类型为EUpdate
  // 初始化EmetaBlob,设置事件序号
  EUpdate *le = new EUpdate(mdlog, "openc");
  mdlog->start_entry(le); 
EUpdateLogSegment填充本次请求相关内容
  // 将reqid和client tid记录到EMetaBlob中
  le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); 
  // 将inode分配情况记录到EMetaBlob中                           
  journal_allocated_inos(mdr, &le->metablob);                      
  // 这是一个复杂的过程,简单理解为:更新mdcache中父目录的mtime,并将inode的primary dentry,dir标记为dirty
  mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
  // 将primary dentry信息记录到EMetaBlob中的lump_map中,意味着在LogEvent中包含了完整的dentry信息
  le->metablob.add_primary_dentry(dn, in, true, true, true); 
  // 记录ino
  le->metablob.add_opened_ino(in->ino());
  // 记录open file
  LogSegment *ls = mds->mdlog->get_current_segment();
  ls->open_files.push_back(&in->item_open_file);
构造回调,然后提交log
  C_MDS_openc_finish *fin = new C_MDS_openc_finish(this, mdr, dn, in, follows);
  journal_and_reply(mdr, in, dn, le, fin);

journal_and_reply是一个比较重要的函数,在多种请求流程中都会调用,其过程如下

void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEvent *le, MDSLogContextBase *fin)
{
  assert(!mdr->has_completed);

  // 首先做pin操作.
  mdr->tracei = in;
  if (in)
    mdr->pin(in);

  mdr->tracedn = dn;
  if (dn)
    mdr->pin(dn);
  
  // 先向客户端返回一次reply,状态为unsafe,以便客户端进入后续流程,客户端可能会阻塞等待safe状态的reply
  early_reply(mdr, in, dn);
  
  mdr->committing = true;
  // 提交mdlog
  submit_mdlog_entry(le, fin, mdr, __func__);
  ...
}

void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
                                const char *event)
{
  if (mdr) {
    string event_str("submit entry: ");
    event_str += event;
    mdr->mark_event_string(event_str);
  } 
  mdlog->submit_entry(le, fin);
}

void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
{
  assert(submit_mutex.is_locked_by_me());
  assert(!mds->is_any_replay());
  assert(!capped);

  assert(le == cur_event);
  cur_event = NULL;

  // 将LogEvent注册到LogSegment,并处理LogSegment计数
  // 至此,LogSegment中包含了LogEvent的信息,再flush流程中即可回刷此LogEvent
  assert(!segments.empty());
  LogSegment *ls = segments.rbegin()->second;
  ls->num_events++;
  le->_segment = ls;
  le->update_segment();
  le->set_stamp(ceph_clock_now());
  
  //加入pendding列表,并处理MDLog的计数
  //上层传入的回调对象c,也被包裹成一个PendingEvent被记录,等待回刷完成以后,回调对象将被调用
  mdsmap_up_features = mds->mdsmap->get_up_features();
  pending_events[ls->seq].push_back(PendingEvent(le, c));
  num_events++;
  unflushed++;
  ...
  // 可能需要开启一个新的LogSegment
    _start_new_segment();
  ...
}

至此,LogEvent(EUpdate)被注册到了MDLog,
注意现在还没有持久化
接下来LogEvent交由MDLog的submit线程做journal提交处理

MDLog的submit线程

在RANK初始化时,会触发MDLog的open,open中会启动submit线程

void MDLog::open(MDSInternalContextBase *c)
{   
  ...
  // 创建并启动submit_thread
  submit_thread.create("md_submit");
} 

submit_thread的入口函数为log->_submit_thread()

void MDLog::_submit_thread()
{
  // 互斥锁保护
  submit_mutex.Lock();

  while (!mds->is_daemon_stopping()) {
    ...
    // 不存在pending LogEvent时,线程等待
    map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
    if (it == pending_events.end()) {
      submit_cond.Wait(submit_mutex);
      continue;
    }
    ...

    // 取出pending event
    int64_t features = mdsmap_up_features;
    PendingEvent data = it->second.front();
    it->second.pop_front();

    submit_mutex.Unlock();
    if (data.le) {
      LogEvent *le = data.le;
      LogSegment *ls = le->_segment;
      // 使用各种event类型的encode方法encode features和event本身信息
      // 最后bl中包含了event的全部信息
      bufferlist bl;
      le->encode_with_header(bl, features);
      
      // 处理journaler的log写入位置
      uint64_t write_pos = journaler->get_write_pos();
      le->set_start_off(write_pos);
      if (le->get_type() == EVENT_SUBTREEMAP)
        ls->offset = write_pos;
      
      // 将bl提交给journaler,journaler会再次进行异步回刷
      const uint64_t new_write_pos = journaler->append_entry(bl);
      ls->end = new_write_pos;
      
      // 处理回调,若event未携带回调,则设置一个C_MDL_Flushed回调
      // 对于openc流程来说,此处是有携带回调对象的
      MDSLogContextBase *fin;
      if (data.fin) {
        fin = dynamic_cast<MDSLogContextBase*>(data.fin);
        assert(fin);
        fin->set_write_pos(new_write_pos);
      } else {
        fin = new C_MDL_Flushed(this, new_write_pos);
      }
      // 将回调注册到journaler的waitfor_safe map中,与write_pos相关联
      // 注意此处并非阻塞等待,仅仅只是注册回调,journaler完成flush后会调用此回调
      journaler->wait_for_flush(fin);

      if (data.flush)
        journaler->flush();

      if (logger)
        logger->set(l_mdl_wrpos, ls->end);

      delete le;
    } else {
      // pending event中不包含log event的情况,仅需要等待journaler做flush,之后完成回调即可
      if (data.fin) {
        MDSInternalContextBase* fin =
                dynamic_cast<MDSInternalContextBase*>(data.fin);
        assert(fin);
        C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
        fin2->set_write_pos(journaler->get_write_pos());
        journaler->wait_for_flush(fin2);
      }
      if (data.flush)
        // 不含回调的情况,仅执行journaler flush
        journaler->flush();
    }

    submit_mutex.Lock();
    if (data.flush)
      unflushed = 0;
    else if (data.le)
      unflushed++;
  }

  submit_mutex.Unlock();
}

从上面的分析可知,LogEvent会被submit thread异步的从pending events列表提交到journaler
journaler再进行异步下刷和回调
注意到现在为止,Log仍然未持久化

journaler的创建以及回刷

MDLog::create中,会初始化journaler

void MDLog::create(MDSInternalContextBase *c)
{
  ...
  // 创建journaler
  // 指定pool为mdsmap的metadata pool
  journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
                            CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
                            l_mdl_jlat, mds->finisher);
  assert(journaler->is_readonly());
  journaler->set_write_error_handler(new C_MDL_WriteError(this));
  journaler->set_writeable();
  journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
  journaler->write_head(gather.new_sub());

  ...
}

需要注意,Journaler有多处实现,此处使用的是src/osdc/Journaler.h中定义的,其实现在src/osdc/Journaler.cc
接着上面的提交流程,会执行journaler->append_entry(bl)

uint64_t Journaler::append_entry(bufferlist& bl)
{
  unique_lock l(lock);
  ...

  size_t delta = bl.length() + journal_stream.get_envelope_size();
  // write_buf 的流控,目的是保证可控的内存占用量
  if (!write_buf_throttle.get_or_fail(delta)) {
    l.unlock();
    ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
    write_buf_throttle.get(delta);
    l.lock();
  }

  // buffer写入到journal_stream
  size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
  write_pos += wrote;

  // 根据journal object的layout进行计算
  // 若本次写入导致之前的journal对象被写满了(默认4MB),则立即触发一次flush操作,之前写满的object就被持久化到rados
  uint64_t su = get_layout_period();
  assert(su > 0);
  uint64_t write_off = write_pos % su;
  uint64_t write_obj = write_pos / su;
  uint64_t flush_obj = flush_pos / su;
  if (write_obj != flush_obj) {
    ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
                   << write_obj << " flo " << flush_obj << ")" << dendl;
    _do_flush(write_buf.length() - write_off);

    // if _do_flush() skips flushing some data, it does do a best effort to
    // update next_safe_pos.
    if (write_buf.length() > 0 && 
        write_buf.length() <= wrote) { // the unflushed data are within this entry
      // set next_safe_pos to end of previous entry
      next_safe_pos = write_pos - wrote;
    }    
  }

  return write_pos;
}

至此,Log buffer被写入了journal_stream,但不一定被持久化到了RADOS
journaler的回刷需要从MDLog层触发,由mdlog->flush()触发

void MDLog::flush()
{
  submit_mutex.Lock();

  bool do_flush = unflushed > 0; 
  unflushed = 0; 
  if (!pending_events.empty()) {
    pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
    do_flush = false;
    submit_cond.Signal();
  }

  submit_mutex.Unlock();

  if (do_flush)
    journaler->flush();
}

调用mdlog->flush()的地方比较多,主要由几类:

  1. src/mds/Locker.cc中,需要强制flush以确保一致性的流程
  2. src/mds/MDCache.cc中,需要强制flush以确保一致性的流程
  3. src/mds/Migrator.cc中,目录迁移的场景,需要立即回刷
  4. src/mds/Server.cc中,一些变更类流程中,可能需要立即回刷
  5. src/mds/MDSRank.cc中,在周期性的tick()流程中,调用mdlog->flush()

journaler回刷完成以后,会调用注册的回调,以驱动上层流程继续执行

void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
{
  lock_guard l(lock);
  assert(!readonly);
  
  if (r < 0) { 
    lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
    handle_write_error(r);
    return;
  }
  
  ...
  
  // adjust safe_pos
  auto it = pending_safe.find(start);
  assert(it != pending_safe.end());
  uint64_t min_next_safe_pos = pending_safe.begin()->second;
  pending_safe.erase(it);
  if (pending_safe.empty())
    safe_pos = next_safe_pos;
  else
    safe_pos = min_next_safe_pos;

  // kick waiters <= safe_pos
  if (!waitfor_safe.empty()) {
    list<Context*> ls;
    while (!waitfor_safe.empty()) {
      auto it = waitfor_safe.begin();
      if (it->first > safe_pos)
        break;
      ls.splice(ls.end(), it->second);
      waitfor_safe.erase(it);
    } 
    // 完成回调
    finish_contexts(cct, ls);
  } 
}

template<class A>
inline void finish_contexts(CephContext *cct, std::list<A*>& finished, 
                            int result = 0)
{
  if (finished.empty())
    return;

  list<A*> ls;
  ls.swap(finished); // swap out of place to avoid weird loops

  if (cct)
    mydout(cct, 10) << ls.size() << " contexts to finish with " << result << dendl;
  typename std::list<A*>::iterator it;
  for (it = ls.begin(); it != ls.end(); it++) {
    A *c = *it;
    if (cct)
      mydout(cct,10) << "---- " << c << dendl;
    c->complete(result);
  }
}

最终执行各种回调的complete,通过Context的负载的继承关系,最终调用finish
C_MDS_openc_finish为例

class C_MDS_openc_finish : public ServerLogContext { 
  CDentry *dn; 
  CInode *newi;
  snapid_t follows;
public:
  C_MDS_openc_finish(Server *s, MDRequestRef& r, CDentry *d, CInode *ni, snapid_t f) : 
    ServerLogContext(s, r), dn(d), newi(ni), follows(f) {}

  // 对应的mdlog flush以后,回调到此函数
  void finish(int r) override {
    assert(r == 0);
    // 指向的临时变更的inode信息无效了,去掉
    dn->pop_projected_linkage();

    // 将mdcache中的inode、dentry、dir标记为dirty,待回刷到RADOS
    newi->inode.version--;   // a bit hacky, see C_MDS_mknod_finish
    newi->mark_dirty(newi->inode.version+1, mdr->ls);
    newi->mark_dirty_parent(mdr->ls, true);
    mdr->apply();
    get_mds()->locker->share_inode_max_size(newi);
    MDRequestRef null_ref;
    get_mds()->mdcache->send_dentry_link(dn, null_ref);
    
    // 热度统计?
    utime_t now = ceph_clock_now();
    get_mds()->balancer->hit_inode(now, newi, META_POP_IWR);

    // 返回客户端消息
    server->respond_to_request(mdr, 0);

    assert(g_conf->mds_kill_openc_at != 1);
  }
};
总结

mdlog会经历以下步骤:

  1. 在各种请求的handler函数中产生各种类型的LogEvent
  2. LogEvent被注册到当前LogSegment中
  3. MDLog的submit线程将LogSegment中注册的pending events提交给journaler
  4. 通过MDLog的flush完成journaler的flush
  • MDSRank的tick周期过程中会调用MDLog的flush
  • 在各种需要一致性保障的流程中,会显示进行强制MDLog的flush
  1. journaler的flush将log event顺序写入RADOS中的journal对象
  • log event提交journal时,会检查是否写满一个4M对象,写满后该4M对象会被立即回刷
  • 上层显示调用flush时,会检查是否有未回刷数据,若有,则会以追加写方式回刷RADOS对象
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容