Storm LinearDrpc

以ReachTopology为例子,这个类在storm-starter里。

LinearDRPC顾名思义是线性的,bolt的处理流程是linear的。

重要的方法

1.StormTopologycreateTopology(DRPCSpout spout)

linearDRPC核心的防范就是创建createTopology,方法里首先会给DRPCSpout赋予一个spoutid,然后创建一个bolt,这个bolt接受spout的stream。


bolt的类型是PrepareRequest,PrepareRequest会发射三个stream,分别是参数(args),返回信息(returnInfo)和请求id(requestID)


接下来,会遍历topo中的每一个组件(component),遍历组件的过程主要做了几个事情:

1. 对每一个组件用BoltDeclarer包装一下

2. 遍历每一个component的group(比如shuffle group, field group, direct etc)付给新创建的BoltDeclarer


在这之后,每个linearDrpcBuilder都会有一个JoinResult和ReturnResult的bolt,其中JoinResult接受Prepare发出的request stream和最后一个bolt发出的第一个fields。

并将二者做join生成json。ReturnResult获得JoinResult的结果,并且通过drpc的result的方法放到result的队列。


从数据流转的角度,请求如下进行:

1. 首先,客户端通过execute方法,将请求提交给drpc server(server有clojure实现,请见drpc.clj)


2. 从server的角度来说,topo提交后,就会自动开始计算(参见上面的linearDrpcBuilder中的实现方式)。

3. DrpcSpout从server中获取request(详见drpc.clj的fetchRequest方法)

4. 计算结果最终放到另一个队列,并由execute返回

注意这里用信号量来控制时序, execute用信号量的许可为0来做初始化,这样会导致execute阻塞


而result方法会对相应的信号量做释放


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容