RxJava之subscribeOn详细解析

本小节主要是我在学习了 subscribeOn 源码之后的记录,方便日后查阅。

本人能力有限,若内容有错误之处,请指出,谢谢。

示例代码

  • 示例代码,下面这段代码很简单,就是发送两个事件,并且指定在子线程中发送。接下来分析这个过程实现。
//定义事件源
Observable observable = Observable.create(new ObservableOnSubscribe<I
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Except
        //发送两个事件
        e.onNext(1);
        e.onNext(2);
    }
})
.subscribeOn(Schedulers.io());//将发送事件切换到子线程中去执行
Observer observer1 = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        //获取连接器 Disposable 对象
        Log.e("zeal", "onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        //接受事件
        Log.e("zeal", "onNext=" + integer);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
};
//observer1 订阅 observable 事件源
observable.subscribe(observer1);
  • 执行结果
    onSubscribe
    onNext=1
    onNext=2

开始分析

  • 从 subscribe 开始
  • observable.subscribe(observer1) 这段代码中,我们首先要明白这个 observable 对象实际上指的是哪个对象?我们使用 subscribeOn 内部会创建一个 Observable 的子类并返回,这样才能实现 rxjava 引以为傲的链式调用。那么 subscribeOn 返回的就是 ObservableSubscribeOn 对象,而我们之前分析过 Observable 的 subscribe 方法,我们知道最终它会调用 subscribeActual 方法,而这个方法是抽象的,因此我们去关注 ObservableSubscribeOn 的 subscribeActual 方法。
  • rxjava 中的 observer 是一层一层的往上包装的,也就是每一个 Observable 的 subscribeActual 方法内部都会将 observer 参数再进一层包装。例如下面就将 s 作为构造传入 SubscribeOnObserver 。
@Override
public void subscribeActual(final Observer<? super T> s) {
   //将 s 作为构造传入 SubscribeOnObserver 
  //SubscribeOnObserver 它即是 observer 类型,也是 disposable 类型。
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
 //将连接器parent传递给当前 s.onSubscribe ,s 就是我们刚才创建的 observer1 对象。
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}
  • s.onSubscribe(parent);

将连接器 parent 传递给当前 s.onSubscribe ,s 就是我们刚才创建的 observer1 对象。
因此 observer1 中的 onSubscribe 方法会被调用也就是这个日志被打印Log.e("zeal", "onSubscribe");

  • scheduler 是什么?

scheduler 在之前的章节已经分析过了,这里简单的提一下, 通过
Schedulers.io() 获取到的 scheduler 就是 IoScheduler 对象。

  • IoScheduler

这个类的注释还是有必要提一下,在下面的分析中会用到,它表示能在池中创建和缓存一些线程,并且在必要的时候可以重用这些线程。这个下面再具体分析。

/**
 * Scheduler that creates and caches a set of thread pools and reuses them if possible.
 */
public final class IoScheduler extends Scheduler {
  • scheduler.scheduleDirect(runnable)

Schedules the given task on this scheduler non-delayed execution.
在当前的调度器中去执行一个给定的任务 runnable。
从下面的 Runnable 的核心的 run 方法可以知道,它是负责去告知 parent 这个 observer 去订阅上一层的 Observable 这个事件源。

new Runnable() {
      @Override
      public void run() {
         source.subscribe(parent);
     }
}
  • SubscribeOnObserver.onSubscribe 方法内部实现

该方法内部通过 setOnce 给对象的 s 变量赋值为 d,而这个 d 就是上一级 ObservableCreate.subscribeActual 内部的 CreateEmitter 对象。也就是在 SubscribeOnObserver 内部保存有上一级的连接器,这样就可以控制事件的发送了。

@Override
public void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this.s, d);
}

  • parent.setDisposable(Disposable)

scheduler.schedulerdDirect(run) 会返回一个 Disposable 对象。并且会设置给 parent 对象。注意:也就是之后使用 parent.get() 就不会为 null 了,换句话说 parent 是作为 Disposable 传递给我们在外部创建的订阅者 observer1 的 onSubscribe(disposable) 的。

void setDisposable(Disposable d) {
         //给当前对象 SubscribeOnObserver 设置为 d
         DisposableHelper.setOnce(this, d);
}
  • DisposableHelper.setOnce(this, d);
  • 内部使用的 ActomicRefrence ,它可以保证对对象的操作是原子性的。
  • DisposableHelper.setOnce 中的第一个参数是 ActomicReference<Disposable>,而 SubscribeOnObserver 就是继承了 ActomicReference 。
  • 因为这里没有给初始值,因此这里传入给 setOnce 的 this 就是 null,因此它会被赋值为 scheduler.scheduleDirect 返回的 Disposable 对象。
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
    ObjectHelper.requireNonNull(d, "d is null");
    //判断 field 是否为 null,若是为null 那么就将 field 设置为 d,并且返回 true,否则返回false。
    if (!field.compareAndSet(null, d)) {
        d.dispose();
        if (field.get() != DISPOSED) {
            reportDisposableSet();
        }
        return false;
    }
    return true;
}
  • SubscribeOnObserver.dispose()

