rebalance和reassign,读这篇文章就够了

rebalance

我们知道,在storm中rebalance可以通过ui、命令行、代码的方式来调用,对Topology的worker数进行重新分配。
rebalance通过rebalance(String name, RebalanceOptions options)、recv_rebalance()方法,向nimbus传输数据和接收数据。

public void send_rebalance(String name, RebalanceOptions options) throws TException {
            Nimbus.rebalance_args args = new Nimbus.rebalance_args();
            args.set_name(name);
            args.set_options(options);
            this.sendBase("rebalance", args);
        }

        public void recv_rebalance() throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
            Nimbus.rebalance_result result = new Nimbus.rebalance_result();
            this.receiveBase(result, "rebalance");
            if (result.e != null) {
                throw result.e;
            } else if (result.ite != null) {
                throw result.ite;
            } else if (result.aze != null) {
                throw result.aze;
            }
        }

在接收到rebalance信号后,Topology由active状态转换为rebalance状态
rebalance: 实际上是调用了 rebalance-transition 函数,从代码可以看出,会将状态改成 rebalancing, 然后再转换成 do-rebalance 。 do-rebalance 其实也是重新分配任务

(defn rebalance-transition [nimbus storm-id status]  
  (fn [time num-workers executor-overrides]  
    (let [delay (if time  
                  time  
                  (get (read-storm-conf (:conf nimbus) storm-id)  
                       TOPOLOGY-MESSAGE-TIMEOUT-SECS))]  
      (delay-event nimbus  
                   storm-id  
                   delay  
                   :do-rebalance)  
      {:type :rebalancing  
       :delay-secs delay  
       :old-status status  
       :num-workers num-workers  
       :executor-overrides executor-overrides  
       })))  

在do-rebalance中调用mk-assignments重新分配任务

(defn do-rebalance [nimbus storm-id status storm-base]
  (let [rebalance-options (:topology-action-options storm-base)]
    (.update-storm! (:storm-cluster-state nimbus)
      storm-id
        (-> {}
          (assoc-non-nil :component->executors (:component->executors rebalance-options))
          (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
  (mk-assignments nimbus :scratch-topology-id storm-id))
什么是mk-assignment

主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程)
mk-assignment源码在此处贴出:mk-assignment源码

在mk-assignment中分为以下几个方面

  1. 读出所有active topology信息
  2. 读出当前的assignemnt情况
  3. 根据取到的Topology和Assignement情况, 对当前topology进行新的assignment
Storm的可靠性保证

其实Storm本身已经提供了该问题可靠性保证。大致的原理是:
spout发出的所有数据,都有一个acker对其进行追踪,无论处理成功、失败或者超时,都会告知spout。如果spout发现消息处理失败或丢失,则会重新发送该消息。
结合Topology rebalance的过程,首先de-active,这时候topology的状态被保存。未被处理的消息由acker追踪。
当topology重新分配后,spout发现已发出的消息未被处理,则重新发射这些消息。


reassign

在mk-assignment中的第三步,找出missing-assignment-topologies, 需要从新assign (当前逻辑没有用到, 在sechduler里面会自己判断(判断逻辑相同))
什么叫missing-assignment, 满足下面任一条件

  • topology->executors, 其中没有该topolgy, 说明该topology没有assignment信息, 新的或scratch
  • topology->executors != topology->alive-executors, 说明有executor dead
  • topology->scheduler-assignment中的实际worker数小于topology配置的worker数 (可能上次assign的时候可用slot不够, 也可能由于dead slot造成)
        missing-assignment-topologies (->> topologies
                                           .getTopologies
                                           (map (memfn getId))
                                           (filter (fn [t]
                                                     (let [alle (get topology->executors t)
                                                           alivee (get topology->alive-executors t)]
                                                       (or (empty? alle)
                                                           (not= alle alivee)
                                                           (< (-> topology->scheduler-assignment
                                                                  (get t)
                                                                  num-used-workers )
                                                              (-> topologies (.getById t) .getNumWorkers)))))))

supervisor会定时从zookeeper获取topologies、已分配的任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor周期性地进行同步时,会根据新的任务分配来启动新的worker或者关闭旧的worker,以响应任务分配和负载均衡。
worker通过定期的更新connections信息,来获知其应该通讯的其它worker。

详见Lifecycle-of-a-topology.md

- Nimbus monitors the topology during its lifetime
   - Schedules recurring task on the timer thread to check the topologies [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L623)
   - Nimbus's behavior is represented as a finite state machine [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L98)
   - The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls `reassign-topology` through `reassign-transition` [code](https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L497)
   - `reassign-topology` calls `mk-assignments`, the same function used to assign the topology the first time. `mk-assignments` is also capable of incrementally updating a topology
      - `mk-assignments` checks heartbeats and reassigns workers as necessary
      - Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers
所以关于reassign,其实就是nimbus重新调用了mk-assignment,并且根据负载均衡重新分配任务。

参考文章:

mk-assignment源码解析
storm如何分配任务和负载均衡?
Storm中Topology的状态

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354