GRPC 介绍和使用示例

1. 简介

RPC 远程调用,可以调用另一个服务中的方法,或者做数据传输。
GRPC是Google 推出的RPC框架。并且支持多种语言。
本例已java为例。

2. proto files

Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。
在maven中,使用Protobuf插件可以将.proto文件编译成相应的java代码。

3. GRPC的几种模式

3.1 GRPC的几种模式

单项 RPC

客户端发出单个请求,获得单个响应。客户端调用服务端的某个方法。客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。

服务端流式 RPC

客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。个人认为当有客户端需要主动从服务端读取数据的时候可以用。

客户端流式 RPC

客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。个人认为应该是客户端需要把数据发送给服务端的时候使用。

双向流式 RPC

是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。

3.2 几种模式对应的proto写法

不同的模式对应的proto文件稍有不同。

单项 RPC

 这个表示调用服务端SayHello方法,客户端需要传递一个HelloRequest,然后服务端返回一个HelloReply,HelloRequest和HelloReply也需要在proto文件中定义出来,后文会有描述。
 rpc SayHello (HelloRequest) returns (HelloReply) {}

客户端流式 RPC

通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}

服务端流式 RPC

通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}

双向流式 RPC

你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 简单RPC helloworld示例

4.1 需要创建一个maven项目,导入依赖和maven插件

依赖:

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-all</artifactId>
            <version>1.21.0</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.7.1</version>
        </dependency>

插件:

<build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <!--
                      The version of protoc must match protobuf-java. If you don't depend on
                      protobuf-java directly, you will be transitively depending on the
                      protobuf-java version that grpc depends on.
                    -->
                    <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.21.0:exe:${os.detected.classifier}</pluginArtifact>
                    <!--<protocExecutable>G:\Users\zl\Desktop\protoc-3.0.0-win32\bin\protoc</protocExecutable>-->
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

ps:如果是一个空项目,只要网络正常,应该不会有什么问题,但是如果是在本身已经比较复杂的项目中添加这些依赖和插件,可能会出现版本冲突,导致最终运行的时候,报各种错误。如果出现了,可以查看grpc自己有依赖了什么,这些依赖在别的模块中是不是也有,以及他们的版本是什么。

4.2 编写.proto文件,并生成java代码

4.2.1 编写.proto文件

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
proto文件存放位置,不要乱放

4.2.2 并生成java代码

点击如图两个地方
生成代码位置

生成的代码是根据proto文件来的,不同的proto文件会生成不同的代码

4.3 编写服务端和客户端代码

4.3.1 服务端

package io.grpc.examples.helloworld;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.logging.Logger;

/**
 * Server that manages startup/shutdown of a {@code Greeter} server.
 */
public class HelloWorldServer {
  private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());

  private Server server;

  private void start() throws IOException {
    /* The port on which the server should run */
    int port = 50051;
    server = ServerBuilder.forPort(port)
        .addService(new GreeterImpl())
        .build()
        .start();
    logger.info("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        // Use stderr here since the logger may have been reset by its JVM shutdown hook.
        System.err.println("*** shutting down gRPC server since JVM is shutting down");
        HelloWorldServer.this.stop();
        System.err.println("*** server shut down");
      }
    });
  }

  private void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

  /**
   * Await termination on the main thread since the grpc library uses daemon threads.
   */
  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  /**
   * Main launches the server from the command line.
   */
  public static void main(String[] args) throws IOException, InterruptedException {
    final HelloWorldServer server = new HelloWorldServer();
    server.start();
    server.blockUntilShutdown();
  }

  static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

    @Override
    public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
      HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }
}

服务端代码中有一个内部类继承GreeterGrpc.GreeterImplBase,在构建server对象时,将这个内部类addservice进去,它里面有个sayhello方法的回调,当服务端收到客户端的调用时,就会回调这个方法。
然后服务端就可以在sayhello方法里做需要的操作,完成之后构建一个返回的对象HelloReply,然后调用onNext将服务端的返回数据发给客户端,调用onCompleted,完成本次调用。

4.3.2 客户端

package io.grpc.examples.helloworld;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A simple client that requests a greeting from the {@link HelloWorldServer}.
 */
