1 grpc的四种通信模式
四种通信模式
- 一元模式
- 客户端流模式
- 服务的流模式
- 双向流模式
2 一元模式
前面已经实现了,这里不再赘述
3 流模式
在grpc里面引入了流的概念,其实和rxjava很像哦
3.1 proto 定义
syntax = "proto3";
option java_package = "cn.beckbi.pb";
option java_outer_classname = "AdInfo";
message Ad {
int32 id = 1;
string name = 2;
string description = 3;
float price = 4;
}
message AdList {
string createAt = 1;
string traceId = 2;
repeated Ad ads = 3;
}
message AdIdList{
repeated int32 id =1;
}
service AdStream {
//stream
rpc addAdList(stream AdList) returns(AdIdList);
rpc getAdList(AdIdList) returns(stream AdList);
}
3.2 服务端处理
syntax = "proto3";
option java_package = "cn.beckbi.pb";
option java_outer_classname = "AdInfo";
message Ad {
int32 id = 1;
string name = 2;
string description = 3;
float price = 4;
}
message AdList {
string createAt = 1;
string traceId = 2;
repeated Ad ads = 3;
}
message AdIdList{
repeated int32 id =1;
}
service AdStream {
//stream
rpc addAdList(stream AdList) returns(AdIdList);
rpc getAdList(AdIdList) returns(stream AdList);
}
package cn.beckbi.server;
import cn.beckbi.pb.AdInfo;
import cn.beckbi.pb.AdStreamGrpc;
import java.util.*;
import java.util.logging.Logger;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
/**
* @program: kgrpc
* @description:
* @author: bikang
* @create: 2022-08-28 08:02
*/
public class AdStreamGrpcImpl extends AdStreamGrpc.AdStreamImplBase {
private static final Logger logger = Logger.getLogger(AdStreamGrpcImpl.class.getName());
private final Random random = new Random();
private final Map<Integer, AdInfo.Ad> adInfoMap = new HashMap<>(50);
@Override
public io.grpc.stub.StreamObserver<AdInfo.AdList> addAdList(
io.grpc.stub.StreamObserver<AdInfo.AdIdList> responseObserver) {
return new StreamObserver<AdInfo.AdList>() {
private AdInfo.AdIdList adIdList = AdInfo.AdIdList.newBuilder().build();
@Override
public void onNext(AdInfo.AdList value) {
if (value != null) {
logger.info("time:"+value.getCreateAt());
logger.info("traceId:"+value.getTraceId());
List<AdInfo.Ad> adList = value.getAdsList();
adList.stream().forEach( adInfo -> {
int id = random.nextInt(100000000);
AdInfo.Ad ad = AdInfo.Ad.newBuilder()
.setId(id)
.setName(adInfo.getName())
.setPrice(adInfo.getPrice())
.setDescription(adInfo.getDescription())
.build();
adInfoMap.put(id, ad);
adIdList = adIdList.toBuilder().addId(id).build();
logger.info("create ad success : " + id );
});
}
}
@Override
public void onError(Throwable t) {
logger.info("ad create error " + t.getMessage());
}
@Override
public void onCompleted() {
logger.info("create ad success");
responseObserver.onNext(adIdList);
responseObserver.onCompleted();
}
};
}
@Override
public void getAdList(AdInfo.AdIdList request,
io.grpc.stub.StreamObserver<AdInfo.AdList> responseObserver) {
List<AdInfo.Ad> adList = new ArrayList<>(30);
logger.info("request:"+request.getIdList().toString());
request.getIdList().forEach( id ->
{
if (adInfoMap.containsKey(id)) {
adList.add(adInfoMap.get(id));
logger.info( "result:"+adInfoMap.get(id).toString());
}
}
);
logger.info("ad list"+adList.toString());
responseObserver.onNext(AdInfo.AdList.newBuilder().addAllAds(adList).build());
responseObserver.onCompleted();
}
}
3.3 客户端代码
package cn.beckbi.client;
import cn.beckbi.pb.AdInfo;
import cn.beckbi.pb.AdStreamGrpc;
import com.alibaba.fastjson2.JSON;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* @program: kgrpc
* @description:
* @author: bikang
* @create: 2022-08-27 23:56
*/
public class AdClient {
private static final Logger logger = Logger.getLogger(AdClient.class.getName());
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11112)
.usePlaintext()
.build();
AdStreamGrpc.AdStreamBlockingStub stub = AdStreamGrpc.newBlockingStub(channel);
AdStreamGrpc.AdStreamStub asyncStub = AdStreamGrpc.newStub(channel);
//create ad in list
AdInfo.AdIdList adIdList = createAdList(asyncStub);
logger.info("result:"+ adIdList.toString());
List<AdInfo.AdList> adListList = getAdList(stub, adIdList);
logger.info("adListList:"+ adListList.toString());
channel.shutdown();
}
public static List<AdInfo.AdList> getAdList(AdStreamGrpc.AdStreamBlockingStub stub, AdInfo.AdIdList adIdList) {
List<AdInfo.AdList> adListList = new ArrayList<>();
Iterator<AdInfo.AdList> iterator = stub.getAdList(adIdList);
while (iterator.hasNext()) {
AdInfo.AdList adList = iterator.next();
adListList.add(adList);
}
return adListList;
}
public static AdInfo.AdIdList createAdList(AdStreamGrpc.AdStreamStub asyncStub ) {
List<Integer> allIdList = new ArrayList<>(30);
AdInfo.Ad ad1 = AdInfo.Ad.newBuilder()
.setName("cpl1")
.setDescription("cpl ad")
.setPrice(11.21f)
.build();
AdInfo.Ad ad2 = AdInfo.Ad.newBuilder()
.setName("cp21")
.setDescription("cpl ad")
.setPrice(21.21f)
.build();
AdInfo.Ad ad3 = AdInfo.Ad.newBuilder()
.setName("cpl3")
.setDescription("cpl ad")
.setPrice(31.21f)
.build();
AdInfo.Ad ad4 = AdInfo.Ad.newBuilder()
.setName("cpl1")
.setDescription("cpl ad")
.setPrice(41.21f)
.build();
AdInfo.Ad ad5 = AdInfo.Ad.newBuilder()
.setName("cpl1")
.setDescription("cpl ad")
.setPrice(51.21f)
.build();
final CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver<AdInfo.AdIdList> adIdListStreamObserver = new StreamObserver<AdInfo.AdIdList>() {
@Override
public void onNext(AdInfo.AdIdList adIdList) {
adIdList.getIdList().stream().forEach(
id -> {
logger.info("create ad success : " + id);
allIdList.add(id);
});
}
@Override
public void onError(Throwable t) {
logger.info("error:"+t.getMessage());
}
@Override
public void onCompleted() {
logger.info("create success");
countDownLatch.countDown();
}
};
StreamObserver<AdInfo.AdList> createAdObserver = asyncStub.addAdList(adIdListStreamObserver);
AdInfo.AdList adList1 = AdInfo.AdList.newBuilder()
.addAds(ad1)
.addAds(ad2)
.addAds(ad3)
.addAds(ad4)
.addAds(ad5)
.setTraceId("a")
.setCreateAt("2022-01-01 01:01:01")
.build();
AdInfo.AdList adList2 = AdInfo.AdList.newBuilder()
.addAds(ad1)
.addAds(ad2)
.addAds(ad3)
.addAds(ad4)
.addAds(ad5)
.setTraceId("b")
.setCreateAt("2022-02-02 01:01:01")
.build();
AdInfo.AdList adList3 = AdInfo.AdList.newBuilder()
.addAds(ad1)
.addAds(ad2)
.addAds(ad3)
.addAds(ad4)
.addAds(ad5)
.setTraceId("c")
.setCreateAt("2022-03-03 01:01:01")
.build();
createAdObserver.onNext(adList1);
createAdObserver.onNext(adList2);
createAdObserver.onNext(adList3);
if (countDownLatch.getCount() == 0) {
logger.warning("rpc error");
return null;
}
createAdObserver.onCompleted();
try {
if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
logger.warning("failed after 10 seconds");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return AdInfo.AdIdList.newBuilder().addAllId(allIdList).build();
}
}
4 附录
项目源码: https://github.com/beckbikang/kgrpc
本地试过可以运行