响应式流处理为Java提供了一个通用的API接口。响应式流处理API有4个接口需要理解,分别是Publisher,Subscriber,Subscription和Processor。
Publisher:提供无边界的数据序列,可以依据订阅者要求发布给订阅者。
public interface Publisher {
public void subscribe(Subscriber s);
}
Subscriber:订阅数据和获得消息,需要实现订阅通知,数据通知,出错处理,完成处理。
public interface Subscriber {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription:代表一个订阅,绑定订阅者和发布者,订阅者可以对订阅做控制(请求发送和取消)。
public interface Subscription {
public void request(long n);
public void cancel();
}
Processor:处理器,即是个订阅者又是个发布者,内部对数据进行处理后再次发布。
public interface Processor extends Subscriber, Publisher {
}
响应流标准已经有一些实现遵循或兼容标准,例如下面的一些库和框架:
- RxJava2.0,
- Reactor 3.0(需要Java 8),
- Akka Streams,
- Ratpack内置库(也可用RxJava或Reactor替代),
- Vert.x内置库(也可用RxJava替代),
- Spring Framework 5(内置Reactor 3.0)
Java 8中响应流接口是独立的包,可以用Maven获得。
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.0</version>
</dependency>
Java 9中已经内置的响应流接口,在Flow包下。Flow包还没有提供对背压的支持,如果需要,可以使用RxJava包(2.0以上)。
没有响应流标准以前,响应库不可以兼容,有了响应流标准,很容易使用和切换不同的响应流功能。例如MongoDB提供了符合标准的响应流驱动 ,你可以用最新的Reactor或RxJava消费来自MongoDB的数据。
参考资料:
https://dzone.com/articles/what-are-reactive-streams-in-java
https://community.oracle.com/docs/DOC-1006738