grpc和consul结合实现分布式rpc调用

# GRPC

> 主要介绍了grpc在使用示例和原理,以及如何与consul结合

## gRPC 是什么?

> gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得我们能够更容易地创建分布式应用和服务。

参考文档:[gRPC Python Quickstart](https://link.juejin.im/?target=https%3A%2F%2Fgrpc.io%2Fdocs%2Fquickstart%2Fpython.html)

开始前确保已经安装grpcio-tools和grpcio这两个包

定义一个GRPC有如下三个步骤:

1. 定义一个消息类型

2. 编译该proto文件

3. 编写服务端代码

4. 编写客户端代码

我们以实现一个echo的grpc为例。

### 定义一个消息类型

首先定义通信双方(即客户端和服务端)交互的消息格式(protobuf消息的格式),然后定义该echo服务

如下:

```proto

syntax = "proto3";  // 声明使用 proto3 语法

//  定义客户端请求的protobuf格式,如下所示,包含一个字符串字段q

message Req {

    string q = 1;

}

//  定义服务端相应的protobuf格式,如下所示,包含一个字符串字段a

message Resp {

    string a = 1;

}

//  定义echo服务,如下所示,该服务包含一个名称为"echo"的rpc

service Echoer{

    rpc echo (Req) returns (Resp) {}

}

```

使用以下命令编译:

```Bash

python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. ./Echoer.proto

```

生成两个py文件

- Echoer_pb2.py 此文件包含生成的 request(Req) 和 response(Resp) 类。

- Echoer_pb2_grpc.py 此文件包含生成的 客户端(EchoerStub)和服务端(EchoerServicer)的类

### 创建服务端代码

创建和运行 Echoer 服务可以分为两个部分:

- 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”的函数。

- 运行一个 gRPC 服务器,监听来自客户端的请求并传输服务的响应。


在当前目录,创建文件 Echoer_server.py,实现一个新的函数:

```Python

from concurrent import futures

import time

import grpc

import Echoer_pb2

import Echoer_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24

class Echoer(Echoer_pb2_grpc.EchoerServicer):

    # 工作函数

    def SayHello(self, request, context):

        return Echoer_pb2.Resp(a="echo")

def serve():

    # gRPC 服务器

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)

    server.add_insecure_port('[::]:50051')

    server.start()  # start() 不会阻塞,如果运行时你的代码没有其它的事情可做,你可能需要循环等待。

    try:

        while True:

            time.sleep(_ONE_DAY_IN_SECONDS)

    except KeyboardInterrupt:

        server.stop(0)

if __name__ == '__main__':

    serve()

```

### 创建客户端代码

在当前目录,打开文件 Echoer_client.py,实现一个新的函数:

```Python

from __future__ import print_function

import grpc

import Echoer_pb2

import Echoer_pb2_grpc

def run():

    channel = grpc.insecure_channel('localhost:50051') # 创建信道

    stub = Echoer_pb2_grpc.EchoerStub(channel) # 通过信道获取凭据,即Stub

    response = stub.echo(Echoer_pb2.Req(q='echo')) # 调用rpc,获取响应

    print("Echoer client received: " + response.a)

if __name__ == '__main__':

    run()

```

运行代码

首先运行服务端代码

```Bash

python Echoer_server.py

```

复制代码

然后运行客户端代码

```bash

python Echoer_client.py

# output

Echoer client received: echo

```

## 进阶

> [点击查看参考博客]( https://blog.codeship.com/exploring-security-metrics-and-error-handling-with-grpc-in-python/)

为了通信安全起见,GRPC提供了TSl\SSL的支持。

### 首先利用openssl创建一个自签名证书

```bash

$ openssl genrsa -out server.key 2048

Generating RSA private key, 2048 bit long modulus (2 primes)

............................................................+++++

................................................................................................................................+++++

e is 65537 (0x010001)

$ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650

You are about to be asked to enter information that will be incorporated

into your certificate request.

What you are about to enter is what is called a Distinguished Name or a DN.

There are quite a few fields but you can leave some blank

For some fields there will be a default value,

If you enter '.', the field will be left blank.

-----

Country Name (2 letter code) [AU]:

State or Province Name (full name) [Some-State]:

Locality Name (eg, city) []:

Organization Name (eg, company) [Internet Widgits Pty Ltd]:

Organizational Unit Name (eg, section) []:

Common Name (e.g. server FQDN or YOUR name) []:Echoer

Email Address []:

```

生成了server.key和server.crt两个文件,服务端两个文件都需要,客户端只需要crt文件

### 修改服务端代码

```Python

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)

# 读取 key and certificate

with open(os.path.join(os.path.split(__file__)[0], 'server.key')) as f:

    private_key = f.read().encode()

with open(os.path.join(os.path.split(__file__)[0], 'server.crt')) as f:

    certificate_chain = f.read().encode()

# 创建 server credentials

server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),))

# 调用add_secure_port方法,而不是add_insesure_port方法

server.add_secure_port('localhost:50051', server_creds)

```

### 修改客户端代码

```Python

# 读取证书

with open('server.crt') as f:

    trusted_certs = f.read().encode()

# 创建 credentials

credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)

# 调用secure_channel方法,而不是insecure_channel方法

channel = grpc.secure_channel('localhost:50051', credentials)

```

启动服务端后,启动客户端,会出现以下错误:

```Bash

grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:

        status = StatusCode.UNAVAILABLE

        details = "Connect Failed"

        debug_error_string = "{"created":"@1547552759.642000000","description":"Failed to create subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":2721,"referenced_errors":[{"created":"@1547552759.642000000","description":"Pick Cancelled","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":241,"referenced_errors":[{"created":"@1547552759.642000000","description":"Connect Failed","file":"src/core/ext/filters/client_channel/subchannel.cc","file_line":689,"grpc_status":14,"referenced_errors":[{"created":"@1547552759.642000000","description":"Peer name localhost is not in peer certificate","file":"src/core/lib/security/security_connector/security_connector.cc","file_line":880}]}]}]}"

>

```

!!! 警告:

这是因为TSL\SSL模式下,客户端是通过服务名称:port来获取服务的凭据,而不是ip:port, 所以对客户端做如下修改:

```Bash

# 修改前

channel = grpc.secure_channel('localhost:50051', credentials)

# 修改后

channel = grpc.secure_channel('Echoer:50051', credentials)

```

!!! 警告:

其次,在TSL\SSL模式下,客户端对服务名称:port解析时候需要dns支持,目前不知道如何解决,只能够采取以下措施解决,通过修改windows的host文件,利用host将服务名称解析为IP地址,

打开windows的host文件,地址:`C:\Windows\System32\drivers\etc\hosts`备份后修改如下,添加:

```Bash

# 服务的IP地址 服务名称

127.0.0.1 Echoer

```

保存即可

修改后,再次运行,即可运行成功

注意事项:CA证书和私钥key都是配套的,不配套的CA证书和key是无法校验成功的

## 结合consul

注意事项:确保consul已经正确启动,查看http://ip:port:8500/, 可查看consul的状态,确保已经安装python-consul这个库,否则无法操作consul

首先想象我们以上的grpc示例程序之所以成功的有限制条件,

- 我们知道服务端已经正常启动

- 我们知道了服务端的ip和端口

但在实际过程中,一般是不可能确切知道服务的ip和端口的,所以consul就起了个中间桥梁的作用,具体如下:

### 服务注册

服务注册,顾名思义,服务在启动之前,必须现在consul中注册。

服务端:当服务端启动之后,consul会利用服务注册时获得的ip和port同服务建立联系,其中最重要的就是health check即心跳检测。consul通过心跳检测来判定该服务是否正常。

客户端:客户端通过consul来查询所需服务的ip和port,若对应服务已经注册且心跳检测正常,则会返回给客户端对应的ip和port信息,然后客户端就可以利用这个来连接服务端了

服务注册示例代码如下:

```Python

def register(self, server_name, ip, port, consul_host=CONSUL_HOST):

    """

    server_name: 服务名称

    ip: 服务IP地址

    port: 服务监听的端口

    consul_host: 所连接的consul服务器的IP地址

    """

    c = consul.Consul(host=consul_host) # 获取与consul的连接

    print(f"开始注册服务{server_name}")

    check = consul.Check.tcp(ip, port, "10s") # 设置心跳检测的超时时间和对应的ip和port端口

    c.agent.service.register(server_name, f"{server_name}-{ip}-{port}", address=ip, port=port, check=check) # 注册

```

既然有服务注册,当然会有服务注销,示例代码如下:

```Python

def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST):

    c = consul.Consul(host=consul_host)

    print(f"开始退出服务{server_name}")

    c.agent.service.deregister(f"{server_name}-{ip}-{port}")

```

### 服务查询

客户端则需要在consul中查询对应服务的IP和port,但由于在TSL/SSL模式下,所需的只是服务名称和port,故而只需要查询port端口即可。

客户端服务查询采用的是DNS的查询方式,必须确保安装dnspython库,用于创建DNS查询

服务查询示例代码如下:

```Python

# 创建一个consul dns查询的 resolver

consul_resolver = resolver.Resolver()

consul_resolver.port = 8600

consul_resolver.nameservers = [consul_host]

def get_host_port(self, server_name):

    try:

        dns_answer_srv = consul_resolver.query(f"{server_name}.service.consul", "SRV") # 查询对应服务的port,

    except DNSException as e:

        return None, None

    return server_name, dns_answer_srv[0].port # 返回服务名和端口

```

## grpc流模式

grpc总共提供了四种数据交互模式:

- simpe 简单模式 RPC:即上述的所有的grpc

- server-side streaming 服务端流式 RPC

- client-side streaming 客户端流式 RPC

- Bidirectional streaming 双向数据流模式的 gRPC

由于grpc对于消息有大小限制,diff数据过大会导致无法接收数据,我们在使用过程中,使用了流模式来解决了此类问题,

在此模式下,客户端传入的参数由具体的protobuf变为了protobuf的迭代器,客户端接收的响应也变为了迭代器,获取完整的响应则需要迭代获取。

服务端响应也变为了一个迭代器。

### 修改服务定义文件:

```proto

# 修改前

service Echoer{

    rpc echo (Req) returns (Resp) {}

}

# 修改后

service Echoer{

    rpc echo (stream Req) returns (stream Resp) {}

}

```

重新编译

### 修改服务端

将工作函数修改为如下所示, 即工作函数变成了一个迭代器:

```Python

def echo(self, request_iterator, context):

    for i in range(10):

        yield Echoer_pb2.Resp(a="echo")

```

### 修改客户端

将echo的传入参数修改为迭代器:

```Python

def qq():

    for i in range(10):

        yield Echoer_pb2.Req(q="echo")

response = stub.echo(qq())

for resp in response:

    print("Echoer client received: " + response.a)

```

重新运行,接收结果如下:

```bash

$ python Echoer_client.py

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

Echoer client received: echo

```

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容