twitter future 异步编程

  异步编程之RPC框架

在程序设计中为了应对复杂业务场景我们通常会讲系统设计模块化。不同模块之间免不了相互通信,如果是同一台机器,操作系统提供了整套的解决方案。

- 多线程之间:Mutex(ReentryLock),Read-writeLock,Condition,SpinLocks,Barriers等

- 多进程间:Pipes,MessageQueues,Samphores,SharedMemory,Sockets(UNIX Domain Sockets)

 使用同步IO方式实现rpc

主进程建立服务端监听,当有client连接时创建Socket并新建一个线程进行处理。这样做有很明显的弊端:

1. 线程数会随着client连接增加而增多,过多的线程额外增加了很多上下文切换时间。

2. 由于一个方法调用会经历读取request,计算,response回写,绝大多数时间都是阻塞在io操作上


ps 参考别人的代码

ServerSocket serverSocket = new ServerSocket(PORT);   

            while (true) {   

                // 主线程阻塞等待   

                Socket client = serverSocket.accept();   

                // 开启一个新的线程处理client连接   

                new HandlerThread(client);   

            }   


异步实现

使用nio我们可以将客户端连接数与线程解耦,nio的三大核心:Channel(通道),Buffer(缓冲区), Selector。nio操作的对象是缓冲区而不是传统的数据流,通过单一的selector来管理多个通道事件

> selector 通过单一线程来响应事件(connect,accept,read,write),当有事件完成时才会触发具体的操作,而不是阻塞在每一个事件上面。让事件来驱动程序执行,与下面要介绍的future编程模型有着共同的特性。

ps 参考别人的代码

handleAccept(SelectionKey key) {

       ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();

       SocketChannel sc = ssChannel.accept();

       sc.configureBlocking(false);

       sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(BUF_SIZE));

   }

