Reactor 基本流程分析

    Scheduler subscribeOn = Schedulers.newSingle("thread 1");
    Scheduler publishOn = Schedulers.newSingle("thread 2");

    Consumer<FluxSink<String>> eventEmitter = sink -> sink.next("12");

    Function<String, Integer> intValueConverter = value -> {
        Util.printlnThreadName(" map ");
        return Integer.valueOf(value);
    };

    Consumer<Integer> subscriber = value -> {
        Util.printlnThreadName(" subscriber ");
        System.out.println("the final subscriber value = " + value);
    };

// 主要的执行流程!
    Flux.create(eventEmitter)         // 1. FluxCreate
            .subscribeOn(subscribeOn) // 2. FluxSubscribeOn
            .map(intValueConverter)   // 3. FluxMap
            .publishOn(publishOn)     // 4. FluxPublishOn
            .subscribe(subscriber);   // 5. 执行流程
    
    LockSupport.park();

上面的例子中的几个操作,是我们在 使用 Reactor 中最常用的操作!

我们将会对上面的例子,进行简单的源码分析,目的是了解 Reactor 每个步骤的大体调用流程!有很多细节暂不详细介绍!

基本创建流程分析

我们从 Flux.create 方法开始分析。
第一步,我们使用 Flux.create 静态方法,创建了一个对象,这个对象是 FluxCreate(它是 Flux的子类)
第二步,我们使用 subscribeOn 操作符,指定了事件发生器的运行线程为 subscribeOn,我们得到的是FluxSubscribeOn(它是 Flux的子类)
第三步,我们使用 map 操作符,将next结果进行转换,我们得到的是FluxMap(它是Flux的子类)
第四步,我们使用 publishOn 操作符,指定事件监听者的运行线程为 publishOn , 我们得到的是 FluxPublishOn (它也是 Flux的子类)
最后一步,我们调用 subscribe 方法,提供事件消费者,并且开始流程!

在第5步,执行之前的设置步骤,都是我们预定的蓝图,都是计划!直到第5步执行的时候,这个流程才会启动,由自定义的 事件发射器发射事件监听者获取数据并消费

subscribe 方法

这个方法是 每个 Publisher 接口的子类,都必须实现的一个方法,表示给事件发生者,提供一个事件的监听者

我们是从 FluxPublishOn 类的对象调用进入这个方法的!这个方法由父类 Flux 提供的不可复写的方法!

Flux#subscribe.jpg

内部经过多个方法的重载,最终调用的是,这个有4个参数的方法!

// 4个参数的方法,分别是 onNext 的消费者 , onError 的消费者,onComplete 的消费者,以及 onSubscription 方法的消费者
public final Disposable subscribe(
            @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer,
            @Nullable Consumer<? super Subscription> subscriptionConsumer) {
        // 可以看到,最终将几个 消费者,合并成为了一个 LambdaSubscriber 对象
        // 也可以看做,用 LambdaSubscriber 包装了这几个消费者
        return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
                completeConsumer,
                subscriptionConsumer));
    }

上面的 LambdaSubscriberCoreSubscriber 的子类(Subscriber的子类),就是一个 Subscriber!它的主要功能就是包装,我们提供的消费者并且提供一些简单的错误机制(比如next发生了未捕获的异常,就默认去触发 error 事件)

我们最终会 调用 Flux 的抽象方法 subscribe(CoreSubscriber<? super T> actual) 注意这里的 入参是 CoreSubscriber类型的对象!

这个方法在 FluxPublishOn 中被复写!

FluxPublishOn 中的 subscribe

FluxPublishOn对象是我们通过在 Flux对象上调用 publish 方法创建的!

首先,我们回到它的创建过程:我们通过 上述的 第四步 创建这个对象!
最终创建它的方法是在 FluxpublishOn 重载方法中

final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
        if (this instanceof Callable) {
            // ... 
        }
        // 此处的 prefetch 和 lowTide 默认都是 256 
        return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
}

它的构造方法为:

FluxPublishOn(Flux<? extends T> source,
            Scheduler scheduler,
            boolean delayError,
            int prefetch,
            int lowTide,
            Supplier<? extends Queue<T>> queueSupplier) {
        // 它的上游就是之前的 Flux 链 ,它的下游是 传入的 Subscriber 
        super(source);
        if (prefetch <= 0) {
            throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
        }
        this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
        this.delayError = delayError;
        this.prefetch = prefetch;
        this.lowTide = lowTide;
        this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
    }

它的 subscribe 方法写的很简单,主要的作用,就是使用 PublishOnSubscriber 包装了传入的 Subscriber

