storm源码工作流程(三)Supervisor监听zookeeper 启动worker

clipboard27.png

由上诉代码可以知道无论任务是否存在 都会有一个Assignment 对象


clipboard29.png

对 Assignment 对象进行 一些设置 框出代码是对 集群信息进行更改 将任务信息更新到zookeep上


clipboard30.png

继续点击
clipboard31.png

path 是zookeeper的路径 然后根据对象类型 更新zookeep信息

接下里 通过 watch 机制会 回调 supervisor 中的main方法


clipboard32.png

main方法 在调用run方法
clipboard33.png

通过 mkSupervisor 获得一个 SuperviosrManger对象 点击进入mkSupervisor 方法
clipboard34.png

点击进入 类 SyncProcessEvent
public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
    LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
    lastTime = TimeUtils.current_time_secs();
    try { /**
         * Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
         */
        //从本地获取任务信息,请问本地的任务信息从哪里来的,由SyncSupervisorEvent类获取zookeeper的信息写入到本地。 if (localAssignments == null) {
            localAssignments = new HashMap<>();
        }
        LOG.debug("Assigned tasks: " + localAssignments); /**
         * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
         * 获取当前机器上所属的worker的状态信息
         */
        Map<String, StateHeartbeat> localWorkerStats;
        try {
            localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
        } catch (Exception e) {
            LOG.error("Failed to get Local worker stats");
            throw e;
        }
        LOG.debug("Allocated: " + localWorkerStats); /**
         * Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
         * 杀死一些无效的worker并移除本地状态
         */
        Map<String, Integer> taskCleaupTimeoutMap;
        Set<Integer> keepPorts = null;
        try {
            taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
            keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
            localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
        } catch (IOException e) {
            LOG.error("Failed to kill workers", e);
        } // check new workers
        checkNewWorkers(conf); // check which topology need update
        checkNeedUpdateTopologys(localWorkerStats, localAssignments); // start new workers
        //启动新的任务
        startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); } catch (Exception e) {
        LOG.error("Failed Sync Process", e);
        // throw e
    }}

其中startNewWorkers 需要会创建一个新的worker 点击进入


clipboard35.png

点进去 会有这个代码 会把worker 放入队列


clipboard36.png

拼装参数 讲道理 debug的时候 会有参数详情 我这里不知知道为什么没有 先看着吧
clipboard37.png

然后启动worker 这里大家可能想知道 worker这个进程是在哪里启动的下面给大家看看


clipboard38.png

点击进入方法
clipboard39.png

继续点击
clipboard40.png

继续点击呗
clipboard41.png

非后台 启动一个进程 点击进入
clipboard42.png

皆大欢喜 进程启动方法

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,195评论 30 60
  • 1. Storm介绍: Storm是实时流计算框架。企业中典型实时分析框架搭建模式: Flume + Kafka ...
    奉先阅读 1,694评论 0 3
  • Storm学习笔记总结 Storm概述 离线计算是什么 离线计算:批量获取数据、批量传输数据、周期性批量计算数据、...
    Waldeinsamkeit4阅读 804评论 0 10
  • 拾参姨 本想在满月前发送这文却因为无论拖延症一再滞后,今日无论如何坚持要发文,原因实在是看不惯盲目信息的流通。 不...
    珊珊Samya阅读 723评论 0 1
  • 太阳之为病,脉浮,头项强痛而恶寒。 柯韵伯:仲景作论大法,六经各立病机一条以提揭一经纲领,必择至当之脉证而表彰之....
    升明小素问阅读 241评论 0 2