main() {

       Selector selector = null;

       ServerSocketChannel ssc = null;

       try{

           selector = Selector.open();

           ssc= ServerSocketChannel.open();

           ssc.socket().bind(new InetSocketAddress(PORT));

           ssc.configureBlocking(false);

           ssc.register(selector, SelectionKey.OP_ACCEPT);

           while(true){

              selector.select(TIMEOUT)

               while(selector.selectedKeys().hasNext()){

                   SelectionKey key = iter.next();

                   if(key.isAcceptable()){

                       handleAccept(key);

                   }

                   if(key.isReadable()){

                       handleRead(key);

                   }

                   if(key.isWritable() && key.isValid()){

                       handleWrite(key);

                   }

                   iter.remove();

               }

           }



> 下图是 thrift基于Nio实现的服务端的一种实现方式:TThreadedSelectorServer,它也是种Half-sync/Half-async的服务模型。

> 要实现全异步模式,需要针对workThread中的任务也进行异步化。GRPC 提供了这种机制,它允许为每个计算过程添加回调,实现全异步编程。下节讲述的future就可以我们的业务编程异步化、事件化。


 future简介

future提供了一套高效便捷的非阻塞并行操作管理方案。其基本思想很简单,所谓Future,指的是一类占位符对象,用于指代某些尚未完成的计算的结果。一般来说,由Future指代的计算都是并行执行的,计算完毕后可另行获取相关计算结果。以这种方式组织并行任务,便可以写出高效、异步、非阻塞的并行代码。

  所谓Future,是一种用于指代某个尚未就绪的值的对象。而这个值,往往是某个计算过程的结果

- 若该计算过程尚未完成,我们就说该Future未就位;

- 若该计算过程正常结束,或中途抛出异常,我们就说该Future已就位

- Future的一个重要属性在于它只能被赋值一次。一旦给定了某个值或某个异常,future对象就变成了不可变对象——无法再被改写

 使用场景

- 远程rpc调用

- 异步task,主线程无需等待

- 阻塞IO,disk reading、mysql等

 future实现

twitter-future

> 1.Finagle是Twitter基于Netty开发的支持容错的、协议无关的RPC框架,该框架支撑了Twitter的核心服务。

> 2.基于scala编写,并自己实现一套future框架。该future对java使用同时提供了支持

scala-future

> 1.在finagle一年之后发布,具有类似twitter future类似语法

> 2.应用于scala本身,对java兼容性不好



 基本语法

future构建

1. 构建一个异步future

1      long startTime = System.currentTimeMillis();

2      Future<String> future = FuturePools.unboundedPool().apply(() -> {

3        sleep(1);

4        return "sleep 1";

5      }).onSuccess(v->{

6          printTimeUse("success return", startTime);

7          return BoxedUnit.UNIT;

8      }).onFailure(t->{

9          t.printStackTrace();

10          return BoxedUnit.UNIT;

11      });;

12      sleep(5);

13      printTimeUse(startTime);

在主进程起一个future,该future会异步进行处理。同时可以为该future添加回调函数。

onSuccess(v-{}) 在future执行完成后进行调用;onFailure(Throwable t->{}) 在执行过程中发生异常会执行改回调函数,其中t是执行中抛出的异常。

2. 构建一个同步future,阻塞主进程

1  Future<String> future2 = Future.apply(() -> {

2          sleep(3);

3          return "sleep 3";

4      });

因为编程中难免会有阻塞操作,所以要避免此类方式生成future。

3. 并发执行future,互相无影响,不影响主线程


1      Future<String> future1 = FuturePools.unboundedPool().apply(() -> {

2          sleep(2);

3          System.out.println("return future1");

4          return "suc";

5      });

6      Future<String> future2 = FuturePools.unboundedPool().apply(() -> {

7          sleep(1);

8          System.out.println("return future2");

9          return "suc";

10      });

11    //do something else


可以理解为向线程池提交了任务,但是没有执行get操作。

 future组合

future通过map,flatmap将不同的future连接。可以很的组成了一个执行链条,这也是future如此流行的基础。

1. future通过map映射为一个新的future,新future运行依赖原有future


1  public Future<String> getA() {

2        System.out.println("get A from testDaoRx begin");

3        Future<Integer> future = testDaoRx.getA();

4        return future.map(x->{

5            System.out.println("get A from testDaoRx sucss");

6            return String.valueOf(x) + " Str";

7        });

8    }

其中getA方法获取一个future,返回值是int。调用该future的map方法会返回另外一个新的future。return的类型也就是新future的类型,这个例子将getA返回的int转换成了String并返回。也就是getA的返回类型。

```

graph RL

future1:int-->future2:String

```

2. 多future并行执行,全部完成之后返回新的future。

方式1:使用join


1    public Future<String> getSumAB(){

2        System.out.println("get sumAB begin ");

3        Future<Integer> futureA = testDaoRx.getA();

4        Future<Integer> futureB = testDaoRx.getB();

5        return futureA.join(futureB).map(x->{

6            System.out.println("get a " + x._1 + "  get b " + x._2);

7            return "sum:" + (x._1 + x._2);

8        });

9    }



graph RL

future1:int-->future3:String

future2:int-->future3:String


方式2:使用collect


1    public Future<String> getSumABCollect() {

2        System.out.println("get sumAB collect begin ");

3        Future<Integer> futureA = testDaoRx.getA();

4        Future<Integer> futureB = testDaoRx.getB();

5        Future<List<Integer>> listFuture = Future.collect(Lists.newArrayList(futureA, futureB));

6        return listFuture.map(x->{

7            System.out.println("get list" + x);

8            return "sum:" + (x.stream().reduce((sum, n) -> sum + n).orElse(0));

9        });

10  }


其中collect方式要求子future返回类型相同

3. future串行执行,futureA执行之后再执行futureB


1    public Future<String> getSumABSeq() throws Exception {

2        System.out.println("get sumAB begin ");

3        Future<Integer> futureA = testDaoRx.getA();

4        Future<String> futureStr = futureA.flatMap(x -> {

5          return testDaoRx.getB().map(y -> {

6                return x + y;

7            });

8        }).map(sum->{

9            return "str:"+sum;

10      });

11      return futureStr;

12  }



graph RL

future1:int-->future2:int

future2:int-->future3:String

```

flatMap之后返回的类型是Future<Integer>,之后使用map转换结果类型成String。下面是lambda简写方式:


1    public Future<String> getSumABSeqSimplified() throws Exception {

2        System.out.println("get sumAB begin ");

3        Future<Integer> futureA = testDaoRx.getA();

4        return futureA.flatMap(x -> testDaoRx.getB().map(y -> x + y)).map(sum-> "str:"+sum);

5    }


>通过并行和串行两种组织方式就可组织成适用于复杂业务场景的调用链

```

graph RL

F_db1-->F_bus1

F_db2-->F_bus1

F_rpc1-->F_bus2

F_bus1-->F_bus2

F_rpc3-->F_rpc2

F_rpc2-->F_bus2

F_bus2-->F_Service1

```

 future使用

1. block等待结果

    future支持同步等待future运行结果。针对一个同步返回的rpc接口,使用future首先要构建调用链条。然后同步等待结果返回。


1    public void testSumAB() throws Exception {

2        Long startTime = System.currentTimeMillis();

3        Future<String> futureStr = testBizRx.getSumAB();

4  //        Future<String> futureStr = testBizRx.getSumABCollect();

5        String str = Await.result(futureStr);

6        System.out.println(str + " timeUse " + (System.currentTimeMillis() - startTime) / 1000 + "s");

7

8    }


2. block等待,增加超时限制



1    public void testGetAWithTimeoutAwait() throws Exception {

2        Long startTime = System.currentTimeMillis();

3        Future<String> futureStr = testBizRx.getA();

4

5        String str = Await.result(futureStr, Duration.fromMilliseconds(3000));

6

7        System.out.println(str + " timeUse " + (System.currentTimeMillis() - startTime) / 1000 + "s");

8

9    }


使用Await方式设置超时时间,Duration.fromMilliseconds(3000) 代表3s超时

3. Future单独设置超时限制

将普通函数调用包装成future时进行超时的指定

方式1:使用Promise,超时之后返回null但是不抛出异常


1    public <R> Future<R> get(Function0<R> function, int milliseconds) {

2        Future<R> future = futurePool.getWrapperFuturePool().apply(function);

3        Promise<R> p = new Promise<>();

4        future.within(timer, Duration.fromMilliseconds(milliseconds))

5              .onSuccess(v->{

6                  p.setValue(v);

7                  return BoxedUnit.UNIT;

8              })

9              .onFailure(v1 -> {

10                p.setValue(null);

11                return BoxedUnit.UNIT;

12      });

13

14      return p;

15  }


方式2:超时后返回timeoutException


1    public <R> Future<R> getWithException(Function0<R> function, int milliseconds) {

2        Future<R> future = futurePool.getWrapperFuturePool().apply(function);

3        return future.within(timer, Duration.fromMilliseconds(milliseconds));

4    }


4. Future方法内部抛出异常


1    public Future<Integer> getException() {

2        if(hasError)

3            return Future.exception(new RuntimeException(" throw exception"));

4        //something else

5        ......

6    }


#### 参考资料

>  [scala-future SIP-14](https://docs.scala-lang.org/sips/completed/futures-promises.html)

>  [finagle future](https://twitter.github.io/finagle/guide/Futures.html#futures)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352