聊聊 Elasticsearch 中的任务管理机制

Elasticsearch 对外提供了一个 _tasks 接口,用于获取当前各个节点正在执行的任务,这里要避免和 pending_tasks 搞混,后者是用于获取在 master leader 节点排队等待修改 cluster state 的处理任务。

WHY

Task 管理功能不是一开始就有的,是从 5.0 开始推出并不断完善的,如下是相关的 issue 链接。

摘取上面的部分描述如下:

We have identified several potential features of elasticsearch that can spawn long running tasks and therefore require a common management mechanism for this tasks.

This issue will introduce task management API that will provide a mechanism for communicating with and controlling currently running tasks.

The task management API will be based on the top of existing TransportAction framework, which will allow any transport action to become a task.

The tasks will maintain parent/child relationship between tasks running on the coordinating nodes and subtasks that are spawn by the coordinating node on other nodes.

上面这段话把引入 Task 的原因、作用都讲解的很清楚了,简单总结如下:

  • 因为现在 es 里面会有一些长时间运行的任务,所以需要构建一个通用的管理机制,方便去查看这些任务的运行状态、进度,甚至能进行一定的管理操作,比如取消。
  • 该机制会提供一个 task management 的 API 来查看和获取当前在执行的所有任务
  • 实现上会基于现有的 TransportAction 框架,这样就可以将所有的 transport action 请求都变成 task 进行管理。
  • 还会提供父子关系的 Task 类型,主要是由协调节点触发的父任务和下发到其他节点的子任务组成。

看到这里,相信大家已经对于 Task 管理的来历以及要实现的功能有了大致的理解,最后再补充一句。

Task 管理功能也极大提升了 Elasticsearch 系统的可观测性,对于各个节点当前在执行的任务有了统一观测的手段,不用再猜了。

接下来我们就看看什么是 Task。

WHAT

Task 通常包含如下信息:

  • id,id of the task with the node
    • Node 级别的 id,每个 node 独立维护的单调递增的整型
  • task_id ,unique task id
    • Cluster 级别的 id,由 Node Id 和 Node 级别的 Task Id 组成,格式为:{Node Id}:{Task Node Id}
  • node,运行任务的 node
  • parent_task_id,parent task id
    • 如果该任务有父子关系,那么该 id 不为空
  • type,task type,任务类型,主要有下面几种:
    • transport: 其他 Node 发送来的 transport 请求任务
    • direct:直接在本地 node 执行的请求任务
    • persistent:持久化的 task,这些 task 是存储在 cluster state 里面的,比如 rollup/transform 等,不会因为 node 挂掉而丢失
  • action,task action
    • 创建该 task 的 transport action 名字
    • 通过 action 可以知道这个 task 在具体做的任务,比如 indices:data/write/bulk 是一个写请求,cluster:monitor/tasks/lists 是一个获取 task 列表的请求
    • 有些 action 后面有一些后缀,比如 [n] [s] 等,这些后缀有一定的标识作用,这里简单解释下
      • [n],TransportNodesAction,节点与节点间通过 transport 请求发送的 action
      • [s],transportShardAction,发生在分片上的 action 操作
      • [p],PrimaryShardAction,发生在主分片上的 action 操作
      • [r],ReplicaShardAction,发生在副本分片上的 action 操作
  • start_time,任务开始的时间戳
  • running_time,任务已经运行的总时间
  • x_opaque_id,client 发起请求时可以设定该 http header,这样便可以跟踪该请求发起的所有 task
  • description,任务的详情描述
  • status,描述任务的状态
  • cancellable,是否可以取消

Task 的相关接口主要是下面两个:

  • _cat/tasks,以列表的形式展现所有节点的任务,只展现部分内容,属于简洁模式
  • _tasks,以 JSON 的形式展现所有节点的任务详情,属于详情模式

下面是通过 GET _tasks?detailed=true 获取的一个样例数据:

{
  "-Tws1PJEQ4WW_GSsRLTSLg:35061634": {
    "node": "-Tws1PJEQ4WW_GSsRLTSLg",
    "id": 35061634,
    "type": "transport",
    "action": "indices:data/write/bulk[s]",
    "status": {
      "phase": "waiting_on_primary"
    },
    "description": "requests[1], index[.monitoring-es-7-2022.12.11][0]",
    "start_time_in_millis": 1670740946462,
    "running_time_in_nanos": 359046,
    "cancellable": false,
    "parent_task_id": "_Z3jXvvvQnqR6pye8zwZtg:69312208",
    "headers": {}
  }
}
  • -Tws1PJEQ4WW_GSsRLTSLg:35061634 是 task id,这个 task 运行在 node id 为 -Tws1PJEQ4WW_GSsRLTSLg 的 node 上,id 为 35061634
  • 这个 task 是一个子任务 subtask,其 parent task id 为 _Z3jXvvvQnqR6pye8zwZtg:69312208
  • task type 为 transport,这说明是其他 node 发送来的请求
  • action 为 indices:data/write/bulk[s],是在 shard 上执行的一个 task,从 description 上可以看到是在 index .monitoring-es-7-2022.12.11 shard 0 上执行的

其他字段大家可以自行解释,这里不再展开讲解。

通过这一小节,我们已经知道了 Task 的组成,那么接下来我们看看 Task 管理是如何实现的。

HOW

TaskManager 是核心管理类,它提供两个核心方法供外部调用。

  • register,注册一个新的 task
  • unregister,在 task 执行结束后,注销之前的 task

TaskManager 内部使用一个 Map<Long,Task> 的结构维护当前 Node 的所有 task,即 task node idtask 的映射管理。

