对grpc双向流的理解

[toc]

对grpc双向流的理解

一. 双向流的实现

1.1 proto

service RouteGuide {
  // 定义一个双向流方法
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

1.2 服务端

  • 自动生成代码部分
// 服务端接口RouteGuideServer
type RouteGuideServer interface {
    RouteChat(RouteGuide_RouteChatServer) error
}
// 发送/接收RouteGuide_RouteChatServer接口
type RouteGuide_RouteChatServer interface {
    Send(*RouteNote) error
    Recv() (*RouteNote, error)
    grpc.ServerStream
}
// RouteGuide_RouteChatServer接口的实现
type routeGuideRouteChatServer struct {
    grpc.ServerStream
}

func (x *routeGuideRouteChatServer) Send(m *RouteNote) error {
    return x.ServerStream.SendMsg(m)
}

func (x *routeGuideRouteChatServer) Recv() (*RouteNote, error) {
    m := new(RouteNote)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}
// 注册服务
func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) {
    s.RegisterService(&_RouteGuide_serviceDesc, srv)
}
  • 深层源码
// 服务注册
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) {
        grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    }
    s.register(sd, ss)
}
// 核心逻辑
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
  // 赋值非流式方法
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
  // 赋值流式方法
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
  // 注册服务
    s.m[sd.ServiceName] = srv
}
  • 调用部分
func main() {
  // 监听,返回listener
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
    grpcServer := grpc.NewServer(opts...)
  // 注册服务
  pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
  // 使用服务  ,grpcServer.Serve(lis)内部调用 rawConn, err := lis.Accept() ->   s.handleRawConn(rawConn)
    grpcServer.Serve(lis)
}
// 实现RouteGuideServer接口
type routeGuideServer struct {
    pb.UnimplementedRouteGuideServer
}


func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
      ...
      // 接收 
      in, err := stream.Recv()
      ...
      // 发送
      err := stream.Send(note)
  }
}

1.3 客户端

  • 自动生成代码部分
// 客户端接口RouteGuideClient
type RouteGuideClient interface {
    RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)
}

// 发送/接收RouteGuide_RouteChatClient
type RouteGuide_RouteChatClient interface {
    Send(*RouteNote) error
    Recv() (*RouteNote, error)
    grpc.ClientStream
}

// 接口RouteGuide_RouteChatClient的实现
type routeGuideRouteChatClient struct {
    grpc.ClientStream
}
// 发送
func (x *routeGuideRouteChatClient) Send(m *RouteNote) error {
    return x.ClientStream.SendMsg(m)
}
// 接收
func (x *routeGuideRouteChatClient) Recv() (*RouteNote, error) {
    m := new(RouteNote)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

// RouteGuideClient接口的实现
type routeGuideClient struct {
    cc grpc.ClientConnInterface
}

// 实现RouteGuideClient接口的实现
func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
    stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[2], "/routeguide.RouteGuide/RouteChat", opts...)
    if err != nil {
        return nil, err
    }
  // 将stream赋值给grpc.ClientStream
    x := &routeGuideRouteChatClient{stream}
    return x, nil
}

// 客户的构造函数
// grpc.ClientConnInterface是一个grpc的连接
func NewRouteGuideClient(cc grpc.ClientConnInterface) RouteGuideClient {
    return &routeGuideClient{cc}
}

  • 客户端调用代码
// grpc拨号并返回socket连接
conn, err := grpc.Dial(*serverAddr, opts...)
// 传入grpc连接,生成服务的客户端
client := pb.NewRouteGuideClient(conn)
// 调用RouteChat返回流
stream, err := client.RouteChat返回流(ctx)
go func() {
        for {
      // 连续读取数据
            in, err := stream.Recv()
        }
}()
for _, note := range notes {
  // 连续发送数据
    stream.Send(note);
}
// 使用完记得关闭
stream.CloseSend()

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。