Worker由Supervisor根据分配的任务启动。主要负责启动由各个组件封装后的task。
1.程序分析
1.1 入口
JStorm源码分析-5.Supervisor中的最后,我们已经知道了Worker由com.alibaba.jstorm.daemon.worker.Worker
启动。启动时,Supervisor向Worker传递了如下几个参数:
- 拓扑ID,topology_id
- Supervisor的ID,supervisor_id
- 当前Worker要使用的port
- 当前Worker的ID worker_id
- 拓扑代码所在的本地路径jar_path
2 主要流程
2.1 预处理
- 首先判断传入的参数是否符合要求;
- 读取配置,确认是否为分布式模式;
- 将System.out重定向到
/dev/null
2.2 Worker
mk_worker方法会创建一个Worker对象,并执行其execute方法,创建时首先会将Worker所需要的数据都封装到WorkerData中。WorkerData中的记录worker运行过程中所需要的所有必要数据。
WorkerData
下面是一些关键的属性
- taskids - worker中包含的task列表
- nodeportSocket - WorkerSlot是节点id和port,IConnection用于与其他Worker通信
- taskNodeport - 每个task对应的Worker,使用WorkerSlotSlot记录
- taskToResource - 每个task的资源分配ResourceAssignment
- innerTaskTransfer - 用于内部的task之间互相传递tuple
- tasksToComponent - task与组件的映射
- componentToSortedTasks - 组件的task列表
- transferQueue - 向其他worker发送tuple时,放入这个队列即可
创建task
Task的主要工作就是启动组件,Spout会不断调用nextTuple方法并发出tuple,Bolt会不断的收到tuple处理后发出给其他Bolt。创建的流程如下:
- TaskHeartbeatRunable:创建Task心跳线程,每隔
task.heartbeat.frequency.secs
默认10秒,将心跳TaskHeartbeat写入节点/jstorm/taskbeats/{topologyId}/{taskId}
中。 - 创建Transport层,由于是非distribute的,并不会创建连接,而是将taskid绑定一个queue,服务器收到tuple后会将消息保存到queue中。Transport层默认使用MQContext,也可以使用NettyContext。
- TaskSendTargets:这个对象用来获得task发送tuple到某个指定的steam时,计算需要发送到哪些task上;创建之后,还会向
SYSTEM_STREAM_ID
的stream发送startup
消息。 - 创建Executor之前,先创建了一个TaskReportError类,用于处理task的异常;TaskReportErrorAndDie会在收到异常后停止jvm;
- 创建Executor:由于task对应组件有IBolt和ISpout两种情况,会分别创建
BoltExecutors
和SpoutExecutors
,这两个Executor都是RunnableCallback,会被放在AsyncLoopThread中不断的执行。
SpoutExecutors
- 初始化时,创建一个disruptorRecvQueue,用于接受发给task的tuple;之后将这个queue设置到registerInnerTransfer中,意味着当前Worker中的其他task发来的消息会保存到这个queue中;最后创建一个处理queueu的RecvRunnable,这个线程会不断接收当前Worker中的task发来的tuple并保存到这个queue中。
- 初始化时,读取参数
topology.max.spout.pending
默认为null,即spout需要等待的正在处理的tuple数量;如果为null或者大于1,那么就需要启动一个AckerRunnable线程执行executeEvent,否则就在SpoutExecutors中执行executeEvent。 - 调用spout的open方法,传入SpoutOutputCollector。
SpoutExecutors是放在AsyncLoopThread中不断执行的,在run方法中,首次启动时,会休眠spout.delay.run
默认为30秒,这是为了等待bolt启动。 后续流程:
- 如果需要则在当前线程执行executeEvent,executeEvent会从disruptorRecvQueue读取tuple,由于是Spout类型,只会处理ack和fail类型的tuple。如果为IAckMsg,执行event;如果是Tuple类型,获取tuple的源stream,如果为ACKER_ACK_STREAM_ID,说明收到了ack消息,使用AckSpoutMsg调用spout的ack方法;如果源stream是ACKER_FAIL_STREAM_ID,说明收到了fail消息,使用FailSpoutMsg调用spout的fail方法;具体的Ack机制见下一节。
- 处理完ack消息后,如果task的状态不为TaskStatus.RUN,休眠后返回,等待其他线程将task停止。
- 如果不设置max_spout_pending或如果小于当前等待中的tuple数量,调用nextTuple方法获取tuple并发送;否则休眠后返回等待下次执行。
BoltExecutors
BoltExecutors在初始化时,也会创建一个disruptorRecvQueue,使用RecvRunnable处理收到的当前Worker发给task的tuple;还会调用bolt的prepare方法。
在循环执行中,Bolt会不断收到tuple,调用bolt的execute方法处理即可。
TaskShutdownDameon
创建task后返回的对象,用来操作每个task,可以active、deactive和shutdown。会为worker中的task创建一个TaskShutdownDameon,能够控制task的状态。如关闭时,会移除zk上task的心跳节点、执行组件的clean方法等。active和deactive会修改task的状态并执行相应的方法。
有了WorkerData的支持后,就开始启动worker内部众多的功能线程了。
WorkerVirtualPort
由于Worker中运行着多个task,每个Worker只使用分配给自己的portsolt启动服务器端。为了便于分发tuple,WorkerVirtualPort内部使用VirtualPortDispatch对服务器收到的数据进行分发。如果目标task在当前worker中,会使用IContext创建一个非distribute
的IConnection,这是一个虚拟连接,然后向分发器注册<task,IConnection>,分发器收到数据后会发到正确的task上,在实现上非distribute
使用QueueConnection的方式,每个连接有自己的disruptorQueue,收到数据后放到queue中即可,task会处理对queue的消费。
RefreshConnections
用于刷新连接,周期性的查看zk上的配置,更新连接配置,然后创建所需的新连接,关闭不需要的连接。
RefreshActive
用于周期性检查zk上的拓扑状态,切换active和deactive
WorkerHeartbeatRunable
用于周期性修改worker的心跳信息,保存到LocalState的orker-heartbeat中
DrainerRunable
用于不断处理workerdata的transferQueue,向其他worker发送消息;Worker中所有task需要发送数据时,会加入transferQueue,等待这个现场处理
2.3 消息的传递
我们知道,消息都是从Spout发出的,先看Spout是如何发出消息的,在上面我们可以知道在SpoutExecutors的run方法中会被不断会调用nextTuple(),在这个方法中,我们通常会使用open时候传入的SpoutOutputCollector
来发送数据。SpoutOutputCollector使用了代理模式,内部通过SpoutCollector来emit消息。
SpoutCollector
emit最终调用了sendSpoutMsg,需要传入几个参数:
- out_stream_id要发送到哪个stream
- values tuple
- message_id 消息的id
- out_task_id 要发送到的task
- 如果out_task_id为null,由于我们制定要发送到哪个stream,那么根据topology结构,可以计算出要发送到哪些task,通过sendTargets可以计算出来out_tasks。
- tuple发出的消息,生成一个MessageId作为root_id,使用uuid的64位bit。如果传入的message_id不为null且设置的ackerNum大于0,说明需要ack机制。
- 遍历需要发送到的task,先生成msgid,如果需要ack,msgid为root_id和task的组合;如果不需要ack,msgid是空的。创建一个TupleImplExt,设置目标task,然后使用transfer_fn发送。transfer_fn使用的是TaskTransfer类。
TaskTransfer
task发送使用的taskTransfer在Task的构造方法中初始化,TaskTransfer的工作很简单,task所在Worker的WorkerData中有两种queue,分别用于与接收和发送tuple:
- innerTaskTransfer:在WorkerData中初始化,SpoutExecutors/BoltExecutors都继承了BaseExecutors,在BaseExecutors中,会使用registerInnerTransfer注册task对应的disruptorRecvQueue,注册的目的是当前Worker内的其他task发送tulple写入这个queue即可;之后使用RecvRunnable处理其他Worker的task发送的tuple,也会写入这个queue。在SpoutExecutors/BoltExecutors的run方法中,都是从disruptorRecvQueue读取tuple并处理的。
- transferQueue:这个是整个Worker用于与外部Worker通信的queue,也是在WorkerData中初始化。当需要向外部Worker的task发送消息时,写入这个queue即可。DrainerRunable用于处理发送的过程。
TaskTransfer首先从innerTaskTransfer中获取当前task的disruptorQueue,如果有,说明tuple的目的task是当前Worker,将其放入queue中即可。否则,保存到serializeQueue中,TransferRunnable线程会处理这个queue,会将tuple序列化后保存到TaskMessage中,传入transferQueue中等待发送到指定Worker的task中。
BoltCollector
Bolt组件最终发送使用boltEmit方法:
- String out_stream_id:发送到的stream
- Collection<Tuple> anchors:锚,用于ack或fail
- List<Object> values:发送的tuple
- Integer out_task_id:要发送到的task
Bolt组件与SpoutCollector类似,不同的是对anchors的处理。anchors用于ack。我们在下一节进行分析。
自此,我们知道了Worker是如何运行不同组件的task、以及各个task直接是如何进行通信的了,如何对收到的tuple进行处理。