public void subscribe(CoreSubscriber<? super T> actual) {
        Worker worker;
        try {
          // 从 scheduler 中创建 worker
            worker = Objects.requireNonNull(scheduler.createWorker(),
                    "The scheduler returned a null worker");
        }
        catch (Throwable e) {
            Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
            return;
        }
// ...
// 使用 PublishOnSubscriber 包装下 监听者 actual 
source.subscribe(new PublishOnSubscriber<>(actual,
                scheduler,
                worker,
                delayError,
                prefetch,
                lowTide,
                queueSupplier));
    }

我们知道,publishOn 操作符的最大的作用就是切换监听者方法调用的运行线程!所以在 PublishOnSubscriber 其实主要的作用就是 用一个指定的线程去 call 监听者 actual 的方法!实现线程的切换功能!实际的操作比较的复杂,暂不分析!

我们发现 FluxPublishOn 的功能很简单,就是对 监听者进行了一次包装,它的主要功能实现在 PublishOnSubscriber 这个包装的监听者中!

subscribe 方法的最后一步,还是调用了 source.subscribe,执行的流程再次进入 subscribe 方法中,此时我们的 source 是 FluxMap 类型的对象

FluxMap 的 subscribe 方法

这个对象对应于 map 操作符的功能,和上面的 FluxPublishOn 类似,他们的功能都是 作用于下游!它的核心功能实现也是包装一个监听者 Subscriber实现的!

@Override
    @SuppressWarnings("unchecked")
    public void subscribe(CoreSubscriber<? super R> actual) {
       // ...
        source.subscribe(new MapSubscriber<>(actual, mapper));
    }

最后一步同样调用了上游的 subscribe方法!它的上游对象是 FluxSubscribeOn

FluxSubscribeOn 的 subscribe

我们的调用链路到了 FluxSubscribeOnsubscribe
这个对象对应于 subscribeOn 操作符的作用,主要是 指定事件发生器的运行线程
它作用于它的上游 source,在一个新的线程中调度 source 的方法!

@Override
    @SuppressWarnings("unchecked")
    public void subscribe(CoreSubscriber<? super T> actual) {
        Worker worker;
        
        try {
        // 创建 运行的 Work
            worker = Objects.requireNonNull(scheduler.createWorker(),
                    "The scheduler returned a null Function");
        } catch (Throwable e) {
            Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
            return;
        }
        // 包装 source 和 actual 
        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
                actual, worker, requestOnSeparateThread);
        
        // 在这里调用了下游的 onSubscribe ,是创建蓝图的线程中!而不是 onSubscribe 的线程中
        actual.onSubscribe(parent);

        try {
          // parent 实现了 Runnable 接口
            worker.schedule(parent);
        }
        catch (RejectedExecutionException ree) {
            if (parent.s != Operators.cancelledSubscription()) {
                actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
                        actual.currentContext()));
            }
        }
    }

后面就是等待 Worker 运行 SubscribeOnSubscriber 这个 Runnable
它的 run 方法是

@Override
public void run() {
    THREAD.lazySet(this, Thread.currentThread());
    // 调用 Source 的 subscribe 方法!注意此时的运行线程是 我们指定的 Worker里!
    // 所以实现了在指定的线程中调度 上游
    source.subscribe(this);
}

在 run 中调用了上游的 subscribe ,此时的上游对象是 FluxCreate

FluxCreate 的 subscribe

这里是顶层的事件发射层!

顶层没有上层的 Source 了,顶层负责创建 BaseSink ,它是Subscription的子类!表示一次关联的关系

这里再次将 监听者actual 包装为 BaseSink

BaseSink 它可以表示一个关联关系,它实现了 Subscription 接口!
BaseSink 也可以表示下游的监听者( 因为他包装了 actual ),后面我们的事件发射都会走过 BaseSink !它的作用就是实现一些背压策略(默认的是 BUFFER 所有的元素)

@Override
public void subscribe(CoreSubscriber<? super T> actual) {

// 创建 Sink ,可以使用它发送事件给 下游的 actual,它有自己的背压策略
    BaseSink<T> sink = createSink(actual, backpressure);
// 调用下游的 onSubscribe 方法,注意此处的运行线程是我们 subscribeOn 提供的线程
    actual.onSubscribe(sink);
    try {
// 调用我们 初始化 时候设置的 sink 的消费者
// 进行事件的发射!(事件发射,只需要调用 sink 的 相应的方法即可,在内部 sink 会向底层的 actual 发射事件!)
// 是否用一个 SerializedSink 包装一下 Sink 
        source.accept(
                createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
                        sink);
    }
    catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        sink.error(Operators.onOperatorError(ex, actual.currentContext()));
    }
}

