昨晚写完 151. 【go 语言】gRPC 环境搭建(二) 后,今早爬起来继续踩坑,发现事情并不简单,版本更新太快,部分文档的滞后性相当明显,不过都已经解决了,相关内容,已经在昨晚那篇新增了,第一次看的话,直接跟着走就行。
好了,今天从继续往后面探索。
gRPC 的调用方式简介
- Unary RPC:一元 RPC。
- Server-side streaming RPC:服务端流式 RPC。
- Client-side streaming RPC:客户端流式 RPC。
- Bidirectional streaming RPC:双向流式 RPC。
1. Unary RPC:一元 RPC
一元 RPC 也称为单次 RPC,简单来讲,就是客户单发起一次普通的 RPC 请求,是最基础的调用,也是最常用的方式,如下图所示,
服务端代码如下,
package main
import (
"context"
"google.golang.org/grpc"
"net"
pb "project_protoc/proto/helloworld"
)
type GreeterServer struct {
pb.UnimplementedGreeterServer
}
func (g GreeterServer) SayHello(ctx context.Context, request *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "hello world, QiJ"}, nil
}
func main() {
server := grpc.NewServer()
pb.RegisterGreeterServer(server, &GreeterServer{})
lis, _ := net.Listen("tcp", ":"+"2021")
server.Serve(lis)
}
代码说明,
- 创建 gRPC Server 对象,可以把它理解为 Server 端的抽象对象。
- 将 GreeterServer(其包含需要被调用的服务端接口)注册到 gRPC Server 的内部注册中心,这样在接收请求时,即可通过内部的“服务发现”发现该服务端接口,并进行逻辑处理。
- 创建 Listen,监听 TCP 端口。
- gRPC Server 开始 lis.Accept,直到 Stop 或 GracefulStop。
Cannot use '&GreeterServer{}' (type *GreeterServer) as type GreeterServer Type does not implement 'GreeterServer' as some methods are missing: mustEmbedUnimplementedGreeterServer()
客户端代码如下,
package main
import (
"context"
"google.golang.org/grpc"
"log"
pb "project_protoc/proto/helloworld"
)
func main() {
conn, _ := grpc.Dial(":"+"2021", grpc.WithInsecure())
defer conn.Close()
client := pb.NewGreeterClient(conn)
_ = SayHello(client)
}
func SayHello(client pb.GreeterClient) error {
resp, _ := client.SayHello(context.Background(), &pb.HelloRequest{Name: "七镜"})
log.Printf("client.SayHello resp: %s", resp.Message)
return nil
}
代码说明,
- 创建与给定目标(服务端)的连接句柄。
- 创建 Greeter 的客户端对象。
- 发送 RPC 请求,等待同步响应,得到回调后返回响应结果。
二、Service-side streaming RPC:服务端流式 RPC
服务端流式 RPC 是一个单向流,指 Server 为 Stream、Client 为普通的一元 RPC 请求。
简单来讲,就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集,如下图所示,
proto 文件调整如下,
...
service Greeter {
...
rpc SayList (HelloRequest) returns (stream HelloReply) {};
...
}
...
意思是新增一个 SayList 服务。加完之后,继续执行下述命令,重新生成 *.pb.go 和 *_grpc.pb.go 。
protoc -I=. --go_out=. --go-grpc_out=. ./proto/*.proto
服务端代码调整如下,
...
func (s GreeterServer) SayList(r *pb.HelloRequest, stream pb.Greeter_SayListServer) error {
for n := 0; n <= 6; n++ {
_ = stream.Send(&pb.HelloReply{Message: "hello list, QiJ"})
}
return nil
}
...
在 Server 端,需重点留意 stream.Send 方法。通过阅读源码,可以得知 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调用内部的 SendMsg 方法,改方法涉及以下流程:
- 消息体(对象)序列化。
- 压缩序列化后的消息体。
- 为正在传输的消息体增加 5 字节的 header(标志位)。
- 判断压缩 + 序列化后的消息体总字节长度是否大于预设的 maxStreamMessageSize(预设值为 math.MaxInt32),若超出则提示错误。
- 写入流的数据集。
客户端代码调整如下,
...
func SayList(client pb.GreeterClient, r *pb.HelloRequest) error {
stream, _ := client.SayList(context.Background(), r)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: %v", resp.Message)
}
return nil
}
...
在 Client 端,需重点留意 stream.Recv 方法,并思考一下,在什么情况下会出现 io.EOF,又在什么情况下会出现错误信息呢?实际上,stream.Recv 方法是对 ClientStream.RecvMsg 方法的封装,而 RecvMsg 方法会从流中读取完整的 gRPC 消息体,有次可以得知:
- RecvMsg 是阻塞等待的。
- 当流成功或结束(调用了 Close)是,RecvMsg 会返回 io.EOF。
- 当流出现任何错误时,流都会被中止。错误信息中会包含 RPC 错误码。RecvMsg 中可能出现的错误如下,
- io.EOF、io.ErrUnexpectedEOF。
- transport.ConnectionError。
- google.golang.org/grpc/codes(gRPC 的预定义错误码)
需要注意的是,默认的 MaxReceiveMessageSize 值为 1024x1024x4。若有特殊需求,则可以适当调整。
三、 Client-side streaming RPC:客户端流式 RPC
客户端流式 RPC 是一个单向流,客户端通过流式发起多次 RPC 请求给服务端,而服务端又发起一次响应给客户端,如下图所示,
proto 文件调整如下
...
rpc SayRecord (stream HelloRequest) returns (HelloReply) {};
...
再次执行下述命令,重新生成 *.pb.go 和 *_grpc.pb.go
protoc -I=. --go_out=. --go-grpc_out=. ./proto/*.proto
服务端代码调整如下
...
func (s *GreeterServer) SayRecord(stream pb.Greeter_SayRecordServer) error {
for {
resp, err := stream.Recv()
if err == io.EOF {
message := &pb.HelloReply{Message: "hello record, QiJ"}
return stream.SendAndClose(message)
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
}
...
上述代码中,我们对每一个 Recv 都进行了处理。当 io.EOF(流关闭)后,需要通过 stream.SendAndClose 方法将最终的响应结果发送给客户端,同时关闭在另外一侧等待的 Recv。
客户端代码调整如下,
...
func SayRecord(client pb.GreeterClient, r *pb.HelloRequest) error {
stream, _ := client.SayRecord(context.Background())
for n := 0; n < 6; n++ {
_ = stream.Send(r)
}
resp, _ := stream.CloseAndRecv()
log.Printf("client.SayRecord resp: %v", resp.Message)
return nil
}
...
Server 端的 stream.SendAndClose 方法与 Client 端的 stream.CloseAndRecv 方法是配套使用的。
四、Bidirectional streaming RPC:双向流式 RPC
双向流式 RPC,顾名思义是双向流,由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求。
首个请求一定是客户端发起的,但具体的交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)则由程序编写的方式来确定(可以结合协程)。
假设双向流是按顺序发送的,则大致如下图所示,
proto 文件调整如下
...
rpc SayRoute (stream HelloRequest) returns (stream HelloReply) {};
...
再次执行下述命令,重新生成 *.pb.go 和 *_grpc.pb.go
protoc -I=. --go_out=. --go-grpc_out=. ./proto/*.proto
服务端代码调整如下,
...
func (s *GreeterServer) SayRoute(stream pb.Greeter_SayRouteServer) error {
n := 0
for {
_ = stream.Send(&pb.HelloReply{Message: "hello route"})
resp, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++
log.Printf("server.SayRoute resp: %v", resp)
}
}
...
客户端代码调整如下,
...
func SayRoute(client pb.GreeterClient, r *pb.HelloRequest) error {
stream, _ := client.SayRoute(context.Background())
for n := 0; n <= 6; n++ {
_ = stream.Send(r)
resp, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("client.SayRoute resp: %v", resp.Message)
}
_ = stream.CloseSend()
return nil
}
...
四种 gRPC 调用方式的结果展示汇总
Client 端:
Server 端:
项目工程下载
下载地址:点击下载