gRPC java 示例 - client 端 stream

gRPC 服务类型

一共有4中服务方法:

  • 一元RPC

这种最简单,就是客户端发一个请求,服务端返回结果。

  • 服务器端流RPC

客户端发一个请求,服务端返回一个流,客户端从流中读取每条信息。

  • 客户端流RPC

客户端向服务端发送一个流,服务端读取完数据返回处理结果。

  • 双向流RPC

客户端和服务端都可以独立的向对方发送、接收流数据。

gRPC 支持同步和异步调用,同步调用只支持一元RPC服务器端流RPC异步调用对这4中方式都支持。

客户端流RPC示例

(1)proto 定义

StreamingExample.proto

syntax = "proto3";

import "google/protobuf/timestamp.proto";
package com.example.server.streaming;

message Metric {
    google.protobuf.Timestamp timestamp = 1;
    int64 metric = 2;
}

message Average {
    double val = 1;
}

service MetricsService {
    rpc collect(stream Metric) returns (Average);
}

(2)服务实现

MetricsServiceImpl.java

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
  @Override
  public StreamObserver<StreamingExample.Metric> collect(StreamObserver<StreamingExample.Average> responseObserver) {
    return new StreamObserver<StreamingExample.Metric>() {
      private long sum = 0;
      private long count = 0;

      @Override
      public void onNext(StreamingExample.Metric value) {
        System.out.println("value: " + value);
        sum += value.getMetric();
        count++;
      }

      @Override
      public void onError(Throwable t) {
        responseObserver.onError(t);
      }

      @Override
      public void onCompleted() {
        responseObserver.onNext(StreamingExample.Average.newBuilder()
            .setVal(sum / count)
            .build());
      }
    };
  }
}

(3)server

MetricsServer.java

public class MetricsServer {
  public static void main(String[] args) throws InterruptedException, IOException {
    Server server = ServerBuilder.forPort(8081).addService(new MetricsServiceImpl()).build();

    server.start();
    
    System.out.println("listening 8081 ...");

    server.awaitTermination();
  }
}

(4)client

MetricsClient.java

public class MetricsClient {
  public static void main(String[] args) throws InterruptedException {
    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081).usePlaintext(true).build();
    MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);

    StreamObserver<StreamingExample.Metric> collect = stub.collect(new StreamObserver<StreamingExample.Average>() {
      @Override
      public void onNext(StreamingExample.Average value) {
        System.out.println("Average: " + value.getVal());
      }

      @Override
      public void onError(Throwable t) {

      }

      @Override
      public void onCompleted() {

      }
    });

    Stream.of(1L, 2L, 3L, 4L).map(l -> StreamingExample.Metric.newBuilder().setMetric(l).build())
        .forEach(collect::onNext);
    
    collect.onCompleted();
  }
}

小结

仔细看代码会发现一个问题:

我们定义service 时是这么定义的:

rpc collect(stream Metric) returns (Average);

但service 实现中是这么定义的:

StreamObserver<StreamingExample.Metric> collect(StreamObserver<StreamingExample.Average> responseObserver)
...

和service定义是相反的,service定义时,传入参数是 Metric,返回值是 Average,在实现中正好反了。

为了便于理解,可以想象成甩管子,负责接收数据的人把自己的管子甩给发送数据的人。

例如,service 定义的 collect 方法,client 把 Metric 传给 server,就是 server 负责接收数据,那么 server 就要把自己的管子甩给 client,让 client 向管子中放数据,所以,server 创建了一个 StreamObserver<StreamingExample.Metric> 对象返回给 client,client 就可以向其中发送 Metric,也就是实现了 proto 中定义的效果。

同样的,返回值的处理也是一个道理,client 需要接收结果数据,就要把自己的管子甩给 server,所以,client 创建了 StreamObserver<StreamingExample.Average>,传给了 server,等待 server 往里放结果数据。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容