对于建立grpcclient的一些分析

首先,在express中建立一系列的http请求是node开发中非常常见的情况,如何去开发一套逻辑结构比较清晰又方便维护的httpclient,下面结合paypal的逻辑结构去做一些分析

  1. 请求入口,不做赘述,一个非常常见的请求函数
const { kycServClient } = require('../lib/grpc/grpcClient')

async createBizInfo(payload) {
    const res = await kycServClient.submitBusinessEntity(payload)
    logger.info('create biz info success.', res)
    return res
  },
  1. 遍历所有的service的定义准备组装,可以看到这里直接通过init函数组装,直接抛出,核心为createClient函数,下一步说
function init () {
  const result = {}
  for (const clientName in configuration) {
    console.log("clientName",configuration,clientName)
    result[clientName] = createClient(configuration[clientName])
  }
  return result
}
module.exports = init()
  1. 这里内容较多,首先protoRoot为ppcn的接口定义,grpc.loadObject则是将接口定义转换成了对应service函数,这里之后再详细讨论,serviceLookup则是通过reduce将对应的serviceConfig转换成sercice函数,例如将kyc转换成ServiceClient(address, credentials, options)即kycservice,之后通过
    grpcClient函数向service添加一系列的新功能(通过proxy复写get的方法)
{
domain:'kyc'
name:'KycService'
url:'10.221.102.142:30044'
}

const protoDescriptor = grpc.loadObject(protoRoot)

function serviceLookup ({ name, domain }) {
  const servicePath = protoDescriptor.ppcn.api
  const domainPath = domain.split('.')
  return domainPath.reduce((path, current) => {
    if (path[current][name]) {
      return path[current][name]
    }
    path = path[current]
    return path
  }, servicePath)
}

function createClient (serviceConfig = {}) {
  const Service = serviceLookup(serviceConfig)
  const client = new Service(serviceConfig.url, grpc.credentials.createInsecure(), grpcOption)
  return grpcClient(client, protoRoot, {
    metadata: {
      'ppcn-user-agent': `${config.serviceName}/${packageJson.version}`
    }
  })
}
  1. grpcClient函数,这边两个用途,第一复写了get的逻辑,主要是为了区分是否为grpc请求,如果是grpc请求才会应用grpc的接口定义以及拦截层,这就是为什么如果切换成http请求的话maskmessage逻辑需要重新做,第二注入三个中间层,具体可以在client-proxy.js中查看
...
module.exports = (client, protoRoot, options = {}) => {
  client.protoRoot = protoRoot
  client.interceptors = []
  client.interceptors.push(
    require('./interceptor/logging-client-interceptor'),
    require('./interceptor/jaeger-client-interceptor'),
    metadataInterceptorFactory({ metadata: options.metadata })
  )
  return new Proxy(client, handler)
}
  1. 到第四步为止,其实大体上建立一个grpcclient去请求相应数据这层逻辑已经完成了,那么ppcn到底是怎么将api定义转换成service函数,又是怎么去应用client中注入的interceptor去处理数据的,下面逐步进行分析

    1.当接收到protobufjs传回的api接口json形式的定义后会调用grpc包中的loadObject方法进行构建,这边首先对于protobuf_js_6_common版本号进行了判断,具体不展开,去判断是5版本还是6版本的protobufjs,分别去调用相应版本的loadObject,下面使用6版本做例子, ,这边protobufjs其实是grpc里的概念之后再去熟悉,这里不做赘述

     exports.loadObject = function loadObject(value, options) {
      options = Object.assign({}, common.defaultGrpcOptions, options);
      options = Object.assign({}, {'protobufjsVersion': 'detect'}, options);
      var protobufjsVersion;
      if (options.protobufjsVersion === 'detect') {
        if (protobuf_js_6_common.isProbablyProtobufJs6(value)) {
          protobufjsVersion = 6;
        } else if (protobuf_js_5_common.isProbablyProtobufJs5(value)) {
          protobufjsVersion = 5;
        } else {
          var error_message = 'Could not detect ProtoBuf.js version. Please ' +
              'specify the version number with the "protobufjsVersion" option';
          throw new Error(error_message);
        }
      } else {
        protobufjsVersion = options.protobufjsVersion;
      }
      switch (protobufjsVersion) {
        case 6: return protobuf_js_6_common.loadObject(value, options);
        case 5:
        return protobuf_js_5_common.loadObject(value, options);
        default:
        throw new Error('Unrecognized protobufjsVersion', protobufjsVersion);
      }
    };
    
    1. 在protobuf_js_6_common中的loadObject方法中核心代码其实就一句client.makeClientConstructor(service_attrs),这里只是拿接口定义的json去判断,只要是method那就去建立client对象,如果是nested说明是个中间节点,就是个树形结构迭代的过程
    exports.loadObject = function loadObject(value, options) {
      var result = {};
      if (!value) {
        return value;
      }
      if (value.hasOwnProperty('methods')) {
        // It's a service object
        var service_attrs = getProtobufServiceAttrs(value, options);
        return client.makeClientConstructor(service_attrs);
      }
    
      if (value.hasOwnProperty('nested')) {
        // It's a namespace or root object
        if (value.nested !== null && value.nested !== undefined) {
          var values = Object.keys(value.nested).map(key => value.nested[key]);
          values.forEach(nested => {
            result[nested.name] = loadObject(nested, options);
          });
        }
        return result;
      }
    
      // Otherwise, it's not something we need to change
      return value;
    };
    
    1. 现在我们进入makeClientConstructor函数,这个函数是创建serviceClient的幕后主使,这里其他都不重要,包括Client对象,因为这里大部分的操作只是在不断init新的属性给后续方法使用,主要的步骤其实是在定义serviceClient的prototype也就是foreach中的逻辑,这里啰嗦一句在gRPC提供四种模式:unary,client streaming,server streaming 以及 bidirectional streaming,在foreach中的判断主要是判断该请求方法处于哪种模式,然后将请求函数赋值给method_func接着再对serviceClient的prototype进行一系列的定义(例如methodname这种)
    ...
    Object.keys(methods).forEach(name => {
      const attrs = methods[name];
      if (name.indexOf('$') === 0) {
        throw new Error('Method names cannot start with $');
      }
      var method_type = common.getMethodType(attrs);
      var method_func = function() {
        return requester_funcs[method_type].apply(this,
          [ attrs.path, attrs.requestSerialize, attrs.responseDeserialize ]
          .concat([].slice.call(arguments))
        );
      };
      if (class_options.deprecatedArgumentOrder) {
        ServiceClient.prototype[name] =
          deprecated_request_wrap[method_type](method_func);
      } else {
        ServiceClient.prototype[name] = method_func;
      }
      ServiceClient.prototype.$method_names[attrs.path] = name;
      // Associate all provided attributes with the method
      Object.assign(ServiceClient.prototype[name], attrs);
      if (attrs.originalName) {
        ServiceClient.prototype[attrs.originalName] =
          ServiceClient.prototype[name];
      }
    });
    ...
    
    1. 到第三步为止,这个serverClient链路已经组装完成了,结果如下所示,现在其实可以直接发起请求,当然这时候除了请求他没有任何别的功能,包括拦截层在内的后续步骤必须结合client-proxy(node_modules/@ppcn/node-starter-lib/grpc-client/client-proxy.js)中额外定义的逻辑才能起作用


      image.png
  1. 从这步开始我们来讲讲serviceclient是如何让拦截层interceptors生效的,比如登录的时候,调用loginByCredentialMail方法
 return await credentialServClient.loginByCredentialMail({
   mail: email,
   password: password
 })
  1. 这边的call其实就是调用之前在serviceclient中建立好的loginByCredentialMail方法
       origFunc.call(target, message, options, (err, data) => {
         if (err) {
           reject(new GrpcException(err))
         } else {
           console.log("下一步",data)
           resolve(data)
         }
       })
  1. 然后便进入了我们之前在serviceClient中说的method_func
