1.背景
本人所在的部门是主要负责的职责就是根据活动的玩法来发相关的礼品,我们常见的礼品有:xxx券,券码,以及一些其他合作方的券
对于这些礼品的发放记录我这边数据量是比较大的,一天预估的是500w的量,那么一年下来就会达到18亿的数据量,这些记录保留一年,所以说峰值最高会有18亿的数据量,这些数据存放可以用Mysql分库分表。
但是现在有个问题是仅仅存放是完全可以的,但是如果要查询呢?
对于简单的查询根据分片id能查,但是对于一些复杂的多维度查询,mysql分库分表是不方便的,实现了性能也会很差。所以我们这边就打算把数据存放在ES中。下面是目前对于礼品记录写和查的方案
写
采用双写方案先写DB,写成功后再异步发kafka,写ES
查询
要求实时性高、查询简单的就走DB,复杂多维度的就走ES
2.现状问题
在方案的设计之初,对这个方案的考虑不够全面,导致出现下面的问题:
单ES索引数据量巨大:单个索引占用110G,不符合es索引推荐20-50G的范围
mapping设计不合理:存在不需要分词但是仍然分词的、以及多余的字段
没有清理数据任务:历史废数据一致存在,随着时间推移会越来越大,影响性能
针对上面的问题,这次打算用合理的方案把它优化掉
3.新架构
1.架构描述
旧的整体架构是在左边,这个架构也就会存在上述的数据量大等因为数据都存在一个索引的问题。
那么经过架构的设计升级,最新的礼品发放记录读写架构变成了右图,主要改动点:
-
mapping设计:重新设计mapping
减少不必要的字段
针对不需要分词的字段改成keyword类型
重新设置shard数量
读:es用别名的方式,多个分片索引对应同一个别名(user_award_index_all),这样查的时候都统一查user_award_index_all的索引即可
写:根据索引月份分片后,每次写入当前月对应的的索引分片,例如现在是2023年12月,那么就会写入到
user_award_index_202312
索引通过定时任务,定期清理过期的分片索引
2.创建索引和删除过期索引定时任务
这个定时任务每周跑一次,会创建这个月和下个月的索引,以及删除上一年本月-1过期索引。当然如果当前需要创建的索引已经存在就不会再创建,这样每周跑一次,保证了创建当前月和下个月的索引,以及删除旧索引能重试3次。
例如:当前是23年12月,那么就会创建user_awrad_index_2312,和user_awrad_index_2401的索引,
同时会删除上一年本月-1过期索引(user_awrad_index_2211),也就是保留一年的数据。
3.ES Alias(别名)介绍
作为本篇文章中最重要的知识点之一,对于es的别名机制这里简单说明一下:
别名可以看作是索引的一个指向,它提供了一种灵活而可靠的方式来管理和操作索引。下面是ES别名机制的几个优势:
简化索引操作:通过使用别名,您可以将复杂的索引操作(如创建、删除、重命名等)进行简化。例如,当需要更换底层索引时,只需更新别名指向新索引即可,而不需要更改应用程序代码或配置文件。
实现无缝切换:由于别名提供了指向具体索引的逻辑名称,因此可以轻松地在不影响应用程序和用户访问的情况下切换底层索引。这对于数据迁移、版本升级或故障恢复等场景非常有用。
支持滚动升级:当您需要对大型索引进行结构变更或调整时,别名机制使得滚动升级成为可能。您可以先创建一个新版本的索引,并将别名指向该新版本。然后逐步将旧版本中的数据导入到新版本中,在此过程中保持服务连续可用。
多租户支持:通过使用别名,您可以为每个租户或用户创建独立的视图,并将其映射到特定的索引。这样可以实现安全性和隔离性,并方便地控制每个租户或用户所能访问和操作的数据范围。
3.优势
每个分片索引的大小能控制在20-50G内,符合ES的规范标准,提高读写性能
删除数据的时候,只需要将别名移除,再把过期索引整个删除即可,不需要在同一个大索引操作,减少引起ES性能问题的可能
由于之前的礼品发放记录采用的是旧的方案,那么需要转变成新的方案,就涉及到数据的迁移,那么怎么迁移呢?
4.迁移方案
1.方案评估
在做数据迁移方案的时候,有以下这些考虑的点:
1.是否停机迁移?
2.如何把旧迁移迁移到新数据?
3.数据的新增、更新如何应对?
4.出问题了如何回滚?
5.如何校对数据迁移的准确性?
上面这些考虑点也是做数据迁移绕不过的考虑的点,那么针对本人的业务场景一一分析:
1.是否停机?
我们的业务是不允许停机来做数据迁移的,目前发奖峰值QPS在500,并且发奖的这个业务如果停机对于C端的业务是不能接受的
2.如何迁移旧数据到新数据?
迁移数据就是需要从老的user_award_index
迁移到分片的user_award_index_2023xx
,那么迁移方式有2种:
1.采用定时任务,深度分页查询es数据,通过scroll分页的查询,将数据同步过去
2.使用es的自带迁移命令:reindex,这个原理是通过异步任务,也是和方式1的scroll深度分页查询一样,深 度分页查询出来后,同步到目标索引
这里使用es的reindex
会更加方便,支持的功能也比较丰富,同时它是异步分页处理任务,不影响先上的读写。而且es有getTask
命令,用于查询reindex相关的任务,这种可视化命令的方式也提供了不少便利。
3.数据的新增、更新如何应对?
我们的场景都是写操作,有记录了就往ES里面写,没有更新的数据,所以这里迁移数据处理起来会更简单,直接覆盖即可。
4.出问题了如何回滚?
我们的方案采用双写,然后切读,再关旧索引的写,最后删除老的索引,去掉用于切换双写等开关代码
所以在切读以后如果出问题了,那么可以立马把读切回读老的索引。
5.如何校对数据迁移的准确性?
条数对比:使用es的命令查询新旧索引的条数,然后对比,这里需要注意不能直接比对实时全量的数据,因为新老索引写ES的速度不一样,执行统计索引数量的时间也不一致,完全实时的数量是不一定一致的。所以我们这边是对比某个时间点之前的全量数据条数。
数据性对比:礼品发放记录的索引数据是相对简洁,没有太复杂的逻辑,只需要抽样一些数据对比是否有差异
2.迁移流程
1.迁移流程图
2.上线双写
这一步上线写新老索引的代码,同时加上写新老索引的开关,方便后续出问题后切换
// 写老的es
String oldUserAwardEsWriteSwitch = sysConfigHelper.getByKeyWithDefaultVal(CommDefConstants.OLD_USER_AWARD_ES_WRITE_SWITCH, "1");
if (Objects.equals("1", oldUserAwardEsWriteSwitch)) {
esUtil.batchAddListData(CommDefConstants.EsIndex.USER_AWARD_RECORD_INDEX_OLD, esBatchList);
}
// 写新的es
try {
String newUserAwardEsWriteSwitch = sysConfigHelper.getByKeyWithDefaultVal(CommDefConstants.NEW_USER_AWARD_ES_WRITE_SWITCH, "0");
if (Objects.equals("1", newUserAwardEsWriteSwitch)) {
esUtil.batchAddListData(getSliceUserAwardIndexName(YearMonth.now()), esBatchList);
}
} catch (Exception e) {
log.error(SfJsonUtil.toJsonStr(esBatchList), e);
}
OLD_USER_AWARD_ES_WRITE_SWITCH
为写老es索引的开关
NEW_USER_AWARD_ES_WRITE_SWITCH
为写新es索引的开关,这里try-catch
住了,防止写新索引报错影响正常的业务
3.迁移数据:reindex
下面是迁移的命令:
curl -u xxx:xxx -X POST 'xxxxxx:9200/_reindex' \
-H 'Content-Type: application/json' \
--d '{
"source":{
"index":"user_award_record_index",
"size":5000,
"_source":[
"amount",
"channel",
"createTm",
"detail",
"userId",
"errorMsg",
"errorCode"
],
"query": {
"range": {
"createTm": {
"gte": "2022-12-07T18:00:00.000",
"lte": "2023-12-08T00:00:00.000"
}
}
}
},
"dest":{
"index":"slice_of_user_award_index_202312"
},
}'
souce: 表示源索引,也就是数据的来源
dest: 表示目标索引,表示需要迁移到的目标索引
size:表示每次批量处理的数据量大小
query: 查询需要迁移的数据
es的reindex还有其他很多参数,例如下面:
当涉及到 Elasticsearch 的 reindex API 参数时,下面是详细介绍:
"script"
:允许在重新索引期间对文档进行转换或修改的脚本。可以使用不同的编程语言如 Painless、Groovy 等编写自定义脚本来处理文档内容。"size"
:用于分批处理数据时指定每个批处理操作中要复制的文档数量。默认为 1000。"refresh"
:控制在每次重新索引操作后是否自动刷新目标索引以使更改立即生效。可以设置为true
表示自动刷新,或者false
表示禁用刷新。"wait_for_completion"
:决定是否等待重新索引过程完成后再返回响应。默认为true
,表示等待完成;而设置为false
会立即返回响应并在后台执行重建操作。"requests_per_second"
:限制重新索引操作每秒允许执行的请求数量。这有助于平衡性能和资源消耗。例如,"requests_per_second": 1000
将限制每秒不超过 1000 个请求。"conflicts"
:定义如何解决可能出现的冲突情况(当源和目标之间存在相同 ID 的文档)。可以选择忽略冲突、替换目标文档或更新目标文档。
我目前的场景对于冲突用的是默认更新,其他的参数就暂时用不少了,对于复杂的迁移可能会用上。
这里也提供一下查询reindex的任务命令:
这样就能看出当前的reindex任务执行进度,会包含需要迁移的数据量、当前迁移的数量、创建时间等。
4.验证数据
验证数据就是查询新老索引的数量进行对比,然后抽查一下数据的情况。
-H 'Content-Type: application/json' \
-d '{
"query": {
"range": {
"createTm": {
"gte": "2022-12-09T18:00:00.000",
"lte": "2023-12-11T10:40:00.000"
}
}
}
}'
5.切读
String readFlag = sysConfigHelper.getByKeyWithDefaultVal(CommDefConstants.USER_AWARD_ES_READ_FLAG, "old");
BoolQueryBuilder boolBuilder;
boolean old = "old".equals(readFlag);
if (old) {
boolBuilder = genBoolQueryBuilder(req);
} else {
boolBuilder = boolQueryBuilder(req);
}
切读就是通过USER_AWARD_ES_READ_FLAG
开关来控制读新索引还是老索引,这里比较简单
6.停止旧索引写入
观察几天,当线上流量读的流量验证读无问题后,那么就说明新的索引运行稳定了,就可以把写旧索引关了。这里直接把OLD_USER_AWARD_ES_WRITE_SWITCH
开关改成0即可,同时把老的索引删除。
5.成果
平滑迁移0故障、0BUG: 此次迁移未造成任何生产影响或者问题
存储占用减少:由于重新设计过了mapping,不需要分词的数据去掉了分词,也减少了不必要字段,整体的索引容量减少了一倍。2亿2千条数据从之前的110G到了56G
性能提升:查询性能相较之前提升了30%,平均耗时从125ms下降到93ms,这里是由于es集群本身没优化好,导致新老索引的整体耗时偏高