public class HelloWorldClient {
  private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());

  private final ManagedChannel channel;
  private final GreeterGrpc.GreeterBlockingStub blockingStub;

  /** Construct client connecting to HelloWorld server at {@code host:port}. */
  public HelloWorldClient(String host, int port) {
    this(ManagedChannelBuilder.forAddress(host, port)
        // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
        // needing certificates.
        .usePlaintext()
        .build());
  }

  /** Construct client for accessing HelloWorld server using the existing channel. */
  HelloWorldClient(ManagedChannel channel) {
    this.channel = channel;
    blockingStub = GreeterGrpc.newBlockingStub(channel);
  }

  public void shutdown() throws InterruptedException {
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }

  /** Say hello to server. */
  public void greet(String name) {
    logger.info("Will try to greet " + name + " ...");
    HelloRequest request = HelloRequest.newBuilder().setName(name).build();
    HelloReply response;
    try {
      response = blockingStub.sayHello(request);
    } catch (StatusRuntimeException e) {
      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      return;
    }
    logger.info("Greeting: " + response.getMessage());
  }

  /**
   * Greet server. If provided, the first element of {@code args} is the name to use in the
   * greeting.
   */
  public static void main(String[] args) throws Exception {
    HelloWorldClient client = new HelloWorldClient("localhost", 50051);
    try {
      /* Access a service running on the local machine on port 50051 */
      String user = "world";
      if (args.length > 0) {
        user = args[0]; /* Use the arg as the name to greet if provided */
      }
      client.greet(user);
    } finally {
      client.shutdown();
    }
  }
}

客户端只需要调用blockingStub的sayHello方法就可以了,客户端会一直等待服务端返回,这里是同步的。
blockingStub这里会有sayHello这个方法也是因为在proto里面有相关代码。

5.客户端流式 RPC 实战

5.1 导包和依赖同上

5.2 编辑proto文件,生成java代码


syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.transport";
option java_outer_classname = "MessageProto";
option objc_class_prefix = "RTG";

package transport;

service MessageGuide {
  rpc RecordRoute(stream Message) returns (BackMessage) {}
}

message Message {
  string data = 1;

  string type = 2;
}

message BackMessage {
  string data = 1;
}

5.3 编辑客户端和服务段代码

5.3.1 客户端代码

package io.grpc.transport;

import com.ahhx.jhpt.server.base.receiver.SynFilesReceiver;
import com.ahhx.jhpt.server.base.receiver.SynOriginReceiver;
import com.ahhx.jhpt.server.base.rpc.SjtbClient;
import com.ahhx.jhpt.server.base.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * @author zhanglei
 * @email ah.zhanglei4@aisino.com
 * @date 2019/7/8
 */
public class TransportClient {
    private static final Logger logger = Logger.getLogger(TransportClient.class.getName());

    private final ManagedChannel channel;
    private final MessageGuideGrpc.MessageGuideBlockingStub blockingStub;
    private final MessageGuideGrpc.MessageGuideStub asyncStub;

    private static TransportClient instance = null;

    public static String grpcServerIp;
    public static int grpcServerPort;


    /** Issues several different requests and then exits. */
//    @Test
//    public void test() throws InterruptedException {
//        List<Message> messages = new ArrayList<Message>();
//        for (int i = 0; i < 1; i++) {
//            Message m = Message.newBuilder().setData("111"+i).setType("type"+i).build();
//            messages.add(m);
//        }
//
//        TransportClient client = new TransportClient();
//        try {
//            client.recordRoute(messages, null, null, null, 0, null);
//        } finally {
//            client.shutdown();
//        }
//    }


    public void send(String methodName, String body, SynFilesReceiver synFilesReceiver, SynOriginReceiver synOriginReceiver, Channel mqchannel, long tag, org.slf4j.Logger logger) throws Exception {
        List<Message> messages = new ArrayList<Message>();
        ////String body2 = EncodeUtils.unCompressAndDecode(body);
        Message m = Message.newBuilder().setData(body).setType(methodName).build();
        messages.add(m);
        try {
            instance.recordRoute(messages,synFilesReceiver,synOriginReceiver,mqchannel,tag,logger);
        } finally {
            ////instance.shutdown();
            ////instance = null;
        }
    }

