gRPC 服务类型
一共有4中服务方法:
- 一元RPC
这种最简单,就是客户端发一个请求,服务端返回结果。
- 服务器端流RPC
客户端发一个请求,服务端返回一个流,客户端从流中读取每条信息。
- 客户端流RPC
客户端向服务端发送一个流,服务端读取完数据返回处理结果。
- 双向流RPC
客户端和服务端都可以独立的向对方发送、接收流数据。
gRPC 支持同步和异步调用,同步调用只支持一元RPC、服务器端流RPC,异步调用对这4中方式都支持。
客户端流RPC示例
(1)proto 定义
StreamingExample.proto
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package com.example.server.streaming;
message Metric {
google.protobuf.Timestamp timestamp = 1;
int64 metric = 2;
}
message Average {
double val = 1;
}
service MetricsService {
rpc collect(stream Metric) returns (Average);
}
(2)服务实现
MetricsServiceImpl.java
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
@Override
public StreamObserver<StreamingExample.Metric> collect(StreamObserver<StreamingExample.Average> responseObserver) {
return new StreamObserver<StreamingExample.Metric>() {
private long sum = 0;
private long count = 0;
@Override
public void onNext(StreamingExample.Metric value) {
System.out.println("value: " + value);
sum += value.getMetric();
count++;
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onNext(StreamingExample.Average.newBuilder()
.setVal(sum / count)
.build());
}
};
}
}
(3)server
MetricsServer.java
public class MetricsServer {
public static void main(String[] args) throws InterruptedException, IOException {
Server server = ServerBuilder.forPort(8081).addService(new MetricsServiceImpl()).build();
server.start();
System.out.println("listening 8081 ...");
server.awaitTermination();
}
}
(4)client
MetricsClient.java
public class MetricsClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081).usePlaintext(true).build();
MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
StreamObserver<StreamingExample.Metric> collect = stub.collect(new StreamObserver<StreamingExample.Average>() {
@Override
public void onNext(StreamingExample.Average value) {
System.out.println("Average: " + value.getVal());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
Stream.of(1L, 2L, 3L, 4L).map(l -> StreamingExample.Metric.newBuilder().setMetric(l).build())
.forEach(collect::onNext);
collect.onCompleted();
}
}
小结
仔细看代码会发现一个问题:
我们定义service 时是这么定义的:
rpc collect(stream Metric) returns (Average);
但service 实现中是这么定义的:
StreamObserver<StreamingExample.Metric> collect(StreamObserver<StreamingExample.Average> responseObserver)
...
和service定义是相反的,service定义时,传入参数是 Metric,返回值是 Average,在实现中正好反了。
为了便于理解,可以想象成甩管子,负责接收数据的人把自己的管子甩给发送数据的人。
例如,service 定义的 collect 方法,client 把 Metric 传给 server,就是 server 负责接收数据,那么 server 就要把自己的管子甩给 client,让 client 向管子中放数据,所以,server 创建了一个 StreamObserver<StreamingExample.Metric>
对象返回给 client,client 就可以向其中发送 Metric,也就是实现了 proto 中定义的效果。
同样的,返回值的处理也是一个道理,client 需要接收结果数据,就要把自己的管子甩给 server,所以,client 创建了 StreamObserver<StreamingExample.Average>
,传给了 server,等待 server 往里放结果数据。