JAVA9 初探(三)

java9发布以来一直备受关注的是带来最大变化的模块化。模块化可以说是java9的核心思想。前面也大概介绍了java9的模块化。这次来介绍下java9的另一个新特性,异步非阻塞的反向控制流。

反向控制流

下面就是Java9的反向控制流的大概处理过程。

反向控制

Flow介绍

java9中提供了java.util.concurrent.Flow类,里面有很多内部接口类。比如:

  • Publisher 发布者。
  • Subscriber 订阅者。
  • Subscription 处理发布者/订阅者的关系类。
  • Processor 包含了发布者&订阅者。

|方法名|参数|描述|
|--------|:--:|:------------:|----|
|Publisher.subscribe|Subscriber|给当前发布者绑定一个订阅者。|
|Subscriber.onSubscribe|Subscription|当订阅者订阅了一个发布者的时候,需要执行的后续操作。
|Subscriber.onNext|Object|订阅者主动向发布者请求数据后,发布者把数据发布出来,然后Subscription按次将数据丢到onNext中。订阅者这时可以按照自己的业务处理数据。
|Subscriber.onError|Throwable|当订阅者请求的数据长度<=0时,或者其它异常的时候,Subscription会调用onError方法,订阅者需要进行错误处理。
|Subscriber.onComplete|-|在数据处理完成后执行onComplete方法,订阅者可以进行完成的后续动作。
|Subscription.request|long|通过主动发起请求数据的方式,获取N个数据在当前的订阅中,并且顺次的调用订阅者的onNext方法。其中N是控制数据流的流速。如果传入Long.MAX_VALUE则表示无限制的流速。发布者将会一次把所有的数据都取到当次订阅Subscription中,由Subscription通知到订阅者。
|Subscription.cancle|-|取消接受消息,该方法不会触发onComplete或者onError|

SubmissionPublisher

java9已经实现了一套发布,即SubmissionPublisher,我们只需要实现自己的订阅业务逻辑即可。demo代码如下:

public class MySubscribe<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(T item) {
        System.out.println("onNext get: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done.");
    }

    public void start(int n){
        subscription.request(n);
    }
}

public static void main(String[] args) throws InterruptedException {
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    MySubscribe<String> subscribe = new MySubscribe<>();
    publisher.subscribe(subscribe);//产生发布订阅关系
    List.of("1","2","3").forEach(publisher::submit);//将数据添加到发布者中。
    subscribe.start(1);//手动开始订阅。
    //需要关闭publisher,否则无论如何都不会执行onComplete方法
    //关闭不分先后,关闭后仍然可以获取数据,但是不能往发布者中submit数据了。
    publisher.close();
    //由于是异步非阻塞的,这边把线程睡眠2s等待异步执行完成。
    Thread.sleep(2000);
}

最后执行的结果如下:

onNext get: 1
onNext get: 2
onNext get: 3
Done.

上面示例是手动开始请求数据,也可以在产生订阅关系的时候就开始请求获取数据。在onSubscribe中添加subscription.request(1);

Processor

如果遇到多个数据处理流的话,就可以用Processor。它即使一个发布者也是一个订阅者(它订阅上一个发布者,并且可以作为下一个订阅者的发布者)。下面分别是一个类型转换处理和偶数过滤器:

public class EvenFilterProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Integer, Integer>{

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        if(item % 2 == 0) {
            submit(item);//将数据提交到下一个发布者中。
        }
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        //do something
    }

    @Override
    public void onComplete() {
        close();//注意多个发布订阅链需要在完成的时候调用。通知下一个发布者完成。
    }
}

public class StringToIntegerProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<String, Integer>{

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        submit(Integer.valueOf(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        //do something
    }

    @Override
    public void onComplete() {
        close();
    }
}

具体调用方法:

public static void main(String[] args) throws InterruptedException {
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    StringToIntegerProcessor convert = new StringToIntegerProcessor();
    publisher.subscribe(convert);
    EvenFilterProcessor filter = new EvenFilterProcessor();
    convert.subscribe(filter);
    MySubscribe<Integer> subscribe = new MySubscribe<>();
    filter.subscribe(subscribe);
    List.of("1","2","3","4","5","6","8").forEach(publisher::submit);
    publisher.close();
    subscribe.start(1);
    Thread.sleep(2000);
}

输出结果:

onNext get: 2
onNext get: 4
onNext get: 6
onNext get: 8
Done.

小结

java9的这个新特性使代码写起来更简洁,更易懂,而且维护起来更方便。至于性能方面,这里面没有去测试,但是是异步非阻塞的线程,应该性能不错。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容