python grpc拦截器

概述

python grpc拦截器的proposal:https://github.com/grpc/proposal/blob/master/L13-python-interceptors.md#server-side-implementation
python grpc拦截器的官方文档,除了API Reference, 就只有这个了,再次吐槽下grpc的官方文档
通过这个proposal来了解拦截器的使用方法,还是有些费劲

python grpc拦截器设计上,要求每个自定义拦截器要负责调用下一个拦截器,调用下一个拦截器的方法称作continuation, eg:

class _ExampleClientInterceptor(grpc.UnaryUnaryClientInterceptor):

    def intercept_unary_unary(self, continuation, client_call_details, request):
        new_details, new_request = <Pre Process Logic>
        
        # 留意这里的调用,没有这个调用,后面的拦截器以及真正的rpc函数都不会执行了
        response = continuation(new_details, new_request)

        new_response = <Post Process Logic>
        return new_response  # It's both a grpc.Call and a grpc.Future.

拦截器包括客户端实现和服务端实现

client端拦截器

使用示例:

class ExampleClientInterceptor(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        metadata = []
        if client_call_details.metadata is not None:
            metadata = list(client_call_details.metadata)
        metadata.append((
            'x-custom-1',
            '123',
        ))
        client_call_details = _ClientCallDetails(
            client_call_details.method, client_call_details.timeout, metadata,
            client_call_details.credentials, client_call_details.wait_for_ready,
            client_call_details.compression)
        return continuation(client_call_details, request)

with grpc.insecure_channel('localhost:50051') as channel:
    channel = grpc.intercept_channel(channel, ExampleClientInterceptor())
    stub = helloworld_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)

客户端拦截器通过包装channel来实现, continuation封装了对原函数的调用,client_call_details就是把grpc调用的参数封装为一个对象逐层传递, 如下所示:
grpc函数的调用参数:

    def __call__(self,
                 request,
                 timeout=None,
                 metadata=None,
                 credentials=None,
                 wait_for_ready=None,
                 compression=None):
        state, call, = self._blocking(request, timeout, metadata, credentials,
                                      wait_for_ready, compression)
        return _end_unary_response_blocking(state, call, False, None)

client_call_details的封装:

 def _with_call(self,
                   request,
                   timeout=None,
                   metadata=None,
                   credentials=None,
                   wait_for_ready=None,
                   compression=None):
        client_call_details = _ClientCallDetails(self._method, timeout,
                                                 metadata, credentials,
                                                 wait_for_ready, compression)

        def continuation(new_details, request):
            (new_method, new_timeout, new_metadata, new_credentials,
             new_wait_for_ready,
             new_compression) = (_unwrap_client_call_details(
                 new_details, client_call_details))
            try:
                response, call = self._thunk(new_method).with_call(
                    request,
                    timeout=new_timeout,
                    metadata=new_metadata,
                    credentials=new_credentials,
                    wait_for_ready=new_wait_for_ready,
                    compression=new_compression)
                return _UnaryOutcome(response, call)
            except grpc.RpcError as rpc_error:
                return rpc_error
            except Exception as exception:  # pylint:disable=broad-except
                return _FailureOutcome(exception, sys.exc_info()[2])

        call = self._interceptor.intercept_unary_unary(continuation,
                                                       client_call_details,
                                                       request)
        return call.result(), call

客户端拦截器的执行流程

Application Invokes an RPC ->
    Interceptor A Start ->
        Interceptor B Start ->
            Interceptor C Start ->
                Invoke Original '*MultiCallable' ->
                Return the Response from the Server ->
            Interceptor C Returns ->
        Interceptor B Returns ->
    Interceptor A Returns ->
Application Gets the Response

客户端拦截器有如下4种:

class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    @abc.abstractmethod
    def intercept_unary_unary(self, continuation, client_call_details,
                              request): pass

class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    @abc.abstractmethod
    def intercept_unary_stream(self, continuation, client_call_details,
                               request): pass

class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    @abc.abstractmethod
    def intercept_stream_unary(self, continuation, client_call_details,
                               request_iterator): pass

class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    @abc.abstractmethod
    def intercept_stream_stream(self, continuation, client_call_details,
                                request_iterator): pass

sever端拦截器

服务端拦截器的执行流程:

Server Receives a Request ->
    Interceptor A Start ->
        Interceptor B Start ->
            Interceptor C Start ->
                The Original Handler
            Interceptor C Returns Updated Handler C ->
        Interceptor B Returns Updated Handler B ->
    Interceptor A Returns Updated Handler A ->

    Invoke the Updated Handler A with the Request ->
    Updated Handler A Returns Response ->
Server Replies

