开源搜索引擎 Havenask 架构解密系列之表管理

📚 Catalog 表管理

Catalog 在整体 havenask 中的位置,主要负责表管理。

具体的代码目录在 aios/catalog

先介绍里面涉及几个概念:

  • 🗂️ catalog:目录
  • 🗄️ database:数据库
  • 📋 table:表
  • 🧩 Partition:分区
  • 📝 TableStructure:表结构
  • 👥 TableGroup:表组
  • 🔄 LoadStrategy:加载策略
  • 🔧 Function:函数
  • 🏗️ Build:构建

我们下面会挨个介绍,先看顶层的 Catalog。

🗂️ Catalog 目录

目录 (Catalog) 是 catalog 系统中用来组织和管理多个数据库 (Database) 的顶层容器。它是用户访问 catalog 系统元信息的入口点。

整个实体的关系是:
目录 (Catalog) -> 数据库 (Database) -> 表 (Table) / 表组 (TableGroup) / 函数 (Function)

这些实体都继承自一个基类:EntityBase,将上面实体的共性都抽象出来,包括:

  • 🆔 它们都有一个唯一的标识符(ID)
  • 🔢 它们都有一个版本号(CatalogVersion),用于跟踪状态变化
  • 🚦 它们都有一个状态(proto::EntityStatus),表示当前生命周期阶段(如正常、待发布、待删除等)
  • 🔄 它们都需要支持克隆自身(创建一个完全独立且相同的副本)
  • 🔄 它们可能都需要支持更新自身的状态,并且状态的更新可能需要传播给它们的子实体

在实现技巧上:EntityBase 是一个模板类,它使用了一种叫做 Curiously Recurring Template Pattern (CRTP) 的技巧,他大概的实现如下:

// File: entity/EntityBase.h (片段)  
template <typename Self>  // Self 是指继承 EntityBase 的具体类  
class EntityBase {  
private:  
    // CRTP 技巧:通过 static_cast 访问派生类方法  
    const Self *self() const { return static_cast<const Self*>(this); }  
    Self *self() { return static_cast<Self*>(this); }  
}

其中 (template <typename Self>self() 方法): 这是 EntityBase 设计的核心。基类 EntityBase 将继承它的派生类 Self 作为模板参数。通过 static_cast<const Self*>(this) (在 const 方法中) 或 static_cast<Self*>(this) (在非 const 方法中),基类方法可以在编译时"知道"自己是哪个派生类,并安全地调用派生类中实现的特定方法(如 copyDetailupdateChildStatus)。这是一种静态多态,相比虚函数有更低的运行时开销。

🗄️ Database 数据库

1个 database 属于 1个 catalog,管理了下面的 table、tablegroup、function 这些。database 的主要功能包括:

  1. 🔄 自身生命周期管理create, update, drop 方法用于创建、修改和删除数据库本身。这些操作会影响数据库的状态 (_status) 和版本 (_version),并将变更记录到传递进来的 UpdateResult 对象中

  2. 👨👩👧👦 子实体管理:提供一系列方法用于在其下创建、修改、删除、获取和列出表 (Table)、表组 (TableGroup) 和函数 (Function)

  3. 🔄 更下层实体操作的委托:类似 Catalog 将表、分区等操作委托给 Database,Database 也将针对更下层实体(如表结构、分区、列、索引等)的操作委托给其包含的正确的 Table 或 TableGroup 实体去执行

📋 Table 表

内部包括 TableStructure 和 Partition 两个对象,前者代表表结构:定义了有哪些列以及有哪些索引,后者代表管理的分区。

🧩 Partition 分区

核心成员包括分区的配置 (_partitionConfig)、数据源位置 (_dataSource),分区是实现数据分布式存储和管理的关键概念。它允许我们将一个大表的完整数据拆分成若干个更小的、易于管理的块,并将这些块存放在不同的物理位置上。

👥 TableGroup 表组

它是一个逻辑概念,作用是对同一批表应用相同的策略,当前是维护了加载策略。

🔄 LoadStrategy 加载策略

每张表的数据来源可能不同(文件、数据库、消息队列),加载方式也不同(全量导入、增量更新、实时同步),需要的配置参数也多种多样(文件路径、数据格式、分隔符、过滤条件、认证信息)。

加载策略的作用就是将这些加载相关的细节标准化和集中化。它是一个独立的实体,依附于一个特定的表组中的特定表,详细说明了如何将数据加载到这张表里。

🔧 Function 函数

代表了一个注册在目录系统中的用户自定义函数(UDF)或存储过程等可执行代码块。

