什么是分发引擎?
分发引擎在业务用来建立all表时使用。
all表的概念可以理解为一个视图。
在all表上读数据时CH数据流程如下:
1.分发SQL到对应多个shard上执行SQL
2.执行SQL后的数据的中间结果发送到主server上
3.数据再次汇总过滤。
如下图所示:
在各shard执行SQL是自动且并行化的,无需参数配置或手动干预。读数时随机选择某个shard的replica进行读书。如果表有索引优先使用索引。
分布式引擎接受参数有:服务器配置文件中的集群名称,远程数据库的名称,远程表的名称以及(可选)分片键。例:
Distributed(logs, default, hits[, sharding_key])
以上面的建表引擎作为例子。
参数说明:
logs : 服务器配置文件中的群集名称。rpm包装好后的配置文件在/etc/clickhouse-server/config.xml
default: 库名,也可以使用常量表达式来代替数据库名称,如currentDatabase()
hits:表名
sharding_key:路由算法
上面引擎的隐喻是:定位logs集群,从位于群集中每个服务器上的“default.hits”表中读取数据。数据将在远程服务器部分处理。例如,对于使用GROUP BY的查询,将在远程服务器上聚合数据,聚合函数的中间状态将发送到请求者服务器。数据将会进一步聚合。
配置信息详解
这个配置文件一般叫做metrika.xml,当然你也可以给这个文件自定义一个名字,但是别忘了在主config.xml中通过include_from标签包含进去。否则,服务会报找不到cluster的错误。metrika.xml 文件中一般存集群配置、ZK配置、分片配置等。
这个配置文件告诉我们说:
我定义了一个名字为"logs"的集群,由两个shard(分片)组成,每个shard包含两个副本。其中shard的概念,是指包含不同数据部分的服务器(所有Shard上的数据加起来是整个数据集)。副本是复制服务器(可以访问任何副本上的数据,副本是主主复制)。
对每个服务器来说,有两个强制参数(host,port),两个可选参数(user,password)。
host: 远程服务器的主机地址。可以指定为域名、IPv4或IPv6地址。如果指定域,服务器将在启动时执行DNS查找。如果DNS请求失败,服务器将无法启动。如果修改了DNS记录,请重新启动服务器以使新记录生效。
port:服务器之间通信的TCP端口(配置文件中的tcp_port,默认是9000)
user:连接到远程服务器的用户名称。默认情况下,用户是“default”。此用户必须具有连接到远程服务器的访问权限。在users.xml配置文件访问权限的管理。
password:以明文登录到远程服务器的密码。默认为空字符串。
当一个读取的sql落到一个shard上时,分片将选择一个可用副本。当然你也可以通过配置负载均衡算法(副本访问的优先级,参阅‘load_balancing’设置)。如果与服务器的连接没有建立成功,则在短暂超时后继续尝试连接。如果连接失败,下一个副本将被选中,以此类推。如果所有副本的连接尝试失败,会以相同的方式重复几次尝试。与此同时,远程服务器仍然会接受连接,但大概率不能正常提供服务。
在写配置文件的时候,你可以只写一个分片,这时候就没有分发的概念了,因为所有的数据都将落在这一个shard上。你也可以搞N个分片,每个分片搞N个副本。每个分片的副本数量可以不同。
你也可以在配置文件中配任意数量的cluster。
查看集群
查看我当前有几个集群,运行命令:
分发引擎让你使用起来就好像在用本地服务器(其实它已经在分布式的工作了)。注意,群集是不可扩展的:必须将其配置写入每个服务器的配置文件。
不支持查看其他分布式表的Distributed表(除非分布式表只有一个分片)。作为替代方法,使分布式表查看“最终”表。
分发引擎需要编写集群配置文件。修改后的配置的可热更新,不需要重新启动服务器。如果需要每次都向未知的分片和副本发送查询,无需创建分布式表,推荐使用“远程”表格功能。请参阅“table functions”。
有两种将数据写入集群的方法:
1. 你想往哪些服务器写哪些数据,直接通过分片去写入。
2.通过distributed表灌数,引擎将通过sharding key (最后一个参数,必须指定)算法分散数据落到不同的服务器上。如果只有一个分片的情况下,无需指定sharding key。
每个分片都可以在配置文件中定义一个权重。默认情况下,权重等于1。数据以与分片权重成正比的量分布在分片上。例如,如果有两个分片,第一个的权重是9,第二个的权重是10,则第一个将存放9/19数据集,第二个将存放10/19。
每个分片都可以在配置文件中定义“internal_replication”参数。如果此参数设置为“true”,则写入操作会选择第一个健康的replica并向其写入数据。然后各个replica之间通过zookeeper自动同步数据,类似于replicated表的数据同步模式。如果它设置为'false'(默认),数据将被写入所有副本。实质上,这意味着Distributed表本身复制数据。这比使用复制表要糟糕,因为副本的一致性未被检查,并且随着时间的推移,它们的数据会存在部分不一致的情况。
分片表达式(sharding key)
分析分片的表达式(sharding key)用来决定将数据写入到哪个分片。
最后一项sharding_key是可选的,可以是一个表达式,例如rand(),可以是一列 例如user_id,(integer类型),通过对余数值进行取余分片,(rand()函数返回值/shards总权重)【这有个好处就是特定的Userid会落到特定的分片上,从而简化了用户的in和join操作】。如果担心分片数据不均匀,也可以加上hash函数,如intHash64(user_id)
假设现在来了一条数据,这条数据要写到哪个分片呢?我们来看看这里的算法实现。
首先假设我们有3个分片,权重分别是9,10,11。
3个shard分割出如下3个权重空间[0,9);[9,19);[19,30]。其中,第一个shard拥有第一个权重空间,第二个shard拥有第二个权重空间,第3个shard拥有第三个权重空间。
权重空间分割的计算方式:[prev_weight,pre_weights+weight) ... 其中,prev_weights是最左分片权重的总和,weight表示当前shard权重,示例如上。
现在来了一条待写入的row,我们看看写入时发生了什么?
1.sharding key表达式先被解析,假定sharding key表达式是rand()函数,函数返回值是43。
2.函数返回值/shards权重总和=43/(9+10+11) = 13
3.查找13属于范围[9,19),这个权重空间属于第二个shard,于是,数据将落到第二个shard上。
对于sharding key的选择来说,划分的一个简单的余数(rand算法)是分片的有限解决方案,并不总是合适的。它适用于大中型数据(数十台服务器),但不适用于大量数据(数百台服务器或更多)。在后一种情况下,使用主题区域所需的分片方案,不建议使用distribute表中的条目。
当使用复制表格时,可以复制数据 - 查看“resharding”部分。但在许多情况下,最好不要这样做。SELECT查询被发送到所有的分片,并且无论数据如何分布在分片上(它们可以完全随机地分配),都可以工作。当你添加一个新的分片时,可以给这个shard配一个较重的权重 - 数据会有一部分落到这个shard上,这导致整体cluster的数据分布稍微不均匀,但不会影响正常查询。
分片方案
在以下场景,应该关注设计分片方案:
- 使用查询需要通过特定的键来连接数据(IN或JOIN)。如果数据被这个键分割,你可以使用本地IN或JOIN而不是GLOBAL IN或GLOBAL JOIN,这样更有效率。
- 使用大量的服务器(数百个或更多)以及大量的小型查询(来自各个不同个体的客户端 、 网站,广告商或合作伙伴)。为了使小型查询不影响整个集群,在单个分片上定位单个客户端的数据是很有意义的。或者,正如我们在Yandex.Metrica中所做的那样,您可以设置双层分片:将整个集群划分为“层”,其中一个层可以由多个分片组成。单个客户端的数据位于单个图层上,但是可以根据需要将分片添加到图层中,在他们内部分发。为每个层创建分布式表,并为全局查询创建一个共享的分布式表。(听上去很高大上,这对提供大数据的基础服务提供了一点儿新思路)
数据是异步写入的。对于分配给表的INSERT ,数据块只写入本地文件系统。数据尽快发送到后台的远程服务器。你应该通过检查表目录/var/lib/clickhouse/data/database/table /中的文件列表(等待发送的数据)来检查数据是否成功发送。
如果服务器在INSERT到Distributed表之后宕机或重启(例如,在设备出现故障后),插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,则将其转移到“已损坏”的子目录中,不再使用。