上面的代码中有选择是否使用 SerializedSink 包装 sink, 两者的区别就是:

  1. 普通的BaseSink 是不能并发插入的,会出现消息丢失的情况!它只适用于,一个生产者,一个消费者的场景!
  2. SerializedSink 是可以并发插入的,它适用于多个生产者,一个消费者的场景。

如果存在 并发的发射事件的情况,那么就需要使用 SerializedSink 包装。

总结

从上面我们看到,每一个操作符其实都是对上游或者下游的一次包装,subscribeOn 操作符对上游起作用,所以他包装了上游,指定上游的运行环境!map/publishOn 操作符对下游起作用,所以他们包装了下游!

启动过程的核心方法是 Publisher 接口的 subscribe 方法!这个方法的执行过程,从下游一直贯穿到顶层!期间操作符的对上游或者下游的包装,都是在这个方法的执行过程中,完成的!

onSubscribe 方法

这个方法是,事件发送之前就必须调用的方法,这个方法是来自于 Reactive-Stream 规范的,用于获取下游需要的数据量。想要从上游获取数据,下游必须调用 Subscription#request(long) 来表明自己需要多少数据!

只有这个方法被调用完毕,以后,数据才会被发射出来!如果 下游 没有 request 数据,那么数据就不会下放到下游!这个方法的调用是在,构建蓝图的线程中执行的!

onSubscribe 都是在构建蓝图的线程中,调用的!并且这个方法,仅仅会调用一次!

需要注意的是:为了让 onSubscribe 不在我们指定的 subscribeOn 或者 publishOn 线程中执行,而是在构建蓝图的线程中执行的!

onSubscribe 最开始的执行,是在 FluxSubscribeOn 类的 subscribe 方法中

FluxSubscribeOn中调用onSubscribe

我们知道这个类的作用就是,切换事件发射器的运行线程,但是我们的目的是让 onSuscribe 方法在构建的线程运行,所以我们在 subscribe 方法调用时就调用下游的 onSubscribe 方法!此时传入的 Subscription 是一个 SubscribeOnSubscriber 对象!

// 我们构造的包装类对象!它包装了上游和下游,并且实现了 Subscription 接口
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
                actual, worker, requestOnSeparateThread);
        actual.onSubscribe(parent);

此处的 actual 是个 MapSubscriber 对象

MapSubscriber 的 onSubscribe 方法

@Override
public void onSubscribe(Subscription s) {
    if (Operators.validate(this.s, s)) {
        this.s = s;
    // 直接调用 下游的 onSubscribe 方法
        actual.onSubscribe(this);
    }
}

PublishOnSubscriber 的 onSubscribe 方法

@Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.s, s)) {
                this.s = s;
            // ...
                queue = queueSupplier.get();
                // 调用下游的 onSubscribe 方法
                // 需要注意的是,这个方法的运行,是在构建蓝图的线程中
                actual.onSubscribe(this);
                // 这里进行了预取!默认是预取 256个元素
                s.request(Operators.unboundedOrPrefetch(prefetch));
            }
    }

此处的 actual 很明显就是 我们的 LambdaSubscriber

LambdaSubscriber 的 onSubscribe

我们上面的例子,没有传入 subscriptionConsumer ,所以我们默认 requestLong.MAX_VALUE ,当然我们也可以自定义自己的 subscriptionConsumer

@Override
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
    this.subscription = s;
    if (subscriptionConsumer != null) {
        try {
            subscriptionConsumer.accept(s);
        }
        catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            s.cancel();
            onError(t);
        }
    }
    else {
        s.request(Long.MAX_VALUE);
    }
}
}

事件发射的流程

事件的发射,由我们初始化时候提供的 Consumer<FluxSink<String>> 对象完成!
我们这里很简单,只是使用 sink.next 发送了一个 字符串 "12"

Consumer<FluxSink<String>> eventEmitter = sink -> sink.next("12");

此处的 sink 是什么呢?是我们上面分析过程最后的产物 一个 SerializedSink 类的对象。我们事件发送的过程是,从上向下,依次执行的!

SerializedSink 的 next

@Override
public FluxSink<T> next(T t) {
    Objects.requireNonNull(t, "t is null in sink.next(t)");
    // 如果下游的已经是处于 结束的状态,那么直接就 返回
    if (sink.isCancelled() || done) {
        Operators.onNextDropped(t, sink.currentContext());
        return this;
    }
    // 每次只能一个 线程能够进入到下面的代码段
    if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
        try {
        // 因为这个 sink 内部的 Queue是一个单生产者,单消费者的模型,所以,要从外部限制进入的个数
            sink.next(t);
        }
        catch (Throwable ex) {
            Operators.onOperatorError(sink, ex, t, sink.currentContext());
        }
        if (WIP.decrementAndGet(this) == 0) {
            return this;
        }
    }
    else {
       // 如果 不能够直接调用 sink 的 next 的,先放在 queue 中等待
        this.mpscQueue.offer(t);
        // 如果 WIP 不为 0 ,那么 说明当前 至少有一个线程正在 发送数据
        // 那么我们只用 WIP 记下个数
        if (WIP.getAndIncrement(this) != 0) {
            return this;
        }
    }
    // 这个方法,死循环从 队列中取出元素执行 sink.next
    // 这个方法,只有在 WIP 为0 的时候,才会退出
    // 我们可以分析得出,每次调用这个轮询方法的,有且只会有一个线程
    // 所以它是在单线程的 执行 sink.next
    drainLoop();
    return this;
}