以上一些重要的实体概念,下一步我们从目录服务开始介绍。

🌐 CatalogServiceImpl 目录服务

CatalogServiceImpl 是 catalog 系统对外提供的 RPC (Remote Procedure Call) 服务的具体实现,CatalogServiceImpl 本身不直接管理元信息实体(目录、数据库、表等),它只是一个调度者和请求转发器。真正的业务逻辑和实体管理发生在它背后更深层的组件中(特别是 CatalogController)。

他内部会持有两个重要的组件:存储 (IStore) 和 CatalogControllerManager。在 start 方法中,会调用 _controllerManager->recover(),这个调用会触发管理器从存储中加载所有现有的目录 (Catalog),并为每个目录创建相应的目录控制器 (CatalogController)。这是服务启动后能够响应请求的基础。

🔍 CatalogControllerManager 目录控制器管理

目录控制器管理器 (CatalogControllerManager)。它扮演着"总管理员"的角色,负责管理系统中所有目录控制器 (CatalogController) 的生命周期,处理目录 (Catalog) 层级的创建、删除和恢复操作,并维护全局的根目录 (Root) 实体,确保系统中 Catalog 列表的全局一致性。

本质上 CatalogControllerManager 是一个全局单例,内部的操作通过一把锁强制进行串行,保证并发操作的正确性,内部会调用存储 (IStore) 进行持久化。

🎮 CatalogController 目录控制器

目录控制器 (CatalogController)。它是负责管理单个目录 (Catalog) 及其内部所有元信息(数据库、表、分区等)的核心业务逻辑单元。目录控制器接收来自服务层路由的请求,维护其负责的目录的内存快照和版本,协调与存储层的交互进行持久化,并通过通用的 checkAndUpdate 框架执行各种元信息操作。此外,它还负责周期性地将分区的变化同步到 Build 系统。

这里面有几个关键点:

  1. 📸 目录快照 (CatalogSnapshot),表示内存中的最新目录信息,每次修改都是先修改内存中的数据,然后做持久化

  2. 💾 存储接口和读写器:如果 CatalogController 直接与具体的文件系统 API 或数据库客户端 API 交互,代码就会与特定的存储实现(例如本地文件系统、HDFS、OSS、或者某个具体的KV存储)紧密耦合。一旦需要切换存储方式,就需要修改大量代码,于是引入了抽象的存储接口(IStore)和具体的读写器(ICatalogReader, ICatalogWriter 等)

通过这种分层和接口化的设计,service 项目的存储模块实现了高度的灵活性和可维护性:

  • 🔝 上层组件(CatalogController)只关心元数据业务逻辑和快照概念,无需关心具体存储细节
  • 🔄 中间层接口(IStore, ICatalogReader, ICatalogWriter)定义了不同层次的存储操作规范。IStore 负责 root 的读写,ICatalogReaderICatalogWriter 负责 Catalog 的读写
  • 🔽 底层实现(例如 LocalStoreZkStore)则负责与具体的物理存储(如文件系统)进行交互

除了上面持久化相关的工作外,目录控制器还有个重要的工作是:

  1. 🔄 同步状态到 Build 系统: 负责将目录中关于分区 (Partition) 的变化同步到 Build 系统,触发数据的构建或更新(例如创建新的索引)

具体来说,会在后台启动一个 sync 线程,

  • 它会不断地比对当前目录快照中的分区信息,与上次盘点时记录的分区信息
  • 一旦发现有新的分区、分区信息发生变化,或者有分区被删除了,它就会自动触发相应的构建任务创建、更新或删除操作

CatalogController 内部维护着一个 std::vector<proto::Build> _builds 成员变量。这个列表就像一个"构建任务清单",记录了当前所有与分区相关的构建任务的元数据,包括:

  • 🆔 BuildId:构建任务的唯一标识,通常包含分区名称、表名、数据库名、目录名,以及一个 generation_id(通常是时间戳,代表了本次构建的版本)
  • 🎯 BuildTarget:期望的构建状态和配置(例如,构建类型、期望达到的状态、以及构建任务所需的配置路径)
  • 📊 BuildCurrent:构建任务当前的实际状态(例如,运行状态、当前进度、错误信息、最终生成的数据路径)

_builds 列表同样需要被持久化,以便在服务重启后能恢复所有进行中的或已完成的构建任务状态。

