什么是时序索引?
其主要特点体现在两个方面,
一存,以时间为轴,数据只有增加,没有变更,并且必须包含timestamp(日期时间,名称随意)字段,其作用和意义要大于数据的id字段,常见的数据比如我们通常要记录的操作日志、用户行为日志、或股市行情数据、服务器CPU、内存、网络的使用率等;
二取,一定是以时间范围为第一过滤条件,然后是其它查询条件,比如近一天、一周、本月等等,然后在这个范围内进行二次过滤,比如性别或地域等,查询结果中比较关注的是每条数据和timestamp字段具体发生的时间点,而非id。
此类数据一般用于OLAP、监控分析等场景。
最近的一个项目是风控过程数据实时统计分析和聚合的一个OLAP分析监控平台,日流量峰值在10到12亿上下,每年数据约4000亿条,占用空间大概200T,面对这样一个数据量级的需求,我们的数据如何存储和实现实时查询将是一个严峻的挑战,经过对Elasticsearch多方调研和超过几百亿条数据的插入和聚合查询的验证之后,总结出以下几种能够有效提升性能和解决这一问题的方案,包括从集群规划、存储策略、索引拆分、压缩、冷热分区等几个维度的优化方案,在本文中逐一介绍,希望对你有所帮助和启发。
本文所使用的Elasticsearch版本为5.3.3。
一,集群部署规划
我们都知道在Elasticsearch(下称ES)集群中有两个主要角色,Master Node和Data Node,其它如Tribe Node等节点可根据业务需要另行设立。为了让集群有更好的性能表现,我们应该对这两个角色有一个更好的规划,在Nodes之间做读取分离,保证集群的稳定性和快速响应,在大规模的数据存储和查询的压力之下能够坦然面对,各自愉快的协作:)
Master Nodes
Master Node,整个集群的管理者,负有对index的管理、shards的分配,以及整个集群拓扑信息的管理等功能,众所周知,Master Node可以通过Data Node兼任,但是,如果对群集规模和稳定要求很高的话,就要职责分离,Master Node推荐独立,它状态关乎整个集群的存活,Master的配置
node.master: true
node.data: false
node.ingest: false
这样Master不参与I、O,从数据的搜索和索引操作中解脱出来,专门负责集群的管理工作,因此Master Node的节点配置可以相对低一些。
另外防止ES集群 split brain(脑裂),合理配置discovery.zen.minimum_master_nodes参数,官方推荐master-eligible nodes / 2 + 1向下取整的个数,这个参数决定选举Master的Node个数,太小容易发生“脑裂”,可能会出现多个Master,太大Master将无法选举。
更多Master选举相关内容请参考:modules-discovery-zen#master-election
Data Nodes
Data Node是数据的承载者,对索引的数据存储、查询、聚合等操作提供支持,这些操作严重消耗系统的CPU、内存、IO等资源,因此,应该把最好的资源分配给Data Node,因为它们是真正干累活的角色,同样Data Node也不兼任Master的功能,配置:
node.master: false
node.data: true
node.ingest: false
Coordinating Only Nodes
ES本身是一个分布式的计算集群,每个Node都可以响应用户的请求,包括Master Node、Data Node,它们都有完整的cluster state信息,正如我们知道的一样,在某个Node收到用户请求的时候,会将请求转发到集群中所有索引相关的Node上,之后将每个Node的计算结果合并后返回给请求方,我们暂且将这个Node称为查询节点,整个过程跟分布式数据库原理类似。那问题来了,这个查询节点如果在并发和数据量比较大的情况下,由于数据的聚合可能会让内存和网络出现瓶颈,因此,在职责分离指导思想的前提下,这些操作我们也应该从这些角色中剥离出来,官方称它是 coordinating nodes,只负责路由用户的请求,包括读、写等操作,对内存、网络和CPU要求比较高,本质上,coordinating only nodes可以笼统的理解为是一个负载均衡器,或者反向代理,只负责读,本身不写数据,它的配置是:
node.master: false
node.data: false
node.ingest: false
search.remote.connect: false
增加coordinating nodes的数量可以提高API请求响应的性能,我们也可以针对不同量级的index分配独立的coordinating nodes来满足请求性能,那是不是越多越好呢,在一定范围内是肯定的,但凡事有个度,过了负作用就会突显,太多的话会给集群增加负担,在做Master选举的时候会先确保所有Node的cluster state是一致的,同步的时候会等待每个Node的acknowledgement确认,所以适量分配可以让集群畅快的工作。
search.remote.connect是禁用跨集群查询,防止在进行集群之间查询时发生二次路由,modules-cross-cluster-search。
二,Routing
类似于分布式数据库中的分片原则,将符合规则的数据存储到同一分片。ES通过哈希算法来决定数据存储于哪个shard,
shard_num = hash(_routing) % num_primary_shards
其中hash(_routing)得出一个数字,然后除以主shards的数量得到一个余数,余数的范围是0到number_of_primary_shards - 1,这个数字就是文档所在的shard。
Routing默认是id值,当然可以自定义,合理指定Routing能够大幅提升查询效率,Routing支持GET、Delete、Update、Post、Put等操作。
如:
PUT my_index/my_type/1?routing=user1
{
"title": "This is a document"
}
GET my_index/my_type/1?routing=user1
不指定Routing的查询过程:
简单的来说,一个查询请求过来以后会查询每个shard,然后做结果聚合,总的时间大概就是所有shard查询所消耗的时间之和。
指定Routing以后:
会根据Routing查询特定的一个或多个shard,这样就大大减少了查询时间,提高了查询效率,当然,如何设置Routing是一个难点,需要一点技巧,要根据业务特点合理组合Routing的值,来划分shard的存储,最终保持数据量相对均衡。
可以组合几个维度做为Routing ,有点类似于hbase key,例如不同的业务线加不同的类别,不同的城市和不同的类型等等,如
- _search?routing=beijing按城市
- _search?routing=beijing_user123按城市和用户
- _search?routing=beijing_android,shanghai_android按城市和手机类型等。
数据不均衡?
假如你的业务在北京、上海的数据远远大于其它二三线城市的数据,再例如我们的业务场景,A业务线的数据量级远远大于B业务线,有时候很难通过Routing指定个一个值保证数据在所有shards上均匀分布,会让部分shard变的越来越大,影响查询性能,怎么办?
一种解决办法是单独为这些数据量大的渠道创建独立的index,如http://localhost:9200/shanghai,beijing,other/_search?routing=android
,这样可以根据需要在不同index之间查询,然而每个index中shards的数据可以做到相对均衡。
另一种办法是指定index参数index.routing_partition_size,来解决最终可能产生群集不均衡的问题,指定这个参数后新的算法如下:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
,
index.routing_partition_size 应具有大于1且小于 index.number_of_shards 的值,最终数据会在routing_partition_size几个shard上均匀存储,是哪个shard取决于hash(_id) % routing_partition_size的计算结果。
指定参数index.routing_partition_size后,索引中的mappings必须指定_routing为"required": true,另外mappings不支持parent-child父子关系。
很多情况下,指定Routing后会大幅提升查询性能,毕竟查询的shard只有那么几个,但是如何设置Routing是个难题,可根据业务特性巧妙组合。
三,索引拆分
Index通过横向扩展shards实现分布式存储,这样可以解决index大数据存储的问题,但在一个index变的越来越大,单个shard也越来越大,查询和存储的速度也越来越慢,更重要的是一个index其实是有存储上限的(除非你设置足够多的shards和机器),如官方声明单个shard的文档数不能超过20亿(受限于Lucene index,每个shard是一个Lucene index),考虑到I、O,针对index每个node的shards数最好不超过3个,那面对这样一个庞大的index,我们是采用更多的shards,还是更多的index我们如何选择,index的shards总量也不宜太多,更多的shards会带来更多的I、O开销,其实答案就已经很明确,除非你能接受长时间的查询等待。
Index拆分的思路很简单,时序索引有一个好处就是只有增加,没有变更,按时间累积,天然对索引的拆分友好支持,可以按照时间和数据量做任意时间段的拆分,ES提供的Rollover Api + Index Template可以非常便捷和友好的实现index的拆分工作,把单个index docs数量控制在百亿内,也就是一个index默认5个shards左右即可,保证查询的即时响应。
简单介绍一下Rollover Api 和 Index Template这两个东西,如何实现index的拆分
Index template
我们知道ES可以为同一目的或同一类索引创建一个Index template,之后创建的索引只要符合匹配规则就会套用这个template,不必每次指定settings和mappings等属性。
一个index可以被多个template匹配,那settings和mappings就是多个template合并后的结果,有冲突通过template的属性"order" : 0从低到高覆盖(这部分据说会在ES6中会做调整,更好的解决template匹配冲突问题)。
示例:
PUT _template/template_1
{
"index_patterns" : ["log-*"],
"order" : 0,
"settings" : {
"number_of_shards" : 5
},
"aliases" : {
"alias1" : {}
}
}
Rollover Index
Rollover Index可以将现有的索引通过一定的规则,如数据量和时间,索引的命名必须是logs-000001这种格式,并指定aliases,示例:
PUT /logs-000001
{
"aliases": {
"logs_write": {}
}
}
# Add > 1000 documents to logs-000001
POST /logs_write/_rollover
{
"conditions": {
"max_age": "7d",
"max_docs": 1000
}
}
先创建索引并指定别名logs_write,插入1000条数据,然后请求_rollover api并指定拆分规则,如果索引中的数据大于规则中指定的数据量或者时间过时,新的索引将被创建,索引名称为logs-000002,并根据规则套用index template,同时别名logs_write也将被变更到logs-000002。
注意事项:
- 索引命名规则必须如同:logs-000001
- 索引必须指定aliases
- Rollover Index Api调用时才去检查索引是否超出指定规则,不会自动触发,需要手动调用,可以通过Curator实现自动化
- 如果符合条件会创建新的索引,老索引的数据不会发生变化,如果你已经插入2000条,拆分后还是2000条
- 插入数据时一定要用别名,否则你可能一直在往一个索引里追加数据
技巧:
按日期滚动索引
PUT /<logs-{now/d}-1>
{
"aliases": {
"logs_write": {}
}
}
假如生成的索引名为logs-2017.04.13-1,如果你在当天执行Rollover会生成logs-2017.04.13-000001,次日的话是logs-2017.04.14-000001。
这样就会按日期进行切割索引,那如果你想查询3天内的数据可以通过日期规则来匹配索引名,如:
GET /<logs-{now/d}-*>,<logs-{now/d-1d}-*>,<logs-{now/d-2d}-*>/_search
到此,我们就可以通过Index Template和Rollover Api实现对index的任意拆分,并按照需要进行任意时间段的合并查询,这样只要你的时间跨度不是很大,查询速度一般可以控制在毫秒级,存储性能也不会遇到瓶颈。
四,Hot-Warm架构
冷热架构,为了保证大规模时序索引实时数据分析的时效性,可以根据资源配置不同将Data Nodes进行分类形成分层或分组架构,一部分支持新数据的读写,另一部分仅支持历史数据的存储,存放一些查询发生机率较低的数据,即Hot-Warm架构,对CPU,磁盘、内存等硬件资源合理的规划和利用,达到性能和效率的最大化。
我们可以通过ES的Shard Allocation Filtering来实现Hot-Warm的架构。
实现思路:
- 将Data Node根据不同的资源配比打上标签,如:host、warm
- 定义2个时序索引的index template,包括hot template和warm template,hot template可以多分配一些shard和拥有更好资源的Hot Node
- 用hot template创建一个active index名为active-logs-1,别名active-logs,支持索引切割,插入一定数据后,通过roller over api将active-logs切割,并将切割前的index移动到warm nodes上,如active-logs-1,并阻止写入
- 通过Shrinking API收缩索引active-logs-1为inactive-logs-1,原shard为5,适当收缩到2或3,可以在warm template中指定,减少检索的shard,使查询更快
- 通过force-merging api合并inactive-logs-1索引每个shard的segment,节省存储空间
- 删除active-logs-1
Hot,Warm Nodes
Hot Nodes
拥有最好资源的Data Nodes,如更高性能的CPU、SSD磁盘、内存等资源,这些特殊的Nodes支持索引所有的读、写操作,如果你计划以100亿为单位来切割index,那至少需要三个这样的Data Nodes,index的shard数为5,每个shard支持20亿documents数据的存储
为这类Data Nodes打上标签,以便我们在template中识别和指定
启动参数:
./bin/elasticsearch -Enode.attr.box_type=hot
或者配置文件:
node.attr.box_type: hot
Warm Nodes
存储只读数据,并且查询量较少,但用于存储长多时间历史数据的Data Nodes,这类Nodes相对Hot Nodes较差的硬件配置,根据需求配置稍差的CPU、机械磁盘和其它硬件资源,至于数量根据需要保留多长时间的数据来配比,同样需要打上标签,方法跟hot nodes一样,指定为warm,box_type: warm。
Hot,Warm Template
Hot Template
我们可以通过指定参数"routing.allocation.include.box_type": "hot",让所有符合命名规则索引的shard都将被分配到hot nodes上。
PUT _template/active-logs
{
"template": "active-logs-*",
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"routing.allocation.include.box_type": "hot",
"routing.allocation.total_shards_per_node": 2
},
"aliases": {
"active-logs": {}
}
}
Warm Template
同样符合命名规则索引的shard会被分配到warm nodes上,我们指定了更少的shards数量和复本数,注意,这里的复本数为0,和best_compression级别的压缩,方便做迁移等操作,以及进行一些数据的压缩。
PUT _template/inactive-logs
{
"template": "inactive-logs-*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"routing.allocation.include.box_type": "warm",
"codec": "best_compression"
}
}
假如我们已经创建了索引active-logs-1 ,当然你可以通过_bulk API快速写入测试的数据,然后参考上文中介绍的rollover api进行切割。
Shrink Index
Rollover api切割以后,active-logs-1将变成一个冷索引,我们将它移动到warm nodes上,先将索引置为只读状态,拒绝任何写入操作,然后修改index.routing.allocation.include.box_type属性,ES会自动移动所有shards到目标Data Nodes上。
PUT active-logs-1/_settings
{
"index.blocks.write": true,
"index.routing.allocation.include.box_type": "warm"
}
cluster health API可以查看迁移状态,完成后进行收缩操作,其实相当于复制出来一个新的索引,旧的索引还存在。
POST active-logs-1/_shrink/inactive-logs-1
我们可以通过Head插件查看整个集群索引的变化情况。
关于shard的分配请参考Shard Allocation Filtering
Forcemerge
到目前为止我们已经实现了索引的冷热分离,和索引的收缩,我们知道每个shard下面由多个segment组成,那inactive-logs-1的shard数是1,但segment还是多个。
这类索引不会在接受写入操作,为了节约空间和改善查询性能,通过Forcemerge Api将segment适量合并,
PUT inactive-logs-1/_settings
{ "number_of_replicas": 1 }
ES的Forcemerge过程是先创建新的segment删除旧的,所以旧segment的压缩方式best_compression不会在新的segment中生效,新的segment还是默认的压缩方式。
现在inactive-logs-1的复本是还是0,如果有需要的话,可以分配新的复本数
PUT inactive-logs-1/_settings
{ "number_of_replicas": 1 }
最后删除active-logs-1,因为我们已经为它做了一个查询复本inactive-logs-1,
DELETE active-logs-1
走到这里,我们就已经实现了index的Hot-Warm架构,根据业务特点将一段时间的数据放在Hot Nodes,更多的历史数据存储于Warm Nodes。
五,其它优化方案
索引隔离
在多条业务线的索引共用一个ES集群时会发生流量被独吃独占的情况,因为大家都共用相同的集群资源,流量大的索引会占用大部分计算资源而流量小的也会被拖慢,得不到即时响应,或者说业务流量大的索引可以按天拆分,几个流量小的索引可以按周或月拆分。
这种情况下我们可以将规模大的索引和其它相对小规模的索引独立存储,分开查询或合并查询,除了Master Nodes以外,Data Nodes和Coordinating Nodes都可以独立使用(其实如果每个索引的量都特别大也应该采用这种架构),还有一个好处是对方的某个Node挂掉,自己不受影响。
同样利用ES支持Shard Allocation Filtering功能来实现索引的资源独立分配,先将nodes进行打标签,划分区域,类似于Hot-Warm架构node.attr.zone=zone_a、node.attr.zone=zone_b
或者
node.attr.zone =zone_hot_a、node.attr.zone=zone_hot_b
等打标签的方式来区别对应不同的索引,然后在各自的index template中指定不同的node.attr.zone即可,如"index.routing.allocation.include.zone" : "zone_a,zone_hot_a",
或者排除法"index.routing.allocation.exclude.size": "zone_hot_b"
分配到zone_hot_b以外的所有Data Nodes上。
更多用法可以参考Hot-Warm架构,或shard-allocation-filtering。
跨数据中心
如果你的业务遍布全国各地,四海八荒,如果你数据要存储到多个机房,如果你的index有几万个甚至更多( index特别多,集群庞大会导致cluster state信息量特变大,因为cluster state包含了所有shard、index、node等所有相关信息,它存储在每个node上,这些数据发生变化都会实时同步到所有node上,当这个数据很大的时候会对集群的性能造成影响),这些情况下我们会考虑部署多个es cluster,那我们将如何解决跨集群查询的问题呢?目前es针对跨集群操作提供了两种方案Tribe node和Cross Cluster Search。
Tribe node
需要一个独立的node节点,加入其它es cluster,用法有点类似于Coordinating Only Node,所不同的是tribe是针对多个es集群之间的所有节点,tribe node收到请求广播到相关集群中所有节点,将结果合并处理后返回,表面上看起来tribe node将多个集群串连成一个了一个整体,遇到同名index发生冲突,会选择其中一个index,也可以指定。
tribe:
on_conflict: prefer_t1
t1:
cluster.name: us-cluster
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ['usm1','usm2','usm3']
t2:
cluster.name: eu-cluster
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ['eum1','eum2','eum3']
Cross Cluster Search
Cross Cluster Search可以让集群中的任意一个节点联合查询其它集群中的数据, 通过配置elasticsearch.yml或者API来启用这个功能,API示例
PUT _cluster/settings
{
"persistent": {
"search": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
...
}
}
}
}
}
提交以后整个集群所有节点都会生效,都可以做为代理去做跨集群联合查询,不过我们最好还是通过Coordinating Only Nodes去发起请求。
POST /cluster_one:decision,decision/_search
{
"match_all": {}
}
对集群cluster_one和本集群中名为decision的索引联合查询。
目前这个功能还在测试阶段,但未来可能会取代tribe node,之间的最大的差异是tribe node需要设置独立的节点,而Cross Cluster Search不需要,集群中的任意一个节点都可以兼任,比如可以用我们的Coordinating Only Nodes做为联合查询节点,另一个优点是配置是动态的,不需要重启节点,实际上可以理解为是一个ES集群之间特定的动态代理工具,支持所有操作,包括index的创建和修改,并且通过namespace对index进行隔离,也解决了tribe node之index名称冲突的问题。
小结
我们在文中介绍了几种方案用来解决时序索引的海量数据存储和查询的问题,根据业务特点和使用场景来单独或组合使用能够发挥出意想不到的效果,特别是nodes之间的读写分离、索引拆分、Hot-Warm等方案的组合应用对索引的查询和存储性能有显著的提升,另外routing在新版本中增加了routing_partition_size解决了shard难以均衡的问题,如果你的索引mapping中没有parent-child关联关系可以考虑使用,对查询的性能提升非常有效。
本文参考内容: