整体架构
-
BlueStore
中,对KV存储进行了一层抽象,以支持多种不同的KV存储引擎,抽象类型为KeyValueDB
,BlueStore
包含了一个KeyValueDB
类型的成员db
,元数据的存取均由db完成 -
KeyValueDB
的缺省实现类型为RocksDBStore
,其中包含一个rocksdb::DB
实例 -
rocksdb::DB
实例中包含一个rocksdb::Env
的实现,通过Env完成RocksDB与实际存储介质之间的连接 - 由于
BlueStore
采用裸块设备保存数据,而RocksDB
是基于文件系统语义的,所以需要中间层BlueFS
来完成RocksDB与BlockDevice
之间的连接 - 为保持
BlueFS
单一的文件系统职责,于是在其上又增加了一层BlueRocksEnv
实现,用来对接RocksDB
和BlueFS
,完成Env接口到文件系统接口的转换 -
BlueFS
作为bluestore的一部分,底层使用了一致的BlockDevice
接口,使得BlueFS
能够运行在多种块设备实现之上,充分解耦
主要类图如下:
bluefs-rocksdb.png
在以RocksDB为KV存储引擎,KernelDevice为实际存储设备的情况下,可将类图简化为如下结构:
bluefs-rocksdb-simple.png
进一步简化为如下层次结构:
bluefs-rocksdb.jpg
DB初始化
在执行OSD部署时,会由ceph-volume等工具来执行ceph-osd --mkfs ......
命令,此命令会触发DB的初始化过程
src/ceph_osd.cc
int main(int argc, const char **argv)
{
if (mkfs) {
...
int err = OSD::mkfs(g_ceph_context, store, g_conf->osd_data,
mc.monmap.fsid, whoami);
...
}
}
src/osd/OSD.cc
int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
uuid_d fsid, int whoami)
{
...
ret = store->mkfs();
...
}
src/os/bluestore/BlueStore.cc
int BlueStore::mkfs()
{
...
// 注意参数为true,代表创建DB
r = _open_db(true);
...
}
int BlueStore::_open_db(bool create)
{
int r;
assert(!db);
string fn = path + "/db";
string options;
stringstream err;
ceph::shared_ptr<Int64ArrayMergeOperator> merge_op(new Int64ArrayMergeOperator);
string kv_backend;
if (create) {
// kv_backend = "rocksdb"
kv_backend = cct->_conf->bluestore_kvbackend;
} else {
...
}
bool do_bluefs;
if (create) {
// do_bluefs = true
do_bluefs = cct->_conf->bluestore_bluefs;
} else {
...
}
//开始构建
rocksdb::Env *env = NULL;
if (do_bluefs) {
...
// 生成BlueFS实例
bluefs = new BlueFS(cct);
string bfn;
struct stat st;
bfn = path + "/block.db"; // DB设备路径
// DB设备路径存在且读取正常
if (::stat(bfn.c_str(), &st) == 0) {
//向BlueFS添加DB设备
r = bluefs->add_block_device(BlueFS::BDEV_DB, bfn);
...
// 设置设备label
...
if (create) {
// 预留8K空间,label (4k) + bluefs super (4k),数据区域从8K开始
bluefs->add_block_extent(
BlueFS::BDEV_DB,
SUPER_RESERVED,
bluefs->get_block_device_size(BlueFS::BDEV_DB) - SUPER_RESERVED);
}
bluefs_shared_bdev = BlueFS::BDEV_SLOW;
bluefs_single_shared_device = false;
} else {
// DB设备路径不存在
bluefs_shared_bdev = BlueFS::BDEV_DB
...
}
// 添加Block设备到BlueFS
bfn = path + "/block";
r = bluefs->add_block_device(bluefs_shared_bdev, bfn);
...
if (create) {
// 由于block设备是共享空间的,只能使用一部分用于DB存储
// 初始大小 = (最小比例 + 每次最大扩展比例 = 0.02 + 0,02)* block设备总容量
uint64_t initial =
bdev->get_size() * (cct->_conf->bluestore_bluefs_min_ratio +
cct->_conf->bluestore_bluefs_gift_ratio);
// 初始容量取计算值和配置的最小值(1GB)中较大的,初始DB占用至少为1GB
initial = MAX(initial, cct->_conf->bluestore_bluefs_min);
...
// 基于 alloc_size做对齐,从设备的中间位置开始,分配初始空间给bluefs
initial = P2ROUNDUP(initial, cct->_conf->bluefs_alloc_size);
// put bluefs in the middle of the device in case it is an HDD
uint64_t start = P2ALIGN((bdev->get_size() - initial) / 2,
cct->_conf->bluefs_alloc_size);
bluefs->add_block_extent(bluefs_shared_bdev, start, initial);
bluefs_extents.insert(start, initial);
}
bfn = path + "/block.wal";
if (::stat(bfn.c_str(), &st) == 0) {
// WAL设备存在且正常读取的情况,添加WAL设备到bluefs
r = bluefs->add_block_device(BlueFS::BDEV_WAL, bfn);
...
if (create) {
// WAL设备仅预留Label的4K空间
bluefs->add_block_extent(
BlueFS::BDEV_WAL, BDEV_LABEL_BLOCK_SIZE,
bluefs->get_block_device_size(BlueFS::BDEV_WAL) -
BDEV_LABEL_BLOCK_SIZE);
}
cct->_conf->set_val("rocksdb_separate_wal_dir", "true");
bluefs_single_shared_device = false;
} else {
r = -errno;
if (::lstat(bfn.c_str(), &st) == -1) {
// WAL设备不存在的情况
r = 0;
cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
} else {
derr << __func__ << " " << bfn << " symlink exists but target unusable: "
<< cpp_strerror(r) << dendl;
goto free_bluefs;
}
}
// WAL、DB、Block设备分别设置好以后,执行mkfs初始化
if (create) {
bluefs->mkfs(fsid);
}
// 然后做mount
r = bluefs->mount();
...
if (cct->_conf->bluestore_bluefs_env_mirror) {
// 启用mirror env 做debug的情况
} else {
// 将初始化好的bluefs传入env,进行对接
env = new BlueRocksEnv(bluefs);
// 设置db目录名为db, 逻辑上为/db目录
fn = "db";
}
if (bluefs_shared_bdev == BlueFS::BDEV_SLOW) {
// DB和block设备分离的情况,那么db设备将会是两个,不包括WAL设备
// 两个设备将分别对应一个目录,为: /db /db.slow
ostringstream db_paths;
uint64_t db_size = bluefs->get_block_device_size(BlueFS::BDEV_DB);
uint64_t slow_size = bluefs->get_block_device_size(BlueFS::BDEV_SLOW);
db_paths << fn << ","
<< (uint64_t)(db_size * 95 / 100) << " "
<< fn + ".slow" << ","
<< (uint64_t)(slow_size * 95 / 100);
cct->_conf->set_val("rocksdb_db_paths", db_paths.str(), false);
}
if (create) {
// /db目录总是会创建
env->CreateDir(fn);
// 存在独立WAL设备的情况,创建/db.wal目录
if (cct->_conf->rocksdb_separate_wal_dir)
env->CreateDir(fn + ".wal");
// DB和block设备分离的情况,创建/db.slow目录
if (cct->_conf->get_val<std::string>("rocksdb_db_paths").length())
env->CreateDir(fn + ".slow");
}
} else if (create) {
// 不使用bluefs,创建本地目录的情况
}
// 创建出RocksDBStore实例,传入包含了bluefs的env,完成全部层次的对接
db = KeyValueDB::create(cct,
kv_backend,
fn,
static_cast<void*>(env));
if (!db) {
// 创建DB失败的情况
}
FreelistManager::setup_merge_operators(db);
db->set_merge_operator(PREFIX_STAT, merge_op);
db->set_cache_size(cache_kv_ratio * cache_size);
if (kv_backend == "rocksdb"){
// 使用ceph.conf中的自定义rocksdb参数
options = cct->_conf->bluestore_rocksdb_options;
}
// 执行db的初始化过程,使用自定义的参数
db->init(options);
// open DB
if (create)
r = db->create_and_open(err);
else
r = db->open(err);
if (r) {
//打开失败的情况
}
return 0;
}
在上面的_open_db
过程中,依次经历了四个重要的过程,下面分别进行分析:
bluefs->mkfs(fsid)
int BlueFS::mkfs(uuid_d osd_uuid)
{
std::unique_lock<std::mutex> l(lock);
// 为每一个bdev生成一个allocator,将设备的可用空间全部添加为allocator的free空间
_init_alloc();
// 初始化bluefs的各种性能计数器
_init_logger();
// 初始化superblock信息
super.version = 1;
super.block_size = bdev[BDEV_DB]->get_block_size();
super.osd_uuid = osd_uuid;
// 初始化superblock的唯一标识,生成随机的uuid
super.uuid.generate_random();
dout(1) << __func__ << " uuid " << super.uuid << dendl;
// 初始化log文件
FileRef log_file = new File;
// 注意log文件的inode number为1
log_file->fnode.ino = 1;
// 指定log文件的优选设备为WAL设备,log文件空间分配时,设备优先级为WAL > DB > Block
log_file->fnode.prefer_bdev = BDEV_WAL;
// 为log文件分配空间,预分配 bluefs_max_log_runway=4MB 空间
int r = _allocate(
log_file->fnode.prefer_bdev,
cct->_conf->bluefs_max_log_runway,
&log_file->fnode);
assert(r == 0);
// 创建一个BlueFS::FileWriter,其为支持内存Buffered write的结构
log_writer = _create_writer(log_file);
// 构建日志事务,记录空间分配操作,因为此前为log文件做了4MB的空间分配
log_t.op_init();
for (unsigned bdev = 0; bdev < MAX_BDEV; ++bdev) {
interval_set<uint64_t>& p = block_all[bdev];
if (p.empty())
continue;
for (interval_set<uint64_t>::iterator q = p.begin(); q != p.end(); ++q) {
log_t.op_alloc_add(bdev, q.get_start(), q.get_len());
}
}
// 下刷日志事务,同步等待
// 此过程较为复杂,先简单理解为将log file writer中的内存buffer下刷到bdev中,并触发bdev->flush()操作,确保日志数据落盘
// 在后续的写流程分析中再做详细分析
_flush_and_sync_log(l);
// 将superblock信息写入到DB设备的4KB开始位置
super.log_fnode = log_file->fnode;
_write_super();
// 依次调用各bdev的flush操作,确保数据落盘
flush_bdev();
// 至此,superblock被写盘,且已经产生了日志文件,该文件的空间分配情况也被落盘记录
// 接下来是释放操作并返回
super = bluefs_super_t();
_close_writer(log_writer);
log_writer = NULL;
block_all.clear();
block_total.clear();
_stop_alloc();
_shutdown_logger();
dout(10) << __func__ << " success" << dendl;
return 0;
}
bluefs->mount()
int BlueFS::mount()
{
// 读取superblock并做crc校验
int r = _open_super();
...
// block_all保存了每个bdev已分配的extend记录
block_all.clear();
block_all.resize(MAX_BDEV);
// block_total记录每个bdev分配的extend长度总和,即已分配空间
block_total.clear();
block_total.resize(MAX_BDEV, 0);
// 为每个bdev初始化allocator
_init_alloc();
// 过程略为复杂,读取ino为1的文件即log文件,
// 遍历文件中的bluefs transaction,遍历每个transaction中的每个op,针对不同的op类型做不同的回放操作
// 基于op记录在内存中重建allocator的free空间,重建目录和文件信息,更新block_all和block_total记录
// bluefs没有记录inode、dentry等等常见的文件系统元数据,每次mount时对log做回放操作,来还原内存的文件系统状态
// 这是基于一种假设:rocksdb中的文件数量比较少,且空间分配的连续性比较好,log量就不会太大
r = _replay(false);
...
// 将文件已占用的空间从allocator的free list移除,还原allocator的空间分配情况
for (auto& p : file_map) {
dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl;
for (auto& q : p.second->fnode.extents) {
alloc[q.bdev]->init_rm_free(q.offset, q.length);
}
}
// 初始化bluefs的log writer和下次写入位置
log_writer = _create_writer(_get_file(1));
assert(log_writer->file->fnode.ino == 1);
log_writer->pos = log_writer->file->fnode.size;
_init_logger();
return 0;
out:
super = bluefs_super_t();
return r;
}
-
db->init(options)
src/kv/RocksDBStore.cc
int RocksDBStore::init(string _options_str)
{
// 将传入的rocksdb配置参数串保存下来,供后续的create_and_open使用
options_str = _options_str;
// 接下来只是对传入配置参数串做一次参数解析,以确保配置串的正确、合法
rocksdb::Options opt;
//try parse options
if (options_str.length()) {
int r = ParseOptionsFromString(options_str, opt);
if (r != 0) {
return -EINVAL;
}
}
return 0;
}
-
db->create_and_open(err)
src/kv/RocksDBStore.cc
int RocksDBStore::create_and_open(ostream &out)
{
if (env) {
// 通过env创建底层FS的db目录
unique_ptr<rocksdb::Directory> dir;
env->NewDirectory(path, &dir);
} else {
// 使用本地目录的情况
...
}
// 主要逻辑均在do_open里面,第二个参数表示是否为新创建db
return do_open(out, true);
}
int RocksDBStore::do_open(ostream &out, bool create_if_missing)
{
rocksdb::Options opt;
rocksdb::Status status;
// 正式转换配置串为rocksdb options
if (options_str.length()) {
int r = ParseOptionsFromString(options_str, opt);
if (r != 0) {
return -EINVAL;
}
}
// 打开rocksdb性能统计时的计数器初始化,默认不打开性能统计,因为它会降低rocksdb大约10%的性能
if (g_conf->rocksdb_perf) {
dbstats = rocksdb::CreateDBStatistics();
opt.statistics = dbstats;
}
opt.create_if_missing = create_if_missing;
// 有WAL设备时,需要建立独立的db.wal目录
if (g_conf->rocksdb_separate_wal_dir) {
opt.wal_dir = path + ".wal";
}
// 将Bluestore中生成的db设备path和size配置字符串转换为opt.db_paths列表格式
if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
list<string> paths;
get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
for (auto& p : paths) {
size_t pos = p.find(',');
if (pos == std::string::npos) {
return -EINVAL;
}
string path = p.substr(0, pos);
string size_str = p.substr(pos + 1);
uint64_t size = atoll(size_str.c_str());
if (!size) {
return -EINVAL;
}
opt.db_paths.push_back(rocksdb::DbPath(path, size));
dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
}
}
// 将rocksdb的日志输出对接到ceph的日志输出
if (g_conf->rocksdb_log_to_ceph_log) {
opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
}
// 将自定义的env设置到options中,此处为BlueRocksEnv实现,对接到BlueFS
if (priv) {
dout(10) << __func__ << " using custom Env " << priv << dendl;
opt.env = static_cast<rocksdb::Env*>(priv);
}
// rocksdb cache相关的设置,省略
...
// 打开DB,由于前面在opt中设置了create_if_missing,所以首次打开时,会创建DB
status = rocksdb::DB::Open(opt, path, &db);
if (!status.ok()) {
derr << status.ToString() << dendl;
return -EINVAL;
}
// 性能计数器PerfCounters相关的初始化,省略
...
// 若在rocksdb的配置串中指定了:compact_on_mount = true,则在open之后立即执行一次compact操作,默认未指定此设置
if (compact_on_mount) {
derr << "Compacting rocksdb store..." << dendl;
compact();
derr << "Finished compacting rocksdb store" << dendl;
}
return 0;
}