    public static TransportClient newInstance() {
        if (instance == null) {
            synchronized (SjtbClient.class){
                if (instance == null) {
                    instance = new TransportClient();
                }
            }
        }
        return instance;
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /**
     * Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
     * features} with a variable delay in between. Prints the statistics when they are sent from the
     * server.
     */
    public void recordRoute(List<Message> messages, SynFilesReceiver synFilesReceiver, SynOriginReceiver synOriginReceiver, Channel mqchannel, long tag, org.slf4j.Logger logger) throws InterruptedException {
        info("*** RecordRoute");

        StreamObserver<BackMessage> responseObserver = new StreamObserver<BackMessage>() {

            @Override
            public void onNext(BackMessage backMessage) {
                TransportClient.logger.info("backMessage:"+backMessage);
                if (null != synFilesReceiver){
                    synFilesReceiver.replyToSender(backMessage.getData());
                    TransportClient.logger.info("处理数据包到hdfs接口服务完成...");
                }
                if (null != synOriginReceiver){
                    synOriginReceiver.replyToSender(backMessage.getData());
                    TransportClient.logger.info("处理写表操作调用WriteTable接口服务完成...");
                }
            }

            @Override
            public void onError(Throwable throwable) {
                RabbitMqUtils.rejectAndBackMQ(mqchannel, tag, logger);
                warning("RecordRoute Failed: {0}", Status.fromThrowable(throwable));
            }

            @Override
            public void onCompleted() {
                info("Finished RecordRoute");
                if (null != synFilesReceiver){
                    RabbitMqUtils.askMessage(mqchannel, tag, logger);
                }
            }
        };

        StreamObserver<Message> requestObserver = asyncStub.recordRoute(responseObserver);
        try {
            for (int i = 0; i < messages.size(); i++) {
                requestObserver.onNext(messages.get(i));
            }
        } catch (RuntimeException e) {
            //// Cancel RPC
            requestObserver.onError(e);
            throw e;
        }
        //// Mark the end of requests
        requestObserver.onCompleted();
    }

    /** Construct client for accessing RouteGuide server using the existing channel. */
    public TransportClient() {
        channel =  ManagedChannelBuilder.forAddress(grpcServerIp, grpcServerPort).usePlaintext().build();
        blockingStub = MessageGuideGrpc.newBlockingStub(channel);
        asyncStub = MessageGuideGrpc.newStub(channel);
    }

    private void info(String msg, Object... params) {
        logger.log(Level.INFO, msg, params);
    }

    private void warning(String msg, Object... params) {
        logger.log(Level.WARNING, msg, params);
    }

}

调用asyncStub.recordRoute可以获得一个requestObserver对象,requestObserver的onNext就可以将数据发送出去。
此外,构建一个StreamObserver,这个类中的回掉方法会在服务段返回数据回来的时候被回掉。

5.3.2 服务端代码

package io.grpc.transport;

import com.ahhx.jhpt.server.base.service.SynDataService;
import com.ahhx.jhpt.server.base.utils.SpringContextUtil;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.Test;

import java.io.IOException;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author zhanglei
 * @email ah.zhanglei4@aisino.com
 * @date 2019/7/8
 */
public class TransportServer {

    private SynDataService synDataService = SpringContextUtil.getBean(SynDataService.class);
    private static final Logger logger = LoggerFactory.getLogger(TransportServer.class.getName());
    private Server server;

    private int grpcServerPort;

    public TransportServer(int grpcServerPort) {
        this.grpcServerPort = grpcServerPort;
        server = ServerBuilder.forPort(grpcServerPort).addService(new TransportService())
                .build();
    }

    @Test
    public void test() throws Exception {
        TransportServer server = new TransportServer(20002);
        server.start();
        server.blockUntilShutdown();
    }

    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    /** Start serving requests. */
    public void start() throws IOException {
        server.start();
        logger.info("Server started, listening on " + grpcServerPort);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                TransportServer.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }


    private class TransportService extends MessageGuideGrpc.MessageGuideImplBase {
        final long startTime = System.nanoTime();
        @Override
        public StreamObserver<Message> recordRoute(StreamObserver<BackMessage> responseObserver) {

            return new StreamObserver<Message>() {
                @Override
                public void onNext(Message message) {
                    logger.info("message:"+message);
                    logger.info("开始处理业务逻辑.");
                    try {
                        Method method = SynDataService.class.getMethod(message.getType(), String.class);
                        String backMessageData = (String) method.invoke(synDataService, message.getData());
                        responseObserver.onNext(BackMessage.newBuilder().setData(backMessageData)
                                .build());
                    } catch (Exception e) {
                        logger.info("调用方法异常:" + e.getMessage());
                    }finally {
                        logger.info("处理业务逻辑完成.");
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    responseObserver.onError(throwable);
                }

                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    }

    /** Stop serving requests and shutdown resources. */
    public void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

}

构建server对象的时候addservice,这个service类中的回掉方法,会在接收到客户端请求时被回掉。

6. 结束

在idea中,通过点击下面,可以生成java代码



实测,在linux中执行maven打包的时候会自动生成java代码,生成jar包。

更过关于GRPC的内容,推荐链接:
http://doc.oschina.net/grpc?t=56831
https://github.com/grpc/grpc
https://github.com/grpc/grpc-java

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • gRPC 是一个高性能、通用的开源RPC框架,基于HTTP/2协议标准和Protobuf序列化协议开发,支持众多的...
    小波同学阅读 19,490评论 6 19
  • 1.简介 在gRPC中,客户端应用程序可以直接调用不同计算机上的服务器应用程序上的方法,就像它是本地对象一样,使您...
    第八共同体阅读 1,875评论 0 6
  • 原文出处:gRPC gRPC分享 概述 gRPC 一开始由 google 开发,是一款语言中立、平台中立、开源的远...
    小波同学阅读 7,212评论 0 18
  • GRPC是基于protocol buffers3.0协议的. 本文将向您介绍gRPC和protocol buffe...
    二月_春风阅读 17,987评论 2 28
  •  gRPC 学习笔记,记录gprc一些基本概念.  gRPC正如其他 RPC 系统,gRPC 基于如下思想:定义一...
    Jancd阅读 1,972评论 1 7