ElasticSearch的数据传输服务TransportService
ElasticSearch的数据传输服务是在TransportService类中实现的。TransportService的核心方法是sendRequest,如下图所示:
从上面的代码段可以看出几个有用的信息:
- 首先看这一句:
这一句表明,每个被传输的请求,均包含一个requestId,根据requestId的生成函数newRequestId()的实现来看,requestId实际是一个自增的long型变量。这一变量的作用就是为了标识每一次请求。在接收方处理完请求,并返回应答时,需要将请求的requestId带回,以便发送方收到应答后,能够确定是对哪次请求的应答。
- 再来看这一句:
在发送传输请求时,同时指定了返回数据的回调句柄对象:TransportResponseHandler<T> hander。这一回调句柄对象被注册到clientHandlers容器中。
clientHandlers可以看做是一个Map,map的key值是requestId,map的value值是对应的TransportResponseHandler。可以通过clientHandlers.get(requestId)这样的调用来获取到对应的ResponseHandler。
- 最后看这一段
在发送请求的时候,需要指定发送的目标节点。如果目标节点是本机,直接调用sendLocalRequest方法即可,这一方法不需要通过网络协议进行传输。如果目标节点不是本机,则调用transport成员的sendRequest方法实现数据发送。
Transport组件
ElasticSearch的节点间数据传输组件被抽象成Transport接口。并通过构造函数注入的方式注入到TransportService对象中,如下图所示:
可以看到,transport对象和threadPool对象都是通过构造函数注入的方式注入到TransportService中的。
,实际上,Transport这一接口的实现类仅有NettyTransport这一个。所以,可以认为ElasticSearch的节点间通讯就是通过Netty来实现的。
NettyTransport
由于Netty基于拦截器模式实现的NIO通讯框架,因此Netty的响应处理机制要通过如下代码说明:
从上图的代码可以看出,ServerChannelPipelineFactory在pipeline上主要添加了两个Handler,一个是SizeHeaderFrameDecoder,一个是MessageChannelHandler。
SizeHeaderFrameDecoder
SizeHeaderFrameDecoder在ChannelPipeline中被命名为“size”,考虑到Netty本身也内置一个类似SizeHeaderFrameDecoder的Decoder,因此,很自然的理解为该Decoder是负责通过一个数据包长度的字段来指示包的长度的。而实际上,elasticsearch的SizeHeaderFrameDecoder的功能远比简单的一个包长度复杂,Netty的数据包头也不仅是一个包长度信息。下面详细介绍一下Netty数据包的包头数据结构。
Netty数据包头格式
NettyHeader
NettyHeader数据的格式如下:
字段名称| 字段长度(字节)| 说明|备注
----|------|----
MARKER_BYTES_SIZE| 2 | 起始标识 | “ES”两个大写字母
MESSAGE_LENGTH_SIZE| 4 | 消息长度 | int型变量
REQUEST_ID_SIZE| 8 | 消息ID | long型变量,请求发起方自增生成
STATUS_SIZE | 1 |状态变量|消息的flag集合,下面详细说明
VERSION_ID_SIZE| 4 | 版本信息 |
STATUS字段
NettyHeader中的Status字段的意义在TransportStatus类中定义。STATUS字段主要包含三个标志位:
- STATUS_REQRES
- STATUS_ERROR
- STATUS_COMPRESS
TransportStatus的代码如下所示:
下图图示中列出了STATUS字段(单字节)各个标识位的位置和意义。可以看出,只有后三位是有意义的。
7|6|5|4|3|3|2|1|0
----|----|----|----|----|----|----|----
-|-|-|-|-|-|压缩标识|Error标识|response标识(request为0,response为1)
MessageChannelHandler
MessageChannelHandler在ChannelPipeline中被命名为“dispatcher”,这说明该Decoder负责决定接收到的数据包该交给那个具体的业务逻辑去处理。在MessageChannelHandler的业务逻辑中,如下三个成员起了重要作用:
- transport
- threadPool
- transportServiceAdapter
这三个成员是通过构造函数传入的,如下图所示:
从上面的代码可以看出来,threadPool和transportServiceAdapter均来自于transport对象,因此,对于MessageChannelHandler来说,transport是至关重要的。
transportServiceAdapter
MessageChannelHandler的核心逻辑从messageReceived方法展开。但是,在进入messageReceived方法之前,我想先介绍一下transportServiceAdapter。这是后面关于messageReceived方法相关逻辑中需要涉及到的一个重要成员变量。
TransportServiceAdapter接口和TransportService类
transportServiceAdapter成员是TransportServiceAdapter接口的一个实现类,该接口的代码如下所示:
该接口只有一个实现类,即TransportService.Adapter。其实,TransportServiceAdapter虽然命名为Adapter,但是,它的设计原意可能更接近门面模式。因为目标是使用一个更简单的接口来调用TransportService。TransportServiceAdapter接口有两个主要的获取消息处理句柄的方法,分别是:
- onResponseReceived
- getRequestHandler
下面针对这两个函数,来看一下TransportService.Adapter的代码。
TransportService.Adapter.onResponseReceived
从代码中可以看到,这部分代码的主要逻辑是从clientHandlers容器中,获取到response的处理句柄——ResponseHandler。关于clientHandlers,之前在介绍TransportService.sendRequest方法时,介绍过了。下面结合此部分代码,重新回忆和梳理一下request的发送和响应流程:
- 发送方构建Request,在提交Request的同时,还需要提供responseHandler的响应信息回调处理句柄对象。
- 发送方将构建的Request对象和responseHandler句柄传递给TransportService的sendRequest方法。
- TransportService的sendRequest方法首先给request分配一个requestId,然后将requestId和responseHandler已key-value对的方式存储到clientHandlers容器中。随后,sendRequest调用transport成员变量的sendRequest方法执行数据发送操作。
- 接收方接收到request,进行处理,并返回response。(这部分操作在下面会进一步描述)。然后通过请求的通道(channel)将response返回。
- 发送方通过Netty框架完成接收数据包的处理,根据数据包的status字段,判断这是一个Response,然后调用MessageChannelHandler的相应函数进行处理。MessageChannelHandler最终通过调用TransportService.Adapter.onResponseReceived方法在TransportService的clientHandlers中根据requestId查找到该response对应的handler处理句柄对象。
- 调用handler的handleResponse方法进行返回结果的处理。根据handler的执行线程选择,可能在数据接收线程里面直接进行处理,也可能在线程池调用线程进行处理。
下面,首先对上述第5步的数据包接收处理过程进行详细描述。
TransportService.Adapter.getRequestHandler方法
代码很简单,就是直接调用requestHandlers的get方法。requestHandlers也是个map,key值是action,value值是RequestHandlerRegistry,这个Registry中包含相应消息的hander句柄对象。
那么requestHandlers是由谁构建的呢,这个requestHandler是在系统启动时,由各个消息相应的Action对象通过调用registerRequestHandler方法,注册到TransportService中的。整个ElasticSearch各个模块,其中大量功能是需要用到节点间通讯的。因此,ElasticSearch各个模块均会调用TransportService的registerRequestHandler方法。下面以SearchServiceTransportAction为例进行说明,代码如下:
从代码中可以看到,SearchServiceTransportAction中注册了大量不同类型的Request的处理句柄。
MessageChannelHandler的messageReceived方法
MessageChannelHandler作为Netty的Decoder的实现类。需要重载messageReceived方法。在该方法中,根据消息的status信息,来决定如何对消息进行处理。具体代码如下:
可以看到,实际处理消息内容的函数有如下几个:
- handleRequest
- handlerResponseError
- handleResponse
以上这三个函数均是MessageChannelHandler的proteted方法。根据上面的业务逻辑,数据包的status标志位中,只有response才会出现error的情况。
handleRequest函数
handleRequest函数的代码如下图所示:
上述代码主要有三个需要注意的地方,已经在上面的代码中通过红色方框标出
获取requestHandler
通过调用transportServiceAdapter.getRequestHandler方法实现,这部分代码在前面介绍transportServiceAdpater成员变量的时候,已经进行了较为详细的说明。
执行request的消息处理函数
从代码上看,是根据request的处理句柄对象的执行方式设定来决定是在当前线程(Netty的消息处理线程)中进行消息处理还是在特定的线程池中完成消息处理。
异常信息返回
如果在request消息处理过程中发生异常,则调用transportChannel.sendResponse(Throwable e)方法将错误信息返回给request请求节点。
handleResponse函数
handleResponse函数的代码如下图所示:
handleResponse方法中并无特殊需要注意的代码。大致逻辑与handleRequest相同。