1.proto文件的编写(gRPC基于proto3语法)
四种方式的数据传输:
- Unary RPCs where the client sends a single request to the server and gets a single response back, just like a normal function call.
- Server streaming RPCs where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. gRPC guarantees message ordering within an individual RPC call.
- Client streaming RPCs where the client writes a sequence of messages and sends them to the server, again using a provided stream. Once the client has finished writing the messages, it waits for the server to read them and return its response. Again gRPC guarantees message ordering within an individual RPC call.
- Bidirectional streaming RPCs where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. The order of messages in each stream is preserved.
具体形式分别如下代码:
syntax = "proto3";
package com.liyuanfeng.proto;
option java_package = "com.liyuanfeng.proto";
option java_outer_classname = "StudentProto";
option java_multiple_files = true;
service StudentService {
rpc getRealNameByUsername (MyRequest) returns (MyResponse) {
}
rpc GetStudentByAge (StudentRequest) returns (stream StudentResponse) {
}
rpc GetStudentsWrapperByAges (stream StudentRequest) returns (StudentResponseList) {
}
rpc BiTalk(stream StreamRequest) returns (StreamResponse){}
}
message MyRequest {
string username = 1;
}
message MyResponse {
string realname = 2;
}
message StudentRequest {
int32 age = 1;
}
message StudentResponse {
string name = 1;
int32 age = 2;
string city = 3;
}
message StudentResponseList {
repeated StudentResponse studentResponse = 1;
}
message StreamRequest{
string request_info = 1;
}
message StreamResponse{
string response_info = 1;
}
2.配置gradle文件,使其stub和server代码生成到合适的位置
apply plugin: 'java'
apply plugin: 'com.google.protobuf'
group 'com.liyuanfeng'
version '1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
maven { url 'https://maven.aliyun.com/repository/central' }
maven { url 'https://maven.aliyun.com/repository/jcenter' }
maven {//配置Maven仓库的地址
url "http://repo.springsource.org/libs-milestone-local"
}
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
// https://mvnrepository.com/artifact/io.netty/netty-all
compile group: 'io.netty', name: 'netty-all', version: '4.1.6.Final'
// https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java
compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.3.1'
// https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.3.1'
// https://mvnrepository.com/artifact/org.apache.thrift/libthrift
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.12.0'
compile 'io.grpc:grpc-netty-shaded:1.18.0'
compile 'io.grpc:grpc-protobuf:1.18.0'
compile 'io.grpc:grpc-stub:1.18.0'
}
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.5'
}
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.5.1-1"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.18.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {
setOutputSubDir "java"
}
}
}
generateProtoTasks.generatedFilesBaseDir = "src"
}
tasks.withType(JavaCompile){
options.encoding = "UTF-8"
}
sourceSets{
main{
proto{
srcDir 'src/main/proto'
srcDir 'src/main'
}
}
}
这里需要注意:
proto文件的存放位置是有特殊要求,具体存放位置截图如下
3.生成相应的proto的java代码,具体命令行是:
D:\Java\netty_lecture>gradle clean generateProto
4.Server端的编写
package com.liyuanfeng.grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class GrpcServer {
private Server server;
private void start() throws IOException {
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
System.out.println("server started!");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("关闭JVM");
GrpcServer.this.stop();
}));
System.out.println("执行到这里");
}
private void stop() {
if (null != server) {
this.server.shutdown();
}
}
private void awaitTermination() throws InterruptedException {
if (null != server) {
this.server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
GrpcServer grpcServer = new GrpcServer();
grpcServer.start();
grpcServer.awaitTermination();
}
}
5.Service部分的代码编写
package com.liyuanfeng.grpc;
import com.liyuanfeng.proto.*;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
@Override
public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
System.out.println("接收到客户端信息:" + request.getUsername());
responseObserver.onNext(MyResponse.newBuilder().setRealname("李远锋").build());
responseObserver.onCompleted();
}
@Override
public void getStudentByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接收到了客户端信息:" + request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("周杰伦").setAge(35).setCity("中国").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("林俊杰").setAge(35).setCity("台湾").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("李连杰").setAge(35).setCity("香港").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("林志颖").setAge(35).setCity("中国").build());
responseObserver.onCompleted();
}
@Override
public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentRequest>() {
@Override
public void onNext(StudentRequest value) {
System.out.println("onNext" + value.getAge());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
StudentResponse studentResponse1 = StudentResponse.newBuilder().setName("zhangsan").setAge(20).setCity("Beijing").build();
StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("lisi").setAge(20).setCity("Beijing").build();
StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse1).addStudentResponse(studentResponse2).build();
responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
@Override
public void onNext(StreamRequest value) {
System.out.println(value.getRequestInfo());
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted!");
responseObserver.onCompleted();
}
};
}
}
6.客户端代码的编写
package com.liyuanfeng.grpc;
import com.liyuanfeng.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime;
import java.util.Iterator;
public class GrpcClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
.usePlaintext().build();//usePlaintext没有加密
StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);
StudentServiceGrpc.StudentServiceStub studentServiceStub = StudentServiceGrpc.newStub(managedChannel);
MyResponse myResponse = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("秦子豪").build());
System.out.println(myResponse.getRealname());
System.out.println("--------------------------------------");
Iterator<StudentResponse> studentByAge = blockingStub.getStudentByAge(StudentRequest.newBuilder().setAge(20).build());
while (studentByAge.hasNext()) {
StudentResponse next = studentByAge.next();
System.out.println(next.getName() + "," + next.getCity() + "," + next.getAge());
}
System.out.println("------------------------------------------------------------------------------");
StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName());
System.out.println(studentResponse.getAge());
System.out.println(studentResponse.getCity());
System.out.println("********************");
});
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
};
StreamObserver<StudentRequest> studentsWrapperByAges = studentServiceStub.getStudentsWrapperByAges(studentResponseListStreamObserver);
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(10).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(20).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(30).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(40).build());
studentsWrapperByAges.onCompleted();
StreamObserver<StreamRequest> streamRequestStreamObserver = studentServiceStub.biTalk(new StreamObserver<StreamResponse>() {
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}
@Override
public void onError(Throwable t) {
System.out.println("lalala");
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
});
for (int i = 0; i < 10; i++) { streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
}
Thread.sleep(100000);
}
}