sync 线程中 syncLoop 方法是构建流程管理的大脑。它周期性地被 _syncThread 调用,执行以下步骤:

  1. 📥 获取最新分区信息:从当前的目录快照 (_snapshot) 中获取所有最新的分区信息
  2. 📋 获取当前构建任务ID:从 _builds 列表中获取当前所有构建任务的ID
  3. 🔍 比对与协调:
    • 新增或更新分区:如果发现当前快照中有一个分区是 _lastPartitions 中没有的,或者版本号发生变化,或者其结构(schema)发生变化,则触发创建新构建任务的流程
    • 删除分区:如果 _lastPartitions 中有某个分区,但在当前快照中已经不存在,则触发删除对应构建任务的流程
  4. 🔄 更新历史状态:将当前的分区信息保存到 _lastPartitions,以便下一次循环进行比对

以上就是 havenask 中表管理的主要功能,下面我们来看一个具体的例子,帮助理解。

🏗️ 创建默认目录

发出的请求 {"catalog": "{"catalogName": "catalog"}"} ,接口是:/CatalogService/createCatalogsuez_admin 收到请求后,会将其最终持久化到 zk 上(我这环境是分布式),持久化的地址 _storeUri 是在 suez_admin 启动的时候,通过命令行参数传入。

我们直接看:zfs://10.4.64.14:2181/havenask/havenask-sql-remote/appmaster 地址的数据,下面包括了几个文件:

catalog # 新建的catalog目录名称
  catalog.pb # 详细信息
root.pb # 存放 root信息
cluster_state.pb # 存放集群状态

root.pbcatalog.pb 信息如下:

message Root {
    int64 version = 1;
    string root_name = 2;
    repeated string catalog_names = 3;
    OperationMeta operation_meta = 4;
}

message CatalogStateEntity {
    repeated Catalog catalogs = 1; # 存储了多个版本信息
    repeated Build builds = 2; # 存储了build信息
}

每次读写都是全量覆盖写。

🏗️ 创建默认数据库

接口:CatalogService/createDatabase,1个示例请求格式:

{
  "databaseConfig": {
    "storeRoot": "hdfs://10.4.64.14:9000/xxx/havenask/database_store"
  },
  "catalogName": "catalog",
  "databaseName": "database",
  "tableGroups": [
    {
      "tableGroupName": "tablegroup",
      "databaseName": "database",
      "catalogName": "catalog"
    }
  ]
}

每次操作都会带上一个版本信息,用来做 cas 操作,最终持久化到 zfs 上。

🏗️ 创建直写表

创建完默认目录:catalog,默认数据库:database 后,下一步就是通过命令创建表

先看第一个信息:表结构

包括了字段定义,索引以及表结构配置,里面重点说下表结构配置的:

里面包括了这个表的一些关键配置,表类型:倒排,表构建类型:直写表,逻辑分片信息:2分片。
除此之外还有个需要关注的信息是分区信息。

这个分区信息跟我之前理解有些 gap,我以前以为 partition 就是分片,但是这个表是 2 分片,但是分区信息只有 1 个,分区是物理概念,而上面的 shard 则是逻辑上的。
上面标创建完后,还会创建消息队列,消息队列会根据表创建对应的 topic,关于消息队列后面讲解。

ps:此处有个疑问,集群只有 1 列,但是我创建了 2 列的表,集群是否会自动扩到 2 列?

🏗️ 创建读写分离表

和创建直写表的区别在分区上会包括全量的文件。

并且会有构建任务,会去查看 build 任务的状态,接口:CatalogService/getBuild

这里我们梳理下构建的过程,当我们新建表后,他是怎么创建构建任务的。前面在目录控制器中提过,目录控制器中会有个后台线程,结合代码,逻辑如下:

