主要类图
可放大后查看原图

mdlog流程梳理
下面以创建文件(OPENC)为例,来分析mdlog产生、提交及flush的过程
首先,获取到当前的LogSegment
mdr->ls = mdlog->get_current_segment();
那么,当前segment怎么来的呢?
// 代码很简单,直接取segments的最后一个
LogSegment *get_current_segment() {
assert(!segments.empty());
return segments.rbegin()->second;
}
这就有必要探究一下segments从何而来
MDSRank::start()函数的尾段,会执行MDSRank::starting_done()
// 也就是,在MDS启动时,会产生第一个LogSegment
void MDSRank::starting_done()
{
dout(3) << "starting_done" << dendl;
assert(is_starting());
request_state(MDSMap::STATE_ACTIVE);
mdlog->start_new_segment();
}
// in MDLog.h
void start_new_segment() {
Mutex::Locker l(submit_mutex);
_start_new_segment();
}
// in MDLog.cc
void MDLog::_start_new_segment()
{
_prepare_new_segment();
_journal_segment_subtree_map(NULL);
}
void MDLog::_prepare_new_segment()
{
assert(submit_mutex.is_locked_by_me());
// 可见Segment的序号就是它包含的第一个LogEvent的seq
uint64_t seq = event_seq + 1;
segments[seq] = new LogSegment(seq);
...
mds->mdcache->advance_stray();
}
void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
{
assert(submit_mutex.is_locked_by_me());
ESubtreeMap *sle = mds->mdcache->create_subtree_map();
sle->event_seq = get_last_segment_seq();
_submit_entry(sle, new C_MDL_Flushed(this, onsync));
}
除了启动以外,在其它几处,也会产生新的LogSegment:
-
MDLog::submit_entry()时发现当前Segment的event数量超过mds_log_events_per_segment限制时 -
MDCache关闭时,会产生一个新的LogSegment,便于flush之前所有的LogSegment
if (mds->mdlog->get_num_segments() > 0) {
auto ls = mds->mdlog->get_current_segment();
if (ls->num_events > 1 || !ls->dirty_dirfrags.empty()) {
// Current segment contains events other than subtreemap or
// there are dirty dirfrags (see CDir::log_mark_dirty())
mds->mdlog->start_new_segment();
mds->mdlog->flush();
}
}
- mds scrub完成后,因为可能修复了部分数据,所以需要立即回刷一次所有LogSegment,采取的方式也是产生一个新的
LogSegment,然后flush之前的所有 - 显示调用
flush_mdlog
产生新的LogEvent,子类型为EUpdate
// 初始化EmetaBlob,设置事件序号
EUpdate *le = new EUpdate(mdlog, "openc");
mdlog->start_entry(le);
为EUpdate和LogSegment填充本次请求相关内容
// 将reqid和client tid记录到EMetaBlob中
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
// 将inode分配情况记录到EMetaBlob中
journal_allocated_inos(mdr, &le->metablob);
// 这是一个复杂的过程,简单理解为:更新mdcache中父目录的mtime,并将inode的primary dentry,dir标记为dirty
mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
// 将primary dentry信息记录到EMetaBlob中的lump_map中,意味着在LogEvent中包含了完整的dentry信息
le->metablob.add_primary_dentry(dn, in, true, true, true);
// 记录ino
le->metablob.add_opened_ino(in->ino());
// 记录open file
LogSegment *ls = mds->mdlog->get_current_segment();
ls->open_files.push_back(&in->item_open_file);
构造回调,然后提交log
C_MDS_openc_finish *fin = new C_MDS_openc_finish(this, mdr, dn, in, follows);
journal_and_reply(mdr, in, dn, le, fin);
journal_and_reply是一个比较重要的函数,在多种请求流程中都会调用,其过程如下
void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEvent *le, MDSLogContextBase *fin)
{
assert(!mdr->has_completed);
// 首先做pin操作.
mdr->tracei = in;
if (in)
mdr->pin(in);
mdr->tracedn = dn;
if (dn)
mdr->pin(dn);
// 先向客户端返回一次reply,状态为unsafe,以便客户端进入后续流程,客户端可能会阻塞等待safe状态的reply
early_reply(mdr, in, dn);
mdr->committing = true;
// 提交mdlog
submit_mdlog_entry(le, fin, mdr, __func__);
...
}
void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
const char *event)
{
if (mdr) {
string event_str("submit entry: ");
event_str += event;
mdr->mark_event_string(event_str);
}
mdlog->submit_entry(le, fin);
}
void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
{
assert(submit_mutex.is_locked_by_me());
assert(!mds->is_any_replay());
assert(!capped);
assert(le == cur_event);
cur_event = NULL;
// 将LogEvent注册到LogSegment,并处理LogSegment计数
// 至此,LogSegment中包含了LogEvent的信息,再flush流程中即可回刷此LogEvent
assert(!segments.empty());
LogSegment *ls = segments.rbegin()->second;
ls->num_events++;
le->_segment = ls;
le->update_segment();
le->set_stamp(ceph_clock_now());
//加入pendding列表,并处理MDLog的计数
//上层传入的回调对象c,也被包裹成一个PendingEvent被记录,等待回刷完成以后,回调对象将被调用
mdsmap_up_features = mds->mdsmap->get_up_features();
pending_events[ls->seq].push_back(PendingEvent(le, c));
num_events++;
unflushed++;
...
// 可能需要开启一个新的LogSegment
_start_new_segment();
...
}
至此,LogEvent(EUpdate)被注册到了MDLog,
注意现在还没有持久化
接下来LogEvent交由MDLog的submit线程做journal提交处理
MDLog的submit线程
在RANK初始化时,会触发MDLog的open,open中会启动submit线程
void MDLog::open(MDSInternalContextBase *c)
{
...
// 创建并启动submit_thread
submit_thread.create("md_submit");
}
submit_thread的入口函数为log->_submit_thread()
void MDLog::_submit_thread()
{
// 互斥锁保护
submit_mutex.Lock();
while (!mds->is_daemon_stopping()) {
...
// 不存在pending LogEvent时,线程等待
map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
if (it == pending_events.end()) {
submit_cond.Wait(submit_mutex);
continue;
}
...
// 取出pending event
int64_t features = mdsmap_up_features;
PendingEvent data = it->second.front();
it->second.pop_front();
submit_mutex.Unlock();
if (data.le) {
LogEvent *le = data.le;
LogSegment *ls = le->_segment;
// 使用各种event类型的encode方法encode features和event本身信息
// 最后bl中包含了event的全部信息
bufferlist bl;
le->encode_with_header(bl, features);
// 处理journaler的log写入位置
uint64_t write_pos = journaler->get_write_pos();
le->set_start_off(write_pos);
if (le->get_type() == EVENT_SUBTREEMAP)
ls->offset = write_pos;
// 将bl提交给journaler,journaler会再次进行异步回刷
const uint64_t new_write_pos = journaler->append_entry(bl);
ls->end = new_write_pos;
// 处理回调,若event未携带回调,则设置一个C_MDL_Flushed回调
// 对于openc流程来说,此处是有携带回调对象的
MDSLogContextBase *fin;
if (data.fin) {
fin = dynamic_cast<MDSLogContextBase*>(data.fin);
assert(fin);
fin->set_write_pos(new_write_pos);
} else {
fin = new C_MDL_Flushed(this, new_write_pos);
}
// 将回调注册到journaler的waitfor_safe map中,与write_pos相关联
// 注意此处并非阻塞等待,仅仅只是注册回调,journaler完成flush后会调用此回调
journaler->wait_for_flush(fin);
if (data.flush)
journaler->flush();
if (logger)
logger->set(l_mdl_wrpos, ls->end);
delete le;
} else {
// pending event中不包含log event的情况,仅需要等待journaler做flush,之后完成回调即可
if (data.fin) {
MDSInternalContextBase* fin =
dynamic_cast<MDSInternalContextBase*>(data.fin);
assert(fin);
C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
fin2->set_write_pos(journaler->get_write_pos());
journaler->wait_for_flush(fin2);
}
if (data.flush)
// 不含回调的情况,仅执行journaler flush
journaler->flush();
}
submit_mutex.Lock();
if (data.flush)
unflushed = 0;
else if (data.le)
unflushed++;
}
submit_mutex.Unlock();
}
从上面的分析可知,LogEvent会被submit thread异步的从pending events列表提交到journaler,
journaler再进行异步下刷和回调
注意到现在为止,Log仍然未持久化
journaler的创建以及回刷
在MDLog::create中,会初始化journaler
void MDLog::create(MDSInternalContextBase *c)
{
...
// 创建journaler
// 指定pool为mdsmap的metadata pool
journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
l_mdl_jlat, mds->finisher);
assert(journaler->is_readonly());
journaler->set_write_error_handler(new C_MDL_WriteError(this));
journaler->set_writeable();
journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
journaler->write_head(gather.new_sub());
...
}
需要注意,Journaler有多处实现,此处使用的是src/osdc/Journaler.h中定义的,其实现在src/osdc/Journaler.cc中
接着上面的提交流程,会执行journaler->append_entry(bl)
uint64_t Journaler::append_entry(bufferlist& bl)
{
unique_lock l(lock);
...
size_t delta = bl.length() + journal_stream.get_envelope_size();
// write_buf 的流控,目的是保证可控的内存占用量
if (!write_buf_throttle.get_or_fail(delta)) {
l.unlock();
ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl;
write_buf_throttle.get(delta);
l.lock();
}
// buffer写入到journal_stream
size_t wrote = journal_stream.write(bl, &write_buf, write_pos);
write_pos += wrote;
// 根据journal object的layout进行计算
// 若本次写入导致之前的journal对象被写满了(默认4MB),则立即触发一次flush操作,之前写满的object就被持久化到rados
uint64_t su = get_layout_period();
assert(su > 0);
uint64_t write_off = write_pos % su;
uint64_t write_obj = write_pos / su;
uint64_t flush_obj = flush_pos / su;
if (write_obj != flush_obj) {
ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
<< write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
// if _do_flush() skips flushing some data, it does do a best effort to
// update next_safe_pos.
if (write_buf.length() > 0 &&
write_buf.length() <= wrote) { // the unflushed data are within this entry
// set next_safe_pos to end of previous entry
next_safe_pos = write_pos - wrote;
}
}
return write_pos;
}
至此,Log buffer被写入了journal_stream,但不一定被持久化到了RADOS
journaler的回刷需要从MDLog层触发,由mdlog->flush()触发
void MDLog::flush()
{
submit_mutex.Lock();
bool do_flush = unflushed > 0;
unflushed = 0;
if (!pending_events.empty()) {
pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
do_flush = false;
submit_cond.Signal();
}
submit_mutex.Unlock();
if (do_flush)
journaler->flush();
}
调用mdlog->flush()的地方比较多,主要由几类:
-
src/mds/Locker.cc中,需要强制flush以确保一致性的流程 -
src/mds/MDCache.cc中,需要强制flush以确保一致性的流程 -
src/mds/Migrator.cc中,目录迁移的场景,需要立即回刷 -
src/mds/Server.cc中,一些变更类流程中,可能需要立即回刷 src/mds/MDSRank.cc中,在周期性的tick()流程中,调用mdlog->flush()
journaler回刷完成以后,会调用注册的回调,以驱动上层流程继续执行
void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
{
lock_guard l(lock);
assert(!readonly);
if (r < 0) {
lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
handle_write_error(r);
return;
}
...
// adjust safe_pos
auto it = pending_safe.find(start);
assert(it != pending_safe.end());
uint64_t min_next_safe_pos = pending_safe.begin()->second;
pending_safe.erase(it);
if (pending_safe.empty())
safe_pos = next_safe_pos;
else
safe_pos = min_next_safe_pos;
// kick waiters <= safe_pos
if (!waitfor_safe.empty()) {
list<Context*> ls;
while (!waitfor_safe.empty()) {
auto it = waitfor_safe.begin();
if (it->first > safe_pos)
break;
ls.splice(ls.end(), it->second);
waitfor_safe.erase(it);
}
// 完成回调
finish_contexts(cct, ls);
}
}
template<class A>
inline void finish_contexts(CephContext *cct, std::list<A*>& finished,
int result = 0)
{
if (finished.empty())
return;
list<A*> ls;
ls.swap(finished); // swap out of place to avoid weird loops
if (cct)
mydout(cct, 10) << ls.size() << " contexts to finish with " << result << dendl;
typename std::list<A*>::iterator it;
for (it = ls.begin(); it != ls.end(); it++) {
A *c = *it;
if (cct)
mydout(cct,10) << "---- " << c << dendl;
c->complete(result);
}
}
最终执行各种回调的complete,通过Context的负载的继承关系,最终调用finish
以C_MDS_openc_finish为例
class C_MDS_openc_finish : public ServerLogContext {
CDentry *dn;
CInode *newi;
snapid_t follows;
public:
C_MDS_openc_finish(Server *s, MDRequestRef& r, CDentry *d, CInode *ni, snapid_t f) :
ServerLogContext(s, r), dn(d), newi(ni), follows(f) {}
// 对应的mdlog flush以后,回调到此函数
void finish(int r) override {
assert(r == 0);
// 指向的临时变更的inode信息无效了,去掉
dn->pop_projected_linkage();
// 将mdcache中的inode、dentry、dir标记为dirty,待回刷到RADOS
newi->inode.version--; // a bit hacky, see C_MDS_mknod_finish
newi->mark_dirty(newi->inode.version+1, mdr->ls);
newi->mark_dirty_parent(mdr->ls, true);
mdr->apply();
get_mds()->locker->share_inode_max_size(newi);
MDRequestRef null_ref;
get_mds()->mdcache->send_dentry_link(dn, null_ref);
// 热度统计?
utime_t now = ceph_clock_now();
get_mds()->balancer->hit_inode(now, newi, META_POP_IWR);
// 返回客户端消息
server->respond_to_request(mdr, 0);
assert(g_conf->mds_kill_openc_at != 1);
}
};
总结
mdlog会经历以下步骤:
- 在各种请求的handler函数中产生各种类型的LogEvent
- LogEvent被注册到当前LogSegment中
- MDLog的submit线程将LogSegment中注册的pending events提交给journaler
- 通过MDLog的flush完成journaler的flush
- MDSRank的tick周期过程中会调用MDLog的flush
- 在各种需要一致性保障的流程中,会显示进行强制MDLog的flush
- journaler的flush将log event顺序写入RADOS中的journal对象
- log event提交journal时,会检查是否写满一个4M对象,写满后该4M对象会被立即回刷
- 上层显示调用flush时,会检查是否有未回刷数据,若有,则会以追加写方式回刷RADOS对象