ElasticSearch数据传输机制

ElasticSearch的数据传输服务TransportService

ElasticSearch的数据传输服务是在TransportService类中实现的。TransportService的核心方法是sendRequest,如下图所示:

sendRequest方法

从上面的代码段可以看出几个有用的信息:

  1. 首先看这一句:
生成requestId

这一句表明,每个被传输的请求,均包含一个requestId,根据requestId的生成函数newRequestId()的实现来看,requestId实际是一个自增的long型变量。这一变量的作用就是为了标识每一次请求。在接收方处理完请求,并返回应答时,需要将请求的requestId带回,以便发送方收到应答后,能够确定是对哪次请求的应答。

  1. 再来看这一句:
响应回调句柄对象

在发送传输请求时,同时指定了返回数据的回调句柄对象:TransportResponseHandler<T> hander。这一回调句柄对象被注册到clientHandlers容器中。
clientHandlers可以看做是一个Map,map的key值是requestId,map的value值是对应的TransportResponseHandler。可以通过clientHandlers.get(requestId)这样的调用来获取到对应的ResponseHandler。

  1. 最后看这一段
调用不同方法发送Request

在发送请求的时候,需要指定发送的目标节点。如果目标节点是本机,直接调用sendLocalRequest方法即可,这一方法不需要通过网络协议进行传输。如果目标节点不是本机,则调用transport成员的sendRequest方法实现数据发送。

Transport组件

ElasticSearch的节点间数据传输组件被抽象成Transport接口。并通过构造函数注入的方式注入到TransportService对象中,如下图所示:

将transport对象注入到TransportService对象中

可以看到,transport对象和threadPool对象都是通过构造函数注入的方式注入到TransportService中的。
,实际上,Transport这一接口的实现类仅有NettyTransport这一个。所以,可以认为ElasticSearch的节点间通讯就是通过Netty来实现的。

NettyTransport

由于Netty基于拦截器模式实现的NIO通讯框架,因此Netty的响应处理机制要通过如下代码说明:

Netty的ChannelPipeline设置

从上图的代码可以看出,ServerChannelPipelineFactory在pipeline上主要添加了两个Handler,一个是SizeHeaderFrameDecoder,一个是MessageChannelHandler。

SizeHeaderFrameDecoder

SizeHeaderFrameDecoder在ChannelPipeline中被命名为“size”,考虑到Netty本身也内置一个类似SizeHeaderFrameDecoder的Decoder,因此,很自然的理解为该Decoder是负责通过一个数据包长度的字段来指示包的长度的。而实际上,elasticsearch的SizeHeaderFrameDecoder的功能远比简单的一个包长度复杂,Netty的数据包头也不仅是一个包长度信息。下面详细介绍一下Netty数据包的包头数据结构。

Netty数据包头格式

NettyHeader

NettyHeader数据的格式如下:

字段名称| 字段长度(字节)| 说明|备注
----|------|----
MARKER_BYTES_SIZE| 2 | 起始标识 | “ES”两个大写字母
MESSAGE_LENGTH_SIZE| 4 | 消息长度 | int型变量
REQUEST_ID_SIZE| 8 | 消息ID | long型变量,请求发起方自增生成
STATUS_SIZE | 1 |状态变量|消息的flag集合,下面详细说明
VERSION_ID_SIZE| 4 | 版本信息 |

STATUS字段

NettyHeader中的Status字段的意义在TransportStatus类中定义。STATUS字段主要包含三个标志位:

  • STATUS_REQRES
  • STATUS_ERROR
  • STATUS_COMPRESS

TransportStatus的代码如下所示:

TransportStatus.java

下图图示中列出了STATUS字段(单字节)各个标识位的位置和意义。可以看出,只有后三位是有意义的。

7|6|5|4|3|3|2|1|0
----|----|----|----|----|----|----|----
-|-|-|-|-|-|压缩标识|Error标识|response标识(request为0,response为1)

MessageChannelHandler

MessageChannelHandler在ChannelPipeline中被命名为“dispatcher”,这说明该Decoder负责决定接收到的数据包该交给那个具体的业务逻辑去处理。在MessageChannelHandler的业务逻辑中,如下三个成员起了重要作用:

  • transport
  • threadPool
  • transportServiceAdapter

这三个成员是通过构造函数传入的,如下图所示:

MessageChannelHandler的构造函数

从上面的代码可以看出来,threadPool和transportServiceAdapter均来自于transport对象,因此,对于MessageChannelHandler来说,transport是至关重要的。

transportServiceAdapter

MessageChannelHandler的核心逻辑从messageReceived方法展开。但是,在进入messageReceived方法之前,我想先介绍一下transportServiceAdapter。这是后面关于messageReceived方法相关逻辑中需要涉及到的一个重要成员变量。

TransportServiceAdapter接口和TransportService类

