JStorm源码分析-6.Worker

Worker由Supervisor根据分配的任务启动。主要负责启动由各个组件封装后的task。

1.程序分析

1.1 入口

JStorm源码分析-5.Supervisor中的最后,我们已经知道了Worker由com.alibaba.jstorm.daemon.worker.Worker启动。启动时,Supervisor向Worker传递了如下几个参数:

  • 拓扑ID,topology_id
  • Supervisor的ID,supervisor_id
  • 当前Worker要使用的port
  • 当前Worker的ID worker_id
  • 拓扑代码所在的本地路径jar_path

2 主要流程

2.1 预处理

  1. 首先判断传入的参数是否符合要求;
  2. 读取配置,确认是否为分布式模式;
  3. 将System.out重定向到/dev/null

2.2 Worker

mk_worker方法会创建一个Worker对象,并执行其execute方法,创建时首先会将Worker所需要的数据都封装到WorkerData中。WorkerData中的记录worker运行过程中所需要的所有必要数据。

WorkerData
下面是一些关键的属性

  • taskids - worker中包含的task列表
  • nodeportSocket - WorkerSlot是节点id和port,IConnection用于与其他Worker通信
  • taskNodeport - 每个task对应的Worker,使用WorkerSlotSlot记录
  • taskToResource - 每个task的资源分配ResourceAssignment
  • innerTaskTransfer - 用于内部的task之间互相传递tuple
  • tasksToComponent - task与组件的映射
  • componentToSortedTasks - 组件的task列表
  • transferQueue - 向其他worker发送tuple时,放入这个队列即可

创建task

Task的主要工作就是启动组件,Spout会不断调用nextTuple方法并发出tuple,Bolt会不断的收到tuple处理后发出给其他Bolt。创建的流程如下:

  1. TaskHeartbeatRunable:创建Task心跳线程,每隔task.heartbeat.frequency.secs默认10秒,将心跳TaskHeartbeat写入节点/jstorm/taskbeats/{topologyId}/{taskId}中。
  2. 创建Transport层,由于是非distribute的,并不会创建连接,而是将taskid绑定一个queue,服务器收到tuple后会将消息保存到queue中。Transport层默认使用MQContext,也可以使用NettyContext。
  3. TaskSendTargets:这个对象用来获得task发送tuple到某个指定的steam时,计算需要发送到哪些task上;创建之后,还会向SYSTEM_STREAM_ID的stream发送startup消息。
  4. 创建Executor之前,先创建了一个TaskReportError类,用于处理task的异常;TaskReportErrorAndDie会在收到异常后停止jvm;
  5. 创建Executor:由于task对应组件有IBolt和ISpout两种情况,会分别创建BoltExecutorsSpoutExecutors,这两个Executor都是RunnableCallback,会被放在AsyncLoopThread中不断的执行。

SpoutExecutors

  1. 初始化时,创建一个disruptorRecvQueue,用于接受发给task的tuple;之后将这个queue设置到registerInnerTransfer中,意味着当前Worker中的其他task发来的消息会保存到这个queue中;最后创建一个处理queueu的RecvRunnable,这个线程会不断接收当前Worker中的task发来的tuple并保存到这个queue中。
  2. 初始化时,读取参数topology.max.spout.pending默认为null,即spout需要等待的正在处理的tuple数量;如果为null或者大于1,那么就需要启动一个AckerRunnable线程执行executeEvent,否则就在SpoutExecutors中执行executeEvent。
  3. 调用spout的open方法,传入SpoutOutputCollector。

SpoutExecutors是放在AsyncLoopThread中不断执行的,在run方法中,首次启动时,会休眠spout.delay.run默认为30秒,这是为了等待bolt启动。 后续流程:

  1. 如果需要则在当前线程执行executeEvent,executeEvent会从disruptorRecvQueue读取tuple,由于是Spout类型,只会处理ack和fail类型的tuple。如果为IAckMsg,执行event;如果是Tuple类型,获取tuple的源stream,如果为ACKER_ACK_STREAM_ID,说明收到了ack消息,使用AckSpoutMsg调用spout的ack方法;如果源stream是ACKER_FAIL_STREAM_ID,说明收到了fail消息,使用FailSpoutMsg调用spout的fail方法;具体的Ack机制见下一节。
  2. 处理完ack消息后,如果task的状态不为TaskStatus.RUN,休眠后返回,等待其他线程将task停止。
  3. 如果不设置max_spout_pending或如果小于当前等待中的tuple数量,调用nextTuple方法获取tuple并发送;否则休眠后返回等待下次执行。

