grpc

基本概念

基本组件

  • Stub (打桩, 定义接口层): 起到了客户端的作用,用来发起proto请求和接收proto响应
  • Channel: 在传输层上的抽象, 适合用于做切面(用于拦截, 装饰), 一般用于日志, 监控, 鉴权
  • Transport: 传输层, grpc 有三种实现
    • netty-based: 用于 client 和 server
    • okhttp-based: 一般用于安卓, 且只有 client端
    • in-process: 线程内通信, 用于client 和 server 在同一进程内情况

打桩有2类意思。(来自知乎)

  1. 容易扩展的块。可以是一个宏,一个函数调用,或者数个宏或函数调用的组合。这里的桩是起着辅助的作用,与一般的代码块的不同是这个桩具体做什么不是那么固定,而只是一个块放在这里。类似面向方面编程的横切点,只不过是显式插入的。
  2. 另一个就是做PRC时的一个代理的点,它不真正做事,而是通过PRC或者类似的机制由外部完成真正的工作。

grpc 提供了4中类型的 service 方法

// 简单的rpc调用, 调用完后等待服务端返回
rpc GetFeature(Point) returns (Feature) {}

// 服务端流 : 客户端发送请求后, 服务端返回 Stream, 客户端用它来读取消息序列, 直到没有消息读取完
rpc ListFeatures(Rectangle) returns (stream Feature) {}

// 客户端流: 客户端通过流写入消息序列发送到服务端, 当客户端完成写入, 会等待服务端读取完成并返回
rpc RecordRoute(stream Point) returns (RouteSummary) {}

// 双向流: 客户端和服务端都通过 Stream 发送消息序列, 两端的操作完成独立, 因此客户端和服务端可以读取或写入而不用管是否读取完成, 消息序列在stream中是有序的
// 例如: 服务端可以选择在返回前读取所有数据, 可以使用读取一条写入一条方式, 或者批量读取后返回
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

maven包依赖以及插件

依赖包 和 生成代码插件 可以在 https://github.com/grpc/grpc-java 看到, 将 proto 文件放入 src/main/proto 或 src/test/proto 目录下, 就可以编译

example 实例

构建 grpc 的步骤

  1. 定义 .proto 文件
  2. 生成服务端, 客户端 proto 代码
  3. 使用 grpc api 编写自己的客户端和服务端

生成代码: 这两货都要点, 1个是proto 生成, 一个是 grpc 的

image.png

生成后路径是

image.png

服务端

服务端创建步骤

  1. 使用自己的业务代码覆实现 grpc 中定义的服务
  2. 运行 grpc server 用于监听请求

可以看到生成的代码中包含抽象类 RouteGuideImplBase , 实现具体业务时就可以通过继承来实现, 具体实现

 private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
     ...
 }
  1. 简单调用实现模式
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  // 返回数据
  responseObserver.onNext(checkFeature(request));
  // 标识完成 rpc 连接
  responseObserver.onCompleted();
}
  1. 服务端 Stream 模式: 返回多个数据到客户端, 和 List<?> 发送区别在于 list 只能同一时间发送, Stream 可以在不同时间发送
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  ...

  for (Feature feature : features) {
    ...
    responseObserver.onNext(feature);
  }
  responseObserver.onCompleted();
}
  1. 客户端 Stream 模式: 用于收集一波数据, 汇总计算之类的
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    /** 处理每个业务数据逻辑 */
    @Override
    public void onNext(Point point) {
      ...
    }

    /** 出错处理 */
    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    /** 完成接收后通过 onNext 返回, 并用 onCompleted 完成连接*/
    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  }
}
  1. 双向 grpc 流模式
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

启动服务用于监听连接

// 创建 ServerBuilder
ServerBuilder.forPort(port)
// 注册服务
server = serverBuilder.addService(new RouteGuideService(features)).build();
// 启动服务,并注册钩子
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
    // 等30s 处理完遗留问题后关闭
    try {
      if (server != null) {
        server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
      }
    } catch (InterruptedException e) {
      e.printStackTrace(System.err);
    }
    System.err.println("*** server shut down");
  }
});
// 阻塞直到关闭
server.blockUntilShutdown();

客户端

调用服务端方法, 我们可以使用两种 stubs

  • 阻塞/同步 stub
  • 非阻塞/异步 stub

在创建 stub 之前需要先创建 grpc Channel, 通过指定 地址/端口 来创建

public RouteGuideClient(String host, int port) {
  this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}

public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
  channel = channelBuilder.build();
  blockingStub = RouteGuideGrpc.newBlockingStub(channel);
  asyncStub = RouteGuideGrpc.newStub(channel);
}

然后通过 Channel 创建 stub

//同步
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
//异步
asyncStub = RouteGuideGrpc.newStub(channel);

调用服务端方法

// 简单调用, StatusRuntimeException 包含 Status, 可用于错误消息处理
try {
  feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

// 服务端流, 和简单调用类似
Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

客户端流则稍微复杂一些: 这里客户端会发送一个 Point 的流到服务端, 服务端返回一个 RouteSummary(汇总消息), 这里需要用 异步Stub

// 服务端返回定义
 final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
    // 接收返回消息
    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }
    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }
    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

// 使用异步stub发送
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      // 发送信息
      requestObserver.onNext(point);
      // 稍后发送下一个
      Thread.sleep(rand.nextInt(1000) + 500);
      // 连接已关闭退出
      if (finishLatch.getCount() == 0) {
        return;
      }
    }
} catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
}
// Mark the end of requests
requestObserver.onCompleted();

双向流和客户端流类似, 只是说客户端流服务端返回值后就会执行 onCompleted 通知完成, 可以查看 routeChat方法

尽管每一方总是按照它们被写入的顺序获取另一方的消息,但客户机和服务器都可以以任何顺序读取和写入消息——流完全独立地操作

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容