问题场景
最近碰到这样的场景,ES1的X索引有100亿数据,当X数据有变更时,将变更数据同步到ES2中。
ES内部是否支持,类似Mysql的Binlog机制呢?
问了AI,提到了Watcher机制,就决定先了解看看
官网关于watcher的介绍
image.png
大致总结下来
- 用途:观察数据的变化和异常,满足条件后,可以去执行操作和响应
- 官方建议场景
- 监控业务场景下(银行、票务系统等)统计数据的变化,并且发送通知。
- 监控架构、硬件等指标信息,并进行预警。
- 监控网络活动检测恶意活动,主动改变防火墙进行保护。
- 监控集群运行情况(节点、吞吐量等),进行告警。
- 监控应用数据加载时间,进行告警。
看了以上介绍,可能不太适合我当前场景,再实施下看看。
配置步骤
- 用来测试的ES版本为 8.1
- watcher是商用版本,免费许可证(basic)不支持该功能,只能试用,用以下命令开通30天的试用许可
# 开通许可
POST /_license/start_trial
# 查看许可状态
GET _license
试用许可
- 创建watcher
{
"trigger": {
"schedule": {
"interval": "3s"
}
},
"input": {
"search": {
"request": {
"indices": [
"test_index"
],
"body": {
"query": {
"match_all": {}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"actions": {
"send_to_kafka": {
"webhook": {
"method": "POST",
"url": "http://ip:8081/test/receiveEsWatcher",
"body": "{{#toJson}}ctx.payload{{/toJson}}",
"headers": {
"Content-Type": "application/json"
}
}
}
}
}
简要说明,一个完整的watcher包含以下4个部分
- trigger 触发器,配置定时执行时间间隔,例如3s执行1次。
- input 查询条件,是否执行通知的源数据查询结果。例如 查询某个索引某条件下查询的数据量。
- condition 触发条件,只有这个条件满足,才会执行4中的action, 例如 统计错误 > 5
- actions 执行器,满足条件后的执行动作,例如告警。
其中action的通知方式有7种:Email、Webhook、Index(索引)、Logging、Slack、PagerDuty、Jira ,我们可以用 Webhook的方式进行接收,即自定义API。
以上示例的逻辑为:查询test_index索引下全量数据(默认是查询10000条),当查询结果大于0条时,将查询的结果发送到 "http://ip:8081/test/receiveEsWatcher" 接口中,并且每3s执行一次。
-
Webhook结果分析
接收到的结果为
image.png
可以看出,发送过来的数据,只有10条
结论
Watcher适合的场景为统计报警,不适合大量列表数据的同步场景,有以下几点
- 大量数据需要同步时,难以支持
- 一次10000,只能发10条
- 一次性变更10w条,不支持连续性分页拉取
- 出现异常,无法异常恢复
- 无法控制速率
- 6.X及以前版本、7.X版本、8.X版本支持各有差异,兼容成本比较高。
- 那么,ES -> ES 变更同步场景,只能用程序批量拉取变更来进行处理了? 还有更好的方式吗?