转述自:Lifecycle of a Storm Topology
本文介绍的storm topology生命周期是基于0.7.1版本的,之后版本可能已发生了一些变化
我们从执行storm jar命令提交topology给nimbus开始,到supervisor启动或停止worker,再到task执行整个过程进行描述,这其中也包括nimbus是如何监控topology的。
关于topology的两点说明:
1. 实际运行中的topology与我们看到的是不同的。运行过程中会有stream和acker bolt加入进来以保证数据处理的可靠性,system-topology函数负责topology的创建
2.system-topology用在a. nimbus创建task时 b.worker route消息时
启动topology
storm jar命令会设置storm.jar环境变量在StormSubmitter上传jar时使用,�然后带着命令行参数执行指定的class。StormSubmitter.submitTopology按以下步骤执行:
* upload未上传过的jar文件
* 使用nimbus的thrift接口实现uploading jars
* uploadChunk每次上传15kb的数据
* 上传完毕时调用finishFileUpload
* topology的配置用json格式序列化
nimbus接收topology提交的请求,并对每个topology的配置进行规范格式化,完成topology一些静态属性的设置:
* jars和configs存放在本地文件系统中,具体为:{nimbus local dir}/stormdist/{topology id}
* setup-storm-static 将task--->component的映射写入zookeeper
* setup-heartbeats在zk中创建一个目录来存放task心跳
nimbus调用mk-assignment给各个节点机分配任务,使用到以下信息:
* master-code-dir: supervisors用来下载jars/configs
* task->node+port: 任务id到worker的映射关系,worker由(node,port)对来标识
* node->host: node id到hostname的映射关系。workers用这个映射关系来与其他worker进行通信,node id用来标识supervisors,因为多个supervisors可以运行在同一台机器上
* task->start-time-secs: 任务启动的时间戳,nimbus用来监控topology,launch time out需要设置的比心跳超时时间大一些,因为启动时有很多初始任务要做,由nimbus.task.launch.secs设定
任务分配完处于deactivated模式,start-storm将相关数据写到zk之后进入active模式spouts开始emit tuples
supervisor默默的做两件事:
* 调用synchronize-supervisor,zk任务分配变化时就会执行,另外每10s也会定时执行,执行时下载新的topology jars,将node要执行的任务写到本地文件系统,其实是一个映射关系 port->localAssignment, LocalAssignment包含一个topo id还有task ids
* 调用sync-processes, 读取第一件事写到本地文件的内容并与运行的topology对比以决定启停worker
mk-worker函数用来启动worker
* worker之间互连并启动一个线程监控变化,如果worker任务变更会与启停worker重连
* 监控topology是否active并将这个状态赋给storm-active-atom变量,task根据这个变量决定是否调用spouts的nextTuple
* worker启动线程来执行具体的tasks
mk-task函数用来启动task
* task启动一个routing函数,接收stream输出tuple返回task ids(用来发送tuple)
* task执行spout和bolt业务逻辑
Topology监控
nimbus对topology的整个生命周期进行监控
* 定时线程执行日常任务的检查
* nimbus按一个有限状态机转动,包含:active\inactive\killed\rebalancing五个状态
* nimbus.monitor.freq.secs设定检测周期,调用reassign-topology触发monitor事件完成
* reassign-topology调用mk-assignments来执行topology的更新,更新时会启停workers
杀掉Topology
storm kill调用nimbus thrift接口完成这个任务,可以用-w 指定remove topology的timeout,
也给workers时机来处理完正在执行的指令。kill命令是fault-tolerant的,当nimbus恢复时会remove killed状态的topology,之后删除zk中该topology的信息和心跳目录\jars\configs,这个由单独的线程do-cleanup 完成