外部调用 dispose 方法的话,会通知上一级的 CreateEmitter 不要再发送事件,同时也会告诉当前类的 worker 不要去执行这个任务了。

@Override
public void dispose() {
    //s 指向的是上一级 ObservableCreate 连接器。关闭上一级的连接
    DisposableHelper.dispose(s);
    //这个 dispose(this) 表示的是通过 scheduler.scheduleDirect 返回的 Dispose.dispose() 操作。
    DisposableHelper.dispose(this);
}
  • scheduler.scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

   //创建一个 worker 
  final Worker w = createWorker();

  //包装 run 对象
  final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

  DisposeTask task = new DisposeTask(decoratedRun, w);

  //3.由 worker 去执行这个任务
  w.schedule(task, delay, unit);

  return task;
}
  • IoScheduler.createWorker

这个方法实在 Scheduler 子类中实现的,看下面的代码出现一个 pool,我们猜测它可能就是用于缓存线程的池子。其内部返回的就是一个 EventLoopWorker 对象,这个对象封装了一个 pool 这个的池子。

@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
  • EventLoopWorker 父类 Worker

它是 IoScheduler 内部的静态类。从类注释来看它是在单个线程或事件循环上执行动作的顺序调度器。简单来讲就用于执行任务的。

/**
 * Sequential Scheduler for executing actions on a single thread or event loop.
 * <p>
 * Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
 */
public abstract static class Worker implements Disposable 
  • pool 是什么?

下面是对 pool 赋值的代码,给 pool 执行线程工厂,存活时间,单位。

final AtomicReference<CachedWorkerPool> pool;

//在 start 方法给该 pool 赋值
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
    update.shutdown();
}
  • CachedWorkerPool

上面讲的 pool 内部维护的就是 CachedWorkerPool 类。它内部有一个比较重要的属性 expiringWorkerQueue,它是一个队列,内部维护的元素是 ThreadWorker ,这个类就是真正去干活的类。这里先暂时不讨论,待会再分析。

  • pool.get()

上面的** "pool 是什么?"**这一小点中贴出 pool 赋值的代码,因此 pool.get() 获取到的对象就是 update ,它是 CachedWorkerPool 类型的。

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
  • new EventLoopWorker(pool.get());

在 EventLoopWoker 构造中传入一个 CachedWorkerPool 对象,给对应的属性赋值,这里要关注一个 this.threadWorker = pool.get();获取的。注意这的 pool 是CachedWorkerPool 类型的,上面的 pool 是 AtomicReference<CachedWorkerPool> 类型的,主要要区分开。

EventLoopWorker(CachedWorkerPool pool) {
    this.pool = pool;
    this.tasks = new CompositeDisposable();
    this.threadWorker = pool.get();
}
  • pool.get()

在 EventLoopWorker 构造方法内部调用了 pool.get() 方法。这个 get() 是 CachedWorkerPool 内部一个方法,它的功能就是从 expiringWorkerQueue 队列中去获取一个 ThreadWork 对象,当队列中没有找到,就创建一个新的 ThreadWorker 对象。注意这里并没有马上将其添加到 expiringWorkerQueue 队列中,至于为什么,待会再分析。

ThreadWorker get() {
    if (allWorkers.isDisposed()) {
        return SHUTDOWN_THREAD_WORKER;
    }
   //判断队列是否为空
    while (!expiringWorkerQueue.isEmpty()) {
        //从队列中取出一个 ThreadWorker 
        ThreadWorker threadWorker = expiringWorkerQueue.poll();
        if (threadWorker != null) {
            return threadWorker;
        }
    }
    //没有在队列里找到可用的 ThreadWorker 那么就创建一个。
    // No cached worker found, so create a new one.
    ThreadWorker w = new ThreadWorker(threadFactory);
    allWorkers.add(w);
    return w;
}

至此 scheduleDirect 方法内部成功通过 createWorker() 获取到对应的 Worker 对象。

  • DisposeTask

在 scheduler.scheduleDirect 方法内部会创建一个 DisposeTask 对象,DisposeTask task = new DisposeTask(decoratedRun, w);内部封装了 worker 和 要执行的任务 runnable 。关注一下内部实现可以发现该类是一个 Runnbale 和 Disposable 的实现类。在 run 方法内部真正去执行任务 decorateRun 的功能。这里为什么要封装 worker 呢?在上面分析了 Worker 的功能,它就是用执行任务的,因为实现了 Disposable 接口,因此具备暂停任务的功能。因为内部若是调用了 dispose 方法的话,就会回调 EventLoopWorker 方法的 dispose() 方法,停止任务。

