ES Watcher的初次接触

问题场景

最近碰到这样的场景,ES1的X索引有100亿数据,当X数据有变更时,将变更数据同步到ES2中。
ES内部是否支持,类似Mysql的Binlog机制呢?
问了AI,提到了Watcher机制,就决定先了解看看

官网关于watcher的介绍

image.png

大致总结下来

  • 用途:观察数据的变化和异常,满足条件后,可以去执行操作和响应
  • 官方建议场景
  1. 监控业务场景下(银行、票务系统等)统计数据的变化,并且发送通知。
  2. 监控架构、硬件等指标信息,并进行预警。
  3. 监控网络活动检测恶意活动,主动改变防火墙进行保护。
  4. 监控集群运行情况(节点、吞吐量等),进行告警。
  5. 监控应用数据加载时间,进行告警。

看了以上介绍,可能不太适合我当前场景,再实施下看看。

配置步骤

  • 用来测试的ES版本为 8.1
  • watcher是商用版本,免费许可证(basic)不支持该功能,只能试用,用以下命令开通30天的试用许可
# 开通许可
POST  /_license/start_trial
# 查看许可状态
GET _license
试用许可
  • 创建watcher
{
  "trigger": {
    "schedule": {
      "interval": "3s"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": [
          "test_index"
        ],
        "body": {
          "query": {
            "match_all": {}
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
  "actions": {
    "send_to_kafka": {
      "webhook": {
        "method": "POST",
        "url": "http://ip:8081/test/receiveEsWatcher",
        "body": "{{#toJson}}ctx.payload{{/toJson}}",
        "headers": {
          "Content-Type": "application/json"
        }
      }
    }
  }
}

简要说明,一个完整的watcher包含以下4个部分

  1. trigger 触发器,配置定时执行时间间隔,例如3s执行1次。
  2. input 查询条件,是否执行通知的源数据查询结果。例如 查询某个索引某条件下查询的数据量。
  3. condition 触发条件,只有这个条件满足,才会执行4中的action, 例如 统计错误 > 5
  4. actions 执行器,满足条件后的执行动作,例如告警。

其中action的通知方式有7种:Email、Webhook、Index(索引)、Logging、Slack、PagerDuty、Jira ,我们可以用 Webhook的方式进行接收,即自定义API。

以上示例的逻辑为:查询test_index索引下全量数据(默认是查询10000条),当查询结果大于0条时,将查询的结果发送到 "http://ip:8081/test/receiveEsWatcher" 接口中,并且每3s执行一次。

  • Webhook结果分析
    接收到的结果为


    image.png

    可以看出,发送过来的数据,只有10条

结论

Watcher适合的场景为统计报警,不适合大量列表数据的同步场景,有以下几点

  1. 大量数据需要同步时,难以支持
    • 一次10000,只能发10条
    • 一次性变更10w条,不支持连续性分页拉取
    • 出现异常,无法异常恢复
    • 无法控制速率
  2. 6.X及以前版本、7.X版本、8.X版本支持各有差异,兼容成本比较高。
  3. 那么,ES -> ES 变更同步场景,只能用程序批量拉取变更来进行处理了? 还有更好的方式吗?
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容