这里在不断的调用 sink.next 消费数据,这里的 Sink 是什么对象呢?
没错就是我们之前包装的,BaseSink类的对象(其实是 BufferAsyncSink 因为我们选择的是缓冲全部的背压策略),BaseSink可以理解成一个适配器,为了将 FluxSink 的功能和下游的 Subscriber 进行适配,抽象出来的类

BufferAsyncSink 的 next

这个方法的 next方法比较的简单。

只是简单的,将元素放入 queue 中,然后调用 drain 方法,消费数据即可!

之所以能写的这么简单,主要是这个方法的next不会被并发的调用

@Override
public FluxSink<T> next(T t) {
    queue.offer(t);
    // 消费的主要方法 调用 包装的监听者的 onNext 方法!
    drain();
    return this;
}

被包装的监听者 actual对象是我们之前构造 BaseSink 传入的监听者,它是从 FluxSubscribeOn被传入的,是一个 SubscribeOnSubscriber类的对象,这个对象的 onNext 很简单,就是直接调用的 被包装的监听者的 onNext

@Override
// 因为这个 对象的主要作用是,修改 Source 的运行环境,对下层操作,并没有什么改变
// 但是需要注意的是,这个方法的调用的线程还是我们提供的上层的运行线程,
// 因为我们是通过上层回调这个方法的
public void onNext(T t) {
    actual.onNext(t);
}

这里的 actual 实际上 FluxMap 传入的监听者,是一个 MapSubscriber 类的对象

MapSubscriber onNext 方法

@Override
public void onNext(T t) {
// 如果已经结束,就直接返回
    if (done) {
        Operators.onNextDropped(t, actual.currentContext());
        return;
    }

    R v;

// 调用 mapper 转化方法,此处 try 了异常
    try {
        v = Objects.requireNonNull(mapper.apply(t),
                "The mapper returned a null value.");
    }
    // 如果失败,直接会执行 onError 方法
    catch (Throwable e) {
        Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
        if (e_ != null) {
            onError(e_);
        }
        else {
            s.request(1);
        }
        return;
    }
    // 如果没有失败,直接回调下层的 actual 监听者
    actual.onNext(v);
}

此处的 actual 方法是 FluxPublishOn 传入的监听者,是一个 PublishOnSubscriber 对象

FluxPublishOn 的 onNext 方法

@Override
public void onNext(T t) {
    // ...
    if (done) {
        Operators.onNextDropped(t, actual.currentContext());
        return;
    }
    // 首先向 预期队列总插入
    // 预期队列是有界队列的,如果插入失败,则表示有太多的元素没有被消费了
    if (!queue.offer(t)) {
    // 太多元素没有被消费,这个返回失败!
        error = Operators.onOperatorError(s,
Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL),
                t, actual.currentContext());
        done = true;
    }
    // 在我们指定的线程中 调度下游的
    trySchedule(this, null, t);
}

我们都知道 publishOn 操作符的作用是 指定监听者的运行线程!所以它只会作用于 这里包装的 下游 actual

这里的 actual 实际上就是一个 LambdaSubscriber 类的对象,它内部包装了,我们传入的事件消费者

LambdaSubscriber 的 onNext

@Override
public final void onNext(T x) {
    try {
        if (consumer != null) {
            consumer.accept(x);
        }
    }
    catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        this.subscription.cancel();
        onError(t);
    }
}

这个方法很简单,就是简单的 try 调用 我们传入的 consumer 方法,如果发生错误,直接停止流。

总结

我们简单分析了,Reactor调用的过程,我们可以发现,Reactor中最常见的就是 代理模式,每个操作符,为了实现自己的功能,要么对上游的调用进行了包装,要么对下游的调用进行了包装!

我们最常用的切换运行线程的功能的实现,其实也很简单,我们通过指定的 Scheduler,然后创建 Worker,然后在指定的 Worker 中调用我们的方法即可!这里还涉及到 SchedulerWorker 之间的关系,我们后面再解释。

Reactive中事件的发送,从上游开始,一步步的走过,我们创建的蓝图,直到被我们的消费者消费!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350