【ClickHouse为什么这么快?】MergeTree 表存储引擎图文实例详解

前言


ClickHouse 是俄罗斯最大的搜索引擎Yandex在2016年开源的数据库管理系统(DBMS),主要用于联机分析处理(OLAP)。其采用了面向列的存储方式,性能远超传统面向行的DBMS,近几年受到广泛关注。

本文综合介绍(东拼西凑)了 ClickHouse MergeTree系列表引擎的相关知识,并通过示例分析MergeTree存储引擎的数据存储结构。

MergeTree 引擎简介

为什么叫 MergeTree ?

ClickHouse MergeTree 的表存储引擎,在写入一批数据时,数据总会以数据片段的形式写入磁盘,且数据片段不可修改。为了避免片段过多,ClickHouse会通过后台线程定期合并这些数据片段,属于相同分区的数据片段会被合成一个新的片段。这种数据片段往复合并的特点也正是合并树的名称由来。

MergeTree 核心引擎如下:

ReplacingMergeTree:在后台数据合并期间,对具有相同排序键的数据进行去重操作。

SummingMergeTree:当合并数据时,会把具有相同主键的记录合并为一条记录。根据聚合字段设置,该字段的值为聚合后的汇总值,非聚合字段使用第一条记录的值,聚合字段类型必须为数值类型。

AggregatingMergeTree:在同一数据分区下,可以将具有相同主键的数据进行聚合。

CollapsingMergeTree:在同一数据分区下,对具有相同主键的数据进行折叠合并。

VersionedCollapsingMergeTree:基于CollapsingMergeTree引擎,增添了数据版本信息字段配置选项。在数据依据ORDER BY设置对数据进行排序的基础上,如果数据的版本信息列不在排序字段中,那么版本信息会被隐式的作为ORDER BY的最后一列从而影响数据排序。

GraphiteMergeTree:用来存储时序数据库Graphites的数据。

MergeTree是该系列引擎中最核心的引擎,其他引擎均以MergeTree为基础,并在数据合并过程中实现了不同的特性,从而构成了MergeTree表引擎家族。下面我们通过MergeTree来具体了解MergeTree表系列引擎。

ClickHouse 建表语句

建表DDL 语法

创建MergeTree的DDL如下所示:

CREATETABLE[IFNOTEXISTS][db.]table_name[ONCLUSTERcluster](name1[type1][DEFAULT|MATERIALIZED|ALIASexpr1][TTLexpr1],name2[type2][DEFAULT|MATERIALIZED|ALIASexpr2][TTLexpr2],...)ENGINE=MergeTree()ORDERBYexpr[PARTITIONBYexpr][PRIMARYKEYexpr][SAMPLEBYexpr][TTLexpr[DELETE|TODISK'xxx'|TOVOLUME'xxx'],...][SETTINGSname=value,...

这里说明一下MergeTree引擎的主要参数:

[必填选项]

ENGINE:引擎名字,MergeTree引擎无参数。

ORDER BY:排序键,可以由一列或多列组成,决定了数据以何种方式进行排序,例如ORDER BY(CounterID, EventDate)。如果没有显示指定PRIMARY KEY,那么将使用ORDER BY作为PRIMARY KEY。通常只指定ORDER BY即可。

[选填选项]

PARTITION BY:分区键,指明表中的数据以何种规则进行分区。分区是在一个表中通过指定的规则划分而成的逻辑数据集。分区可以按任意标准进行,如按月、按日或按事件类型。为了减少需要操作的数据,每个分区都是分开存储的。

PRIMARY KEY:主键,设置后会按照主键生成一级索引(primary.idx),数据会依据索引的设置进行排序,从而加速查询性能。默认情况下,PRIMARY KEY与ORDER BY设置相同,所以通常情况下直接使用ORDER BY设置来替代主键设置。

SAMPLE BY:数据采样设置,如果显示配置了该选项,那么主键配置中也应该包括此配置。例如 ORDER BY CounterID / EventDate / intHash32(UserID)、SAMPLE BY intHash32(UserID)。

TTL:数据存活时间,可以为某一字段列或者一整张表设置TTL,设置中必须包含Date或DateTime字段类型。如果设置在列上,那么会删除字段中过期的数据。如果设置的是表级的TTL,那么会删除表中过期的数据。如果设置了两种类型,那么按先到期的为准。例如,TTL createtime + INTERVAL 1 DAY,即一天后过期。使用场景包括定期删除数据,或者定期将数据进行归档。

index_granularity:索引间隔粒度。MergeTree索引为稀疏索引,每index_granularity个数据产生一条索引。index_granularity默认设置为8092。

enable_mixed_granularity_parts:是否启动index_granularity_bytes来控制索引粒度大小。

index_granularity_bytes:索引粒度,以字节为单位,默认10Mb。

merge_max_block_size:数据块合并最大记录个数,默认8192。

merge_with_ttl_timeout:合并频率最小时间间隔,默认1天。

建表 SQL 实例

CREATE TABLE IF NOT EXISTS mergetree_sample_table

(

name    String,

price    UInt64,

shop_id  UInt64,

quantity UInt64,

p_date  DateTime

)

ENGINE =MergeTree()

partition by p_date

        order by (name,shop_id,p_date)

SETTINGS index_granularity =2;

插入数据:

INSERT INTO mergetree_sample_table

VALUES ('Apple',2,1,40,now()) ('Apple',2,3,35,now()) ('Apple',3,2,45,now()) ('Apple',3,4,35,now()) ('Orange',1,2,40,now()) ('Orange',3,3,50,now()) ('Banana',2,1,25,now()) ('Banana',2,2,55,now());

查询数据:

SELECT t.*

      FROM mydb.mergetree_sample_table t

      LIMIT 501



底层文件存储


MergeTree 表引擎底层的物理存储文件目录如下:

MergeTree 表引擎的物理文件存储目录结构:

├── 1638121099_1_1_0

│   ├── checksums.txt

│   ├── columns.txt

│   ├── count.txt

│   ├── data.bin

│   ├── data.mrk3

│   ├── default_compression_codec.txt

│   ├── minmax_p_date.idx

│   ├── partition.dat

│   └── primary.idx

├── detached

└── format_version.txt

2 directories, 10 files

其中,

$cat default_compression_codec.txt 

CODEC(LZ4)% 

$cat columns.txt 

columns format version: 1

5 columns:

`name` String

`price` UInt64

`shop_id` UInt64

`quantity` UInt64

`p_date` DateTime

$cat count.txt 

8%         

数据分区目录命名规则

目录命名规则如下:

PartitionId_MinBlockNum_MaxBlockNum_Level

PartitionID:分区id,例如20210301。

MinBlockNum:最小分区块编号,自增类型,从1开始向上递增。每产生一个新的目录分区就向上递增一个数字。

MaxBlockNum:最大分区块编号,新创建的分区MinBlockNum等于MaxBlockNum的编号。

Level:合并的层级,被合并的次数。合并次数越多,层级值越大。

level为0,表示此分区没有合并过。


索引文件:稀疏索引

MergeTree索引为稀疏索引,它并不索引单条数据,而是索引一定范围的数据。也就是从已排序的全量数据中,间隔性的选取一些数据记录主键字段的值来生成primary.idx索引文件,从而加快表查询效率。间隔设置参数为index_granularity。



标记文件

mrk标记文件在primary.idx索引文件和bin数据文件之间起到了桥梁作用。primary.idx文件中的每条索引在mrk文件中都有对应的一条记录。


一条记录的组成包括:

offset-compressed bin file:表示指向的压缩数据块在bin文件中的偏移量。

offset-decompressed data block:表示指向的数据在解压数据块中的偏移量。

row counts:代表数据记录行数,小于等于index_granularity所设置的值。

索引、标记和数据文件下图所示:


MergeTree表引擎家族详解

在ClickHouse的整个体系里面,MergeTree表引擎绝对是一等公民,使用ClickHouse就是在使用MergeTree,这种说法一点也不为过。MergeTree表引擎是一个家族系列,目前整个系列一共包含了14种不同类型的MergeTree。

MergeTree(合并树)系列表引擎是ClickHouse提供的最具特色的存储引擎。MergeTree引擎支持数据按主键、数据分区、数据副本以及数据采样等特性。官方提供了包括MergeTree、ReplacingMergeTree、SummingMergeTree、AggregatingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、GraphiteMergeTree等7种不同类型的MergeTree引擎的实现,以及与其相对应的支持数据副本的MergeTree引擎(Replicated*)。

这么多表引擎,它们之间是什么关系?

我们可以使用两种关系,来理解整个MergeTree系列:

继承关系

首先,为了便于理解,可以使用继承关系来看待MergeTree。通过最基础的MergeTree表引擎,向下派生出6个变种表引擎,如下图所示


在ClickHouse底层具体的实现方法中,上述7种表引擎的区别主要体现在Merge合并的逻辑部分。如下图所示:

在具体的实现逻辑部分,7种MergeTree共用一个主体,在触发Merge动作时,调用了各自独有的合并逻辑。特殊功能只会在Merge合并时才会触发。



组合关系

刚才已经介绍了7种MergeTree的关系,余下的7种是ReplicatedMergeTree系列。


ReplicatedMergeTree与普通的MergeTree又有什么区别呢?  我们接着看下面这张图:


图中的虚线框部分是MergeTree的能力边界,而ReplicatedMergeTree在它的基础之上增加了分布式协同的能力(HA)。ClickHouse 集群借助ZooKeeper的消息日志广播,实现了副本实例之间的数据同步功能。

ReplicatedMergeTree系列可以用组合关系来理解,如下图所示:



当我们为7种MergeTree加上Replicated前缀后,又能组合出7种新的表引擎,这些ReplicatedMergeTree 拥有副本协同的能力。

我们到底应该使用哪一种表引擎?

现在回答第二个问题,按照使用的场景划分,可以将上述14种表引擎大致分成以下6类应用场景:

默认情况

在没有特殊要求的场合,使用基础的MergeTree表引擎即可,它不仅拥有高效的性能,也提供了所有MergeTree共有的基础功能,包括列存、数据分区、分区索引、一级索引、二级索引、TTL、多路径存储等等。

与此同时,它也定义了整个MergeTree家族的基调,例如:

ORDER BY 决定了每个分区中数据的排序规则;

PRIMARY KEY 决定了一级索引(primary.idx);

ORDER BY 可以指代PRIMARY KEY, 通常只用声明ORDER BY 即可。

接下来将要介绍的其他表引擎,除开ReplicatedMergeTree系列外,都是在Merge合并动作时添加了各自独有的逻辑。

数据去重

ReplacingMergeTree 使用示例

1.建表

CREATE TABLE IF NOT EXISTS replacingmergetree_test

(

ID          String,

Name        String,

DateOfBirth Date

)

ENGINE =ReplacingMergeTree()

PARTITION BY ID

        ORDER BY (ID,DateOfBirth)

SETTINGS

            index_granularity =1024;

2.插入数据

INSERT INTO replacingmergetree_test

VALUES ('a1','Jim','1995-05-01'),

('a1','Jim','1995-05-01'),

('a1','Jim','1995-05-02'),

('a2','Jil','1995-06-01'),

('a2','Jil','1995-06-01'),

('a2','Jil','1995-06-02');

查询数据,看看效果:

 desc replacingmergetree_test

DESCRIBE TABLE  replacingmergetree_test

Query id: 59ab6932-9912-4d86-b97d-7782a8e11f65

┌─name────────┬─type───┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐

│ ID          │ String │              │                    │        │                  │                │

│ Name        │ String │              │                    │        │                  │                │

│ DateOfBirth │ Date  │              │                    │        │                  │                │

└─────────────┴────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

3 rows in set. Elapsed: 0.001 sec. 

SELECT *

FROM replacingmergetree_test

Query id: 082de655-db24-4679-ab10-8d431320fae2

┌─ID─┬─Name─┬─DateOfBirth─┐

│ a2 │ Jil  │  1995-06-01 │

│ a2 │ Jil  │  1995-06-02 │

└────┴──────┴─────────────┘

┌─ID─┬─Name─┬─DateOfBirth─┐

│ a1 │ Jim  │  1995-05-01 │

│ a1 │ Jim  │  1995-05-02 │

└────┴──────┴─────────────┘

4 rows in set. Elapsed: 0.003 sec. 


3.执行merge

可以手动执行:

OPTIMIZE TABLE replacingmergetree_test FINAL;

一般ClickHouse会由后台程序自动执行 merge 操作。

4.查询数据

通过刚才的说明,大家应该明白,MergeTree的主键(PRIMARY KEY)只是用来生成一级索引(primary.idx)的,并没有唯一性约束这样的语义。

一些朋友在使用MergeTree的时候,用传统数据库的思维来理解MergeTree就会出现问题。

如果业务上不允许数据重复,遇到这类场景就可以使用ReplacingMergeTree,如下图所示:

ReplacingMergeTree通过ORDER BY,表示判断唯一约束的条件。当分区合并之时,根据ORDER BY排序后,相邻重复的数据会被排除。

由此,可以得出几点结论:

第一,使用ORDER BY作为特殊判断标识,而不是PRIMARY KEY。关于这一点网上有一些误传,但是如果理解了ORDER BYPRIMARY KEY的作用,以及合并逻辑之后,都能够推理出应该是由ORDER BY决定。

ORDER BY的作用, 负责分区内数据排序;

PRIMARY KEY的作用, 负责一级索引生成;

Merge的逻辑, 分区内数据排序后,找到相邻的数据,做特殊处理。

第二,只有在触发合并之后,才能触发特殊逻辑。以去重为例,在没有合并的时候,还是会出现重复数据。

第三,只对同一分区内的数据有效。以去重为例,只有属于相同分区的数据才能去重,跨越不同分区的重复数据不能去重。

上述几点结论,适用于包含ReplacingMergeTree在内的6种MergeTree,所以后面不在赘述。

小结:ReplacingMergeTree 引擎会把相同索引的数据进行替换,但仅限本地单台机器。如果使用分布式表,就要确保相同索引的数据入到同一台机器,否则每台机器可能会有一条相同索引的数据。

该索引只有在 merge 的时候才会执行替换,因为 merge 是不定时的,如果没有 merge 的情况下,会出现多条数据的情况。因此必要的话,可以进行手动进行 merge。手动 merge 命令:optimize table db.table;

该索引的建表语句如果没有用某个字段标定版本,该字段可以是 int、double、date 类型,数据库就一定会把后入库的覆盖新入库 (如果有区分版本的字段,则会留下数值大的那条记录)。

预聚合(数据立方体)

有这么一类场景,它的查询主题是非常明确的,也就是说聚合查询的维度字段是固定,并且没有明细数据的查询需求,这类场合就可以使用SummingMergeTree或是AggregatingMergeTree,如下图所示:

可以看到,在新分区合并后,在同一分区内,ORDER BY条件相同的数据会进行合并。如此一来,首先表内的数据行实现了有效的减少,其次度量值被预先聚合,进一步减少了后续计算开销。聚合类MergeTree通常可以和MergeTree表引擎协同使用,如下图所示:

可以将物化视图设置成聚合类MergeTree,将其作为固定主题的查询表使用。

值得一提的是,通常只有在使用SummingMergeTree或AggregatingMergeTree的时候,才需要同时设置ORDER BYPRIMARY KEY。

显式的设置PRIMARY KEY,是为了将主键和排序键设置成不同的值,是进一步优化的体现。

例如某个场景的查询需求如下:

聚合条件,GROUP BY A,B,C

过滤条件,WHERE A

此时,如下设置将会是一种较优的选择:

GROUP BY A,B,C

PRIMARY KEY A

BTW,如果ORDER BYPRIMARY KEY不同,PRIMARY KEY必须是ORDER BY的前缀(为了保证分区内数据和主键的有序性)。

SummingMergeTree 引擎测试

该引擎会把索引以为的所有 number 型字段(包含 int 和 double)自动进行聚合。

该引擎在分布式情况下并不是完全聚合,而是每台机器有一条同纬度的数据。SummingMergeTree 是按 part 纬度来聚合,数据刚导入 clickhouse 可能会产生多个 part,但是 clickhouse 会定期把 part merge,从而实现一台机器只有一条同纬度的数据。

如果将字段设为索引,则不会继续聚合,对于非设为索引的字段,如果是 int 类型会进行聚合,非 int 类型,会随机选取一个字段进行覆盖。

数据更新

数据的更新在ClickHouse中有多种实现手段,例如按照分区Partition重新写入、使用Mutation的DELETE和UPDATE查询。

使用CollapsingMergeTree或VersionedCollapsingMergeTree也能实现数据更新,这是一种使用标记位,以增代删的数据更新方法,如下图所示:


通过增加一个 sign 标志字段(例如图中的sign字段),作为数据有效性的判断依据。

可以看到,在新分区合并后,在同一分区内,ORDER BY条件相同的数据,其标志值为1和-1的数据行会进行抵消。

下图是另外一种便于理解的视角,就如同挤压瓦楞纸一般,数据被抵消了:

VersionedCollapsingMergeTree: 带版本的CollapsingMergeTree

CollapsingMergeTree 和 VersionedCollapsingMergeTree的区别又是什么呢?

CollapsingMergeTree 对数据写入的顺序是敏感的,它要求标志位需要按照正确的顺序排序。例如按照1,-1的写入顺序是正确的; 而如果按照-1,1的错误顺序写入,CollapsingMergeTree就无法正确抵消。

试想,如果在一个多线程并行的写入场景,我们是无法保证这种顺序写入的,此时就需要使用VersionedCollapsingMergeTree了。

VersionedCollapsingMergeTree 在 CollapsingMergeTree基础之上,额外要求指定一个version字段,在分区Merge合并时,它会自动将version字段追加到ORERY BY的末尾,从而保证了标志位的有序性。

ENGINE=VersionedCollapsingMergeTree(sign,ver) ORDER BY id //等效于ORDER BY id,ver

监控集成

GraphiteMergeTree可以与Graphite集成,如果你使用了Graphite作为系统的运行监控系统, 则可以通过GraphiteMergeTree存储指标数据,加速查询性能、降低存储成本。

高可用

Replicated* 拥有数据副本的能力,如下图所示:


结合刚才的5类场景,如果进一步需要高可用的需求,选择一种MergeTree和Replicated组合即可,例如 ReplicatedMergeTree、ReplicatedReplacingMergeTree 等等。


合并算法概述 Overview of the merge algorithm

每个合并按块顺序执行。

Each merge is executed sequentially block by block.

合并算法的主要思想是,确保合并操作不是一个在线程池中执行的子例程(因为它可能会占用一段时间的线程),而是使合并操作在一个协程中完成。它可以在某些点暂停执行,然后从该点恢复执行。

The main idea is to make a merge not a subroutine which is executed in a thread pool and may occupy a thread for a period of time,   but to make a merge a coroutine which can suspend the execution  in some points and then resume the execution from this point.

挂起执行的最佳点是在一个块上的工作完成之后。

任务本身将通过  BackgroundJobExecutor 执行。

任务的接口很简单。主要方法是' execute() ',如果任务想要再次执行,它将返回true,否则返回false。

A perfect point where to suspend the execution is after the work over a block is finished.

The task itself will be executed via BackgroundJobExecutor.

The interface of the task is simple. The main method is `execute()` which will return true, if the task wants to be executed again and false otherwise.

对于这种任务,我们可以给合并一个优先级。 优先级很简单:

合并的大小越小,优先级越高。 

By default priority queue will have max element at top。

所以,如果ClickHouse想要将一些真正大的部分合并成一个更大的部分,那么它将被执行很长一段时间,因为合并的结果并不是真正需要立即。 最好尽快合并小部分。

With this kind of task we can give a merge a priority. A priority is simple :

 the lower the size of the merge, the higher priority. 

So, if ClickHouse wants to merge some really big parts into a bigger part, then it will be executed for a long time, because the result of the merge is not really needed immediately. It is better to merge small parts as soon as possible.

合并任务后台执行器:MergeTreeBackgroundExecutor

一个 MergeTreeBackgroundExecutor 任务有两个队列:Pending 挂起队列(所有任务的主队列)和 Active 活动队列(当前正在执行)。

Pending 挂起队列是需要的,因为任务的数量将超过线程执行。

MergeTreeBackgroundExecutor.h  代码:

#pragma once

#include <deque>

#include <functional>

#include <atomic>

#include <mutex>

#include <future>

#include <condition_variable>

#include <set>

#include <iostream>

#include <boost/circular_buffer.hpp>

#include <base/shared_ptr_helper.h>

#include <base/logger_useful.h>

#include <Common/ThreadPool.h>

#include <Common/Stopwatch.h>

#include <Storages/MergeTree/IExecutableTask.h>

namespace DB

{

namespace ErrorCodes

{

    extern const int LOGICAL_ERROR;

}

struct TaskRuntimeData;

using TaskRuntimeDataPtr = std::shared_ptr<TaskRuntimeData>;

/**

* Has RAII class to determine how many tasks are waiting for the execution and executing at the moment.

* Also has some flags and primitives to wait for current task to be executed.

*/

struct TaskRuntimeData

{

    TaskRuntimeData(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)

        : task(std::move(task_))

        , increment(std::move(metric_))

    {}

    ExecutableTaskPtr task;

    CurrentMetrics::Increment increment;

    std::atomic_bool is_currently_deleting{false};

    /// Actually autoreset=false is needed only for unit test

    /// where multiple threads could remove tasks corresponding to the same storage

    /// This scenario in not possible in reality.

    Poco::Event is_done{/*autoreset=*/false};

    /// This is equal to task->getPriority() not to do useless virtual calls in comparator

    UInt64 priority{0};

    /// By default priority queue will have max element at top

    static bool comparePtrByPriority(const TaskRuntimeDataPtr & lhs, const TaskRuntimeDataPtr & rhs)

    {

        return lhs->priority > rhs->priority;

    }

};

class OrdinaryRuntimeQueue

{

public:

    TaskRuntimeDataPtr pop()

    {

        auto result = std::move(queue.front());

        queue.pop_front();

        return result;

    }

    void push(TaskRuntimeDataPtr item) { queue.push_back(std::move(item));}

    void remove(StorageID id)

    {

        auto it = std::remove_if(queue.begin(), queue.end(),

            [&] (auto item) -> bool { return item->task->getStorageID() == id; });

        queue.erase(it, queue.end());

    }

    void setCapacity(size_t count) { queue.set_capacity(count); }

    bool empty() { return queue.empty(); }

private:

    boost::circular_buffer<TaskRuntimeDataPtr> queue{0};

};

/// Uses a heap to pop a task with minimal priority

class MergeMutateRuntimeQueue

{

public:

    TaskRuntimeDataPtr pop()

    {

        std::pop_heap(buffer.begin(), buffer.end(), TaskRuntimeData::comparePtrByPriority);

        auto result = std::move(buffer.back());

        buffer.pop_back();

        return result;

    }

    void push(TaskRuntimeDataPtr item)

    {

        item->priority = item->task->getPriority();

        buffer.push_back(std::move(item));

        std::push_heap(buffer.begin(), buffer.end(), TaskRuntimeData::comparePtrByPriority);

    }

    void remove(StorageID id)

    {

        auto it = std::remove_if(buffer.begin(), buffer.end(),

            [&] (auto item) -> bool { return item->task->getStorageID() == id; });

        buffer.erase(it, buffer.end());

        std::make_heap(buffer.begin(), buffer.end(), TaskRuntimeData::comparePtrByPriority);

    }

    void setCapacity(size_t count) { buffer.reserve(count); }

    bool empty() { return buffer.empty(); }

private:

    std::vector<TaskRuntimeDataPtr> buffer{};

};

/**

*  Executor for a background MergeTree related operations such as merges, mutations, fetches an so on.

*  It can execute only successors of ExecutableTask interface.

*  Which is a self-written coroutine. It suspends, when returns true from executeStep() method.

*

*  There are two queues of a tasks: pending (main queue for all the tasks) and active (currently executing).

*  Pending queue is needed since the number of tasks will be more than thread to execute.

*  Pending tasks are tasks that successfully scheduled to an executor or tasks that have some extra steps to execute.

*  There is an invariant, that task may occur only in one of these queue. It can occur in both queues only in critical sections.

*

*  Pending:                                              Active:

*

*  |s| |s| |s| |s| |s| |s| |s| |s| |s| |s|              |s|

*  |s| |s| |s| |s| |s| |s| |s| |s| |s|                  |s|

*  |s| |s|    |s|    |s| |s|    |s|                  |s|

*      |s|            |s| |s|                          |s|

*      |s|                |s|

*                          |s|

*

*  Each task is simply a sequence of steps. Heavier tasks have longer sequences.

*  When a step of a task is executed, we move tasks to pending queue. And take another from the queue's head.

*  With these architecture all small merges / mutations will be executed faster, than bigger ones.

*

*  We use boost::circular_buffer as a container for queues not to do any allocations.

*

*  Another nuisance that we faces with is than background operations always interact with an associated Storage.

*  So, when a Storage want to shutdown, it must wait until all its background operaions are finished.

*/

template <class Queue>

class MergeTreeBackgroundExecutor final : public shared_ptr_helper<MergeTreeBackgroundExecutor<Queue>>

{

public:

    MergeTreeBackgroundExecutor(

        String name_,

        size_t threads_count_,

        size_t max_tasks_count_,

        CurrentMetrics::Metric metric_)

        : name(name_)

        , threads_count(threads_count_)

        , max_tasks_count(max_tasks_count_)

        , metric(metric_)

    {

        if (max_tasks_count == 0)

            throw Exception(ErrorCodes::LOGICAL_ERROR, "Task count for MergeTreeBackgroundExecutor must not be zero");

        pending.setCapacity(max_tasks_count);

        active.set_capacity(max_tasks_count);

        pool.setMaxThreads(std::max(1UL, threads_count));

        pool.setMaxFreeThreads(std::max(1UL, threads_count));

        pool.setQueueSize(std::max(1UL, threads_count));

        for (size_t number = 0; number < threads_count; ++number)

            pool.scheduleOrThrowOnError([this] { threadFunction(); });

    }

    ~MergeTreeBackgroundExecutor()

    {

        wait();

    }

    bool trySchedule(ExecutableTaskPtr task);

    void removeTasksCorrespondingToStorage(StorageID id);

    void wait();

private:

    String name;

    size_t threads_count{0};

    size_t max_tasks_count{0};

    CurrentMetrics::Metric metric;

    void routine(TaskRuntimeDataPtr item);

    void threadFunction();

    /// Initially it will be empty

    Queue pending{}; // 等待队列

    boost::circular_buffer<TaskRuntimeDataPtr> active{0}; // 执行队列

    std::mutex mutex; // 互斥锁

    std::condition_variable has_tasks;

    std::atomic_bool shutdown{false};

    ThreadPool pool;

};

extern template class MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;

extern template class MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;

using MergeMutateBackgroundExecutor = MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;

using OrdinaryBackgroundExecutor = MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;

}

MergeTreeBackgroundExecutor.cpp 源代码:

#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>

#include <algorithm>

#include <Common/setThreadName.h>

#include <Storages/MergeTree/BackgroundJobsAssignee.h>

namespace DB

{

template <class Queue>

void MergeTreeBackgroundExecutor<Queue>::wait() // 等待执行

{

    {

        std::lock_guard lock(mutex);

        shutdown = true;

        has_tasks.notify_all();

    }

    pool.wait();

}

template <class Queue>

// 调度任务执行

bool MergeTreeBackgroundExecutor<Queue>::trySchedule(ExecutableTaskPtr task)

{

    std::lock_guard lock(mutex); // 上锁

    if (shutdown)

        return false;

    auto & value = CurrentMetrics::values[metric];

    if (value.load() >= static_cast<int64_t>(max_tasks_count))

        return false;

    pending.push(std::make_shared<TaskRuntimeData>(std::move(task), metric));

    has_tasks.notify_one();

    return true;

}

template <class Queue>

void MergeTreeBackgroundExecutor<Queue>::removeTasksCorrespondingToStorage(StorageID id)

{

    std::vector<TaskRuntimeDataPtr> tasks_to_wait;

    {

        std::lock_guard lock(mutex); // 上锁

        /// Erase storage related tasks from pending and select active tasks to wait for

        pending.remove(id);

        /// Copy items to wait for their completion

        std::copy_if(active.begin(), active.end(), std::back_inserter(tasks_to_wait),

            [&] (auto item) -> bool { return item->task->getStorageID() == id; });

        for (auto & item : tasks_to_wait)

            item->is_currently_deleting = true;

    }

    /// Wait for each task to be executed

    for (auto & item : tasks_to_wait)

    {

        item->is_done.wait();

        item.reset();

    }

}

template <class Queue>

void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)

{

    DENY_ALLOCATIONS_IN_SCOPE;

    /// All operations with queues are considered no to do any allocations

    auto erase_from_active = [this, item]

    {

        active.erase(std::remove(active.begin(), active.end(), item), active.end());

    };

    bool need_execute_again = false;

    try

    {

        ALLOW_ALLOCATIONS_IN_SCOPE;

        need_execute_again = item->task->executeStep();

    }

    catch (...)

    {

        tryLogCurrentException(__PRETTY_FUNCTION__);

    }

    if (need_execute_again)

    {

        std::lock_guard guard(mutex);  // 上锁

        if (item->is_currently_deleting)

        {

            erase_from_active();

            /// This is significant to order the destructors.

            item->task.reset();

            item->is_done.set();

            item = nullptr;

            return;

        }

        /// After the `guard` destruction `item` has to be in moved from state

        /// Not to own the object it points to.

        /// Otherwise the destruction of the task won't be ordered with the destruction of the

        /// storage.

        pending.push(std::move(item));

        erase_from_active();

        has_tasks.notify_one();

        item = nullptr;

        return;

    }

    {

        std::lock_guard guard(mutex);  // 上锁

        erase_from_active();

        has_tasks.notify_one();

        try

        {

            ALLOW_ALLOCATIONS_IN_SCOPE;

            /// In a situation of a lack of memory this method can throw an exception,

            /// because it may interact somehow with BackgroundSchedulePool, which may allocate memory

            /// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.

            item->task->onCompleted();

        }

        catch (...)

        {

            tryLogCurrentException(__PRETTY_FUNCTION__);

        }

        /// We have to call reset() under a lock, otherwise a race is possible.

        /// Imagine, that task is finally completed (last execution returned false),

        /// we removed the task from both queues, but still have pointer.

        /// The thread that shutdowns storage will scan queues in order to find some tasks to wait for, but will find nothing.

        /// So, the destructor of a task and the destructor of a storage will be executed concurrently.

        item->task.reset();

        item->is_done.set();

        item = nullptr;

    }

}


template <class Queue>

void MergeTreeBackgroundExecutor<Queue>::threadFunction()

{

    setThreadName(name.c_str());

    DENY_ALLOCATIONS_IN_SCOPE;

    while (true) // MergeTreeBackgroundExecutor 常驻线程池 

    {

        try

        {

            TaskRuntimeDataPtr item;

            {

                std::unique_lock lock(mutex);   // 上锁

                has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });

                if (shutdown)

                    break;

                item = std::move(pending.pop());

                active.push_back(item);

            }

            routine(std::move(item));

        }

        catch (...)

        {

            tryLogCurrentException(__PRETTY_FUNCTION__);

        }

    }

}