// File: service/CatalogController.cpp
void CatalogController::syncLoop() {
    // 周期性执行,同步 Partition 变化到 Build 系统

    // 1. 获取当前 Catalog 的所有 Partition
    auto curCatalog = std::make_unique<Catalog>();
    std::map<PartitionId, const Partition *> partitions;
    if (auto status = getCatalog(curCatalog.get()); !isOk(status)) { // getCatalog 会获取 _snapshot 的内存副本
        return; // 如果获取 Catalog 失败,跳过本次同步
    }
    if (!isOk(getCurrentPartitions(curCatalog.get(), partitions))) { // 从 Catalog 实体中提取所有 Partition
        AUTIL_LOG(ERROR, "get current partitions failed");
        return;
    }

    // 2. 获取当前 Build 系统记录的 Build 列表
    std::map<PartitionId, /*generation id*/ uint32_t> buildIds;
    if (!isOk(getCurrentBuildIds(curCatalog->id(), buildIds))) { // 调用内部方法获取 Build 信息
        AUTIL_LOG(ERROR, "get current builds failed");
        return;
    }

    // 3. 比较当前 Partition 列表与上次同步时的列表 (_lastPartitions)
    //    判断哪些 Partition 是新增的、更新的、或被删除的

    for (const auto &[partitionId, partition] : partitions) { // 遍历当前 Catalog 中的所有 Partition
        auto iter = _lastPartitions.find(partitionId);
        // 如果 Partition 未变化 (版本号与上次同步相同)
        if (iter != _lastPartitions.end() && partition->version() == iter->second->version()) {
            continue; // 跳过
        }
        auto idIter = buildIds.find(partitionId);
        // 如果是新增的 Partition (Build 系统中没有记录)
        if (idIter == buildIds.end()) {
            createBuild(partition, getStoreRoot(curCatalog, partitionId)); // 调用 createBuild 方法创建 Build
            continue;
        }
        // 如果 Partition 的 TableStructure 改变了,也需要重新创建 Build
        if (iter != _lastPartitions.end() && isSchemaChanged(partition, iter->second)) {
            createBuild(partition, getStoreRoot(curCatalog, partitionId));
            continue;
        }
        // 如果 Partition 发生了其他更新 (但不是 TableStructure 变化),更新 Build
        updateBuild(partition, getStoreRoot(curCatalog, partitionId), idIter->second); // 调用 updateBuild 方法更新 Build
    }

    // 4. 处理被删除的 Partition
    for (const auto &[partitionId, _] : _lastPartitions) { // 遍历上次同步时的 Partition 列表
        if (partitions.count(partitionId) == 0) { // 如果当前列表中不存在
            dropBuild(partitionId); // 调用 dropBuild 方法删除 Build
        }
    }

    // 5. 更新 _lastPartitions 和 _lastCatalog 为当前状态,供下次循环使用
    _lastPartitions.swap(partitions);
    _lastCatalog = std::move(curCatalog);
}

// File: service/CatalogController.cpp
bool CatalogController::isSchemaChanged(const Partition *newPart, const Partition *oldPart) {
    // 比较 Partition 的 TableStructure 和 DataSource 是否有变化
    // ... 比较 TableStructure ...
    // ... 比较 DataSource ...
    return false; // 返回是否变化
}

有几个未解决的疑问:partitionId 和 build 是怎么清理的?创建完 build 后,真正执行构建是怎么触发的?这部分留到 build_service 部分再讲的。

🛠️ BS 配置生成器

catalog 服务提供了表的元信息,他的一个重要下游就是 BS(build_service)索引构建服务,Build Service (BS) 负责数据的导入、处理、索引构建等任务。为了正确地执行这些任务,BS 需要知道很多关于数据的细节:数据长什么样(Schema),数据存放在哪里(数据源),数据如何分片等等。这些信息正是 Catalog 系统所拥有的。问题在于,Catalog 内部存储的元信息是 Protobuf 格式的实体对象,而 Build Service 需要的是特定格式的配置文件(例如 JSON 或其他格式的文件)。这就需要在 Catalog 的元信息和 Build Service 所需的配置之间进行转换。在 catalog 项目中,BS 配置生成器 (BSConfigMaker) 承载了这个角色。

在生产配置的过程中,他会采用一套内置的配置模版,这个模版是在创建 havenask 的时候,上传到了 hdfs 上,具体就是 aios/tools/hape/hape_conf/remote/cluster_templates 目录下的 direct_tableoffline_table 两个目录,根据模板生成的配置有:

struct BSConfig {
    std::string analyzerJson;
    std::string bsHippoJson;
    std::string buildAppJson;
    std::string clusterJson;
    std::string tableJson;
    std::string schemaJson;
};

具体每个文件的作用,在 BS 部分讲解。

结尾

通过以上内容,我们详细解析了 havenask 中 Catalog 模块的架构设计与核心功能。从顶层的 Catalog 到具体的 Table、Partition,再到加载策略与函数支持,Catalog 系统为整个数据管理提供了坚实的基础。同时,通过 CatalogControllerCatalogControllerManager 的协同工作,确保了元信息的高效管理和一致性维护。

此外,Catalog 服务不仅为表管理提供了全面的支持,还通过 BS 配置生成器将元信息转化为下游构建服务所需的配置文件,从而实现了从元数据到实际构建任务的无缝衔接。这种设计不仅提升了系统的灵活性,还为未来的扩展奠定了良好的基础。

尽管文中对部分细节如 partitionIdbuild 的清理机制、构建任务的具体触发流程尚未深入探讨,但这些内容将在后续的 build_service 部分进一步展开。希望本文能帮助读者对 havenask 的 Catalog 系统有一个全面而深入的理解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容