Thrift

架构图

业务层:根据业务逻辑,实现thrift文件中接口
接口层:根据thrift文件,生成框架代码
协议层:对数据流进行序列化(二进制、json)
传输层:负责网络传输

C/S模型

Client端

在Client端,用户首先需要依次指定transport类型、protocol类型和client对象。client对象负责将函数名以及参数发送给server端,并且解析server端返回的结果。

Server端
  1. 在Processor的初始化过程,绑定用户实现的handler。
class MyService:
  def func(self, n1, n2):
        pass
handler = MyService()
processor = MyService.Processor(handler)
  1. Server端建立transport,并设置protocol。
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
  1. 创建server对象并开始服务。
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
server.serve()

Thrift内部流程

序列化支持
  1. ttypes.py
    thrift文件中定义的数据格式,编译后的存放在ttypes.py中。Thrift将用户自定义的struct、enum和exception都转换为python class。
class SharedStruct {
  thrift_spec = (
    None, #0
    (1, TType.I32, 'key', None, None, ) #1
    (2, TType.STRING, 'value', None, None, ) # 2
  )
  
  def __init__(self, key=None, value=None):
    self.key = key
    self.value = value

  def read(self, iprot):
    .....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 1:
        if ftype == TType.I32:
          self.key = iprot.readI32()
        else:
          iprot.skip(ftype)
      elif:
        if ftype == TType.STRING:
          self.value = iprot.readString()
        else:
          iport.skip(ftype)  
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprort.readStructEnd()

    def write(self, oprot):
      oprot.writeStructBegin('SharedStruct')
      if self.key is not  None:
        oprot.writeFieldBegin('key', TType.I32, 1)
        oprot.wirteI32(self.key)
        oprot.writeFieldEnd()
      ....

可以看出,Thrift为自定义类型生成了spec结构、read方法和write方法。在之后的分析中,spec结构对应于自定义类型的内部数据参数,
对数据结构的解析将起到指导性的作用。read方法和write方法是互逆的过程,write方法按照struct name、'key'、key_type、key_value、value_tag、value_type、value_value的顺序将SharedStruct的内容写入output protocol中,而read方法按照相应的顺序,从input protocol中读取各个字段。
注意:在TBinaryProtocol中,readStructBegin和writeStructBegin都是空操作,所以虽然传参不同,但是实际上是互斥空操作。

Client端
client.open()

连接指定的ip地址、端口。

sum_ = client.add(1, 1)

调用client对象的send_xxx和recv_xxx方法。在send_xxx中,使用xxx_args的write方法,将参数发送给Server端;在recv_xxx中,使用xxx_result的read方法来解析Server端返回的结果。Server端会在返回结果的末尾设置TType.Stop标识来表征消息的结束。

Server端
  • 当设置了Server端的server对象后,会调用其serve方法,该方法会建立监听,等待Client端的连接。
    self.serverTransport.listen()
  • 当检测到Client端发起rpc调用后,会建立相应的数据对象,并调动Processor的process进行处理。
    while True:
      client = self.serverTransport.accept()
      if not client:
        continue
      itrans = self.inputTransportFactory.getTransport(client)
      otrans = self.outputTransportFactory.getTransport(client)
      iprot = self.inputProtocolFactory.getProtocol(itrans)
      oprot = self.outputProtocolFactory.getProtocol(otrans)
      try:
        while True:
          self.processor.process(iprot, oprot)
      itrans.close()
      otrans.close()

itrans、otrans、iprot、oprot都是通过工厂方法实例化。itrans、otrans负责Server端与Client端的数据传输,iprot、oprot负责解码工作(应该相同)。

  • process会依据Client端发送的函数标识(add)进行分发,交由process_add方法进行处理。process_add方法会解析参数并调用handler的add方法,并使用xxx_result的write方法来返回结果。
序列化的流程
  1. 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,消息seqId
  2. 写消息体。如果参数是一个类,就writeStructBegin
  3. 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始
  4. 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32
  5. 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束
  6. 读消息时根据数据类型读取相应的长度
Protocol

用于信息的序列化过程,分为write和read对称的两部分。

方法/字段 含义 内容
Message 消息传输的头部 name(方法名)+ type + seqid
FieldB/E/Stop type + id(IDL中的索引) / None / STOP
Struct name
Map ktype + vtype + size
List etype + size
Set etype + size
Bool/Byte/I16/I32/I64/Double/String/Binary bool_val
skip ttype

STRUCT

struct_begin
while True:  
      field_begin
      if ttype == STOP:
          break
     skip(ttype)
    field_end
struct_end

MAP

map_begin
for i in range(size):
  skip(ktype)
  skip(vtype)
map_end

SET / LIST

begin
for i in range(size):
  skip(ttype)
end
HelloService.py
  • Iface
    service的接口描述类;

