rebalance
我们知道,在storm中rebalance可以通过ui、命令行、代码的方式来调用,对Topology的worker数进行重新分配。
rebalance通过rebalance(String name, RebalanceOptions options)、recv_rebalance()方法,向nimbus传输数据和接收数据。
public void send_rebalance(String name, RebalanceOptions options) throws TException {
Nimbus.rebalance_args args = new Nimbus.rebalance_args();
args.set_name(name);
args.set_options(options);
this.sendBase("rebalance", args);
}
public void recv_rebalance() throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
Nimbus.rebalance_result result = new Nimbus.rebalance_result();
this.receiveBase(result, "rebalance");
if (result.e != null) {
throw result.e;
} else if (result.ite != null) {
throw result.ite;
} else if (result.aze != null) {
throw result.aze;
}
}
在接收到rebalance信号后,Topology由active状态转换为rebalance状态
rebalance: 实际上是调用了 rebalance-transition 函数,从代码可以看出,会将状态改成 rebalancing, 然后再转换成 do-rebalance 。 do-rebalance 其实也是重新分配任务
(defn rebalance-transition [nimbus storm-id status]
(fn [time num-workers executor-overrides]
(let [delay (if time
time
(get (read-storm-conf (:conf nimbus) storm-id)
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
delay
:do-rebalance)
{:type :rebalancing
:delay-secs delay
:old-status status
:num-workers num-workers
:executor-overrides executor-overrides
})))
在do-rebalance中调用mk-assignments重新分配任务
(defn do-rebalance [nimbus storm-id status storm-base]
(let [rebalance-options (:topology-action-options storm-base)]
(.update-storm! (:storm-cluster-state nimbus)
storm-id
(-> {}
(assoc-non-nil :component->executors (:component->executors rebalance-options))
(assoc-non-nil :num-workers (:num-workers rebalance-options)))))
(mk-assignments nimbus :scratch-topology-id storm-id))
什么是mk-assignment
主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程)
mk-assignment源码在此处贴出:mk-assignment源码
在mk-assignment中分为以下几个方面
- 读出所有active topology信息
- 读出当前的assignemnt情况
- 根据取到的Topology和Assignement情况, 对当前topology进行新的assignment
Storm的可靠性保证
其实Storm本身已经提供了该问题可靠性保证。大致的原理是:
spout发出的所有数据,都有一个acker对其进行追踪,无论处理成功、失败或者超时,都会告知spout。如果spout发现消息处理失败或丢失,则会重新发送该消息。
结合Topology rebalance的过程,首先de-active,这时候topology的状态被保存。未被处理的消息由acker追踪。
当topology重新分配后,spout发现已发出的消息未被处理,则重新发射这些消息。
reassign
在mk-assignment中的第三步,找出missing-assignment-topologies, 需要从新assign (当前逻辑没有用到, 在sechduler里面会自己判断(判断逻辑相同))
什么叫missing-assignment, 满足下面任一条件
- topology->executors, 其中没有该topolgy, 说明该topology没有assignment信息, 新的或scratch
- topology->executors != topology->alive-executors, 说明有executor dead
- topology->scheduler-assignment中的实际worker数小于topology配置的worker数 (可能上次assign的时候可用slot不够, 也可能由于dead slot造成)
missing-assignment-topologies (->> topologies
.getTopologies
(map (memfn getId))
(filter (fn [t]
(let [alle (get topology->executors t)
alivee (get topology->alive-executors t)]
(or (empty? alle)
(not= alle alivee)
(< (-> topology->scheduler-assignment
(get t)
num-used-workers )
(-> topologies (.getById t) .getNumWorkers)))))))
supervisor会定时从zookeeper获取topologies、已分配的任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor周期性地进行同步时,会根据新的任务分配来启动新的worker或者关闭旧的worker,以响应任务分配和负载均衡。
worker通过定期的更新connections信息,来获知其应该通讯的其它worker。
详见Lifecycle-of-a-topology.md
- Nimbus monitors the topology during its lifetime
- Schedules recurring task on the timer thread to check the topologies [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L623)
- Nimbus's behavior is represented as a finite state machine [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L98)
- The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls `reassign-topology` through `reassign-transition` [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L497)
- `reassign-topology` calls `mk-assignments`, the same function used to assign the topology the first time. `mk-assignments` is also capable of incrementally updating a topology
- `mk-assignments` checks heartbeats and reassigns workers as necessary
- Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers