本小节主要是我在学习了 subscribeOn 源码之后的记录,方便日后查阅。
本人能力有限,若内容有错误之处,请指出,谢谢。
示例代码
- 示例代码,下面这段代码很简单,就是发送两个事件,并且指定在子线程中发送。接下来分析这个过程实现。
//定义事件源
Observable observable = Observable.create(new ObservableOnSubscribe<I
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Except
//发送两个事件
e.onNext(1);
e.onNext(2);
}
})
.subscribeOn(Schedulers.io());//将发送事件切换到子线程中去执行
Observer observer1 = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
//获取连接器 Disposable 对象
Log.e("zeal", "onSubscribe");
}
@Override
public void onNext(Integer integer) {
//接受事件
Log.e("zeal", "onNext=" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//observer1 订阅 observable 事件源
observable.subscribe(observer1);
- 执行结果
onSubscribe
onNext=1
onNext=2
开始分析
- 从 subscribe 开始
- observable.subscribe(observer1) 这段代码中,我们首先要明白这个 observable 对象实际上指的是哪个对象?我们使用 subscribeOn 内部会创建一个 Observable 的子类并返回,这样才能实现 rxjava 引以为傲的链式调用。那么 subscribeOn 返回的就是 ObservableSubscribeOn 对象,而我们之前分析过 Observable 的 subscribe 方法,我们知道最终它会调用 subscribeActual 方法,而这个方法是抽象的,因此我们去关注 ObservableSubscribeOn 的 subscribeActual 方法。
- rxjava 中的 observer 是一层一层的往上包装的,也就是每一个 Observable 的 subscribeActual 方法内部都会将 observer 参数再进一层包装。例如下面就将 s 作为构造传入 SubscribeOnObserver 。
@Override
public void subscribeActual(final Observer<? super T> s) {
//将 s 作为构造传入 SubscribeOnObserver
//SubscribeOnObserver 它即是 observer 类型,也是 disposable 类型。
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//将连接器parent传递给当前 s.onSubscribe ,s 就是我们刚才创建的 observer1 对象。
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
- s.onSubscribe(parent);
将连接器 parent 传递给当前 s.onSubscribe ,s 就是我们刚才创建的 observer1 对象。
因此 observer1 中的 onSubscribe 方法会被调用也就是这个日志被打印Log.e("zeal", "onSubscribe");
- scheduler 是什么?
scheduler 在之前的章节已经分析过了,这里简单的提一下, 通过
Schedulers.io() 获取到的 scheduler 就是 IoScheduler 对象。
- IoScheduler
这个类的注释还是有必要提一下,在下面的分析中会用到,它表示能在池中创建和缓存一些线程,并且在必要的时候可以重用这些线程。这个下面再具体分析。
/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
public final class IoScheduler extends Scheduler {
- scheduler.scheduleDirect(runnable)
Schedules the given task on this scheduler non-delayed execution.
在当前的调度器中去执行一个给定的任务 runnable。
从下面的 Runnable 的核心的 run 方法可以知道,它是负责去告知 parent 这个 observer 去订阅上一层的 Observable 这个事件源。
new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}
- SubscribeOnObserver.onSubscribe 方法内部实现
该方法内部通过 setOnce 给对象的 s 变量赋值为 d,而这个 d 就是上一级 ObservableCreate.subscribeActual 内部的 CreateEmitter 对象。也就是在 SubscribeOnObserver 内部保存有上一级的连接器,这样就可以控制事件的发送了。
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.s, d);
}
- parent.setDisposable(Disposable)
scheduler.schedulerdDirect(run) 会返回一个 Disposable 对象。并且会设置给 parent 对象。注意:也就是之后使用 parent.get() 就不会为 null 了,换句话说 parent 是作为 Disposable 传递给我们在外部创建的订阅者 observer1 的 onSubscribe(disposable) 的。
void setDisposable(Disposable d) {
//给当前对象 SubscribeOnObserver 设置为 d
DisposableHelper.setOnce(this, d);
}
- DisposableHelper.setOnce(this, d);
- 内部使用的 ActomicRefrence ,它可以保证对对象的操作是原子性的。
- DisposableHelper.setOnce 中的第一个参数是 ActomicReference<Disposable>,而 SubscribeOnObserver 就是继承了 ActomicReference 。
- 因为这里没有给初始值,因此这里传入给 setOnce 的 this 就是 null,因此它会被赋值为 scheduler.scheduleDirect 返回的 Disposable 对象。
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
//判断 field 是否为 null,若是为null 那么就将 field 设置为 d,并且返回 true,否则返回false。
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
- SubscribeOnObserver.dispose()
外部调用 dispose 方法的话,会通知上一级的 CreateEmitter 不要再发送事件,同时也会告诉当前类的 worker 不要去执行这个任务了。
@Override
public void dispose() {
//s 指向的是上一级 ObservableCreate 连接器。关闭上一级的连接
DisposableHelper.dispose(s);
//这个 dispose(this) 表示的是通过 scheduler.scheduleDirect 返回的 Dispose.dispose() 操作。
DisposableHelper.dispose(this);
}
- scheduler.scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建一个 worker
final Worker w = createWorker();
//包装 run 对象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//3.由 worker 去执行这个任务
w.schedule(task, delay, unit);
return task;
}
- IoScheduler.createWorker
这个方法实在 Scheduler 子类中实现的,看下面的代码出现一个 pool,我们猜测它可能就是用于缓存线程的池子。其内部返回的就是一个 EventLoopWorker 对象,这个对象封装了一个 pool 这个的池子。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
- EventLoopWorker 父类 Worker
它是 IoScheduler 内部的静态类。从类注释来看它是在单个线程或事件循环上执行动作的顺序调度器。简单来讲就用于执行任务的。
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
*/
public abstract static class Worker implements Disposable
- pool 是什么?
下面是对 pool 赋值的代码,给 pool 执行线程工厂,存活时间,单位。
final AtomicReference<CachedWorkerPool> pool;
//在 start 方法给该 pool 赋值
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
- CachedWorkerPool
上面讲的 pool 内部维护的就是 CachedWorkerPool 类。它内部有一个比较重要的属性 expiringWorkerQueue,它是一个队列,内部维护的元素是 ThreadWorker ,这个类就是真正去干活的类。这里先暂时不讨论,待会再分析。
- pool.get()
上面的** "pool 是什么?"**这一小点中贴出 pool 赋值的代码,因此 pool.get() 获取到的对象就是 update ,它是 CachedWorkerPool 类型的。
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
- new EventLoopWorker(pool.get());
在 EventLoopWoker 构造中传入一个 CachedWorkerPool 对象,给对应的属性赋值,这里要关注一个 this.threadWorker = pool.get();获取的。注意这的 pool 是CachedWorkerPool 类型的,上面的 pool 是 AtomicReference<CachedWorkerPool> 类型的,主要要区分开。
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
- pool.get()
在 EventLoopWorker 构造方法内部调用了 pool.get() 方法。这个 get() 是 CachedWorkerPool 内部一个方法,它的功能就是从 expiringWorkerQueue 队列中去获取一个 ThreadWork 对象,当队列中没有找到,就创建一个新的 ThreadWorker 对象。注意这里并没有马上将其添加到 expiringWorkerQueue 队列中,至于为什么,待会再分析。
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
//判断队列是否为空
while (!expiringWorkerQueue.isEmpty()) {
//从队列中取出一个 ThreadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//没有在队列里找到可用的 ThreadWorker 那么就创建一个。
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
至此 scheduleDirect 方法内部成功通过 createWorker() 获取到对应的 Worker 对象。
- DisposeTask
在 scheduler.scheduleDirect 方法内部会创建一个 DisposeTask 对象,DisposeTask task = new DisposeTask(decoratedRun, w);内部封装了 worker 和 要执行的任务 runnable 。关注一下内部实现可以发现该类是一个 Runnbale 和 Disposable 的实现类。在 run 方法内部真正去执行任务 decorateRun 的功能。这里为什么要封装 worker 呢?在上面分析了 Worker 的功能,它就是用执行任务的,因为实现了 Disposable 接口,因此具备暂停任务的功能。因为内部若是调用了 dispose 方法的话,就会回调 EventLoopWorker 方法的 dispose() 方法,停止任务。
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
//真正任务 decoratedRun 是在 run 方法中被执行。
decoratedRun.run();
} finally {
//任务执行完毕之后调用关闭连接器。
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
//这里也会通过 w.dispose() 进行关闭 worker 的执行
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
- EvenLoopWorker.dispose() 方法内部实现
上面分析了在 DisposeTask 内部为什么要封装了对应的 Worker 对象,因为在 DisposeTask 内部的 decoreRun 执行完毕之后,告诉 worker ,这样就可以通过 pool.release(threadWorker) 将 threadWorker添加到对应的 expiringWorkerQueue 队列中。这里就是上面说的在 expiringWorkerQueue 没有找对应的 ThreadWorker的情况下,会创建了 ThreadWorker 之后,但却没有立即添加到 expiringWorkerQueue 队列的原因了,只有执行完任务的 ThreadWorker 才能添加到队列中去。
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
//释放一个 threadWorker ,将其添加到 expiringWorkerQueue 队列中
pool.release(threadWorker);
}
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
//添加到 expiringWorkerQueue 队列中
expiringWorkerQueue.offer(threadWorker);
}
- 开始执行任务:w.schedule(runnable)
分析完 DisposeWorker 之后现在就开始分析 worker 是怎么去执行任务了。
w 表示对应的 EventLoopWorker 对象。w.schedule(runnable
) 表示去执行的是一个任务 runnable,下面分析一下内部是怎么执行的?
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//真正去执行任务的就是 threadWorker
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
- threadWorker.scheduleActual(action, delayTime, unit, tasks)
threadWorker 是继承至 NewThreadWorker,最后通过 executor.submit 去执行这个任务。
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包装
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//线程池去执行这个任务。
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
CachedWorkerPool
当一个事件源被多个订阅者订阅的情况, rxjava 内部是怎么处理的?
//伪代码
Observable observable
//两个订阅者
Observer observer1,observer2
//分别订阅 observable
obserable.subscribe(observer1);
obserable.subscribe(observer2);
- subscribeActual 方法会被调用两次
核心代码就是scheduler.scheduleDirect 会被执行两次,就是通知 observable 去往两个 observer 去发送事件。
@Override
public void subscribeActual(final Observer<? super T> s) {
//将 s 作为构造传入 SubscribeOnObserver
//SubscribeOnObserver 它即是 observer 类型,也是 disposable 类型。
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//将连接器parent传递给当前 s.onSubscribe ,s 就是我们刚才创建的 observer1 对象。
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
- IoScheduler.EventLoopWorker 对象被创建两次
IoScheduler.EventLoopWorker 对象被创建两次,但是由于 subscribeOn(Schedulers.io() ) 只是调用了一次,因此 IoScheduler 对象只有一个,也就是说内部的 CachedWorkerPool 只有一个。当多个任务需要执行时,会创建两个 ThreadWorker 去执行,因为两个任务会在不同的线程去执行,因此就会有快有慢之分,任务完成之后,就会释放这个 ThreadWorker ,并且将其添加到 CachedWorkerPool 的 expiringWorkerQueue 队列中,那么下次有其他订阅者订阅该 Observable 时,就可以直接从该队列中取出 ThreadWorker 。这就是对 IoScheduler 类的注释的验证。
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
线程池
在 NewThreadWorker 中内部有一个 executor ,它就是线程池,关注这个线程池的特点,它是通过 SchedulerPoolFactory 去创建的。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...
}
- SchedulerPoolFactory.create
该方法负责去创建一个 ScheduledExecutorService 线程池。内部的核心线程数为 1,最大线程数 maximumPoolSize 为 Integer.MAX_VALUE。也就是说若是通过同时有多个线程去订阅该事件源,那么会创建多个线程。
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;