gRPC(2):四种基本通信模式

gRPC(1):入门及简单使用(go) 中,我们实现了一个简单的 gRPC 应用程序,其中双方通信是简单的请求—响应模式,没发出一个请求都会得到一个响应,然而,借助 gRPC 可以实现不同的通信模式,这里介绍四种 gRPC 应用程序的基础通信模式:一元RPC、服务端流RPC、客户端流RPC、双向流RPC

1、一元RPC

一元 RPC 也被称为简单 RPC, 其实就是 gRPC(1):入门及简单使用(go) 中实现的请求—响应模式,每调用一次得到一个结果,这里再以一个简单的订单管理程序做说明,实现两个服务:addOrder 用于添加订单;getOrder 用于根据 id 获取订单:

  • 服务定义
syntax = "proto3";
package proto;
option go_package = "./proto";

service OrderManagement {
    rpc addOrder(Order) returns (StringValue);
    rpc getOrder(StringValue) returns (Order);
}

message Order {
    string id = 1;
    repeated string items = 2;    // repeated 表示列表
    string description = 3;
    float price = 4;
    string destination = 5;
}

message StringValue {
    string value = 1;
}
  • 服务端实现
package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "strings"

    pb "order/proto"

    "github.com/gofrs/uuid"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

const (
    port = ":50051"
)

type server struct {
    pb.UnimplementedOrderManagementServer
}

// 模拟存储
var orderMap = make(map[string]*pb.Order)

func (s *server) AddOrder(ctx context.Context, order *pb.Order) (*pb.StringValue, error) {
    id, err := uuid.NewV4()
    if err != nil {
        return nil, status.Errorf(codes.Internal, "Error while generating Product ID", err)
    }
    order.Id = id.String()
    orderMap[order.Id] = order
    log.Printf("Order %v : %v - Added.", order.Id, order.Description)
    return &pb.StringValue{Value: order.Id}, nil
}

func (s *server) GetOrder(ctx context.Context, orderID *pb.StringValue) (*pb.Order, error) {
    order, exists := orderMap[orderID.Value]
    if exists && order != nil {
        log.Printf("Order %v : %v - Retrieved.", order.Id, order.Description)
        return order, nil
    }
    return nil, status.Errorf(codes.NotFound, "Order does not exist.", orderID.Value)
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterOrderManagementServer(s, &server{})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
  • 客户端实现
package main

import (
    "context"
    "io"
    "log"
    "time"

    pb "order/proto"

    "google.golang.org/grpc"
)

const (
    address = "localhost:50051"
)

func main() {
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    c := pb.NewOrderManagementClient(conn)

    orderID, err := c.AddOrder(context.Background(),
        &pb.Order{
            Items:       []string{"XiaoMI 11"},
            Description: "XiaoMI 11",
            Price:       3999,
            Destination: "suzhou",
        })
    if err != nil {
        log.Fatalf("could not add order: %v", err)
    }
    log.Printf("Added order: %v", orderID.Value)
}

2、服务端流RPC

与一元 RPC 不同的是,流模式下响应或者请求都可以是一个序列,这个序列也被称为”流“,服务端流 RPC 下,客户端发出一个请求,但不会立即得到一个响应,而是在服务端与客户端之间建立一个单向的流,服务端可以随时向流中写入多个响应消息,最后主动关闭流,而客户端需要监听这个流,不断获取响应直到流关闭

下面以一个简单的关键词搜索功能为例,客户端发送关键字,服务端进行匹配,每找到一个就写进流中,在之前的基础上添加代码:

  • 服务定义
service OrderManagement {
    ...
    // stream 将返回参数指定为订单流
    rpc searchOrders(StringValue) returns (stream Order);
}
  • 服务端实现
func (s *server) SearchOrders(searchQuery *pb.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
    for key, order := range orderMap {
        for _, item := range order.Items {
            if strings.Contains(item, searchQuery.Value) {
                err := stream.Send(&order)
                if err != nil {
                    return fmt.Errorf("error sending message to stream: %v", err)
                }
                log.Printf("order found: " + key)
                break
            }
        }
    }
    return nil
}
  • 客户端实现
...
// 获得建立的流对象
stream, err := c.SearchOrders(context.Background(), &pb.StringValue{Value: "XiaoMI"})
if err != nil {
    log.Fatalf("search error: %v", err)
}
for {
    // 循环读取
    order, err := stream.Recv()
    if err == io.EOF {
        log.Print("EOF")
        break
    }
    if err != nil {
        log.Fatal("error: ", err)
    }
    log.Print(order)
}

3、客户端流RPC

客户端流,和服务端流一样的道理,只不过流的方向变为从客户端到服务端,可以发送多条响应,服务端只会响应一次,但何时响应取决于服务端的逻辑,以更新订单序列为例,客户端可以发送一系列订单,服务端可以选择在任意时候停止读取并发送响应:

  • 服务定义
service OrderManagement {
    ...
    rpc updateOrders(stream Order) returns (StringValue);
}
  • 服务端实现
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
    for {
        order, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StringValue{Value: "finished"})
        }
        if err != nil {
            return err
        }
        orderMap[order.Id] = order
        log.Print("OrderID " + order.Id + " updated")
    }
}
  • 客户端实现
