由上诉代码可以知道无论任务是否存在 都会有一个Assignment 对象
对 Assignment 对象进行 一些设置 框出代码是对 集群信息进行更改 将任务信息更新到zookeep上
继续点击
path 是zookeeper的路径 然后根据对象类型 更新zookeep信息
接下里 通过 watch 机制会 回调 supervisor 中的main方法
main方法 在调用run方法
通过 mkSupervisor 获得一个 SuperviosrManger对象 点击进入mkSupervisor 方法
点击进入 类 SyncProcessEvent
public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try { /**
* Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
*/
//从本地获取任务信息,请问本地的任务信息从哪里来的,由SyncSupervisorEvent类获取zookeeper的信息写入到本地。 if (localAssignments == null) {
localAssignments = new HashMap<>();
}
LOG.debug("Assigned tasks: " + localAssignments); /**
* Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
* 获取当前机器上所属的worker的状态信息
*/
Map<String, StateHeartbeat> localWorkerStats;
try {
localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
} catch (Exception e) {
LOG.error("Failed to get Local worker stats");
throw e;
}
LOG.debug("Allocated: " + localWorkerStats); /**
* Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
* 杀死一些无效的worker并移除本地状态
*/
Map<String, Integer> taskCleaupTimeoutMap;
Set<Integer> keepPorts = null;
try {
taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
} catch (IOException e) {
LOG.error("Failed to kill workers", e);
} // check new workers
checkNewWorkers(conf); // check which topology need update
checkNeedUpdateTopologys(localWorkerStats, localAssignments); // start new workers
//启动新的任务
startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); } catch (Exception e) {
LOG.error("Failed Sync Process", e);
// throw e
}}
其中startNewWorkers 需要会创建一个新的worker 点击进入
点进去 会有这个代码 会把worker 放入队列
拼装参数 讲道理 debug的时候 会有参数详情 我这里不知知道为什么没有 先看着吧
然后启动worker 这里大家可能想知道 worker这个进程是在哪里启动的下面给大家看看
点击进入方法
继续点击
继续点击呗
非后台 启动一个进程 点击进入
皆大欢喜 进程启动方法