异步编程之RPC框架
在程序设计中为了应对复杂业务场景我们通常会讲系统设计模块化。不同模块之间免不了相互通信,如果是同一台机器,操作系统提供了整套的解决方案。
- 多线程之间:Mutex(ReentryLock),Read-writeLock,Condition,SpinLocks,Barriers等
- 多进程间:Pipes,MessageQueues,Samphores,SharedMemory,Sockets(UNIX Domain Sockets)
使用同步IO方式实现rpc
主进程建立服务端监听,当有client连接时创建Socket并新建一个线程进行处理。这样做有很明显的弊端:
1. 线程数会随着client连接增加而增多,过多的线程额外增加了很多上下文切换时间。
2. 由于一个方法调用会经历读取request,计算,response回写,绝大多数时间都是阻塞在io操作上
ps 参考别人的代码
ServerSocket serverSocket = new ServerSocket(PORT);
while (true) {
// 主线程阻塞等待
Socket client = serverSocket.accept();
// 开启一个新的线程处理client连接
new HandlerThread(client);
}
异步实现
使用nio我们可以将客户端连接数与线程解耦,nio的三大核心:Channel(通道),Buffer(缓冲区), Selector。nio操作的对象是缓冲区而不是传统的数据流,通过单一的selector来管理多个通道事件
> selector 通过单一线程来响应事件(connect,accept,read,write),当有事件完成时才会触发具体的操作,而不是阻塞在每一个事件上面。让事件来驱动程序执行,与下面要介绍的future编程模型有着共同的特性。
ps 参考别人的代码
handleAccept(SelectionKey key) {
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(BUF_SIZE));
}
main() {
Selector selector = null;
ServerSocketChannel ssc = null;
try{
selector = Selector.open();
ssc= ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
selector.select(TIMEOUT)
while(selector.selectedKeys().hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
handleAccept(key);
}
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
iter.remove();
}
}
> 下图是 thrift基于Nio实现的服务端的一种实现方式:TThreadedSelectorServer,它也是种Half-sync/Half-async的服务模型。
> 要实现全异步模式,需要针对workThread中的任务也进行异步化。GRPC 提供了这种机制,它允许为每个计算过程添加回调,实现全异步编程。下节讲述的future就可以我们的业务编程异步化、事件化。
future简介
future提供了一套高效便捷的非阻塞并行操作管理方案。其基本思想很简单,所谓Future,指的是一类占位符对象,用于指代某些尚未完成的计算的结果。一般来说,由Future指代的计算都是并行执行的,计算完毕后可另行获取相关计算结果。以这种方式组织并行任务,便可以写出高效、异步、非阻塞的并行代码。
所谓Future,是一种用于指代某个尚未就绪的值的对象。而这个值,往往是某个计算过程的结果
- 若该计算过程尚未完成,我们就说该Future未就位;
- 若该计算过程正常结束,或中途抛出异常,我们就说该Future已就位
- Future的一个重要属性在于它只能被赋值一次。一旦给定了某个值或某个异常,future对象就变成了不可变对象——无法再被改写
使用场景
- 远程rpc调用
- 异步task,主线程无需等待
- 阻塞IO,disk reading、mysql等
future实现
twitter-future
> 1.Finagle是Twitter基于Netty开发的支持容错的、协议无关的RPC框架,该框架支撑了Twitter的核心服务。
> 2.基于scala编写,并自己实现一套future框架。该future对java使用同时提供了支持
scala-future
> 1.在finagle一年之后发布,具有类似twitter future类似语法
> 2.应用于scala本身,对java兼容性不好
基本语法
future构建
1. 构建一个异步future
1 long startTime = System.currentTimeMillis();
2 Future<String> future = FuturePools.unboundedPool().apply(() -> {
3 sleep(1);
4 return "sleep 1";
5 }).onSuccess(v->{
6 printTimeUse("success return", startTime);
7 return BoxedUnit.UNIT;
8 }).onFailure(t->{
9 t.printStackTrace();
10 return BoxedUnit.UNIT;
11 });;
12 sleep(5);
13 printTimeUse(startTime);
在主进程起一个future,该future会异步进行处理。同时可以为该future添加回调函数。
onSuccess(v-{}) 在future执行完成后进行调用;onFailure(Throwable t->{}) 在执行过程中发生异常会执行改回调函数,其中t是执行中抛出的异常。
2. 构建一个同步future,阻塞主进程
1 Future<String> future2 = Future.apply(() -> {
2 sleep(3);
3 return "sleep 3";
4 });
因为编程中难免会有阻塞操作,所以要避免此类方式生成future。
3. 并发执行future,互相无影响,不影响主线程
1 Future<String> future1 = FuturePools.unboundedPool().apply(() -> {
2 sleep(2);
3 System.out.println("return future1");
4 return "suc";
5 });
6 Future<String> future2 = FuturePools.unboundedPool().apply(() -> {
7 sleep(1);
8 System.out.println("return future2");
9 return "suc";
10 });
11 //do something else
可以理解为向线程池提交了任务,但是没有执行get操作。
future组合
future通过map,flatmap将不同的future连接。可以很的组成了一个执行链条,这也是future如此流行的基础。
1. future通过map映射为一个新的future,新future运行依赖原有future
1 public Future<String> getA() {
2 System.out.println("get A from testDaoRx begin");
3 Future<Integer> future = testDaoRx.getA();
4 return future.map(x->{
5 System.out.println("get A from testDaoRx sucss");
6 return String.valueOf(x) + " Str";
7 });
8 }
其中getA方法获取一个future,返回值是int。调用该future的map方法会返回另外一个新的future。return的类型也就是新future的类型,这个例子将getA返回的int转换成了String并返回。也就是getA的返回类型。
```
graph RL
future1:int-->future2:String
```
2. 多future并行执行,全部完成之后返回新的future。
方式1:使用join
1 public Future<String> getSumAB(){
2 System.out.println("get sumAB begin ");
3 Future<Integer> futureA = testDaoRx.getA();
4 Future<Integer> futureB = testDaoRx.getB();
5 return futureA.join(futureB).map(x->{
6 System.out.println("get a " + x._1 + " get b " + x._2);
7 return "sum:" + (x._1 + x._2);
8 });
9 }
graph RL
future1:int-->future3:String
future2:int-->future3:String
方式2:使用collect
1 public Future<String> getSumABCollect() {
2 System.out.println("get sumAB collect begin ");
3 Future<Integer> futureA = testDaoRx.getA();
4 Future<Integer> futureB = testDaoRx.getB();
5 Future<List<Integer>> listFuture = Future.collect(Lists.newArrayList(futureA, futureB));
6 return listFuture.map(x->{
7 System.out.println("get list" + x);
8 return "sum:" + (x.stream().reduce((sum, n) -> sum + n).orElse(0));
9 });
10 }
其中collect方式要求子future返回类型相同
3. future串行执行,futureA执行之后再执行futureB
1 public Future<String> getSumABSeq() throws Exception {
2 System.out.println("get sumAB begin ");
3 Future<Integer> futureA = testDaoRx.getA();
4 Future<String> futureStr = futureA.flatMap(x -> {
5 return testDaoRx.getB().map(y -> {
6 return x + y;
7 });
8 }).map(sum->{
9 return "str:"+sum;
10 });
11 return futureStr;
12 }
graph RL
future1:int-->future2:int
future2:int-->future3:String
```
flatMap之后返回的类型是Future<Integer>,之后使用map转换结果类型成String。下面是lambda简写方式:
1 public Future<String> getSumABSeqSimplified() throws Exception {
2 System.out.println("get sumAB begin ");
3 Future<Integer> futureA = testDaoRx.getA();
4 return futureA.flatMap(x -> testDaoRx.getB().map(y -> x + y)).map(sum-> "str:"+sum);
5 }
>通过并行和串行两种组织方式就可组织成适用于复杂业务场景的调用链
```
graph RL
F_db1-->F_bus1
F_db2-->F_bus1
F_rpc1-->F_bus2
F_bus1-->F_bus2
F_rpc3-->F_rpc2
F_rpc2-->F_bus2
F_bus2-->F_Service1
```
future使用
1. block等待结果
future支持同步等待future运行结果。针对一个同步返回的rpc接口,使用future首先要构建调用链条。然后同步等待结果返回。
1 public void testSumAB() throws Exception {
2 Long startTime = System.currentTimeMillis();
3 Future<String> futureStr = testBizRx.getSumAB();
4 // Future<String> futureStr = testBizRx.getSumABCollect();
5 String str = Await.result(futureStr);
6 System.out.println(str + " timeUse " + (System.currentTimeMillis() - startTime) / 1000 + "s");
7
8 }
2. block等待,增加超时限制
1 public void testGetAWithTimeoutAwait() throws Exception {
2 Long startTime = System.currentTimeMillis();
3 Future<String> futureStr = testBizRx.getA();
4
5 String str = Await.result(futureStr, Duration.fromMilliseconds(3000));
6
7 System.out.println(str + " timeUse " + (System.currentTimeMillis() - startTime) / 1000 + "s");
8
9 }
使用Await方式设置超时时间,Duration.fromMilliseconds(3000) 代表3s超时
3. Future单独设置超时限制
将普通函数调用包装成future时进行超时的指定
方式1:使用Promise,超时之后返回null但是不抛出异常
1 public <R> Future<R> get(Function0<R> function, int milliseconds) {
2 Future<R> future = futurePool.getWrapperFuturePool().apply(function);
3 Promise<R> p = new Promise<>();
4 future.within(timer, Duration.fromMilliseconds(milliseconds))
5 .onSuccess(v->{
6 p.setValue(v);
7 return BoxedUnit.UNIT;
8 })
9 .onFailure(v1 -> {
10 p.setValue(null);
11 return BoxedUnit.UNIT;
12 });
13
14 return p;
15 }
方式2:超时后返回timeoutException
1 public <R> Future<R> getWithException(Function0<R> function, int milliseconds) {
2 Future<R> future = futurePool.getWrapperFuturePool().apply(function);
3 return future.within(timer, Duration.fromMilliseconds(milliseconds));
4 }
4. Future方法内部抛出异常
1 public Future<Integer> getException() {
2 if(hasError)
3 return Future.exception(new RuntimeException(" throw exception"));
4 //something else
5 ......
6 }
#### 参考资料
> [scala-future SIP-14](https://docs.scala-lang.org/sips/completed/futures-promises.html)
> [finagle future](https://twitter.github.io/finagle/guide/Futures.html#futures)