static final class DisposeTask implements Runnable, Disposable {
    final Runnable decoratedRun;
    final Worker w;
    Thread runner;
    DisposeTask(Runnable decoratedRun, Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }
    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            //真正任务 decoratedRun 是在 run 方法中被执行。
            decoratedRun.run();
        } finally {
          //任务执行完毕之后调用关闭连接器。
            dispose();
            runner = null;
        }
    }
    @Override
    public void dispose() {
        if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
            ((NewThreadWorker)w).shutdown();
        } else {
            //这里也会通过 w.dispose() 进行关闭 worker 的执行
            w.dispose();
        }
    }
    @Override
    public boolean isDisposed() {
        return w.isDisposed();
    }
}
  • EvenLoopWorker.dispose() 方法内部实现

上面分析了在 DisposeTask 内部为什么要封装了对应的 Worker 对象,因为在 DisposeTask 内部的 decoreRun 执行完毕之后,告诉 worker ,这样就可以通过 pool.release(threadWorker) 将 threadWorker添加到对应的 expiringWorkerQueue 队列中。这里就是上面说的在 expiringWorkerQueue 没有找对应的 ThreadWorker的情况下,会创建了 ThreadWorker 之后,但却没有立即添加到 expiringWorkerQueue 队列的原因了,只有执行完任务的 ThreadWorker 才能添加到队列中去。

@Override
public void dispose() {
    if (once.compareAndSet(false, true)) {
        tasks.dispose();
        // releasing the pool should be the last action
        //释放一个 threadWorker ,将其添加到 expiringWorkerQueue 队列中
        pool.release(threadWorker);
    }
}

void release(ThreadWorker threadWorker) {
    // Refresh expire time before putting worker back in pool
    threadWorker.setExpirationTime(now() + keepAliveTime);
   //添加到 expiringWorkerQueue 队列中
    expiringWorkerQueue.offer(threadWorker);
}
  • 开始执行任务:w.schedule(runnable)

分析完 DisposeWorker 之后现在就开始分析 worker 是怎么去执行任务了。
w 表示对应的 EventLoopWorker 对象。w.schedule(runnable
) 表示去执行的是一个任务 runnable,下面分析一下内部是怎么执行的?

@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
   //真正去执行任务的就是 threadWorker 
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
  • threadWorker.scheduleActual(action, delayTime, unit, tasks)

threadWorker 是继承至 NewThreadWorker,最后通过 executor.submit 去执行这个任务。

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
   
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
     //包装
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }
    Future<?> f;
    try {
        //线程池去执行这个任务。
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

CachedWorkerPool

当一个事件源被多个订阅者订阅的情况, rxjava 内部是怎么处理的?

//伪代码
Observable observable
//两个订阅者
Observer observer1,observer2
//分别订阅 observable
obserable.subscribe(observer1);
obserable.subscribe(observer2);
  • subscribeActual 方法会被调用两次

核心代码就是scheduler.scheduleDirect 会被执行两次,就是通知 observable 去往两个 observer 去发送事件。

@Override
public void subscribeActual(final Observer<? super T> s) {
   //将 s 作为构造传入 SubscribeOnObserver 
  //SubscribeOnObserver 它即是 observer 类型,也是 disposable 类型。
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
 //将连接器parent传递给当前 s.onSubscribe ,s 就是我们刚才创建的 observer1 对象。
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}
  • IoScheduler.EventLoopWorker 对象被创建两次

IoScheduler.EventLoopWorker 对象被创建两次,但是由于 subscribeOn(Schedulers.io() ) 只是调用了一次,因此 IoScheduler 对象只有一个,也就是说内部的 CachedWorkerPool 只有一个。当多个任务需要执行时,会创建两个 ThreadWorker 去执行,因为两个任务会在不同的线程去执行,因此就会有快有慢之分,任务完成之后,就会释放这个 ThreadWorker ,并且将其添加到 CachedWorkerPool 的 expiringWorkerQueue 队列中,那么下次有其他订阅者订阅该 Observable 时,就可以直接从该队列中取出 ThreadWorker 。这就是对 IoScheduler 类的注释的验证。

@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

线程池

在 NewThreadWorker 中内部有一个 executor ,它就是线程池,关注这个线程池的特点,它是通过 SchedulerPoolFactory 去创建的。

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
    volatile boolean disposed;
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    ...
}
  • SchedulerPoolFactory.create

该方法负责去创建一个 ScheduledExecutorService 线程池。内部的核心线程数为 1,最大线程数 maximumPoolSize 为 Integer.MAX_VALUE。也就是说若是通过同时有多个线程去订阅该事件源,那么会创建多个线程。

public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    if (exec instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
        POOLS.put(e, exec);
    }
    return exec;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容