1.组件
- Nimbus: 集群主节点,负责资源管理、任务分配
- Supervisor:集群工作节点,接收任务,管理worker进程
- Worker:工作进程,每个工作进程中都有多个Task线程
- Task:任务,每个任务都是一个线程,执行具体的算子代码
- Zookeeper:集群管理
2.工作流程
3.Topology结构
- Spout : 数据来源(Kafka,DB...)
void open(Map, TopologyContext, SpoutOutputCollector); // 初始化操作
void close(); //关闭时操作
void activate(); //task激活时操作
void deactivate(); //task暂停时操作
void nextTuple(); //发射下一个tuple时操作
void ack(Object msgId); //ack
void fail(Object msgId); //unack
- Bolt : 数据处理单元
void prepare(Map, TopologyContext); // 初始化操作
void execute(Tuple, BasicOutputCollector); //逻辑处理操作
void cleanup(); //关闭时操作
4.启动过程
根据jstorm 2.4.0版本源码整理
-
Nimbus
-
任务调度原则(jstorm将服务器资源通过port、cpu、net、disk四个维度进行分配,按以下4个优先级分配)
· P0: 自定义分配规则
· P1: 尽量将同组件的task分配到不同的worker和supervisor上,提高拓扑稳定性 (ComponentNumSelector)
· P2: 尽量将task分配到supervisor上worker分得的线程数基本一致,保证负载均衡(TotalTaskNumSelector)
· P3: 尽量将有直接消息传输的task放到同一个worker上,保证消息走Worker进程内通信,无需序列化/反序列化,利用disruptor无锁队列提升性能(InputComponentNumSelector)
-
Supervisor
-
Worker
启动过程比较简单,主要是根据分配的任务,利用反射执行指定的代码(bolt或spout),下图是worker工作流程图
-
Disruptor
在jstorm中延用了storm中队列的数据结构disruptor,这是LMAX开发的用于线程间高性能通信的无锁队列,每秒可处理线程间百万级别数据,Log4J2用的也是该数据结构。JStorm也是一样注重组件流水型工作,所以使用disruptor来进行线程间通信。
画了一个草图,disruptor主要是用了一个数组(逻辑环形),一般环形数组使用取余来定位当前下标,disruptor强制要求数组大小是2的倍数,这样使用位计算定位下标会更高效。其次,ring buffer只维护一个seq,表示哪一个slot是可以被消费到的,每一个消费者自身维护一个seq标示自己消费到哪一个slot。
Consumer消费:调用waitStrategy的waitFor()方法,使用不同策略等待消费。
Producer生产:使用2PC方式,先调用next()方法cas获取自己的slot,然后获取该slot设置成自己生产的数据,最后publish()提交,表示该slot对所有消费者可见。
PS:Disruptor为什么这么高效?
1)Ringbuffer数组使用内存预分配,减少GC
2)消除false sharing,填充缓存行
3)无锁
4)批处理效应,消费者追赶,例如生产者一瞬间生产N条消息,消费者不会一条一条请求,而是直接追赶消费到最新的seq
5.特性
-
Topology Master(减少zk压力,storm对zk依赖太过严重的优化)
缓解zk压力,由TM汇总后与zk/nimbus交互,在nimbus分配任务时,将TM作为系统bolt添加到等待分配的任务中(TM本质就是一个Bolt)
Metrics(采样)
Heartbeat(心跳)
-
Control Event(控制事件)
-
反压(生产速度远大于消费速度)
Jstorm使用二级反压,使用high_water_mark,low_water_mark控制反压(当队列大小大于high_water_mark时就开始限制上游发送速度,直到队列大小小于low_water_mark才取消对上游发送速度的限制),有效避免tps抖动。以下内容可参考图6 worker工作流程图了解反压机制。
- emit发送反压
1) 把消息放入send buffer时,判断是否需要流控(queue是否达到high_water_mark),等待至无需阻塞
2) Netty client线程异步从send buffer取消息发送给下游时,判断是否需要流控(根据netty server端response/Topology Master发送的控制消息判断),等待至无需阻塞 - receive接收反压
1)Netty Server接收到消息,缓存到recevie buffer时,判断是否需要流控(queue是否达到high_water_mark)
-
Acker(At least once语义)
Acker本质就是一个Bolt,源码上直接实现Bolt接口
- 启用方法
1)设置acker数量:conf.setNumAckers(1),在nimbus分配任务时,将acker作为系统bolt添加到等待分配的任务中
2)spout发射tuple指定msgId(全局唯一,作为状态监控的依据) -
原理(异或计算)
1)Spout每发送一个数据都会生成一个随机的root_id,并发送ack_init事件给Acker
2)Acker使用RoatingMap数据结构(具有清除超时Object的Map)维护root_id<=>AckObject(val,spout_task,fail)
3)每一个算子都会把自己接收的上游数据和生成的下游数据做异或计算发送给Acker。如bolt接收了上游r1产生了下游r3,则给Acker发送<root_id,r1^r3>;如bolt接收了上游r2,没有生成下游,则给Acker发送<root_id,r2>。直到某一时刻root_id的val为0时说明这个数据被所有的bolt处理完毕。
4)如何判断处理失败?上面提到Acker使用RoatingMap数据结构(具有清除超时Object的Map)维护root_id<=>AckObject,当Acker接收到非ack_init事件,并且在RoatingMap中找不到对应的AckObject,则表示超时
-
Exactly-Once语义
-
Chandy-Lamport Algorithm exactly-once语义
-
Chandy-Lamport Algorithm at least once 语义
如果上面的exactly-once语义缺少了流对齐过程则变成at least once 语义
-
Jstorm实现
1)角色
· spout(数据源)
· non-stateful bolt(状态无关节点)
· stateful bolt(状态节点,做流对齐以及状态保存)
· end bolt(结束节点)
2)流程
3)缺少流对齐的At least once 与 Acker At least once对比,发现新框架相比Acker实现At least once性能更高,应该是避免了各个算子频繁与acker通信产生的优势