本文是对《ClickHouse原理解析与应用实践》一书的概括性总结,整体章节和结构尊重原文,由于书的出版在2019年,版本较旧,所以对应部分有修正,修正来源于clickhouse官方设计文档。因此本文是该书与clickhouse官方文档的一个互补结合。
第二章
2.1 核心特性
- 列式存储:纯列式数据库/数据压缩
- 向量化执行/SIMD
- 关系模型/标准SQL
- 存储引擎抽象/20多种存储引擎
- 多线程分布式/分区分片
- 多主架构
- 数据分片(replica, ditribute table)
第三章
两个有用的小工具:
- clickhouse-local 一个单机版的微内核,和标准的clickhouse服务完全隔离开,数据也不共享,适用于小批量数据。
-
clickhouse-benchmark: echo 'SELECT count(*) from tutorial.visits_v1' | clickhouse-benchmark -i 20000 可以对指定查询做一次benchmark。我在1核2G的服务器上跑了下,得到结果:
第四章
4.1 数据类型
clickhouse常量类型推断:最小存储代价
4.1.1 基础类型
各种长度的整数、浮点数,定点数,字符串、定长字符串、32位UUID、三种时间类型(精度最高就到亚秒)
4.1.2 复合类型
其中array和nested两种嵌套类型,可以之后再用SQL(array [left] join)打平成多行数据。
- array:[1,2.0] array(float64) 要求同类型
- tuple:(1,2,'a','2021-04-22 16:05:25') 可以不同类型,要求定长
- enum:枚举类型,类似C中的枚举
- nested:嵌套类型,但是最多只能嵌套一层,用以表明一些一对多的关系。本质上是用多维数组来存储,行内的数组必须都等长;行之间的数组不必等长。
4.1.3 特殊类型
- nullable
这是一种基本类型的修饰符,表示可以为空。但注意,被修饰的列不能被索引。这种修饰符要慎用,它会对nullable的列额外补充一个文件[column].null.bin,这意味着读取时,有双倍的I/O。 - IPv4 IPv6
这也是种特殊的类型,专门存IP的,底层是int32,提供合法性检查,插入时用字符串插入即可。
4.2 DDL
4.2.1 数据库
clickhouse共有5种库引擎:
- Memory:内存的临时库,不落盘,重启服务自动删除;
- Ordinary:默认引擎;
- Dictionary:第五章介绍;
- Lzay:只能使用Log系列表引擎,Log表引擎第八章介绍;
- Mysql:自动拉取远端Mysql数据,并创建Mysql表引擎的数据表,第八章介绍。
数据库本质上就是文件目录。
4.2.2 数据表
表也要自选引擎。
4.2.3 默认值表达式
有三种默认值表达式:
- DEFAULT:可以显示写入,进行物理存储,随SELECT *返回,插入时就计算该列的值
- MATERIALZED:不能显示写入,但会进行物理存储,不随SELECT *返回
- ALIAS:不能显示写入,不进行物理存储,不随SELECT *返回,只在需要该列时,随查询计算
4.2.4 临时表
只支持MEMORY表引擎,不属于任何数据库,生命周期和Session绑定,连接断掉,表就废掉。
一般不用,主要是内核用。
4.2.5 分区表
概念同HIVE分区表,但是它只作用于本地,没有什么分布式的概念。
只有MergeTree家族的表引擎才支持分区。
分区可以在不同的表间进行复制迁移,但两个表的结构和分区键必须一致。
第六章详细介绍。
4.2.6 视图
- 普通视图:就是简化SQL的一种手段,对存储引擎没有任何影响。
- 物化视图:实际上类似于一张表,带有一定的逻辑和它的宿主表表示同步更新(仅插入,删除和更新还不支持);初始化时可以从宿主表同步,也可以从0开始。
clickhouse的物化视图,还可以指定一个TO db.name,用于将数据从一张表同步到另一张表。
4.3 数据表基本操作
- 列级别的增删改:
- 新增一列:默认值补全数据
- 修改数据类型:需要兼容
- 表移动位置:换一个数据库,可以通过RENAME实现,但是只能在本地库中,不能是远程的。
4.4 分区
可以将一个分区内的某列数据清空,设置为初始值。
也可以将分区卸载/装载,本质上,是分区文件夹位置的迁移,不会真正的删除。
4.5 分布式DDL
需要主动声明ON CLUSTER xx_cluster,才会把DDL的SQL语句在某集群内统一执行。
数据块为单位进行操作,在块级别有原子性
4.7 修改和删除
由于列存储,clickhouse的修改和删除非常的重。会把一个表的所有分区的目录copy一次,去掉那些删除的行,直到写一次merge时,原先inactive的数据才会被删除。
而且,异步、非原子性。
第五章 数据字典
存于内存的一个scheme,可以build on top of many externel sources(clickhouse, mysql, linux file...)。
scheme以key-attributes形式存储,key是一个/多个属性,attributes就是一组属性
默认惰性读取(用到时从外部source读进内存),可以改配置为启动时读取。
外部source包括本地文件、远程文件、可执行文件、clickhouse、ODBC、DBMS(mysql/postgres/mongoDB/redis)。
这意味着,传统的ETL功能,很大程度上被代替了。但是实际上数据主要在内存,没有真正落入clickhouse表(虽然我们能以类似表的SQL去访问它)。
在2020年及之后的clickhouse中,诞生了dictionary表引擎,使得访问dictionary和访问普通数据库表的操作,完全一致。
5.2.4 扩展字典的类型
- 单数值key:
- flat(recommended): 数组形式,size有上限,全存在内存里,但性能最高;支持所有source;
- hashed(recommended): 哈希表形式,全部存于内存,size无上限;支持所有sources;
- sparse hashed:稀疏哈希,更省内存,更花费CPU;
- range_hashed: 可以由范围哈希;
- cache:固定数量的内存slot,自己控制cache的一些策略,且不支持local file,很难使用;
- direct:不存内存,直接访问外部源;
- 复合key:
- complex_key_hashed(recommended):也是哈希表,但是key可以由多个属性构成;
- complex_key_cache:复合键cache
- ip_trie:用于查ip前缀的,比较特殊。
5.2.6 更新策略
一定时间范围内随机定期更新,能增量就增量,不能增量就全量。
第六章 MergeTree原理解析
6.1 MergeTree的创建方式与存储结构
MergeTree在写入数据时,是以block为单位写入的,而且block是immutable的。clickhouse通过后台线程,定期合并这些block,属于同一个分区的block会被合成新的block,因此被称为MergeTree。
6.1.1 创建MergeTree表的重要参数
几个参数:
- PARTITION BY(optional):
- ORDER BY(required):一个block内部的数据按照什么排序,默认按照primary key排序,也可以自定义一个/几个key进行排序;
- PRIMARY KEY(optional):主键会生成主索引,在单个block内部,数据按照主索引的规则升序排序,允许存在重复数据。默认与ORDER BY一样,所以是可选的。
- SAMPLE BY(optional):抽样表达式,表示数据以何种标注抽样,必须在主键中出现;
- SETTINGS: index_granuarity(optional):重要参数,每间隔多少条数据生成一条索引,默认8192;
- SETTINGS: index_granualrity_bytes(optional):clickhouse会根据每一批次的数据的体量大小,动态划分间隔大小。本参数表名每批数据体量大小,默认10MB;
- SETTINGS:merge_with_ttl_timeout(optional):数据TTL;
- SETTINGS:storage_policy(optional):多路径的存储策略。
6.1.2 MergeTree的存储结构
物理存储结构如下:
- columns.txt:该分区各列字段信息,明文存储;
- count.txt:该分区下数据行数,明文存储;
- primary.idx:二进制文件,主索引,也是一个稀疏索引;
- [Column].bin:二进制数据文件,压缩格式存储;
- [Column].mrk:二进制文件,列的标记文件,保存了数据文件中数据的offset信息。标记文件与稀疏索引对齐,又与数据文件一一对应。查询时,首先通过主索引找到对应数据的offset信息,利用offset从数据文件读取,后面细讲。
- [Column].mrk2:与mrk类似,但是是自适应索引的标记文件;
- skp_idx_[Column].idx & skp_idx_[Colun].mrk:二级索引数据文件和标记文件;
- partition.dat:当前分区,分区表达式的值,其实就是哪个分区;
- minmax_[PartitionColumn].idx:当前分区,分区列的最大最小值;
- checksum.txt:保存分区各文件的size及哈希,用于校验完整性。
6.2 数据分区
如果分区key是整数、日期,则转化为字符串作为分区id,浮点、字符串要哈希之后作为分区id。多个key作为分区key用'-'来连接。
6.2.3 分区目录合并
MergeTree的一个重要特征就是,每次写入的时候,都会根据分区key产生一批新的分区目录,这和原先有的分区目录可能会有重复。比如,同样是'202104'的分区,可能最后产生了多个分区目录,而不是在一个分区目录下追加文件。
然后,这些相同分区的目录,通过后台任务进行合并,称为新的分区目录。旧分区目录延迟一段时间再删除。
每一个分区目录都有三个属性:
- MinBlockNum/MaxBlockNum:取同一个分区最大/最小block的num。注意这里的block含义比较特殊:可以这样理解,block num是一个分区内,产生分区目录的全局计数,每新插入一个分区目录(可能是重复的分区,之后再合并),这个计数就会+1。对于新插入的分区目录,minblocknum=maxblocknum,合并过的分区目录则不一样了。
- level:合并次数。
6.3 主索引
一般来说ORDER KEY和PRIMARY KEY都是一致的,他们指定了主索引和数据的排序顺序。
主索引是稀疏索引,默认粒度8192. 由于占用空间极小,所以常驻内存,访问速度极快。
数据文件(.bin)也是按照索引粒度进行数据块的压缩;标记文件也会被索引粒度所影响。
primary.idx按照索引粒度,将相应位置的PRIMARY KEY读出来,按照顺序紧密拼接起来,没有一个多余的字节。多个key之间也不分隔。
6.3.4 索引的查询过程
primary.idx将数据文件划分为若干个等粒度的markRange,这些步长为1的markRange可以进行连续的合并形成更大的markRange,在逻辑上构成一个树形。查询的目标,就是定位到,哪些markRange(步长为1的)可能会含有QUERY值。
- 首先根据查询解析到QUERY的PRIMARY KEY范围;
- 从最大的数据范围内开始进行递归查找,如果QUERY范围和数据范围有交集,则划分成8个子区间(可以配置),如果已经不能拆8份了,即数据范围的markrange步长<8,那么就返回;如果没交集,那直接剪枝掉。
- 最终把返回的markRange区间合并起来。
6.4 二级索引(data_skipping_index)
二级索引要在建表时候主动声明。merge tree会为每个二级索引,建一个skp_idx_[Column].idx索引文件和skp_idx_[Column].mrk的标记文件。
CREATE TABLE table_name
(
u64 UInt64,
i32 Int32,
s String,
...
INDEX a (u64 * i32, s) TYPE minmax GRANULARITY 3,
INDEX b (u64 * length(s)) TYPE set(1000) GRANULARITY 4
) ENGINE = MergeTree()
...
注意到,这里的GRANULARITY和主索引中的index_granularity的含义是不一样的。GRANULARITY控制的是每几个index_granularity,在索引文件中,输出一条汇总聚合信息。
skip_index的种类:
- minmax:计算表达式的极值;
- set(max_rows):计算表达式的唯一值,max_rows表示每个index_granularity里,索引最多记录的行数,0表示无限制;
- bloom_fliter(false positive rate):支持大多数类型的bf;
- ngrambf_v1(token_length, size_of_bf, num_of_hash_functions,random_seed):仅用于字符串的bf,对于in,notin,like,equals,notequals查询有帮助,但是是按照固定长度进行分割;
- tokenbf_v1(size_of_bf, num_of_hash_functions,random_seed):这个也是字符串bf,但是分割方式不一样,是按照非字符数字的字符串作为分割token。
6.5 数据存储
列存储,每个列都有一个.bin数据文件。每列数据都是经过压缩的,目前支持LZ4,ZSTD,Multiple,Delta,数据按照ORDER KEY进行排序,以压缩数据块的格式写入数据文件。
6.5.2 压缩数据块
数据文件的写入,首先是以批次为单位的,即index_granularity所规定的条数。然而这些条数的size是不确定的,对于这个size,有两个极限值来限制(min_compress_block_size(64KB), max_compress_block_size(1MB))。
考虑三种情况:
- 单批次size正好在64KB-1MB之间:那么直接压缩落盘成一个压缩块;
- 单批次size超过了1MB:那么每1MB都要截断生成一个压缩块落盘,剩下最后一个如果不到64KB,见第三种情况;
-
单批次size不足64KB:暂时不压缩,继续获取下一批次,累积到64KB再压缩落盘。
clickhouse对于数据最细粒度的使用就是这个压缩块,也就意味着,即使是只查询一个分区的一列数据,也可以通过只读部分压缩块来减少I/O。
6.6 数据标记
.mrk文件为索引文件.idx和数据文件.bin建立联系。
标记文件的逻辑结构如下图所示,每一行都代表了一个index_granularity,文件中记录了granularity在压缩文件中的位置,以及将对应的压缩块解压缩后,在解压缩块中的偏移量。
标记文件并不常驻内存,而是使用LRU策略缓存。
6.6.2 数据标记的工作方式
- 给定index_granularity的标号,去.mrk文件里面读出压缩块偏移量,然后再找到下一个有变化的偏移量,二者之间就是本个granularity的压缩块的位置;
- 把压缩块拉到内存里进行解压;
- 根据.mrk文件中的解压缩块的偏移量(类似于第一步),扫描相应位置,读到数据。
6.7 MegeTree Summarization
6.7.1 写入
6.7.2 查询
查询的性能主要取决于WHERE条件的Selectivity是否命中了索引,如果没有索引,也只能顺序扫描全部,可以多线程并行扫描。
第七章 MergeTree系列表引擎
MergeTree家族是clickhouse最核心的存储引擎。
7.1 MergeTree
本章只讲两个额外的mergeTree特性。
7.1.1 TTL
clickhouse可以对列/表设置TTL,表示清除它们的时间;对列清除是把它们变成默认值,对表清除是把过期的行删掉。
TTL的设置必须依赖于表中已有的一个日期/时间字段,在它的基础上定义过期的绝对时间。
可以在表定义的时候指定TTL,也可以后面再修改。
具体到实现层面,TTL的实现会在每个分区目录下写一个ttl.txt文件,用json格式配置TTL信息:
- columns:列级别TTL
- tables:表级别TTL
- min/max:保存当前分区内,过期时间戳的最小值和最大值。
只有在MergeTree合并分区时,才会触发TTL删除机制。
7.1.2 多路径存储
可以以分区为最小单元,将数据写入多个磁盘目录。
- 默认策略:所有分区写入path指定的位置;
- JBOD(Just a Bunch Of Disks)策略:round rubin方式写入多个磁盘位置;
- HOT/COLD策略:分成HOT/COLD区域,开始写都向SSD区域写热数据,热数据分区数据囤聚累计到阈值了,数据就自行移动到COLD区域。每个区域内部可以用JBOD策略。
7.2 ReplacingMergeTree
MergeTree允许主键重复,ReplacingMergeTree可以在分区内部保证在充分merge之后(OPTIMIZE命令),数据按照ORDER KEY不重复。
- 仅在merge分区时才会触发删除重复数据;
- 不同分区不去重(因为不会一起进行排序);
- 去重默认根据数据插入时间,保留最新的;但也可以在建表时设置一个ver参数,ver代表表的一个列,去重会保留最大的该列数据。
7.3 SummingMergeTree
如果用户只关心该表的汇总数据(SUM),不关心明细数据,而且GROUP BY条件预先都设置好的,则可以用SummingMergeTree。
这其实就是数仓中的DWS层,只不过clickhouse在存储引擎层做了这个事情。
- 根据ORDER BY作为聚合数据的最细粒度;
- 以分区为单位进行聚合(SUM),不同分区不聚合;
- 可以建表时指定需要聚合的列,默认是所有数值列;非聚合列则使用第一行数据;
- 嵌套数据也可以内部按照key聚合。
注意在SummingMergeTree和后面的AggregatingMergeTree中,出现一种use case,就是PRIMARY KEY(索引包含字段)和ORDER KEY(数据排列顺序)不一致。
这种不一致只能是PRIMARY KEY是ORDER KEY的前缀。
用户可能想让汇总表按照A、B、C、D四个字段为粒度的汇总,但是从查询过滤的角度来说,只需要过滤A即可,后面的区分度不大,那么可以让数据按ABCD进行ORDER BY排序,主索引只有按A排列。
后面如果业务上不需要按照ABCD为粒度进行汇总了,那么也可以修改为ABC或者AB。
7.4 AggregatingMergeTree
是一个增强版的SummingMergeTree,可以对各个需要聚合的字段在建表时确定聚合函数。
虽然提供了非常强大的DWS功能,但是使用起来,插入/查询十分不便,因为不能指定列名,要显式写特殊函数,因此AggregatingMergeTree通常会作为明细表的物化视图的引擎而出现。插入,正常插入底表,同步至物化视图;查询,则需要特殊函数指定聚合列。
CREATE MATERIALIZED VIEW test.basic
ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate)
AS SELECT
CounterID,
StartDate,
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
FROM test.visits
GROUP BY CounterID, StartDate;
SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
GROUP BY StartDate
ORDER BY StartDate;
7.5 CollapsingMergeTree
支持行级别的数据修改和删除的引擎。
以增代删,需要指定一个列,作为删除的标记字段。
7.6 VersionedCollapsingMergeTree
通过指定一个version列,保证最大的version可以被保留,使得分区内的修改和删除顺序,是可以保证的,即使是在多线程写入的情况下。
7.7 MergeTree家族关系
MergeTree家族的各个引擎,仅是在Merge的过程中,按照ORDERY KEY排序的过程中各展所长而已(折叠、聚合、去重)。
除此之外,每个引擎,还可以通过zookeeper广播log entry来实现replicated版本。
第八章 其他表引擎
8.1 外部存储引擎
clichhouse存储元数据,数据文件由外部提供。
- HDFS:clickhouse可以通过HDFS引擎,操控HDFS文件,查询、写入等;也可以直接去读HDFS上已有的文件。
- MySQL:远程操作表。
- JDBC
- Kafka:可以用表的形式做消费端。每隔一段时间拉一次数据,先放缓存,然后入表。仅支持at least once语义。
- File:直接读取本地文件,可以读写。
8.2 内存类型
内存引擎都是直接在内存里读数据,在内存里进行查询。实际上,某些引擎也会将数据落盘以防止丢失。数据表加载时再读到内存中。没有主键,order key等。
- Memory:将数据原样保存于内存,没有压缩、序列化、落盘,重启就会丢失。clickhouse内部将其用于临时广播表。
- Set:自动去重。数据首先写入内存,然后同步至磁盘。可以INSERT,不能SELECT查询,只能位于IN查询的右侧条件。
- [num].bin:num表示数据写入的批次,自增的,不合并,所有列的数据都在这个文件里;
- tmp:写入临时目录,进入数据文件后清理掉。
- Join:和Set引擎的逻辑非常像,但是可以直接SELECT查询,主要用于和其他表进行join。
- Buffer:某个主表的缓冲区,写入首先写入内存Buffer表,然后定期/定量的并行地刷入主表,以减少主表merge。不用于查询,不持久化。
8.3 日志类型
数据量很小(不到100W行),一次写入多次查询,查询不复杂,才会用日志引擎。不支持索引、分区,不支持并发读写,完全同步,有物理存储。
- TinyLog:只有每列一个.bin数据文件,单线程读写;
- StripeLog:所有列都在一个.bin数据文件,但是有标记文件.mrk,可以并行读。
- Log:性能最高的日志引擎。各列都有.bin数据文件,但是每个表只有一个.mrk文件,存着各列数据文件的位置信息。
8.4 接口类型
本身不存储任何数据,作为“胶水”整合其他数据表。
- Merge:不能写入,可以作为代理整合同库同表结构的其他表(分区定义可以不同),对它的查询会dispatch下去,异步并行执行,最终合成结果集返回。
- Dictionary:之前说过,是磁盘上数据scheme的一个内存映射,可以以表的形式访问,创建一个Dictionary库,可以把所有Dictionary弄成表。
- Distributed:分布式分shard的中间件,后面介绍。
第九章 数据查询
这里只标注一些clickhouse特殊的数据查询形式。
- Join strictness:ch的join不仅有传统的内外连接,还有一个连接严格度
- ALL:这和我们普通SQL所支持的是一样的A ALL INNER JOIN B ON A.key=B.key,如果A中一条记录匹配B中多条记录,那么全部emit;
- ANY:只返回匹配的第一条;
- ASOF:模糊查询,可以定义一个模糊key,满足>=关系即认为是匹配的,然后只emit第一条匹配的。这个主要是要由于clickhouse只支持equal join,不支持其他的join 匹配方式。
clickhouse执行join(本地)时会把右表当做小表完全拉进内存与左表比较;而且JOIN没有任何缓存,频繁使用的右表,最好都做成JOIN引擎表来进行缓存;clickhouse是大表模式,join非常吃力,如果需要连续补充多个维度,可以将维度表作为数据字典来join。
- prewhere:这是where的一个优化版本。除了有索引的条件外,where一般会在select之后,返回之前执行,作为筛选;而prewhere则会在最初在相应的列选出满足条件的数据,之后在select过程中再补充其他列。
第十章 副本与分片
10.2 副本
插入数据时,数据首先写入内存缓冲,然后刷到磁盘tmp目录,全部刷完后,整合进正式分区,然后将日志entry同步至zookeeper。数据写入以block为基本单元和最小粒度(max_insert_block_size),对block的写入保持原子性。
clickhouse副本写入依赖zk,但是查询并不依赖zk。同一个shard写进同一个zk_path,各个副本保持不一样的replica_name。
ch的副本是表级别的,而且是多主架构,这种架构使得副本不仅仅为了容错,每个副本都可以作为读写的入口,用以负载均衡。
10.3 ReplicatedMergeTree原理解析
replicatedMergeTree会在zk_path上为这张表创建一组监听节点,分成以下几类:
- 元数据:表metadata,列字段,副本们
- 判断标识:主副本的选举工作;block数据块的哈希值和所属的partition_id;quorun数量,最少写成功副本;block_nums,数据块的写入顺序;
- 操作日志:常规log和mutations(被称为logEntry和MutationEntry);log执行任务队列;log/mutation执行offset;
LogEntry包含以下信息:
- source_replica
- type(get,merge,mutate)
- block_id
- partition_name
MutationEntry包含以下信息:
- source_replica
- commands(DELETE/UPDATE)
- mutation_id
- partition_id
需要主副同步的操作主要有INSERT,MERGE,MUTATION,ALTER四种,即数据写入,分区合并,数据修改,元数据修改四种。
对于其他SQL指令,如SELECT,CREATE,DROP,RENAME,ATTACH等,不支持分布式,要不然登录每台机器结点分别执行,要不然用一些trick,后面讲。
- INSERT:哪台服务器作为接口提供写入,哪台服务器就是主服务器,首先在本地写成分区的形式,然后把写入log放到zk,写入log中只包含分区信息,不含具体数据,其他副本监听log,读取后放入队列,移动log pointer。副本的后台线程从zk队列中读取log,并选择log pointer最大,队列最短的节点,建立http连接,读取分区。
- MERGE:无论在哪个副本上触发了MERGE条件,MERGE最终都是主节点来进行的。follower向主副本进行通信,主副本制定MERGE计划,做成LOG推送到zk中。与此同时,主副本锁住执行线程,监听MERGE执行情况,各副本将LOG拉到自己的zk QUEUE里,然后进行消费,在本地执行MERGE,直到达到用户设置的个数之后,主副本线程解锁。
- MUTATION:和MERGE类似,通知主副本指定MUTATION计划,各副本执行即可;
- ALTER:直接修改zk元数据,更改zk元数据版本;各副本会对zk元数据进行监听,有变化时会进行对照修改。全部修改完成后谁执行谁负责结束。
10.4 数据分片
clickhouse可以灵活配置多个cluster,在每个cluster,一个表可以由多个shard(水平数据分区),每个shard内部还可以有数据副本(垂直数据冗余)。
有了cluster name的配置后,一些DDL语句可以用ON CLUSTER cluster_name进行分布式执行,原理就是根据配置,在每个replica都执行相同的指令。
其中{replica},{shard}可以以宏的形式写,因为在集群每台机器内,都有表存储了当前机器的replica shard分别是什么值。
DDL的分布式执行也是借助于zk,task的发布、状态、完成情况都记录在zk,秉着谁发起谁负责的策略,发起者负责监视是否cluster内所有节点任务完成,完成则结束返回,否则要转入后台执行。
10.5 Distributed原理解析
分布式表对应多个本地表,在多个本地shard表提供一个分布式透明性的服务。分布式表的INSERT SELECT直接作用于其管理的本地表,但是CREATE DROP RENAME之类的元数据操作只作用于自身,不作用与本地表。不支持MUTATION。
分布式表要求所有本地表结构一致,命名相同,仅shard和replica参数不同,在读时检查;
分布式表创建时也要指定ON CLUSTER,在集群所有节点创建分布式表,使得整个集群都可以成为读写入口。
分布式表创建时要指定一个sharding key和sharding function进行分区,function最终返回一个整数即可; 每个shard在配置文件中都有一个权重,代表数据流入的比率,clickhouse按照weights将sharding function的值域划分为几个连续区间,承接数据写入。
10.5.4 DISTRIBUTED写入
DISTRIBUTED表的数据写入一个shard节点,首先是,把本shard内的数据写入分区,然后把其他shard分区的挑出来,分别写到一个固定位置去,形成分区;本shard内有一个对固定位置的监视器,监测到分区目录变化后,会根据目录名,立即与远程shard建立联系,压缩传递数据。秉着谁执行谁负责的原则,写入shard负责确保数据都已经正确写入,结束返回。
写入的过程可以设置同步/异步,异步不用等待远程写完,同步需要设置超时。
上面的过程只考虑了sharding,没有考虑replica,replica的同步写入可以有两种模式,通过配置文件写死配置。第一种可以通过上述的方式,由distribute引擎写入replica,然而这种方式,写入节点要传输和写入的replica太多了,容易造成单点瓶颈;另一种方式是通过Repliacted-MergeTree,利用zk传输日志来进行同步,这样写入节点只需在每个shard选一个replica写入即可,具体选哪个可以根据一个全局计数器errors_count来选择。
10.5.5 DISTRIBUTED查询
分布式查询,要在每一个shard选择一个replica,这就涉及一个负载均衡算法,由参数控制,有以下四种
- random:默认算法。选择errors_count最小的replica,相等的话就随机选择;
- nearest_hostname:还是首先选择errors_count最小的,然后选hostname最接近的;
- in_order:先选errors_count,相等的话看配置文件的顺序;
- first_or_random:先选errors_count,然后按配置文件的第一个看,如果第一个不可用就随机选了。
可想而知,查询也是谁执行谁负责,谁是入口查询节点,谁就要串联整个查询过程。包括分割分布式查询为本地子查询,选择连接其他shard的节点,传递SQL,收到结果数据,UNION返回结果。
- 使用GLOBAL优化分布式子查询
对于SQL中的子查询和JOIN,很可能在一条SQL中出现两次分布式表,那么就会出现SQL被反复传递以获取信息,导致查询请求幂次扩大的问题,为了解决这个问题,在第二/N次出现分布式表的时候,加入GLOBAL字段,查询中首先执行GLOBAL子查询,得到结果返回,构成内存表,再广播给各个节点,最后执行主查询。
由于这种分布式IN/JOIN方式,子句返回的结果不能太大,要在内存中放得下,因此最好要提前DISTINCT或者筛选掉一部分。