java9发布以来一直备受关注的是带来最大变化的模块化。模块化可以说是java9的核心思想。前面也大概介绍了java9的模块化。这次来介绍下java9的另一个新特性,异步非阻塞的反向控制流。
反向控制流
下面就是Java9的反向控制流的大概处理过程。
Flow介绍
java9中提供了java.util.concurrent.Flow类,里面有很多内部接口类。比如:
- Publisher 发布者。
- Subscriber 订阅者。
- Subscription 处理发布者/订阅者的关系类。
- Processor 包含了发布者&订阅者。
|方法名|参数|描述|
|--------|:--:|:------------:|----|
|Publisher.subscribe|Subscriber|给当前发布者绑定一个订阅者。|
|Subscriber.onSubscribe|Subscription|当订阅者订阅了一个发布者的时候,需要执行的后续操作。
|Subscriber.onNext|Object|订阅者主动向发布者请求数据后,发布者把数据发布出来,然后Subscription按次将数据丢到onNext中。订阅者这时可以按照自己的业务处理数据。
|Subscriber.onError|Throwable|当订阅者请求的数据长度<=0时,或者其它异常的时候,Subscription会调用onError方法,订阅者需要进行错误处理。
|Subscriber.onComplete|-|在数据处理完成后执行onComplete方法,订阅者可以进行完成的后续动作。
|Subscription.request|long|通过主动发起请求数据的方式,获取N个数据在当前的订阅中,并且顺次的调用订阅者的onNext方法。其中N是控制数据流的流速。如果传入Long.MAX_VALUE则表示无限制的流速。发布者将会一次把所有的数据都取到当次订阅Subscription中,由Subscription通知到订阅者。
|Subscription.cancle|-|取消接受消息,该方法不会触发onComplete或者onError|
SubmissionPublisher
java9已经实现了一套发布,即SubmissionPublisher,我们只需要实现自己的订阅业务逻辑即可。demo代码如下:
public class MySubscribe<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(T item) {
System.out.println("onNext get: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done.");
}
public void start(int n){
subscription.request(n);
}
}
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MySubscribe<String> subscribe = new MySubscribe<>();
publisher.subscribe(subscribe);//产生发布订阅关系
List.of("1","2","3").forEach(publisher::submit);//将数据添加到发布者中。
subscribe.start(1);//手动开始订阅。
//需要关闭publisher,否则无论如何都不会执行onComplete方法
//关闭不分先后,关闭后仍然可以获取数据,但是不能往发布者中submit数据了。
publisher.close();
//由于是异步非阻塞的,这边把线程睡眠2s等待异步执行完成。
Thread.sleep(2000);
}
最后执行的结果如下:
onNext get: 1
onNext get: 2
onNext get: 3
Done.
上面示例是手动开始请求数据,也可以在产生订阅关系的时候就开始请求获取数据。在onSubscribe中添加subscription.request(1);
。
Processor
如果遇到多个数据处理流的话,就可以用Processor。它即使一个发布者也是一个订阅者(它订阅上一个发布者,并且可以作为下一个订阅者的发布者)。下面分别是一个类型转换处理和偶数过滤器:
public class EvenFilterProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Integer, Integer>{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(item % 2 == 0) {
submit(item);//将数据提交到下一个发布者中。
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
//do something
}
@Override
public void onComplete() {
close();//注意多个发布订阅链需要在完成的时候调用。通知下一个发布者完成。
}
}
public class StringToIntegerProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<String, Integer>{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
submit(Integer.valueOf(item));
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
//do something
}
@Override
public void onComplete() {
close();
}
}
具体调用方法:
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
StringToIntegerProcessor convert = new StringToIntegerProcessor();
publisher.subscribe(convert);
EvenFilterProcessor filter = new EvenFilterProcessor();
convert.subscribe(filter);
MySubscribe<Integer> subscribe = new MySubscribe<>();
filter.subscribe(subscribe);
List.of("1","2","3","4","5","6","8").forEach(publisher::submit);
publisher.close();
subscribe.start(1);
Thread.sleep(2000);
}
输出结果:
onNext get: 2
onNext get: 4
onNext get: 6
onNext get: 8
Done.
小结
java9的这个新特性使代码写起来更简洁,更易懂,而且维护起来更方便。至于性能方面,这里面没有去测试,但是是异步非阻塞的线程,应该性能不错。