前言
我们知道,列式存储的数据组织形式使得它适合海量数据在线查询、分析的场景,而写入性能相对于读取性能似乎并不那么重要(传统的ORC / Parquet on Hive方案就可以满足多数小时级到天级新鲜度的需求)。但是实际业务的发展是无止境的,越来越多的看板、报表要求分钟级甚至秒级刷新,对数据系统的实时性提出了极大挑战。当然,原始的列存格式并不支持行存风格的in-place update,高频更新势必会造成严重的写放大,这就需要一些trick来解决这个痛点。Deletion Vector就是其中之一,下面简要介绍。
Deletion Vector
Deletion Vector是在OLAP数据库、数据湖等系统中的一种优化设计。顾名思义,Deletion Vector是记录删除标记的向量(本质是位图),用于标记一个特定版本的列存文件中哪些行的数据已经失效。这样,通过把UPDATE语义改写成DELETE + INSERT语义,就可以无需更新旧版本的列存文件,只是在写新版本数据的同时写一个小得多的位图文件而已,吞吐量大大提升了。
下面的示意图来自Delta Lake,可见为了节省空间,Deletion Vector还可以用压缩位图结构(e.g. RoaringBitmap)来存储。
下图示出更新一个Parquet文件中的两行数据的过程,注意file_a_dv_1.bin
就是DV文件,且生成新版本的Parquet文件file_c.parquet
。
而在读取数据时则是经典的Merge-on-Read流程:对于附带有DV的旧版本文件file_a.parquet
,会根据DV过滤掉失效数据(相当于为查询增加一个虚拟谓词,图中为_skip_row_
),并与新版本数据file_c.parquet
做合并,产生最新的数据集。当然,Compaction过程也会根据DV过滤失效数据,并同时更新DV文件的标记。
大家耳熟能详的数据湖四剑客中,Delta Lake和Iceberg的MOR都采用了DV设计(Iceberg有两种删除标记,DV方案称为Position Delete)。而Paimon则是在最新的0.8-SNAPSHOT版本中加入了DV支持,具体可以参考社区公众号的这篇文章。作为比较,Hudi MOR走了相对传统的Base File + Log Files的路线,很明显这种方案可以达到更高的写性能(Log是顺序写),但是Merge阶段要合并的Log Files数据较多,读性能有一定折扣。
下面看看DV在StarRocks中是如何发挥作用的。
DV Implementation in StarRocks
StarRocks的主键模型表能够同时支持高效更新和查询,它的Tablet(即最小存储单元)的结构与传统的明细、聚合、更新模型有较大差别,示意图如下。
其中,Rowset是列存文件,Meta是列存文件的元信息(版本、Delta等),DelVector就是DV。存算一体部署时,Meta和DV都持久化在RocksDB中;存算分离部署时,我们可以在文件系统中直接观察到扩展名为.delvec
的文件,如下图。
StarRocks代码中的DV数据结构名为DelVector
,底层存储直接复用了C++ RoaringBitmap,部分操作代码如下,本质上是对Roaring
容器的操作,简单易懂。
void DelVector::_add_dels(const std::vector<uint32_t>& dels) {
if (!_roaring) {
_roaring = std::make_unique<Roaring>(dels.size(), dels.data());
} else {
_roaring->addMany(dels.size(), dels.data());
}
_update_stats();
}
void DelVector::add_dels_as_new_version(const std::vector<uint32_t>& dels, int64_t version,
std::shared_ptr<DelVector>* pdelvec) const {
CHECK(this != pdelvec->get());
DelVectorPtr tmp(new DelVector());
if (_roaring) {
tmp->_roaring = std::make_unique<Roaring>(*_roaring);
}
tmp->_version = version;
tmp->_loaded = true;
tmp->_add_dels(dels);
tmp.swap(*pdelvec);
}
Status DelVector::load(int64_t version, const char* data, size_t length) {
if (length < 1) {
return Status::Corruption("zero length");
}
if (*data != 0x01) {
return Status::Corruption("invalid flag");
}
data += 1;
length -= 1;
_loaded = true;
_version = version;
if (length > 0) {
_roaring = std::make_unique<Roaring>(Roaring::readSafe(data, length));
}
_update_stats();
return Status::OK();
}
void DelVector::init(int64_t version, const uint32_t* data, size_t length) {
_loaded = true;
_version = version;
if (length > 0) {
_roaring = std::make_unique<Roaring>(length, data);
}
_update_stats();
}
string DelVector::save() const {
string ret;
auto roaring_size = _roaring ? _roaring->getSizeInBytes() : 0;
ret.resize(roaring_size + 1);
ret[0] = 0x01; // one byte flag.
if (roaring_size > 0) {
_roaring->write(ret.data() + 1);
}
return ret;
}
void DelVector::save_to(std::string* str) const {
auto roaring_size = _roaring ? _roaring->getSizeInBytes() : 0;
str->resize(roaring_size + 1);
str->at(0) = 0x01; // one byte flag.
if (roaring_size > 0) {
_roaring->write(str->data() + 1);
}
}
数据写入StarRocks主键模型分为Write和Commit两个阶段,DV在Commit阶段生成,如下图所示。
对应的方法为TabletUpdates::_apply_normal_rowset_commit()
,相关的部分逻辑节选如下。
PrimaryIndex::DeletesMap new_deletes;
// ............
size_t ndelvec = new_deletes.size();
vector<std::pair<uint32_t, DelVectorPtr>> new_del_vecs(ndelvec);
size_t idx = 0;
size_t old_total_del = 0;
size_t new_del = 0;
size_t total_del = 0;
string delvec_change_info;
for (auto& new_delete : new_deletes) {
uint32_t rssid = new_delete.first;
if (rssid >= rowset_id && rssid < rowset_id + rowset->num_segments()) {
// it's newly added rowset's segment, do not have latest delvec yet
new_del_vecs[idx].first = rssid;
new_del_vecs[idx].second = std::make_shared<DelVector>();
auto& del_ids = new_delete.second;
new_del_vecs[idx].second->init(version.major_number(), del_ids.data(), del_ids.size());
if (VLOG_IS_ON(1)) {
StringAppendF(&delvec_change_info, " %u:+%zu", rssid, del_ids.size());
}
new_del += del_ids.size();
total_del += del_ids.size();
} else {
TabletSegmentId tsid;
tsid.tablet_id = tablet_id;
tsid.segment_id = rssid;
DelVectorPtr old_del_vec;
// TODO(cbl): should get the version before this apply version, to be safe
st = manager->get_latest_del_vec(_tablet.data_dir()->get_meta(), tsid, &old_del_vec);
if (!st.ok()) {
std::string msg = strings::Substitute("_apply_rowset_commit error: get_latest_del_vec failed: $0 $1",
st.to_string(), debug_string());
failure_handler(msg, false);
return;
}
new_del_vecs[idx].first = rssid;
old_del_vec->add_dels_as_new_version(new_delete.second, version.major_number(),
&(new_del_vecs[idx].second));
size_t cur_old = old_del_vec->cardinality();
size_t cur_add = new_delete.second.size();
size_t cur_new = new_del_vecs[idx].second->cardinality();
if (cur_old + cur_add != cur_new) {
// should not happen, data inconsistent
LOG(FATAL) << strings::Substitute(
"delvec inconsistent tablet:$0 rssid:$1 #old:$2 #add:$3 #new:$4 old_v:$5 "
"v:$6",
_tablet.tablet_id(), rssid, cur_old, cur_add, cur_new, old_del_vec->version(),
version.major_number());
}
if (VLOG_IS_ON(1)) {
StringAppendF(&delvec_change_info, " %u:%zu(%ld)+%zu=%zu", rssid, cur_old, old_del_vec->version(),
cur_add, cur_new);
}
old_total_del += cur_old;
new_del += cur_add;
total_del += cur_new;
}
// ............
}
在这段逻辑中,首先根据主键索引(PrimaryIndex
)找到所有更新和删除数据的映射表,称为DeletesMap
。接下来根据源RowSet ID rssid
确定这是否为一个新写入的Segment,如果是,只需直接生成新的DV,否则需要将旧的DV与更新数据的标记做合并,作为新版本DV,同时还要将新旧DV的基数做校验,防止数据损坏。
至于Compaction过程,则是通过Compaction Policy和Score确定候选Rowsets并合并完成后,在Commit阶段生成新的DV并更新元数据,逻辑与写入基本相同,不再赘述。