  • Client
    客户端发送请求 + 接收回馈的方法类;

class Client(Iface):
  def __init__(self, iprot, oprot=None):
    self._iport = self._oprot = None
    if oport is not None:
      self._oprot = oprot
    self._seqid = 0
  def getStruct(self, key):
    self.send_getStruct(key)
    return self.recv_getStruct()

  def send_getStruct(self, key):
    self._oprot.writeMessageBegin('getStruct', TMessageType.CALL, self._seqid)
    args = getStruct_args()
    args.key = key
    args.write(self._oprot)
    self._oprot.writeMessageEnd()
    self._oprot.trans.flush()

  def recv_getStruct(self):
    iprot = self._iprot
    (fname, mtype, rseqid) = iprot.readMessageBegin()
    if mtype == TMessageType.EXCEPTION:
      ...
    result = getStruct_result()
    result.read(iprot)
    iprot.readMessageEnd()
    if result.success is not None:
      return result.success
    raise TApplicationException(...)

Client实现的用户声明方法的方式为send_xxx然后recv_xxx,这个模式符合rpc的调用思想,先将请求发出去,然后等待接收远端的回应。

send_xxx && recv_xxx

在send_xxx中,首先会写入name、ttyp和seqid的信息。然后通过获取getStruct方法的参数信息,并写入到输出流中。
获取参数信息的方式如下:

class getStruct_args:
  thrift_spec = (
    None, # 0
    (1, TType.I32, 'key', None, None, ), # 1
  )

  def __init__(self, key=None,):
    self.key = key

  def read(self, iprot):
    ....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 1:
        if ftype == TType.I32:
          self.key = iprot.readI32()
        else:
          iprot.skip(ftype)
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprot.readStructEnd()

  def write(self, oprot):
    ....
    oprot.writeStructBegin('getStruct_args')
    if self.key is not None:
      oprot.writeFieldBegin('key', TType.I32, 1)
      oprot.writeI32(self.key)
      oprot.writeFieldEnd()
    oprot.writeFieldStop()
    oprot.writeStructEnd()

send_xxx通过调用getStruct_args的write方法来发送参数信息到远端,可以猜想到,server端会调用getStruct_args的read方法来解析参数信息。
当server端处理完成后,会发送结果到client端。对结果的处理类似于参数处理的逆过程。由getStruct_result负责:

class getStruct_result:
  thrift_spec = (
    (0, TType.STRUCT, 'success', (SharedStruct, SharedStruct.thrift_spec), None, ), # 0
  )

  def __init__(self, success=None,):
    self.success = success

  def read(self, iprot):
    ....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 0:
        if ftype == TType.STRUCT:
          self.success = SharedStruct()
          self.success.read(iprot)
        else:
          iprot.skip(ftype)
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprot.readStructEnd()

  def write(self, oprot):
    ....
    oprot.writeStructBegin('getStruct_result')
    if self.success is not None:
      oprot.writeFieldBegin('success', TType.STRUCT, 0)
      self.success.write(oprot)
      oprot.writeFieldEnd()
    oprot.writeFieldStop()
    oprot.writeStructEnd()

server端使用getStruct_result的write来发送结果,client端会调用read来响应的解析结果。从代码中可以看出,结果success为TType.STRUCT类型,也就是SharedStruct结构体,而对SharedStruct结构体的解释就位于SharedStruct类中的thrift_spec字段。

  • Processor
    服务端接收请求 + 调用处理函数 + 返回结果的方法类;
    在Processor中注册有函数名所对应的内部处理方法,当调用process时,会执行预先给定的handler并返回处理结果。
    对于执行被调用函数的情况,会返回INTERNAL_ERROR的错误。

    1. process function
      name, type, seqid = readMessageBegin
      校验name是否存在
      call process_name
    
    1. process_xxx
      调用Get_args来读取输入
      调用处理函数并获取返回值
      作为TMessageType.REPLY写回
    
  • Get_args

    1. write
      writeStructBeigin('Get_args')
      writeField(参数名称,TType.STRUCT, 1(索引))
      自定义类型的write
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容