BoltExecutors
BoltExecutors在初始化时,也会创建一个disruptorRecvQueue,使用RecvRunnable处理收到的当前Worker发给task的tuple;还会调用bolt的prepare方法。
在循环执行中,Bolt会不断收到tuple,调用bolt的execute方法处理即可。

TaskShutdownDameon
创建task后返回的对象,用来操作每个task,可以active、deactive和shutdown。会为worker中的task创建一个TaskShutdownDameon,能够控制task的状态。如关闭时,会移除zk上task的心跳节点、执行组件的clean方法等。active和deactive会修改task的状态并执行相应的方法。

有了WorkerData的支持后,就开始启动worker内部众多的功能线程了。

WorkerVirtualPort
由于Worker中运行着多个task,每个Worker只使用分配给自己的portsolt启动服务器端。为了便于分发tuple,WorkerVirtualPort内部使用VirtualPortDispatch对服务器收到的数据进行分发。如果目标task在当前worker中,会使用IContext创建一个非distribute的IConnection,这是一个虚拟连接,然后向分发器注册<task,IConnection>,分发器收到数据后会发到正确的task上,在实现上非distribute使用QueueConnection的方式,每个连接有自己的disruptorQueue,收到数据后放到queue中即可,task会处理对queue的消费。

RefreshConnections
用于刷新连接,周期性的查看zk上的配置,更新连接配置,然后创建所需的新连接,关闭不需要的连接。

RefreshActive
用于周期性检查zk上的拓扑状态,切换active和deactive

WorkerHeartbeatRunable
用于周期性修改worker的心跳信息,保存到LocalState的orker-heartbeat中

DrainerRunable

用于不断处理workerdata的transferQueue,向其他worker发送消息;Worker中所有task需要发送数据时,会加入transferQueue,等待这个现场处理

2.3 消息的传递

我们知道,消息都是从Spout发出的,先看Spout是如何发出消息的,在上面我们可以知道在SpoutExecutors的run方法中会被不断会调用nextTuple(),在这个方法中,我们通常会使用open时候传入的SpoutOutputCollector来发送数据。SpoutOutputCollector使用了代理模式,内部通过SpoutCollector来emit消息。

SpoutCollector
emit最终调用了sendSpoutMsg,需要传入几个参数:

  • out_stream_id要发送到哪个stream
  • values tuple
  • message_id 消息的id
  • out_task_id 要发送到的task
  1. 如果out_task_id为null,由于我们制定要发送到哪个stream,那么根据topology结构,可以计算出要发送到哪些task,通过sendTargets可以计算出来out_tasks。
  2. tuple发出的消息,生成一个MessageId作为root_id,使用uuid的64位bit。如果传入的message_id不为null且设置的ackerNum大于0,说明需要ack机制。
  3. 遍历需要发送到的task,先生成msgid,如果需要ack,msgid为root_id和task的组合;如果不需要ack,msgid是空的。创建一个TupleImplExt,设置目标task,然后使用transfer_fn发送。transfer_fn使用的是TaskTransfer类。

TaskTransfer

task发送使用的taskTransfer在Task的构造方法中初始化,TaskTransfer的工作很简单,task所在Worker的WorkerData中有两种queue,分别用于与接收和发送tuple:

  • innerTaskTransfer:在WorkerData中初始化,SpoutExecutors/BoltExecutors都继承了BaseExecutors,在BaseExecutors中,会使用registerInnerTransfer注册task对应的disruptorRecvQueue,注册的目的是当前Worker内的其他task发送tulple写入这个queue即可;之后使用RecvRunnable处理其他Worker的task发送的tuple,也会写入这个queue。在SpoutExecutors/BoltExecutors的run方法中,都是从disruptorRecvQueue读取tuple并处理的。
  • transferQueue:这个是整个Worker用于与外部Worker通信的queue,也是在WorkerData中初始化。当需要向外部Worker的task发送消息时,写入这个queue即可。DrainerRunable用于处理发送的过程。

TaskTransfer首先从innerTaskTransfer中获取当前task的disruptorQueue,如果有,说明tuple的目的task是当前Worker,将其放入queue中即可。否则,保存到serializeQueue中,TransferRunnable线程会处理这个queue,会将tuple序列化后保存到TaskMessage中,传入transferQueue中等待发送到指定Worker的task中。

BoltCollector
Bolt组件最终发送使用boltEmit方法:

  • String out_stream_id:发送到的stream
  • Collection<Tuple> anchors:锚,用于ack或fail
  • List<Object> values:发送的tuple
  • Integer out_task_id:要发送到的task

Bolt组件与SpoutCollector类似,不同的是对anchors的处理。anchors用于ack。我们在下一节进行分析。

自此,我们知道了Worker是如何运行不同组件的task、以及各个task直接是如何进行通信的了,如何对收到的tuple进行处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容