以ReachTopology为例子,这个类在storm-starter里。
LinearDRPC顾名思义是线性的,bolt的处理流程是linear的。
重要的方法
1.StormTopologycreateTopology(DRPCSpout spout)
linearDRPC核心的防范就是创建createTopology,方法里首先会给DRPCSpout赋予一个spoutid,然后创建一个bolt,这个bolt接受spout的stream。
bolt的类型是PrepareRequest,PrepareRequest会发射三个stream,分别是参数(args),返回信息(returnInfo)和请求id(requestID)
接下来,会遍历topo中的每一个组件(component),遍历组件的过程主要做了几个事情:
1. 对每一个组件用BoltDeclarer包装一下
2. 遍历每一个component的group(比如shuffle group, field group, direct etc)付给新创建的BoltDeclarer
在这之后,每个linearDrpcBuilder都会有一个JoinResult和ReturnResult的bolt,其中JoinResult接受Prepare发出的request stream和最后一个bolt发出的第一个fields。
并将二者做join生成json。ReturnResult获得JoinResult的结果,并且通过drpc的result的方法放到result的队列。
从数据流转的角度,请求如下进行:
1. 首先,客户端通过execute方法,将请求提交给drpc server(server有clojure实现,请见drpc.clj)
2. 从server的角度来说,topo提交后,就会自动开始计算(参见上面的linearDrpcBuilder中的实现方式)。
3. DrpcSpout从server中获取request(详见drpc.clj的fetchRequest方法)
4. 计算结果最终放到另一个队列,并由execute返回
注意这里用信号量来控制时序, execute用信号量的许可为0来做初始化,这样会导致execute阻塞
而result方法会对相应的信号量做释放