var method_func = function() {
      return requester_funcs[method_type].apply(this,
        [ attrs.path, attrs.requestSerialize, attrs.responseDeserialize ]
        .concat([].slice.call(arguments))
      );
    };
  1. 经判断该次请求是grpc的unary形式,所以进入了Client.prototype.makeUnaryRequest入口,这个函数逻辑很复杂,这边我们仅仅探讨一下serverClient如何去应用interceptors拦截层的,这里涉及到几个关键函数
...
  var intercepting_call = client_interceptors.getInterceptingCall(
  methodDefinition,
  callOptions,
  interceptors,
  callProperties.channel,
  callProperties.callback
);
...
  intercepting_call.start(callProperties.metadata, last_listener);
  intercepting_call.sendMessage(callProperties.argument);
  intercepting_call.halfClose();  

9.其中getInterceptingCall为主要方法,它的用处是建立一个用于调度拦截层的chain(即_buildChain方法), 在chain中其实组建了一个next迭代器(这里可以从代码明显看出next就是个从interceptor栈出入的过程),之后将这个next迭代器赋值给构造函数InterceptingCall的this.next_call(这个设计思路以后可以借鉴)

  function _buildChain(interceptors, options) {
var next = function(interceptors) {
  if (interceptors.length === 0) {
    return function (options) {};
  }
  var head_interceptor = interceptors[0];
  var rest_interceptors = interceptors.slice(1);
  return function (options) {
    return head_interceptor(options, next(rest_interceptors));
  };
};
var chain = next(interceptors)(options);
return new InterceptingCall(chain);
}
  1. 然后问题就简单了,只要调用了定义在intercepting_call上的start方法,chain便会被调用,从而触发interceptor达到调用拦截层的目的
InterceptingCall.prototype.start = function(metadata, listener) {
var self = this;

// If the listener provided is an InterceptingListener, use it. Otherwise, we
// must be at the end of the listener chain, and any listener operations
// should be terminated in an EndListener.
var next_listener = _getInterceptingListener(listener, new EndListener());

// Build the next method in the interceptor chain
var next = function(metadata, current_listener) {
  // If there is a next call in the chain, run it. Otherwise do nothing.
  if (self.next_call) {
    // Wire together any listener provided with the next listener
    var listener = _getInterceptingListener(current_listener, next_listener);
    self.next_call.start(metadata, listener);
  }
};
this._callNext('start', [metadata, next_listener], next);
};
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,026评论 19 139
  • (目前有点乱,先贴上来,等以后有时间在整理吧。这个问题一直想拿出来分享,还有两个博客,都是相关的,一点点发出来) ...
    kamiSDY阅读 4,469评论 0 2
  • 文章作者:Tyan博客:noahsnail.com 3.4 Dependencies A typical ente...
    SnailTyan阅读 4,220评论 2 7
  • JAVA面试题 1、作用域public,private,protected,以及不写时的区别答:区别如下:作用域 ...
    JA尐白阅读 1,187评论 1 0
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,780评论 18 399