transportServiceAdapter成员是TransportServiceAdapter接口的一个实现类,该接口的代码如下所示:

TransportServiceAdapter接口

该接口只有一个实现类,即TransportService.Adapter。其实,TransportServiceAdapter虽然命名为Adapter,但是,它的设计原意可能更接近门面模式。因为目标是使用一个更简单的接口来调用TransportService。TransportServiceAdapter接口有两个主要的获取消息处理句柄的方法,分别是:

  • onResponseReceived
  • getRequestHandler
    下面针对这两个函数,来看一下TransportService.Adapter的代码。
TransportService.Adapter.onResponseReceived
TransportService.Adapter.onResponseReceived

从代码中可以看到,这部分代码的主要逻辑是从clientHandlers容器中,获取到response的处理句柄——ResponseHandler。关于clientHandlers,之前在介绍TransportService.sendRequest方法时,介绍过了。下面结合此部分代码,重新回忆和梳理一下request的发送和响应流程:

  1. 发送方构建Request,在提交Request的同时,还需要提供responseHandler的响应信息回调处理句柄对象。
  2. 发送方将构建的Request对象和responseHandler句柄传递给TransportService的sendRequest方法。
  3. TransportService的sendRequest方法首先给request分配一个requestId,然后将requestId和responseHandler已key-value对的方式存储到clientHandlers容器中。随后,sendRequest调用transport成员变量的sendRequest方法执行数据发送操作。
  4. 接收方接收到request,进行处理,并返回response。(这部分操作在下面会进一步描述)。然后通过请求的通道(channel)将response返回。
  5. 发送方通过Netty框架完成接收数据包的处理,根据数据包的status字段,判断这是一个Response,然后调用MessageChannelHandler的相应函数进行处理。MessageChannelHandler最终通过调用TransportService.Adapter.onResponseReceived方法在TransportService的clientHandlers中根据requestId查找到该response对应的handler处理句柄对象。
  6. 调用handler的handleResponse方法进行返回结果的处理。根据handler的执行线程选择,可能在数据接收线程里面直接进行处理,也可能在线程池调用线程进行处理。
    下面,首先对上述第5步的数据包接收处理过程进行详细描述。
TransportService.Adapter.getRequestHandler方法
TransportService.Adapter.getRequestHandler

代码很简单,就是直接调用requestHandlers的get方法。requestHandlers也是个map,key值是action,value值是RequestHandlerRegistry,这个Registry中包含相应消息的hander句柄对象。

那么requestHandlers是由谁构建的呢,这个requestHandler是在系统启动时,由各个消息相应的Action对象通过调用registerRequestHandler方法,注册到TransportService中的。整个ElasticSearch各个模块,其中大量功能是需要用到节点间通讯的。因此,ElasticSearch各个模块均会调用TransportService的registerRequestHandler方法。下面以SearchServiceTransportAction为例进行说明,代码如下:

SearchServiceTransportAction

从代码中可以看到,SearchServiceTransportAction中注册了大量不同类型的Request的处理句柄。

MessageChannelHandler的messageReceived方法

MessageChannelHandler作为Netty的Decoder的实现类。需要重载messageReceived方法。在该方法中,根据消息的status信息,来决定如何对消息进行处理。具体代码如下:

MessageChannelHandler的消息分发逻辑

可以看到,实际处理消息内容的函数有如下几个:

  • handleRequest
  • handlerResponseError
  • handleResponse
    以上这三个函数均是MessageChannelHandler的proteted方法。根据上面的业务逻辑,数据包的status标志位中,只有response才会出现error的情况。

handleRequest函数

handleRequest函数的代码如下图所示:

MessageChannelHandler.handleRequest

上述代码主要有三个需要注意的地方,已经在上面的代码中通过红色方框标出

获取requestHandler

通过调用transportServiceAdapter.getRequestHandler方法实现,这部分代码在前面介绍transportServiceAdpater成员变量的时候,已经进行了较为详细的说明。

执行request的消息处理函数

从代码上看,是根据request的处理句柄对象的执行方式设定来决定是在当前线程(Netty的消息处理线程)中进行消息处理还是在特定的线程池中完成消息处理。

异常信息返回

如果在request消息处理过程中发生异常,则调用transportChannel.sendResponse(Throwable e)方法将错误信息返回给request请求节点。

handleResponse函数

handleResponse函数的代码如下图所示:

MessageChannelHandler.handleResponse

handleResponse方法中并无特殊需要注意的代码。大致逻辑与handleRequest相同。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,230评论 11 349
  • 10#数据类型 合并数组和非合并数组 合并数组:存储方式是连续的,中间没有闲置空间。例如,32bit的寄存器,可以...
    constant007阅读 32,864评论 0 18
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,006评论 25 707
  • 休息不代表一定要睡觉,其实只是换个脑子去工作而已。 优秀不够,你是否无可替代
    芣苢_0413阅读 194评论 0 0