一.NettyRpcEnv主要组件
子组件TransportConf,Dispatcher,TransportClientFactory,TransportServer
TransportConf 为RPC框架的中的配置类
Dispatcher 可以有效提高NettyRpcEnv消息异步处理能力和并行处理能力,负责将RPC消息路由到应该对此消息处理的RpcEndpoint端点。
TransportContext 是NettyRpcEnv提供服务端和客户端能力的前提,内部的NettyRpcHandler用于接受远程客户端或服务端发送过来的消息,并将ByteBuffer反序列化成RequestMessage,调用相应处理消息的方法。
TransportClientFactory 是NettyRpcEnv向远端服务发起请求的基础,Spark与远端RpcEnv进行通信都依赖于其生成的TransportClient。
TransportServer 为NettyRpcEnv提供了对外接受请求,处理请求,回复客户端的服务。
二.Dispatcher的构成介绍
endpoints:端点实例名称与端点数据EndpointData之间映射关系的缓存,有了这个缓存,就可以使用端点名称从中快速获取或者删除EndpointData
endpointRefs:端点实例RpcEndpoint与端点实例引用RpcEndpointRef之间的映射关系的缓存,可以使用端点实例从中快速获取或者删除端点实例的引用。
receivers:存储端点数据EndpointData的阻塞队列,只有Inbox中有消息的EndpointData才会被放入到此阻塞队列。
threadpool:用于对消息进行调度的线程池,此线程池运行的任务都是MessageLoop。
三.接受消息的处理过程
- 调用Inbox的post方法,将消息放入到message列表中
- 将有消息的Inbox相关联的EndpointData放入到receivers中
- MessageLoop每次循环首先从receiver中获取EndpointData
- 执行EndpointData中Inbox中的process方法对消息进行具体的处理。
四.发送消息到远端和本地的调用流程
NettyRpcEndpointRef中的ask方法和send方法都是首先将message封装成RequestMessage,然后通过调用NettyRpcEnv的send方法和ask方法对消息目的地进行判断,如果是发送到本地的消息,就调用Dispatcher中对应的postLocalMessage/ postOneWayMessage方法发送到本地的RpcEndpoint对应EndpointData中的Inbox中。
如果是发送到远程RpcEndpoint的消息,则调用NettyRpcEnv的postToOutbox方法,从outboxes中根据远端地址,取出相应的Outbox,然后将消息放入到远端RpcEndpoint的地址所对应的Outbox的message列表中。然后Outbox中会调用drainOutbox方法不断循环,从messages列表中取得OutboxMessage,通过TransportClient向外发送消息到对应的NettyRpcEnv中的RpcEndpoint。
五.总结(Spark消息通信全过程)
调用NettyRpcEndpointRef的send和ask方法,向本地节点的RpcEndpoint发送消息,由于是在同一节点,所以直接调用Dispatcher的postLocalMessage或postOneWayMessage方法,将消息放入EndpointData内部的Inbox的message列表中,此EndpointData也会被加入到Dispatcher中的消息队列receivers,消息队列中有消息,触发MessageLoop线程处理消息,执行EndpointData中Inbox中的process方法对消息进行具体的处理,其实最后调用的都是RpcEndpoint中的receiveAndReply或者receive等方法。
通过NettyRpcEndpointRef的send方法和ask方法向远端节点的RpcEndpoint发送消息,在这种情况下,首先将消息封装成OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outbox的message列表中。
每个Outbox中会调用drainOutbox方法不断循环,从messages列表中取得OutboxMessage。
Outbox中会使用内部的TransportClient向远端的NettyRpcEnv发送OutboxMessage。
和远端的NettyRpcEnv的TransportServer建立了连接后,请求消息首先经过Netty管道的处理,然后经由NettyRpcHandler的处理,最后来自服务端NettyRpcServer的回复消息会触发NettyRpcHandler的receive方法,进而调用Dispatcher的postRemoteMessage或者postOneWayMessage方法。首先是根据端点名称endpointName从缓存endpoints中获取EndpointData,将消息放入到EndpointData内部的Inbox的message列表中,然后将EndpointData推入到receviers中,最后触发MessageLoop线程处理消息,执行EndpointData中Inbox中的process方法对消息进行具体的处理,其实最后调用的都是RpcEndpoint中的receiveAndReply或者receive等方法。