// 取得流
updateStream, err := c.UpdateOrders(context.Background())
if err != nil {
    log.Fatalf("update err: %v", err)
}
// 发送 Order1
if err = updateStream.Send(&pb.Order{
    Id:          "1",
    Items:       []string{"Huawei P50"},
    Description: "Huawei P50",
    Price:       5999,
    Destination: "suzhou",
}); err != nil {
    log.Fatalf("send error: %v", err)
}
// 发送 Order2
if err = updateStream.Send(&pb.Order{
    Id:          "2",
    Items:       []string{"iphone 12"},
    Description: "iphone 12",
    Price:       8999,
    Destination: "suzhou",
}); err != nil {
    log.Fatalf("send error: %v", err)
}
...
// 关闭流,结束发送
updateRes, err := updateStream.CloseAndRecv()
if err != nil {
    log.Fatalf("update stream close error: %v", err)
}
log.Printf("update res: %v", updateRes)

4、双向流RPC

双向流,顾名思义,由客户端发起调用后,将建立起双向的流,在这之后,通信将完全基于双方的应用逻辑,流的操作完全独立,客户端和服务端可以按照任意顺序进行读取和写入,以一个订单筛选过程为例,客户端发送一串订单 ID 序列,服务端进行检查,每遇到一个有效的 ID 就写入流中响应:

  • 服务定义
service OrderManagement {
    ...
    rpc processOrders(stream StringValue) returns (stream StringValue);
}
  • 服务端实现
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
    for {
        orderId, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        order, exists := orderMap[orderId.Value]
        if exists && order != nil {
            stream.Send(&pb.StringValue{Value: order.Id})
        }
    }
}
  • 客户端实现
...
// 取得双向流
processStream, err := c.ProcessOrders(context.Background())
// 同步channel,防止主程序提前退出
waitc := make(chan struct{})
// 双向流是完全异步的,开一个协程用于读取响应
go func() {
    for {
        orderId, err := processStream.Recv()
        if err == io.EOF {
            close(waitc)
            return
        }
        if err != nil {
            log.Fatalf("recv error: %v", err)
        }
        log.Print("recv " + orderId.Value)
    }
}()
// 请求
if err = processStream.Send(&pb.StringValue{Value: "1"}); err != nil {
    log.Fatalf("1 send error: %v", err)
}
if err = processStream.Send(&pb.StringValue{Value: "2"}); err != nil {
    log.Fatalf("2 send error: %v", err)
}
if err = processStream.Send(&pb.StringValue{Value: "3"}); err != nil {
    log.Fatalf("3 send error: %v", err)
}
if err = processStream.CloseSend(); err != nil {
    log.Fatal(err)
}
// 等待读取结束
<-waitc

这就是 gRPC 中主要的四种通信模式,基于它们可以实现各种 gRPC 场景下的交互,至于选择哪种,还需根据具体的场景考虑

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355