前言
RocketMQ中存在四个角色:Producer、Consumer、Broker、NameServer,在前一篇文章中已经讨论过Consumer的运行原理和线程模型,这篇文章主要探讨Producer的运行原理。参考Consumer的运行原理,我们可以大胆的猜测Producer的线程模型存在两种情况:1、存在一个发送线程负责源源不断的将数据发送到Broker,它和Message产生和包装线程通过阻塞队列通信;2、消息的产生、包装、发送在同一个线程中,只不过在发送时有同步发送和异步发送两种方式。究竟时哪种方式,需要在源码中找到答案。
Producer的启动过程
RocketMQ源码中的example包中给出了Producer的示例用例。整个过程分为四步:1、设定Producer的group;2、启动Producer;3、发送消息;4、关闭Producer。Group用于标志Producer,一般以App为界限。在该节,我们重点分析启动过程。
追踪代码发现,start函数调用将会来到DefaultMQProducerImpl类中的start函数,在该函数中将会一次执行:1、配置检查;2、向Client中注册生产者;3、添加Topic的路由信息(初始为空);4、开启MQClientFactory;5、开启心跳检测;6、开启定时任务,负责清理过期的Request。具体代码如图所示:
那么Producer具体有没有开启一个后台线程去专职负责消息发送,需要进入到MQClientFactory中寻找答案。
进入到MQClientFactory中我们发现,不论是Producer还是Consumer的启动过程均公用MQCLientFactory的启动逻辑。而且可以从该类的名字看出,这个类主要负责执行网络通信。MQClientFactory中首先检测NameServer的地址有没有正确设置,若否则去配置中心拉取;其次启动MQClientAPIImpl,此处主要是开启消息发送的Channel,即链接远端的组件(NameServer、Broker);其次开启定时任务,主要定时从NameServer拉取数据;随后会开启消息拉取线程;然后开启消息拉取负载均衡线程,这些都是为了给Consumer服务的,实际上对于Producer来说最主要的是开启Channel以及和NameServer数据同步。具体代码如下:
总结来说,Producer的开启过程中最主要的是开启了一个后台线程用于和NameServer同步数据,此外打通了和远程组件之间的数据Channel,显然Producer和Consumer不一样,并没有一个专门发送数据的后台线程
Producer的发送流程
在用户调用send函数后,send函数中首先会做消息的检查,再做Topic的替换,随后调用到DefaultMQProducerImpl中的send函数,默认采用同步发送,最终调用到DefaultMQProducerImpl中的sendDefaultImpl函数中。主要流程是:1、Channel状态检查;2、消息检查;3、设置消息发送的唯一ID,此处主要是用于Broker确认消息的标志;4、提取Topic的路由数据;5、发送消息。整个发送逻辑还包括了错误重试的逻辑。
上述的逻辑最终会调用到sendKernelImpl,在该函数中存在以下的逻辑:确认Topic的路由信息,若Topic的路由信息有误则会从NameServer获取最新的路由数据;随后执行相应的钩子函数;其后构造消息发送的Request(数据包)。最后通过CLient的API发送数据。
在默认的情况下,Producer采用SYNC的方式发送数据,且没有注册SendCallBack,数据最终会由NettyRemotingClient发送。众所周知,Netty是一个典型的非阻塞网络工具,那么Producer是如何实现同步发送的呢?实现同步的关键就是RocketMQ的ResponseFeature。在发送时会返回一个ResponseFeature,Feature中核心为CountDownLatch。调用Feature中的waiteResponse则会被阻塞,直到Response真正到达,在Netty的响应线程中会putResponse,随后CountDownLatch会唤醒阻塞线程以拿到真正的Response。
总结:Producer默认采用SYNC的方式进行发送,并且发送逻辑中已经实现了重试的逻辑,同步发送的关键是ResponseFeature。