template class MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;

template class MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;

}


附录:ClickHouse SQL 表达式语法

参考:ASTSelectQuery.h


#pragma once

#include

#include

namespace DB

{

struct ASTTablesInSelectQueryElement;

struct StorageID;

/** SELECT query

*/

class ASTSelectQuery :public IAST

{

public:

enum class Expression : uint8_t

{

WITH,

SELECT,

TABLES,

PREWHERE,

WHERE,

GROUP_BY,

HAVING,

WINDOW,

ORDER_BY,

LIMIT_BY_OFFSET,

LIMIT_BY_LENGTH,

LIMIT_BY,

LIMIT_OFFSET,

LIMIT_LENGTH,

SETTINGS

    };

static String expressionToString(Expression expr)

{

switch (expr)

{

case Expression::WITH:

return "WITH";

case Expression::SELECT:

return "SELECT";

case Expression::TABLES:

return "TABLES";

case Expression::PREWHERE:

return "PREWHERE";

case Expression::WHERE:

return "WHERE";

case Expression::GROUP_BY:

return "GROUP BY";

case Expression::HAVING:

return "HAVING";

case Expression::WINDOW:

return "WINDOW";

case Expression::ORDER_BY:

return "ORDER BY";

case Expression::LIMIT_BY_OFFSET:

return "LIMIT BY OFFSET";

case Expression::LIMIT_BY_LENGTH:

return "LIMIT BY LENGTH";

case Expression::LIMIT_BY:

return "LIMIT BY";

case Expression::LIMIT_OFFSET:

return "LIMIT OFFSET";

case Expression::LIMIT_LENGTH:

return "LIMIT LENGTH";

case Expression::SETTINGS:

return "SETTINGS";

}

return "";

}

/** Get the text that identifies this element. */

