声明:
一、subscribe方法
注意:所有的操作只有在订阅的那一刻才开始进行!!!
subscribe方法有两种常用的形式:
-
Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
这四个参数的意义如下:
1)consumer
:收到一个数据时的回调
2)errorConsumer
:上游报告出现错误信号时的回调
3)completeConsumer
:上游报告完成信号时的回调
4)subscriptionConsumer
:提供一个Subscription
类型的对象,你可以使用他对上游流量进行反馈控制
Disposable
返回类型:使用Disposable#dispose
方法可以取消订阅行为,在Flux
和Mono
中,取消意味着告知数据源不再生产数据,但这种取消行为并不一定是及时的,也许数据源产生数据非常快,在接收到取消信号之前就完成所有数据的生成。
Disposable
有一个工具类Disposables
,它主要提供了Disposable.Swap
和Disposable.Composite
两种包装类的工厂方法,前者允许你对一个Disposable
进行替换(不取消容器中当前的Disposable
,仅作替换)以及更新(取消容器中当前的Disposable
,并替换为新的Dsiposable
)操作,后者允许你一次性控制多个Disposable
-
void subscribe(Subscriber<? super T> actual)
Subscriber
是一个订阅器,里面包含了onSubscribe
、onNext
、onError
、onError
四种方法,使用它们可以更加方便的控制订阅的操作。当然实际中使用的还是BaseSubscriber
,这个类继承了Subscriber
,但是它是抽象的,无法直接实例化。使用时直接使用匿名类进行实例化就好,这样可以很好避免一个BaseSubscriber
实例同时作为两个不同订阅操作的订阅器所带来的异常,因为根据Reactive Stream
规则来说,一个订阅器中的onNext
方法只能被时序调用,而不同同时调用。
你可以看下面这个例子:
public class CustomSubsriber<T> extends BaseSubscriber<T> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("subscribed");
request(1);
}
@Override
protected void hookOnNext(T value) {
System.out.println("get value:" + value);
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("completed");
}
}
二、背压——流控
在Reactor
中,下游想要控制上游的流速,是通过request
来实现的,request
的总数代表着下游当前的需求量,比如subscriber.request(1)
就代表下游需要一个数据,多了不要。如果你把request
的值设为Long.MAX_VALUE
,则它意味着下游可以接收无限制的数据,比如Mono#block
、Flux#blockFirst
、Flux#blockLast
就默认无限制接收数据。对了,这三个方法也是一种订阅操作,使用它们会激活整个订阅过程,Mono#block
和Flux#blockFirst
均表示接收流中第一个数据并返回,如果在等待中接收到了完成信号则返回null,同理Flux#blockLast
表示只接收最后一个数据,其会一直等待完成信号的到来。
值得注意的是:request
并不一定恒定的,它可能会被整个上游中的某个操作修改,比如stringFlux.buffer(3).subscriber(null,null,null,sub -> {sub.request(2)})
中,buffer(3)
会将request
进一步修改为6,因为sub.request(2)
的2个请求是请求buffer(3)
的输出结果的,而一个buffer(3)
结果需要其上游的3个数据,故request
被更改为了6
-
limitRequest
也是一种流控操作,它接收一个参数作为“最大请求量”,如果下游总的请求量并没有到达上限,那么一切照常,否则,在到达那一刻时,改操作将会给上游发出取消信号,并给下游发出完成信号 -
limitRate
将请求分组,比如下游请求100个request(100)
,那么limitRate(10)
将会把请求分为10个request(10)
,每完成一个request(10)
则自动发出下一个request(10)
三、异步
在Reactor
中,如果不特别指定异步操作的话,那么整个流的发生到订阅过程全部会执行在subscribe
那个线程中。所以最简单的异步使用Reactor
的方法就是新建一个线程,并在其中执行subscribe
,比如:
public static void main(String[] args) throws InterruptedException {
final Mono<String> mono = Mono.just("hello ");
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
)
t.start();
t.join();
}
其结果为:
hello thread Thread-0
当然Reactor
其实提供更加简便的异步操作方式,其中比较常用的就是publishOn
和SubscribeOn
两个方法了,这两个方法都需要一个Scheduler
类型的参数,它控制着操作的执行模式以及执行执行位置,单从表现上来看,倒是有点像ExecutorService
。创建Scheduler
你需要使用到Schedulers
工厂类,里面定义了许多不同类型的Scheduler
:
1)Schedulers.immediate()
:直接在当前线程中立刻执行
2)Schedulers.single()/newSingle()
:提供一个单线程线程池以供操作,前者是一个固定的定义好的单线程线程池,后者你可以使用它来创建新的单线程线程池
3)Schedulers.elastic()
:相当于提供了一个CachedThreadPool
4)Schedulers.parallel()
:相当于提供了一个和Cpu核心数一样多的核心线程数的线程池
5)Schedulers.fromExecutorService()
:从现有的ExecutorService
中引入
-
subscribeOn
subscribeOn
会将整个流(包括数据源生成)放置在指定线程上执行(具体由哪个线程执行由前面所说的Scheduler
来控制),注意,无论subscribeOn
放在哪,它都将影响整个流,可以有多个subscribeOn
,但是只有第一个会生效 -
publishOn
publishOn
会将其后的操作搬移到指定线程上执行,注意,publishOn
优先级比subscribeOn
高,而且多个publishOn
都各自对其后的操作会有影响。
看个例子:
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> {
System.out.println("map1:"+Thread.currentThread().getName());
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
return 10 + i;
})
.subscribeOn(s)
.map(i -> {
System.out.println("map2:"+Thread.currentThread().getName());
LockSupport.parkNanos(Duration.ofSeconds(2).toNanos());
return "value " + i;
}).publishOn(Schedulers.single())
.map(i -> {
System.out.println("map3:"+Thread.currentThread().getName());
return "mtk:"+i;
});
new Thread(() -> flux.subscribe(System.out::println)).start();
结果如下:
map1:parallel-scheduler-1
map2:parallel-scheduler-1
map1:parallel-scheduler-1
map3:single-1
mtk:value 11
map2:parallel-scheduler-1
map3:single-1
mtk:value 12
从结果中我们可以发现整个流在没有运行在
subscribe
方法调用时所在的线程中,因为有subscribeOn
的缘故,整个流运行在parallel-scheduler-1
线程中,但是在map3操作前,有个publishOn
,其使得其后的操作运行在了single-1
线程中
参考文档:
[1] Reactor api doc
[2] Reactor reference doc