Reactor学习:二、生产者

声明:


一、Flux和Mono

Flux<T>Mono<T>Reactor中非常重要的两个生产者,它们都继承了Publisher<T>FluxMono都代表一个响应式序列,但是不同的是,Flux代表0—N个元素,Mono代表0—1个元素。其实称Mono为一个序列并不准确,称之为“结果”可能会更好。试想web中的应用场景,一个请求必然只生成一个响应,所以使用Mono<HttpResponse>比使用Flux<HttpResponse>语义上更为确切。


二、生产序列的方法
  • Flux#just(T...)Mono#just(T)Mono#justOrEmpty(T)
    just方法就是显式指定序列,类似Stream#of
  • Flux#from(Publisher)Flux#fromArrayFlux#fromIterableFlux#rangeFlux#fromStream(Supplier<Stream>)
    从Publisher、数组、集合、数据范围、流中生产序列
  • Mono#from(Publisher)Mono#fromSupplierMono#fromRunnableMono#fromCallableMono#fromFuture
    从其他地方获取结果,其中Mono#from(Publisher)是截取Publisher第一个元素
  • emptyerrornever
    1)empty指的是一个直接完成的序列
    2)error指的是一个直接报错的序列
    3)never值的是一个不做任何事情的序列,比如发出数据、报告完成、报告错误等等
  • Flux#generate
    Flux#generate是一个同步的简易的程序化生成序列的方法,你需要提供一个初始值以及一个控制sink的回调方法,其完整参数序列是generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
    多说无益,看段代码就懂了
Flux<String> stringFlux = Flux.generate(() -> 0 , (state,sink) -> {
    sink.next("str"+state);
    //sink.next("str"+state);
    LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
    if(state++ == 10) sink.complete();
    return state;
},System.out::println);

这段代码的作用生成一个从"str0"一直到"str10"的序列,且每间隔一秒生成一个。在sink#complete方法被调用之前或者出现异常之前,该回调方法(即第二个参数BiFunction那一段)会被一直调用。需要注意的是sink#next在一次回调中只能被调用一次,否则将会报错,导致序列生成的中止。第三个参数可以不要,其作用是在中止时(包括异常带来的中止或者Disposable#dispose取消任务带来的中止)将当前状态(也就是state)进行一定的操作,代码中是直接将其打印了出来

  • Flux#create
    Flux#create比之Flux#generate要高级一点,其并不需要一个初始值,而且它支持多线程同时触发回调,因此,与监听机制结合在一起会得到意想不到的效果。
Flux<String> stringFlux = Flux.create(sink -> {
    //onRequest在每次接收到request的时候都会被调用
    sink.onRequest(n -> {
        System.out.println("on request:"+n);
    }).onCancel(() -> {
        System.out.println("cancel");
        //onDispose在完成、错误、取消时候调用,取消的优先级低于onCancel
    }).onDispose(() -> {
        System.out.println("dispose");
    });
    myEventContext.addMyEventListener(new SinkEventListener() {
        @Override
        public void onCompleteEvent(SinkCompleteEvent event) {
            sink.complete();
        }

        @Override
        public void onNextEvent(SinkNextEvent event) {
            sink.next(event.getMessage());
        }
    });
}, FluxSink.OverflowStrategy.DROP);

在这段代码中每当有一个SinkNextEvent事件触发时,就会执行一次sink#next,当触发了onCompleteEvent事件时,就会执行sink#complete。第二个参数FluxSink.OverflowStrategy.xxx指定了背压(backpressure,即下游对上游的反馈控制,避免爆发“洪水”)的参数,背压参数有下列几种:
IGNORE 忽视反馈,我行我素,可能会抛出IllegalStateException
ERROR 超出下游接收能力时,抛出IllegalStateException
DROP 超出下游接收能力时(下游不准备接收时),丢弃掉上游数据
LATEST 不抛弃掉上游数据,但是只取最近的数据
BUFFER 默认值,缓存所有的超量数据,要注意超出内存限制的风险
注意:不论选择哪个,在下游第一次发出请求之前,上游的所有数据均会被丢弃。
额外需要注意的是,在sink#complete发生之后,如果还在使用sink#next来添加数据,该数据会被自动丢弃,并打印出类似[DEBUG] (pool-1-thread-1) onNextDropped: 0.7328237206959655的debug信息

  • handle
    该方法存在于Flux以及Mono中,比较特殊,相当于一个过滤机制,它可以承接上一个序列生成方法,将其生成的数据进行过滤后再输出,它使用的sinkSynchronousSink这表示sink#next在一次回调中只能被执行一次。具体示例代码如下:
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).handle((item , sink) -> {
    if(item % 2 != 0) sink.next(item);
});

该段代码会生成1、3、5...的奇数序列,每隔2秒生成一次。item指的是上游发出的数据


参考文档:
[1] Reactor api doc
[2] Reactor reference doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353