1. 简介
RPC 远程调用,可以调用另一个服务中的方法,或者做数据传输。
GRPC是Google 推出的RPC框架。并且支持多种语言。
本例已java为例。
2. proto files
Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。
在maven中,使用Protobuf插件可以将.proto文件编译成相应的java代码。
3. GRPC的几种模式
3.1 GRPC的几种模式
单项 RPC
客户端发出单个请求,获得单个响应。客户端调用服务端的某个方法。客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
服务端流式 RPC
客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。个人认为当有客户端需要主动从服务端读取数据的时候可以用。
客户端流式 RPC
客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。个人认为应该是客户端需要把数据发送给服务端的时候使用。
双向流式 RPC
是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。
3.2 几种模式对应的proto写法
不同的模式对应的proto文件稍有不同。
单项 RPC
这个表示调用服务端SayHello方法,客户端需要传递一个HelloRequest,然后服务端返回一个HelloReply,HelloRequest和HelloReply也需要在proto文件中定义出来,后文会有描述。
rpc SayHello (HelloRequest) returns (HelloReply) {}
客户端流式 RPC
通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
服务端流式 RPC
通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
双向流式 RPC
你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 简单RPC helloworld示例
4.1 需要创建一个maven项目,导入依赖和maven插件
依赖:
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.7.1</version>
</dependency>
插件:
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.21.0:exe:${os.detected.classifier}</pluginArtifact>
<!--<protocExecutable>G:\Users\zl\Desktop\protoc-3.0.0-win32\bin\protoc</protocExecutable>-->
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
ps:如果是一个空项目,只要网络正常,应该不会有什么问题,但是如果是在本身已经比较复杂的项目中添加这些依赖和插件,可能会出现版本冲突,导致最终运行的时候,报各种错误。如果出现了,可以查看grpc自己有依赖了什么,这些依赖在别的模块中是不是也有,以及他们的版本是什么。
4.2 编写.proto文件,并生成java代码
4.2.1 编写.proto文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
4.2.2 并生成java代码
生成的代码是根据proto文件来的,不同的proto文件会生成不同的代码
4.3 编写服务端和客户端代码
4.3.1 服务端
package io.grpc.examples.helloworld;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.logging.Logger;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/
public class HelloWorldServer {
private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());
private Server server;
private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServer.this.stop();
System.err.println("*** server shut down");
}
});
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer server = new HelloWorldServer();
server.start();
server.blockUntilShutdown();
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
服务端代码中有一个内部类继承GreeterGrpc.GreeterImplBase,在构建server对象时,将这个内部类addservice进去,它里面有个sayhello方法的回调,当服务端收到客户端的调用时,就会回调这个方法。
然后服务端就可以在sayhello方法里做需要的操作,完成之后构建一个返回的对象HelloReply,然后调用onNext将服务端的返回数据发给客户端,调用onCompleted,完成本次调用。
4.3.2 客户端
package io.grpc.examples.helloworld;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A simple client that requests a greeting from the {@link HelloWorldServer}.
*/
public class HelloWorldClient {
private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
/** Construct client connecting to HelloWorld server at {@code host:port}. */
public HelloWorldClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext()
.build());
}
/** Construct client for accessing HelloWorld server using the existing channel. */
HelloWorldClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/** Say hello to server. */
public void greet(String name) {
logger.info("Will try to greet " + name + " ...");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Greeting: " + response.getMessage());
}
/**
* Greet server. If provided, the first element of {@code args} is the name to use in the
* greeting.
*/
public static void main(String[] args) throws Exception {
HelloWorldClient client = new HelloWorldClient("localhost", 50051);
try {
/* Access a service running on the local machine on port 50051 */
String user = "world";
if (args.length > 0) {
user = args[0]; /* Use the arg as the name to greet if provided */
}
client.greet(user);
} finally {
client.shutdown();
}
}
}
客户端只需要调用blockingStub的sayHello方法就可以了,客户端会一直等待服务端返回,这里是同步的。
blockingStub这里会有sayHello这个方法也是因为在proto里面有相关代码。
5.客户端流式 RPC 实战
5.1 导包和依赖同上
5.2 编辑proto文件,生成java代码
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.transport";
option java_outer_classname = "MessageProto";
option objc_class_prefix = "RTG";
package transport;
service MessageGuide {
rpc RecordRoute(stream Message) returns (BackMessage) {}
}
message Message {
string data = 1;
string type = 2;
}
message BackMessage {
string data = 1;
}
5.3 编辑客户端和服务段代码
5.3.1 客户端代码
package io.grpc.transport;
import com.ahhx.jhpt.server.base.receiver.SynFilesReceiver;
import com.ahhx.jhpt.server.base.receiver.SynOriginReceiver;
import com.ahhx.jhpt.server.base.rpc.SjtbClient;
import com.ahhx.jhpt.server.base.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* @author zhanglei
* @email ah.zhanglei4@aisino.com
* @date 2019/7/8
*/
public class TransportClient {
private static final Logger logger = Logger.getLogger(TransportClient.class.getName());
private final ManagedChannel channel;
private final MessageGuideGrpc.MessageGuideBlockingStub blockingStub;
private final MessageGuideGrpc.MessageGuideStub asyncStub;
private static TransportClient instance = null;
public static String grpcServerIp;
public static int grpcServerPort;
/** Issues several different requests and then exits. */
// @Test
// public void test() throws InterruptedException {
// List<Message> messages = new ArrayList<Message>();
// for (int i = 0; i < 1; i++) {
// Message m = Message.newBuilder().setData("111"+i).setType("type"+i).build();
// messages.add(m);
// }
//
// TransportClient client = new TransportClient();
// try {
// client.recordRoute(messages, null, null, null, 0, null);
// } finally {
// client.shutdown();
// }
// }
public void send(String methodName, String body, SynFilesReceiver synFilesReceiver, SynOriginReceiver synOriginReceiver, Channel mqchannel, long tag, org.slf4j.Logger logger) throws Exception {
List<Message> messages = new ArrayList<Message>();
////String body2 = EncodeUtils.unCompressAndDecode(body);
Message m = Message.newBuilder().setData(body).setType(methodName).build();
messages.add(m);
try {
instance.recordRoute(messages,synFilesReceiver,synOriginReceiver,mqchannel,tag,logger);
} finally {
////instance.shutdown();
////instance = null;
}
}
public static TransportClient newInstance() {
if (instance == null) {
synchronized (SjtbClient.class){
if (instance == null) {
instance = new TransportClient();
}
}
}
return instance;
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
* features} with a variable delay in between. Prints the statistics when they are sent from the
* server.
*/
public void recordRoute(List<Message> messages, SynFilesReceiver synFilesReceiver, SynOriginReceiver synOriginReceiver, Channel mqchannel, long tag, org.slf4j.Logger logger) throws InterruptedException {
info("*** RecordRoute");
StreamObserver<BackMessage> responseObserver = new StreamObserver<BackMessage>() {
@Override
public void onNext(BackMessage backMessage) {
TransportClient.logger.info("backMessage:"+backMessage);
if (null != synFilesReceiver){
synFilesReceiver.replyToSender(backMessage.getData());
TransportClient.logger.info("处理数据包到hdfs接口服务完成...");
}
if (null != synOriginReceiver){
synOriginReceiver.replyToSender(backMessage.getData());
TransportClient.logger.info("处理写表操作调用WriteTable接口服务完成...");
}
}
@Override
public void onError(Throwable throwable) {
RabbitMqUtils.rejectAndBackMQ(mqchannel, tag, logger);
warning("RecordRoute Failed: {0}", Status.fromThrowable(throwable));
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
if (null != synFilesReceiver){
RabbitMqUtils.askMessage(mqchannel, tag, logger);
}
}
};
StreamObserver<Message> requestObserver = asyncStub.recordRoute(responseObserver);
try {
for (int i = 0; i < messages.size(); i++) {
requestObserver.onNext(messages.get(i));
}
} catch (RuntimeException e) {
//// Cancel RPC
requestObserver.onError(e);
throw e;
}
//// Mark the end of requests
requestObserver.onCompleted();
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public TransportClient() {
channel = ManagedChannelBuilder.forAddress(grpcServerIp, grpcServerPort).usePlaintext().build();
blockingStub = MessageGuideGrpc.newBlockingStub(channel);
asyncStub = MessageGuideGrpc.newStub(channel);
}
private void info(String msg, Object... params) {
logger.log(Level.INFO, msg, params);
}
private void warning(String msg, Object... params) {
logger.log(Level.WARNING, msg, params);
}
}
调用asyncStub.recordRoute可以获得一个requestObserver对象,requestObserver的onNext就可以将数据发送出去。
此外,构建一个StreamObserver,这个类中的回掉方法会在服务段返回数据回来的时候被回掉。
5.3.2 服务端代码
package io.grpc.transport;
import com.ahhx.jhpt.server.base.service.SynDataService;
import com.ahhx.jhpt.server.base.utils.SpringContextUtil;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zhanglei
* @email ah.zhanglei4@aisino.com
* @date 2019/7/8
*/
public class TransportServer {
private SynDataService synDataService = SpringContextUtil.getBean(SynDataService.class);
private static final Logger logger = LoggerFactory.getLogger(TransportServer.class.getName());
private Server server;
private int grpcServerPort;
public TransportServer(int grpcServerPort) {
this.grpcServerPort = grpcServerPort;
server = ServerBuilder.forPort(grpcServerPort).addService(new TransportService())
.build();
}
@Test
public void test() throws Exception {
TransportServer server = new TransportServer(20002);
server.start();
server.blockUntilShutdown();
}
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/** Start serving requests. */
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + grpcServerPort);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
TransportServer.this.stop();
System.err.println("*** server shut down");
}
});
}
private class TransportService extends MessageGuideGrpc.MessageGuideImplBase {
final long startTime = System.nanoTime();
@Override
public StreamObserver<Message> recordRoute(StreamObserver<BackMessage> responseObserver) {
return new StreamObserver<Message>() {
@Override
public void onNext(Message message) {
logger.info("message:"+message);
logger.info("开始处理业务逻辑.");
try {
Method method = SynDataService.class.getMethod(message.getType(), String.class);
String backMessageData = (String) method.invoke(synDataService, message.getData());
responseObserver.onNext(BackMessage.newBuilder().setData(backMessageData)
.build());
} catch (Exception e) {
logger.info("调用方法异常:" + e.getMessage());
}finally {
logger.info("处理业务逻辑完成.");
}
}
@Override
public void onError(Throwable throwable) {
responseObserver.onError(throwable);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
/** Stop serving requests and shutdown resources. */
public void stop() {
if (server != null) {
server.shutdown();
}
}
}
构建server对象的时候addservice,这个service类中的回掉方法,会在接收到客户端请求时被回掉。
6. 结束
在idea中,通过点击下面,可以生成java代码
实测,在linux中执行maven打包的时候会自动生成java代码,生成jar包。
更过关于GRPC的内容,推荐链接:
http://doc.oschina.net/grpc?t=56831
https://github.com/grpc/grpc
https://github.com/grpc/grpc-java