这几天刚实操了一把shard 的relocation, 在cerebro 上看着图标移来晃去觉得很爽,但之前并没深究,也没读过这部分的代码。前几晚夜间上线需要重启所有的ES机器,大家也对ES的relocation 讨论了一番,这次就顺便读读这部分的代码。
这里不再讨论shard 在分配的一些算法,和一些基础概念,如果对分片如何分到一个节点的算法有兴趣,或者对一些概念还不熟悉的话,建议可以看下面文章,或者我之前的文章:
Elasticsearch 5.x 源码分析(7)Shard Allocation 的一些小细节
elasticsearch源码分析之Allocation模块
这次打算用一个例子来贯穿大部分的allocation的入口,如下图:
先假设我们有三个节点,并创建了一个index,2分片2replica。0p 和1r 坐落在Node1 上,1p 和0r 坐落在Node2, 现在我们先假设Node1 Node2都shutdown了,现在这4个分片都是
UNassigned
状态。
1. 重启,初始化分配
这里先撇开集群把shard 进行飘移的问题,简单看一下shard是怎么 assign 的,node1 和node2 启动之前,在集群里面,分片0 和分片1 都会出于 UNassigned
状态;当Node1启动完,它会主动去ping Master节点,在前面的文章都知道了,这时会完整data node的注册的所有步骤,同时Master会下发最新的clusterState 下来。然后在Node1 内部会触发ClusterStateChangeEvent
,这个事件会触发多个modules或者Service去处理逻辑,这里我们关心的逻辑在 IndicesClusterStateService
里。
当处理到检测分配到自己的shard是否存在时,Node1 会检测这个shard的状态,如没有,则会去创建一个shard,有的话则会检测是否需要更新状态
由于Node1 刚刚启动,它还没有去加载 0p 和1r,因此会进入createShard()流程,并在后面跑去加载它本机的shard,这部分的逻辑在createShard() 里面的indicesService.createShard()中
最后Node1 启动完后会把0p 和1r 标记成
initializing
并上报给Master,这样Master就知道这个shard已经被assigned
了并标记成initialzing
了。这里介绍的是一个正向的例子,那么一些反向的例子,假如,在Node1 启动的时候,其实过了很久了,0p早就已经分配到其他机器了,那么Master 发过来的
ClusterState
中的0p 就已经是assigned状态了,那么这段逻辑就不是在 createOrUpdateShards(state);
处理了,而是之前的removeUnallocatedIndices(state)
,failMissingShards(state)
和removeShards(state)
来处理,大家有兴趣可以分别过过这两个代码。
现在回到这个正向的例子,稍等片刻之后,Node1 已经完全初始化完0p 和1r 了,那么node1 就会把这两个 shard 置成started
状态,等下一个时间间隔的ClusterState
心跳过来时,显而易见就会进入updateShard(nodes, shardRouting, shard, routingTable, state);
在这里Node1 就会主动发送一个Action 给master 来请求 做
startedShard
这个Action(最后一行)。在
ShardStateAction
里,有两种请求需要处理,分别是
ShardStartedClusterStateTaskExecutor
ShardFailedClusterStateTaskExecutor
分别处理启用或者mark fail,其中我们看看ShardStartedClusterStateTaskExecutor
的逻辑
从代码看出,这里最终要的一个地方就是,并不是 node上报什么状态,Master就会不分青红皂白一味记录,它需要通过
allocationService.applyStartedShards()
来自己去校验和消费这个状态,最后则生成一个新的ClusterState
,这会体现在下一次的心跳下发。
而ShardFailedClusterStateTaskExecutor
则是处理Node上报的failShard的事情,这种情况我觉得比较少见
经过这一轮后,Node1 上的所有shards都恢复正常,那么Node2 也按这个流程,最后Node1 Node2 启动完后就是下图
2. Node1 挂了,Master标记0p,1r
刚刚上面说到,ShardFailedClusterStateTaskExecutor
这种主动上报一般是比较少见的,我们见更多一般是Node1 重启或者进程挂掉,从
我们曾经提过,ZenDiscovery用于处理一切的Node事件变更,那我们就找找这段代码
继续跟下去,这次我们只关心和shard allocation 相关的代码,处理Node fail 的话会执行一个NodeRemovalClusterStateTaskExecutor.Task
在这里我们找到了我们想找到的东西
这里就会把这台机的所有shard进行 failShard()处理,也就是会mark UNassigned 等等。那么在下一次的
ClusterState
同步事件时其他所有节点就会知道了
3. Cluster Reroute
有时候运维需要,比如我们这次需要全部升级ES,那么往往我们需要手工的去锁住禁止集群进行 allocation,rebalance 等,甚至我们在加机器之后我们希望是手工自己去分配分片,那么就要用到cluster Reroute。相对应的Action是RestClusterRerouteAction
和TransportClusterRerouteAction
,而最后会是由 allocationService.reroute()
来承载,在这个例子里,加入我手工吧0p从Node1 移动到Node3 去,看会发生什么事情。
上面代码最重要就是那句
RoutingExplanations explanations = commands.execute(allocation, explain);
在MoveAllocationCommand
中会对将要Reroute的操作进行一系列的判断,比如canAllocate()
仔细留意的话,这里的targetRelocatingShard 其实是一个
PeerRecoverySource
的shard,这是就会把这个shard标记成INITIALIZING
状态。
那这个状态就会在ClusterState 中并下次心跳同步到目标节点去。也就是说Node3 在下次获取ClusterState时,就会得到新的ShardRouting 请求了。
这时Node3 还是回到 第1章中的方法里,只是这时Node3 并没有0r这个Shard,并且,Node3就会开始尝试从Node1 处去回复这个Shard了
后话
我写这篇文章的初衷是,我当时很好奇,Node3 是如何恢复0p的呢,因为0p好歹是个Primary 呀,并且这时,Node1 会继续服务吗?
继续服务这个是肯定的,至于Node3是如何恢复的,我的一个猜测是首先Node3会像恢复一个replica 那样把Node1的分片拷贝过去,接着,在这段时间的增量数据将会继续同步translog 的方式同步到Node3,到最后,直到同步到一个最新的点,这时Node1 和Node3的分片都是同步的,然后就直接做个切换,0p就成功切换到Node3去了,关于Recovery这块我没有再继续读下去了,知道的朋友欢迎也顺便告知我一下。
由于Allocation这部分代码非常复杂,因此我也没有完全弄懂,有些点也是我自己的推测而成,如有错误欢迎指正和讨论。