Spring Reactor parallel并发与线程切换 实战

第一篇:Spring Reactor 操作符详解
第二篇:Spring Reactor map与flatMap操作符 详解

这一篇讲Reactor 的并发处理,线程切换,背压

parallel 操作符

描述:

  • 并行处理操作符,和我们CPU核数有关,核数越大,并行处理的线程数越多;
  • 需要注意的是,如果我们设置的并行数大于2*核数,其他线程不会被使用;
    • 从下面输出结果能看出,由于我电脑是4核8线程的,并行处理是8,即使我设置了10个线程,也不会进行线程之间切换,所以内容8和9,等上面线程处理完成之后才来处理的;
Flux.create(new Consumer<FluxSink<Integer>>() {
       @Override
       public void accept(FluxSink<Integer> fluxSink) {
           for (int i = 0; i < 10; i++) {
               fluxSink.next(i);
           }
           fluxSink.complete();
       }
   }).parallel(10).runOn(Schedulers.parallel()).doOnNext(new Consumer<Integer>() {
       @Override
       public void accept(Integer integer) {
           System.out.println(Thread.currentThread().getName()+"==内容"+integer);
           try {
               TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }).sequential().blockLast();
   // 输出结果
parallel-1==内容0
parallel-3==内容2
parallel-2==内容1
parallel-5==内容4
parallel-6==内容5
parallel-4==内容3
parallel-7==内容6
parallel-8==内容7
// 隔了两秒之后输出
parallel-1==内容8
parallel-2==内容9

parallel并行处理设置类型:

  1. Schedulers.parallel() :创建CPU内核数量一样多线程池;
  2. Schedulers.single():可重用单个线程
  3. Schedulers.elastic() 无限制的弹性线程池,可以一直创建线程
  4. Schedulers.boundedElastic() 有界的弹性线程池,它会回收闲置的线程,默认是60s;它对创建的线程数做了限制,默认值为CPU内核数x 10,达到上限后,最多可提交10万个任务;
  5. Schedulers.fromExecutorService() 根据我们自定义线程池进行引用;

需要注意

  • 在使用parallel操作符时,需要结合runOn一起使用,才能并行处理;
  • 之后parallel操作符之后的处理操作符才会,进入多线程并行处理,parallel之前的处理逻辑还是在之前的主线程中处理;
    • 我上面的例子中,Flux.create 在parallel操作符之前,所有发射事件操作都是在main主线程中完成;
    • 后面的doOnNext操作处在多线并行处理中,如果doOnNext之后还有操作,也是在多线程并行处理中处理;

subscribeOn

描述:
主要做线程的切换,让我们的事件处理进入指定线程池或者线程中处理;

Flux.create(new Consumer<FluxSink<Integer>>() {
      @Override
      public void accept(FluxSink<Integer> fluxSink) {
          for (int i = 0; i < 2; i++) {
              System.out.println(Thread.currentThread().getName()+"==发射="+i);
              fluxSink.next(i);
          }

          fluxSink.complete();
      }
  }).subscribeOn(Schedulers.single()).doOnNext(new Consumer<Integer>() {
      @Override
      public void accept(Integer integer) {
          System.out.println(Thread.currentThread().getName()+"==内容"+integer);
          
      }
  }).blockLast();
// 输出内容
single-1==发射=0
single-1==内容0
single-1==发射=1
single-1==内容1
  • subscribOn : 指定指定调度器,它会让整个事件处理流程进入切换线程处理;
  • 需要注意的是:不管怎么切换调度器,他都是单线程执行,需要多线程执行必须使用parallel+runOn

sequential()

描述:
如果在并行处理后想要恢复为“正常” 顺序流处理其余操作符链,则可以使用 sequential();

blockLast

描述:
阻塞IO流的操作,就是等多线程执行完成之后,才会返回;
block(),blockFirst()和blockLast()

publishOn

描述:
上游产生的事件往下游传递时,经过publishOn操作,publishOn会将下游的工作线程切换到指定调度器的线程处理;

Flux.create(new Consumer<FluxSink<Integer>>() {
      @Override
      public void accept(FluxSink<Integer> fluxSink) {
          for (int i = 0; i < 2; i++) {
              System.out.println(Thread.currentThread().getName()+"==发射="+i);
              fluxSink.next(i);
          }

          fluxSink.complete();
      }
  }).subscribeOn(Schedulers.single()).doOnNext(new Consumer<Integer>() {
      @Override
      public void accept(Integer integer) {
          System.out.println(Thread.currentThread().getName()+"==内容"+integer);
          
      }
  }).blockLast();
// 输出内容
main==发射=0
single-1==内容0
main==发射=1
single-1==内容1
  • publishOn : 它只会切换publishOn之后的工作线程,之前的发射线程它会不切换;
  • publishOn 在一条链路处理中可以多次使用
Flux.just("1","2")
      .publishOn(scheduler1)
      .map(i -> transform(i))
      .publishOn(scheduler2)
      .doOnNext(i -> processNext(i))
      .subscribe();

背压

onBackpressureBuffer()

描述:
我还是以生产香皂流水线生产来举例子:上游流水线生产香皂,下游流水线给香皂包上盒子;上游流水线一分钟流过来一个香皂,下游接受到一个香皂之后,给这个香皂包装上盒子,这一分钟就完事了,如果你只用了10秒就完成了,你可以休息50秒;上游的生产加速了,一分钟流过程10个香皂,下游这里一分钟最多包装6个盒子,导致4个香皂堆积;如果持续这样流过来,导致下游大量堆积,下游崩溃,甚至累死;
那我们如何处理呢
背压的出现就是为了解决这个问题;

Flux.create(new Consumer<FluxSink<Integer>>() {
       @Override
       public void accept(FluxSink<Integer> fluxSink) {
           // 无限次发送
           while (true){
               fluxSink.next(new Random().nextInt(100));
           }
       }
   }, FluxSink.OverflowStrategy.BUFFER).map(info->String.valueOf(info))
           .subscribe(new Consumer<String>() {
       @Override
       public void accept(String s) {
           // 下游处理
           try {
               TimeUnit.MILLISECONDS.sleep(100);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println(s);
       }
   });

我们在发射事件的第二个参数,就是背压策略;当上游生产事件超过下游处理事件的阈值时,我们采取的措施:

  • OverflowStrategy.IGNORE:忽略下游的处理能力,上游该怎么发还是怎么发,不采取任何措施;

  • OverflowStrategy.ERROR : 当下游无法跟上上游时,抛异常

  • OverflowStrategy.DROP:丢失上游发送的事件,下游接受了6个香皂之后,后面4个香皂丢掉

  • OverflowStrategy.LATEST:下游一直获取最新的事件,丢失老事件

  • OverflowStrategy.BUFFER:用一个大桶先装着,下游从桶里取,如果大桶也满了,就会报OOM异常;


    图片描述
  • 当我们往漏斗里倒水时,漏斗的流速是均匀的;

  • 下游的处理也是均匀的,保证下游的安全;

我们接着看下这个例子:

Flux.create(new Consumer<FluxSink<Integer>>() {
            @Override
            public void accept(FluxSink<Integer> fluxSink) {
                for (int i = 0; i < 100; i++) {
                    fluxSink.next(i);
                }
                fluxSink.complete();
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("处理:"+integer);
            }
            @Override
            public void onError(Throwable t) {
                System.out.println("错误");
            }
            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });
        // 输出结果
处理:0
处理:1
处理:2
处理:3
处理:4
处理:5
处理:6
处理:7
处理:8
处理:9
  • 从事件生成来看,我们发生了100事件,但是下游只处理了10个事件;
  • 其实在s.request(10);中做了处理,下游只请求了10个事件,所以上游即使发生再多的也是不会被处理的;
  • 下游可以根据自己的处理能力去上游获取事件;这是不是保护了下游,也就是起到了背压的效果了;
  • 那上游没有处理的90个事件哪里去了呢。其实reactor内部有一个集合保存了这90个事件,只是我们没有去取,如果上游生产事件太多,内部集合保存不下,也是会报OOM异常的;

我们能不能上游生产多少,下游就进行处理,处理完成之后,上游才再生产呢,而不是上游一次性全部生产?

我们在上游发射器FluxSink对象内部找了这样方法:

  • long requestedFromDownstream(); 返回是当前下游还有多少个事件未处理完成;
  • requestedFromDownstream()返回0,表示下游事件处理完成了

在onSubscribe方法返回一个Subscription订阅者对象,这个对象相当于上游与下游连接的开关总闸;

  • s.cancel(); 取消与上游的连接,事件就流不到下游
  • s.request(10); 与上游连接之后,从上游获取10个事件


    图片描述

我们将Subscription保存出去,在下游处理完成事件之后,上游在生产10个事件,下游在去请求10个事件;


Subscription sp;
    @Test
    public void back(){
        Flux.create(new Consumer<FluxSink<Integer>>() {
            @Override
            public void accept(FluxSink<Integer> fluxSink) {
                //初始10个事件
                for (int i = 0; i < 10; i++) {
                    fluxSink.next(i);
                }
                AtomicInteger data = new AtomicInteger(0);
                // 下游处理完成之后再生产
                while (true){
                    if(fluxSink.requestedFromDownstream() == 0){
                        System.out.println("====================下游处理完成"+fluxSink.requestedFromDownstream());
                        for (int i = 0; i < 6; i++) {
                            System.out.println("上游====发射:"+data.incrementAndGet());
                            fluxSink.next(data.get());
                        }
                        sp.request(6);
                    }
                }
            }
        }, FluxSink.OverflowStrategy.BUFFER).map(info->String.valueOf(info))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(10);
                        sp = s;
                    }
                    @Override
                    public void onNext(String s) {
                        // 下游处理
                        try {
                            TimeUnit.MILLISECONDS.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("下游====处理:"+s);
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("完成");
                    }
                });
    }
    // 输出结果
====================下游处理完成0
上游====发射:1
上游====发射:2
上游====发射:3
上游====发射:4
上游====发射:5
上游====发射:6
下游====处理:1
下游====处理:2
下游====处理:3
下游====处理:4
下游====处理:5
下游====处理:6
====================下游处理完成0
。。。
  • 从结果来看,上游生产,下游消费,等下游处理完之后,上游才继续生产;只有就起到了背压的效果,保护了下游,同时不会造成上游不停的差生事件;

如果有需要,后期在进行源码的讲解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容