基本概念
基本组件
- Stub (打桩, 定义接口层): 起到了客户端的作用,用来发起proto请求和接收proto响应
- Channel: 在传输层上的抽象, 适合用于做切面(用于拦截, 装饰), 一般用于日志, 监控, 鉴权
- Transport: 传输层, grpc 有三种实现
- netty-based: 用于 client 和 server
- okhttp-based: 一般用于安卓, 且只有 client端
- in-process: 线程内通信, 用于client 和 server 在同一进程内情况
打桩有2类意思。(来自知乎)
- 容易扩展的块。可以是一个宏,一个函数调用,或者数个宏或函数调用的组合。这里的桩是起着辅助的作用,与一般的代码块的不同是这个桩具体做什么不是那么固定,而只是一个块放在这里。类似面向方面编程的横切点,只不过是显式插入的。
- 另一个就是做PRC时的一个代理的点,它不真正做事,而是通过PRC或者类似的机制由外部完成真正的工作。
grpc 提供了4中类型的 service 方法
// 简单的rpc调用, 调用完后等待服务端返回
rpc GetFeature(Point) returns (Feature) {}
// 服务端流 : 客户端发送请求后, 服务端返回 Stream, 客户端用它来读取消息序列, 直到没有消息读取完
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// 客户端流: 客户端通过流写入消息序列发送到服务端, 当客户端完成写入, 会等待服务端读取完成并返回
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// 双向流: 客户端和服务端都通过 Stream 发送消息序列, 两端的操作完成独立, 因此客户端和服务端可以读取或写入而不用管是否读取完成, 消息序列在stream中是有序的
// 例如: 服务端可以选择在返回前读取所有数据, 可以使用读取一条写入一条方式, 或者批量读取后返回
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
maven包依赖以及插件
依赖包 和 生成代码插件 可以在 https://github.com/grpc/grpc-java 看到, 将 proto 文件放入 src/main/proto 或 src/test/proto 目录下, 就可以编译
example 实例
构建 grpc 的步骤
- 定义 .proto 文件
- 生成服务端, 客户端 proto 代码
- 使用 grpc api 编写自己的客户端和服务端
生成代码: 这两货都要点, 1个是proto 生成, 一个是 grpc 的
生成后路径是
服务端
服务端创建步骤
- 使用自己的业务代码覆实现 grpc 中定义的服务
- 运行 grpc server 用于监听请求
可以看到生成的代码中包含抽象类 RouteGuideImplBase , 实现具体业务时就可以通过继承来实现, 具体实现
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
- 简单调用实现模式
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
// 返回数据
responseObserver.onNext(checkFeature(request));
// 标识完成 rpc 连接
responseObserver.onCompleted();
}
- 服务端 Stream 模式: 返回多个数据到客户端, 和 List<?> 发送区别在于 list 只能同一时间发送, Stream 可以在不同时间发送
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
for (Feature feature : features) {
...
responseObserver.onNext(feature);
}
responseObserver.onCompleted();
}
- 客户端 Stream 模式: 用于收集一波数据, 汇总计算之类的
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
/** 处理每个业务数据逻辑 */
@Override
public void onNext(Point point) {
...
}
/** 出错处理 */
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
/** 完成接收后通过 onNext 返回, 并用 onCompleted 完成连接*/
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
}
}
- 双向 grpc 流模式
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
启动服务用于监听连接
// 创建 ServerBuilder
ServerBuilder.forPort(port)
// 注册服务
server = serverBuilder.addService(new RouteGuideService(features)).build();
// 启动服务,并注册钩子
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// 等30s 处理完遗留问题后关闭
try {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
// 阻塞直到关闭
server.blockUntilShutdown();
客户端
调用服务端方法, 我们可以使用两种 stubs
- 阻塞/同步 stub
- 非阻塞/异步 stub
在创建 stub 之前需要先创建 grpc Channel, 通过指定 地址/端口 来创建
public RouteGuideClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
然后通过 Channel 创建 stub
//同步
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
//异步
asyncStub = RouteGuideGrpc.newStub(channel);
调用服务端方法
// 简单调用, StatusRuntimeException 包含 Status, 可用于错误消息处理
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
// 服务端流, 和简单调用类似
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
客户端流则稍微复杂一些: 这里客户端会发送一个 Point 的流到服务端, 服务端返回一个 RouteSummary(汇总消息), 这里需要用 异步Stub
// 服务端返回定义
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
// 接收返回消息
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
// 使用异步stub发送
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
// 发送信息
requestObserver.onNext(point);
// 稍后发送下一个
Thread.sleep(rand.nextInt(1000) + 500);
// 连接已关闭退出
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
双向流和客户端流类似, 只是说客户端流服务端返回值后就会执行 onCompleted 通知完成, 可以查看 routeChat方法
尽管每一方总是按照它们被写入的顺序获取另一方的消息,但客户机和服务器都可以以任何顺序读取和写入消息——流完全独立地操作