📚 Catalog 表管理
Catalog 在整体 havenask 中的位置,主要负责表管理。
具体的代码目录在 aios/catalog
先介绍里面涉及几个概念:
- 🗂️ catalog:目录
- 🗄️ database:数据库
- 📋 table:表
- 🧩 Partition:分区
- 📝 TableStructure:表结构
- 👥 TableGroup:表组
- 🔄 LoadStrategy:加载策略
- 🔧 Function:函数
- 🏗️ Build:构建
我们下面会挨个介绍,先看顶层的 Catalog。
🗂️ Catalog 目录
目录 (Catalog) 是 catalog 系统中用来组织和管理多个数据库 (Database) 的顶层容器。它是用户访问 catalog 系统元信息的入口点。
整个实体的关系是:
目录 (Catalog) -> 数据库 (Database) -> 表 (Table) / 表组 (TableGroup) / 函数 (Function)
这些实体都继承自一个基类:EntityBase
,将上面实体的共性都抽象出来,包括:
- 🆔 它们都有一个唯一的标识符(ID)
- 🔢 它们都有一个版本号(CatalogVersion),用于跟踪状态变化
- 🚦 它们都有一个状态(proto::EntityStatus),表示当前生命周期阶段(如正常、待发布、待删除等)
- 🔄 它们都需要支持克隆自身(创建一个完全独立且相同的副本)
- 🔄 它们可能都需要支持更新自身的状态,并且状态的更新可能需要传播给它们的子实体
在实现技巧上:EntityBase
是一个模板类,它使用了一种叫做 Curiously Recurring Template Pattern (CRTP) 的技巧,他大概的实现如下:
// File: entity/EntityBase.h (片段)
template <typename Self> // Self 是指继承 EntityBase 的具体类
class EntityBase {
private:
// CRTP 技巧:通过 static_cast 访问派生类方法
const Self *self() const { return static_cast<const Self*>(this); }
Self *self() { return static_cast<Self*>(this); }
}
其中 (template <typename Self>
和 self()
方法): 这是 EntityBase
设计的核心。基类 EntityBase
将继承它的派生类 Self
作为模板参数。通过 static_cast<const Self*>(this)
(在 const 方法中) 或 static_cast<Self*>(this)
(在非 const 方法中),基类方法可以在编译时"知道"自己是哪个派生类,并安全地调用派生类中实现的特定方法(如 copyDetail
和 updateChildStatus
)。这是一种静态多态,相比虚函数有更低的运行时开销。
🗄️ Database 数据库
1个 database 属于 1个 catalog,管理了下面的 table、tablegroup、function 这些。database 的主要功能包括:
🔄 自身生命周期管理:
create
,update
,drop
方法用于创建、修改和删除数据库本身。这些操作会影响数据库的状态 (_status
) 和版本 (_version
),并将变更记录到传递进来的UpdateResult
对象中👨👩👧👦 子实体管理:提供一系列方法用于在其下创建、修改、删除、获取和列出表 (Table)、表组 (TableGroup) 和函数 (Function)
🔄 更下层实体操作的委托:类似 Catalog 将表、分区等操作委托给 Database,Database 也将针对更下层实体(如表结构、分区、列、索引等)的操作委托给其包含的正确的 Table 或 TableGroup 实体去执行
📋 Table 表
内部包括 TableStructure 和 Partition 两个对象,前者代表表结构:定义了有哪些列以及有哪些索引,后者代表管理的分区。
🧩 Partition 分区
核心成员包括分区的配置 (_partitionConfig
)、数据源位置 (_dataSource
),分区是实现数据分布式存储和管理的关键概念。它允许我们将一个大表的完整数据拆分成若干个更小的、易于管理的块,并将这些块存放在不同的物理位置上。
👥 TableGroup 表组
它是一个逻辑概念,作用是对同一批表应用相同的策略,当前是维护了加载策略。
🔄 LoadStrategy 加载策略
每张表的数据来源可能不同(文件、数据库、消息队列),加载方式也不同(全量导入、增量更新、实时同步),需要的配置参数也多种多样(文件路径、数据格式、分隔符、过滤条件、认证信息)。
加载策略的作用就是将这些加载相关的细节标准化和集中化。它是一个独立的实体,依附于一个特定的表组中的特定表,详细说明了如何将数据加载到这张表里。
🔧 Function 函数
代表了一个注册在目录系统中的用户自定义函数(UDF)或存储过程等可执行代码块。
以上一些重要的实体概念,下一步我们从目录服务开始介绍。
🌐 CatalogServiceImpl 目录服务
CatalogServiceImpl
是 catalog 系统对外提供的 RPC (Remote Procedure Call) 服务的具体实现,CatalogServiceImpl
本身不直接管理元信息实体(目录、数据库、表等),它只是一个调度者和请求转发器。真正的业务逻辑和实体管理发生在它背后更深层的组件中(特别是 CatalogController
)。
他内部会持有两个重要的组件:存储 (IStore
) 和 CatalogControllerManager
。在 start
方法中,会调用 _controllerManager->recover()
,这个调用会触发管理器从存储中加载所有现有的目录 (Catalog),并为每个目录创建相应的目录控制器 (CatalogController
)。这是服务启动后能够响应请求的基础。
🔍 CatalogControllerManager 目录控制器管理
目录控制器管理器 (CatalogControllerManager
)。它扮演着"总管理员"的角色,负责管理系统中所有目录控制器 (CatalogController
) 的生命周期,处理目录 (Catalog) 层级的创建、删除和恢复操作,并维护全局的根目录 (Root) 实体,确保系统中 Catalog 列表的全局一致性。
本质上 CatalogControllerManager
是一个全局单例,内部的操作通过一把锁强制进行串行,保证并发操作的正确性,内部会调用存储 (IStore
) 进行持久化。
🎮 CatalogController 目录控制器
目录控制器 (CatalogController
)。它是负责管理单个目录 (Catalog) 及其内部所有元信息(数据库、表、分区等)的核心业务逻辑单元。目录控制器接收来自服务层路由的请求,维护其负责的目录的内存快照和版本,协调与存储层的交互进行持久化,并通过通用的 checkAndUpdate
框架执行各种元信息操作。此外,它还负责周期性地将分区的变化同步到 Build 系统。
这里面有几个关键点:
📸 目录快照 (
CatalogSnapshot
),表示内存中的最新目录信息,每次修改都是先修改内存中的数据,然后做持久化💾 存储接口和读写器:如果
CatalogController
直接与具体的文件系统 API 或数据库客户端 API 交互,代码就会与特定的存储实现(例如本地文件系统、HDFS、OSS、或者某个具体的KV存储)紧密耦合。一旦需要切换存储方式,就需要修改大量代码,于是引入了抽象的存储接口(IStore
)和具体的读写器(ICatalogReader
,ICatalogWriter
等)
通过这种分层和接口化的设计,service 项目的存储模块实现了高度的灵活性和可维护性:
- 🔝 上层组件(
CatalogController
)只关心元数据业务逻辑和快照概念,无需关心具体存储细节 - 🔄 中间层接口(
IStore
,ICatalogReader
,ICatalogWriter
)定义了不同层次的存储操作规范。IStore
负责 root 的读写,ICatalogReader
和ICatalogWriter
负责 Catalog 的读写 - 🔽 底层实现(例如
LocalStore
、ZkStore
)则负责与具体的物理存储(如文件系统)进行交互
除了上面持久化相关的工作外,目录控制器还有个重要的工作是:
- 🔄 同步状态到 Build 系统: 负责将目录中关于分区 (Partition) 的变化同步到 Build 系统,触发数据的构建或更新(例如创建新的索引)
具体来说,会在后台启动一个 sync 线程,
- 它会不断地比对当前目录快照中的分区信息,与上次盘点时记录的分区信息
- 一旦发现有新的分区、分区信息发生变化,或者有分区被删除了,它就会自动触发相应的构建任务创建、更新或删除操作
CatalogController
内部维护着一个 std::vector<proto::Build> _builds
成员变量。这个列表就像一个"构建任务清单",记录了当前所有与分区相关的构建任务的元数据,包括:
- 🆔 BuildId:构建任务的唯一标识,通常包含分区名称、表名、数据库名、目录名,以及一个 generation_id(通常是时间戳,代表了本次构建的版本)
- 🎯 BuildTarget:期望的构建状态和配置(例如,构建类型、期望达到的状态、以及构建任务所需的配置路径)
- 📊 BuildCurrent:构建任务当前的实际状态(例如,运行状态、当前进度、错误信息、最终生成的数据路径)
_builds
列表同样需要被持久化,以便在服务重启后能恢复所有进行中的或已完成的构建任务状态。
sync
线程中 syncLoop
方法是构建流程管理的大脑。它周期性地被 _syncThread
调用,执行以下步骤:
- 📥 获取最新分区信息:从当前的目录快照 (
_snapshot
) 中获取所有最新的分区信息 - 📋 获取当前构建任务ID:从
_builds
列表中获取当前所有构建任务的ID - 🔍 比对与协调:
- 新增或更新分区:如果发现当前快照中有一个分区是
_lastPartitions
中没有的,或者版本号发生变化,或者其结构(schema)发生变化,则触发创建新构建任务的流程 - 删除分区:如果
_lastPartitions
中有某个分区,但在当前快照中已经不存在,则触发删除对应构建任务的流程
- 新增或更新分区:如果发现当前快照中有一个分区是
- 🔄 更新历史状态:将当前的分区信息保存到
_lastPartitions
,以便下一次循环进行比对
以上就是 havenask 中表管理的主要功能,下面我们来看一个具体的例子,帮助理解。
🏗️ 创建默认目录
发出的请求 {"catalog": "{"catalogName": "catalog"}"}
,接口是:/CatalogService/createCatalog
,suez_admin
收到请求后,会将其最终持久化到 zk 上(我这环境是分布式),持久化的地址 _storeUri
是在 suez_admin
启动的时候,通过命令行参数传入。
我们直接看:zfs://10.4.64.14:2181/havenask/havenask-sql-remote/appmaster
地址的数据,下面包括了几个文件:
catalog # 新建的catalog目录名称
catalog.pb # 详细信息
root.pb # 存放 root信息
cluster_state.pb # 存放集群状态
root.pb
和 catalog.pb
信息如下:
message Root {
int64 version = 1;
string root_name = 2;
repeated string catalog_names = 3;
OperationMeta operation_meta = 4;
}
message CatalogStateEntity {
repeated Catalog catalogs = 1; # 存储了多个版本信息
repeated Build builds = 2; # 存储了build信息
}
每次读写都是全量覆盖写。
🏗️ 创建默认数据库
接口:CatalogService/createDatabase
,1个示例请求格式:
{
"databaseConfig": {
"storeRoot": "hdfs://10.4.64.14:9000/xxx/havenask/database_store"
},
"catalogName": "catalog",
"databaseName": "database",
"tableGroups": [
{
"tableGroupName": "tablegroup",
"databaseName": "database",
"catalogName": "catalog"
}
]
}
每次操作都会带上一个版本信息,用来做 cas 操作,最终持久化到 zfs 上。
🏗️ 创建直写表
创建完默认目录:catalog,默认数据库:database 后,下一步就是通过命令创建表
先看第一个信息:表结构
包括了字段定义,索引以及表结构配置,里面重点说下表结构配置的:
里面包括了这个表的一些关键配置,表类型:倒排,表构建类型:直写表,逻辑分片信息:2分片。
除此之外还有个需要关注的信息是分区信息。
这个分区信息跟我之前理解有些 gap,我以前以为 partition 就是分片,但是这个表是 2 分片,但是分区信息只有 1 个,分区是物理概念,而上面的 shard 则是逻辑上的。
上面标创建完后,还会创建消息队列,消息队列会根据表创建对应的 topic,关于消息队列后面讲解。
ps:此处有个疑问,集群只有 1 列,但是我创建了 2 列的表,集群是否会自动扩到 2 列?
🏗️ 创建读写分离表
和创建直写表的区别在分区上会包括全量的文件。
并且会有构建任务,会去查看 build 任务的状态,接口:CatalogService/getBuild
,
这里我们梳理下构建的过程,当我们新建表后,他是怎么创建构建任务的。前面在目录控制器中提过,目录控制器中会有个后台线程,结合代码,逻辑如下:
// File: service/CatalogController.cpp
void CatalogController::syncLoop() {
// 周期性执行,同步 Partition 变化到 Build 系统
// 1. 获取当前 Catalog 的所有 Partition
auto curCatalog = std::make_unique<Catalog>();
std::map<PartitionId, const Partition *> partitions;
if (auto status = getCatalog(curCatalog.get()); !isOk(status)) { // getCatalog 会获取 _snapshot 的内存副本
return; // 如果获取 Catalog 失败,跳过本次同步
}
if (!isOk(getCurrentPartitions(curCatalog.get(), partitions))) { // 从 Catalog 实体中提取所有 Partition
AUTIL_LOG(ERROR, "get current partitions failed");
return;
}
// 2. 获取当前 Build 系统记录的 Build 列表
std::map<PartitionId, /*generation id*/ uint32_t> buildIds;
if (!isOk(getCurrentBuildIds(curCatalog->id(), buildIds))) { // 调用内部方法获取 Build 信息
AUTIL_LOG(ERROR, "get current builds failed");
return;
}
// 3. 比较当前 Partition 列表与上次同步时的列表 (_lastPartitions)
// 判断哪些 Partition 是新增的、更新的、或被删除的
for (const auto &[partitionId, partition] : partitions) { // 遍历当前 Catalog 中的所有 Partition
auto iter = _lastPartitions.find(partitionId);
// 如果 Partition 未变化 (版本号与上次同步相同)
if (iter != _lastPartitions.end() && partition->version() == iter->second->version()) {
continue; // 跳过
}
auto idIter = buildIds.find(partitionId);
// 如果是新增的 Partition (Build 系统中没有记录)
if (idIter == buildIds.end()) {
createBuild(partition, getStoreRoot(curCatalog, partitionId)); // 调用 createBuild 方法创建 Build
continue;
}
// 如果 Partition 的 TableStructure 改变了,也需要重新创建 Build
if (iter != _lastPartitions.end() && isSchemaChanged(partition, iter->second)) {
createBuild(partition, getStoreRoot(curCatalog, partitionId));
continue;
}
// 如果 Partition 发生了其他更新 (但不是 TableStructure 变化),更新 Build
updateBuild(partition, getStoreRoot(curCatalog, partitionId), idIter->second); // 调用 updateBuild 方法更新 Build
}
// 4. 处理被删除的 Partition
for (const auto &[partitionId, _] : _lastPartitions) { // 遍历上次同步时的 Partition 列表
if (partitions.count(partitionId) == 0) { // 如果当前列表中不存在
dropBuild(partitionId); // 调用 dropBuild 方法删除 Build
}
}
// 5. 更新 _lastPartitions 和 _lastCatalog 为当前状态,供下次循环使用
_lastPartitions.swap(partitions);
_lastCatalog = std::move(curCatalog);
}
// File: service/CatalogController.cpp
bool CatalogController::isSchemaChanged(const Partition *newPart, const Partition *oldPart) {
// 比较 Partition 的 TableStructure 和 DataSource 是否有变化
// ... 比较 TableStructure ...
// ... 比较 DataSource ...
return false; // 返回是否变化
}
有几个未解决的疑问:partitionId 和 build 是怎么清理的?创建完 build 后,真正执行构建是怎么触发的?这部分留到 build_service 部分再讲的。
🛠️ BS 配置生成器
catalog 服务提供了表的元信息,他的一个重要下游就是 BS(build_service)索引构建服务,Build Service (BS) 负责数据的导入、处理、索引构建等任务。为了正确地执行这些任务,BS 需要知道很多关于数据的细节:数据长什么样(Schema),数据存放在哪里(数据源),数据如何分片等等。这些信息正是 Catalog 系统所拥有的。问题在于,Catalog 内部存储的元信息是 Protobuf 格式的实体对象,而 Build Service 需要的是特定格式的配置文件(例如 JSON 或其他格式的文件)。这就需要在 Catalog 的元信息和 Build Service 所需的配置之间进行转换。在 catalog 项目中,BS 配置生成器 (BSConfigMaker) 承载了这个角色。
在生产配置的过程中,他会采用一套内置的配置模版,这个模版是在创建 havenask 的时候,上传到了 hdfs 上,具体就是 aios/tools/hape/hape_conf/remote/cluster_templates
目录下的 direct_table
和 offline_table
两个目录,根据模板生成的配置有:
struct BSConfig {
std::string analyzerJson;
std::string bsHippoJson;
std::string buildAppJson;
std::string clusterJson;
std::string tableJson;
std::string schemaJson;
};
具体每个文件的作用,在 BS 部分讲解。
结尾
通过以上内容,我们详细解析了 havenask 中 Catalog 模块的架构设计与核心功能。从顶层的 Catalog 到具体的 Table、Partition,再到加载策略与函数支持,Catalog 系统为整个数据管理提供了坚实的基础。同时,通过 CatalogController
和 CatalogControllerManager
的协同工作,确保了元信息的高效管理和一致性维护。
此外,Catalog 服务不仅为表管理提供了全面的支持,还通过 BS 配置生成器将元信息转化为下游构建服务所需的配置文件,从而实现了从元数据到实际构建任务的无缝衔接。这种设计不仅提升了系统的灵活性,还为未来的扩展奠定了良好的基础。
尽管文中对部分细节如 partitionId
和 build
的清理机制、构建任务的具体触发流程尚未深入探讨,但这些内容将在后续的 build_service
部分进一步展开。希望本文能帮助读者对 havenask 的 Catalog 系统有一个全面而深入的理解。