Task 创建的时机主要是两个地方:

  • client 通过 rest api 接口发起请求后,大部分请求都会转成一个 transport action 请求发送出去再处理,那么在这个新的 transport action request 发送之前会生成一个 task
  • Node 与 Node 之间一直在不停地互相通信,这个通信也是通过 transport action 请求完成的,在目标 Node 收到请求后,也会生成一个 task

client rest 请求的处理流程大概如下:

image
  1. client 发起 http 请求(e.g. GET _tasks),被路由到相关的 RestAction 中,比如 RestListTasksAction
  2. RestAction 对请求做验证和处理后,会转换成对应的 transport action request 发送出去,转交本地对应的 TransportAction 处理,比如 TransportListTasksAction
  3. TransportAction 在执行任务之前会通过 TaskManager 注册一个 task
  4. TransportAction 处理相关的请求
  5. 在第 4 步处理结束后通过 TaskManger 注销之前注册的 task
  6. 一路返回给 client 相关结果

TransportAction 中的代码如下,注 1 为 register,注 2 为 unregister。

image

Node 与 Node 之间请求的处理流程大致如下:

image
  1. Node1 通过 TransportService 的 sendReqeust 方法向 Node2 发送请求,这其中使用的 actionName 是相关 TransportAction 中定义的 transportNodeAction,比如 TransportListTasksAction 发送的 action name 是 cluster:monitor/tasks/lists[n]
  2. Node2 接收到请求,在 InboudHandler 中,通过 action name 从 ReqeustHandlers 中获取对应的 request handler,然后进行处理
  3. 在 RequestHandler 的 processMessageReceived 方法中,会通过 TaskManager 注册一个 task
  4. RequestHandler 处理相关的请求
  5. 在第 4 步处理结束后通过 TaskManger 注销之前注册的 task
  6. 一路返回给 Node1 相关结果

RequestHandlerRegistry 中的代码如下,注 1 为 register,注 2 为 unregister。

image

关于 register 和 unregister 的实现逻辑,这里就不展开讲了,感兴趣的同学可以自行去查看相关代码。

其他

持久化结果:.tasks 索引

细心的同学可能会发现在 elasticsearch 中有一个系统索引 .tasks,如果去查询这个索引的内容,会得到类似如下的文档内容。

GET .tasks/_search
{
        "_index" : ".tasks",
        "_type" : "task",
        "_id" : "9m1T5Qx6RnaRXER7Z:1715505780",
        "_score" : 2.8134105,
        "_source" : {
          "completed" : true,
          "task" : {
            "node" : "9m1T5Qx6RnaRXER7Z",
            "id" : 1715505780,
            "type" : "transport",
            "action" : "indices:data/write/reindex",
            "status" : {
              "total" : 34556,
              "updated" : 0,
              "created" : 34556,
              "deleted" : 0,
              "batches" : 35,
              "version_conflicts" : 0,
              "noops" : 0,
              "retries" : {
                "bulk" : 0,
                "search" : 0
              },
              "throttled_millis" : 0,
              "requests_per_second" : -1.0,
              "throttled_until_millis" : 0
            },
            "description" : "reindex from [.indexA] to [.indexA_reindex][_doc]",
            "start_time_in_millis" : 1657627161222,
            "running_time_in_nanos" : 7259281625,
            "cancellable" : true,
            "headers" : { }
          },
          "response" : {
            "took" : 7239,
            "timed_out" : false,
            "total" : 34556,
            "updated" : 0,
            "created" : 34556,
            "deleted" : 0,
            "batches" : 35,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled" : "0s",
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until" : "0s",
            "throttled_until_millis" : 0,
            "failures" : [ ]
          }
        }
      }

这个返回的文档记录了一个 reindex task 的详情和结果。

需要注意的是,并非所有的 task 都可以持久化结果到 .tasks 索引中,这只支持某些 long running task ,如下:

  • DeleteByQuery
  • Reindex
  • UpdateByQuery

在发起相关请求时,只要加上一个参数 wait_for_completion=true,请求会返回一个 task id,然后该 task 的结果会被记录到 .tasks 索引中。如果不加该参数,则不会记录。

另外 .tasks 索引是按需创建的,只有在需要记录结果时才会创建该索引,如果你的 cluster 里面没有,也没有什么问题。

取消任务 Cancel Task

部分 Task 在执行过程中可以被取消(Cancel),相关接口为 POST _tasks/[task_id/_cancel 。但不是所有 Task 都可以被取消,只有 Cancellable 为 true 的才可以。

可以取消的任务主要是一些 long running 的task,比如 reindex、update by query、delete by query、search 等,它们的 task 都继承了 CancellableTask

另外 ES 还引入了自动 cancel search 任务的机制,如下是相关 issue:

当 ES 发现 client 主动断开连接时,会主动 cancel 当前正在执行的 search 任务,以便减轻集群负载。

Persistent Task

Persistent Task 是一类比较特殊的任务,一般的 Task 在 Node 停止或者 Crash 后就结束了,即便 Node 重启也无法继续之前在执行的 Task,但是 Persistent Task 通过将自身持久化到 Cluster State 中,即便相关 Node 停止,它依然可以被重新分配到其他 Node 上继续运行。

这部分使用的不多,主要是 x-pack 增加的一些如 ml、rollup、transform 等功能在用,了解下即可。

总结

本文主要讲解了 Elasticsearch 中 Task 的来历、组成和实现,希望能对大家有所帮助,以后可以正确的使用 Task Management API 来解决使用中的问题。

引用

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容