    String getID(char)const override {return "SelectQuery"; }

ASTPtr clone()const override;

bool distinct =false;

bool group_by_with_totals =false;

bool group_by_with_rollup =false;

bool group_by_with_cube =false;

bool group_by_with_constant_keys =false;

bool limit_with_ties =false;

ASTPtr & refSelect()    {return getExpression(Expression::SELECT); }

ASTPtr & refTables()    {return getExpression(Expression::TABLES); }

ASTPtr & refPrewhere()  {return getExpression(Expression::PREWHERE); }

ASTPtr & refWhere()    {return getExpression(Expression::WHERE); }

ASTPtr & refHaving()    {return getExpression(Expression::HAVING); }

const ASTPtr with()const {return getExpression(Expression::WITH); }

const ASTPtr select()const {return getExpression(Expression::SELECT); }

const ASTPtr tables()const {return getExpression(Expression::TABLES); }

const ASTPtr prewhere()const {return getExpression(Expression::PREWHERE); }

const ASTPtr where()const {return getExpression(Expression::WHERE); }

const ASTPtr groupBy()const {return getExpression(Expression::GROUP_BY); }

const ASTPtr having()const {return getExpression(Expression::HAVING); }

const ASTPtr window()const {return getExpression(Expression::WINDOW); }

const ASTPtr orderBy()const {return getExpression(Expression::ORDER_BY); }

const ASTPtr limitByOffset()const {return getExpression(Expression::LIMIT_BY_OFFSET); }

const ASTPtr limitByLength()const {return getExpression(Expression::LIMIT_BY_LENGTH); }

const ASTPtr limitBy()const {return getExpression(Expression::LIMIT_BY); }

const ASTPtr limitOffset()const {return getExpression(Expression::LIMIT_OFFSET); }

const ASTPtr limitLength()const {return getExpression(Expression::LIMIT_LENGTH); }

const ASTPtr settings()const {return getExpression(Expression::SETTINGS); }

bool hasFiltration()const {return where() || prewhere() || having(); }

/// Set/Reset/Remove expression.

    void setExpression(Expression expr, ASTPtr && ast);

ASTPtr getExpression(Expression expr,bool clone =false)const

    {

auto it = positions.find(expr);

if (it != positions.end())

return clone ? children[it->second]->clone() : children[it->second];

return {};

}

/// Compatibility with old parser of tables list. TODO remove

    ASTPtr sampleSize()const;

ASTPtr sampleOffset()const;

std::pair arrayJoinExpressionList()const;

const ASTTablesInSelectQueryElement * join()const;

bool final()const;

bool withFill()const;

void replaceDatabaseAndTable(const String & database_name,const String & table_name);

void replaceDatabaseAndTable(const StorageID & table_id);

void addTableFunction(ASTPtr & table_function_ptr);

void updateTreeHashImpl(SipHash & hash_state)const override;

void setFinal();

const char * getQueryKindString()const override {return "Select"; }

protected:

void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame)const override;

private:

std::unordered_mappositions;

ASTPtr & getExpression(Expression expr);

};

}


参考资料

https://cloud.tencent.com/developer/article/1604965

https://zhuanlan.zhihu.com/p/361622782

https://clickhouse.com/docs/en/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容