这里留意服务端拦截器的执行流程和客户端是不一样的,拦截器的逻辑执行完毕之后,实际返的是一个grpc.RpcMethodHandler, 再下一步才是拿这个grpc.RpcMethodHandler执行调用,返回Response

拦截器的定义如下:

class ServerInterceptor(six.with_metaclass(abc.ABCMeta)):
    @abc.abstractmethod
    def intercept_service(self, continuation, handler_call_details): 
        pass

这里的handler_call_details是一个grpc._server.HandlerCallDetails的实例, eg:

_HandlerCallDetails(method=u'/helloworld.Greeter/SayHello', invocation_metadata=(_Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (osx; chttp2)'),))

和如下的grpc的方法中的context不是同一个东西

class GreeterServicer(object):
  """The greeting service definition.
  """

  def SayHello(self, request, context):
    """Sends a greeting
    """
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

所以在拦截器的intercept_service方法中,无法访问request和context,如果拦截器中要求终止请求返回错误,实现方法如下:

class _ExampleServerInterceptor(grpc.ServerInterceptor):

    def intercept_service(self, continuation, handler_call_details):
        auth_metadata = ('custom-auth-header', 'secret-key')
        if auth_metadata in handler_call_details.invocation_metadata:
            return continuation(handler_call_details)
        else:
            return grpc.unary_unary_rpc_method_handler(
                lambda request, context: context.abort(<code>, <details>))

第三方库grpc-interceptor

git地址: https://github.com/d5h-foss/grpc-interceptor

从上面的介绍可以看到grpc的interceptor使用起来比较繁琐,且server端interceptor无法访问request、response和context,grpc-interceptor的目标是:Simplified Python gRPC interceptors

服务端例子:

from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException

class ExceptionToStatusInterceptor(ServerInterceptor):
    def intercept(
        self,
        method: Callable,
        request: Any,
        context: grpc.ServicerContext,
        method_name: str,
    ) -> Any:
        """Override this method to implement a custom interceptor.
         You should call method(request, context) to invoke the
         next handler (either the RPC method implementation, or the
         next interceptor in the list).
         Args:
             method: The next interceptor, or method implementation.
             request: The RPC request, as a protobuf message.
             context: The ServicerContext pass by gRPC to the service.
             method_name: A string of the form
                 "/protobuf.package.Service/Method"
         Returns:
             This should generally return the result of
             method(request, context), which is typically the RPC
             method response, as a protobuf message. The interceptor
             is free to modify this in some way, however.
         """
        try:
            return method(request, context)
        except GrpcException as e:
            context.set_code(e.status_code)
            context.set_details(e.details)
            raise
            
interceptors = [ExceptionToStatusInterceptor()]
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=interceptors
)

客户端例子:

from grpc_interceptor import ClientCallDetails, ClientInterceptor

class MetadataClientInterceptor(ClientInterceptor):

    def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: grpc.ClientCallDetails,
    ):
        """Override this method to implement a custom interceptor.

        This method is called for all unary and streaming RPCs. The interceptor
        implementation should call `method` using a `grpc.ClientCallDetails` and the
        `request_or_iterator` object as parameters. The `request_or_iterator`
        parameter may be type checked to determine if this is a singluar request
        for unary RPCs or an iterator for client-streaming or client-server streaming
        RPCs.

        Args:
            method: A function that proceeds with the invocation by executing the next
                interceptor in the chain or invoking the actual RPC on the underlying
                channel.
            request_or_iterator: RPC request message or iterator of request messages
                for streaming requests.
            call_details: Describes an RPC to be invoked.

        Returns:
            The type of the return should match the type of the return value received
            by calling `method`. This is an object that is both a
            `Call <https://grpc.github.io/grpc/python/grpc.html#grpc.Call>`_ for the
            RPC and a `Future <https://grpc.github.io/grpc/python/grpc.html#grpc.Future>`_.

            The actual result from the RPC can be got by calling `.result()` on the
            value returned from `method`.
        """
        new_details = ClientCallDetails(
            call_details.method,
            call_details.timeout,
            [("authorization", "Bearer mysecrettoken")],
            call_details.credentials,
            call_details.wait_for_ready,
            call_details.compression,
        )

        return method(request_or_iterator, new_details)
        
interceptors = [MetadataClientInterceptor()]
with grpc.insecure_channel("grpc-server:50051") as channel:
    channel = grpc.intercept_channel(channel, *interceptors)
    ...

参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,252评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,886评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,814评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,869评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,888评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,475评论 1 312
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,010评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,924评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,469评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,552评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,680评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,362评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,037评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,519评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,621评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,099评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,691评论 2 361

推荐阅读更多精彩内容