RxJava源码(二)

使用时:

      Observable.just("xxxxx")
            .map(new Function<String, Object>() {
                @Override
                public Object apply(@NonNull String s) throws Exception {
                    return null;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    
                }

                @Override
                public void onNext(@NonNull Object o) {

                }

                @Override
                public void onError(@NonNull Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

注意到

             .subscribeOn(Schedulers.io())//子线程执行
            .observeOn(AndroidSchedulers.mainThread()) //主线程

一、子线程执行

执行
代表线程切换,源码分析

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

进入ObservableSubscribeOn类,只需要分析

 @Override
 public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }

方法中

      parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

SubscribeTask是一个Runnable

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

scheduler.scheduleDirect()方法传入一个Runnable ,scheduler为
.subscribeOn(Schedulers.io())中Schedulers.io()看一下代码

  @NonNull
  public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

IO初始化为

IO = RxJavaPlugins.initIoScheduler(new IOTask());

进一步

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

IoHolder.DEFAULT

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

IoScheduler

public IoScheduler() {
    this(WORKER_THREAD_FACTORY);
}

WORKER_THREAD_FACTORY为线程工厂

  static final RxThreadFactory WORKER_THREAD_FACTORY;

看一下 this(WORKER_THREAD_FACTORY);

   public IoScheduler() {
    this(WORKER_THREAD_FACTORY);
}

/**
 * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
 *                      system properties for configuring new thread creation. Cannot be null.
 */
public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}

pool 为线程池 , start();方法

@Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

CachedWorkerPool类

            private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;//线程安全的集合,联想到eventbus中保证集合如下,先复制,再添加
image.png

总之是封装一个线程池对象,回到

     parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

中scheduler.scheduleDirect()方法再Scheduler类

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

scheduleDirect具体实现

  @NonNull
 public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

createWorker();方法在IoScheduler类实现

  @Override
  public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

w.schedule(task, delay, unit);方法IoScheduler类实现

    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }

threadWorker为

   EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

threadWorker.scheduleActual()为NewThreadWorker

      public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
    try {
        Future<?> f;
        if (delayTime <= 0L) {
            f = executor.submit(task);
        } else {
            f = executor.schedule(task, delayTime, unit);
        }
        task.setFuture(f);
        return task;
    } catch (RejectedExecutionException ex) {
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }
}

是一些线程池

二、 主线程执行

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn类中

 @Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

发现

        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

onNext中

   @Override
    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

schedule();方法

  void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

.observeOn(AndroidSchedulers.mainThread())方法

   private static final class MainHolder {

    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

HandlerScheduler类

@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

new Handler(Looper.getMainLooper()) 精华 new Handler() 和 new Handler(Looper.getMainLooper()) 的区别
new Handler() , 如果是在主线程中是没问题,但是有的时候可能会在子线程中调用,肯定就报错。
new Handler(Looper.getMainLooper()),确保创建的Handler永远在主线程中,Looper要是主线程的Looper。

三、简单实现子线程

Observable

public Observable<Bitmap> subscribeOn(Schedulers schedulers) {
    return onAssembly(new ObservableSchedulers(this,schedulers));
}

ObservableSchedulers

final class ObservableSchedulers<T> extends Observable<T> {
final Observable<T> source;
final Schedulers schedulers;

public ObservableSchedulers(Observable<T> source, Schedulers schedulers) {
    this.source = source;
    this.schedulers = schedulers;
}

@Override
protected void subscribeActual(Observer<T> observer) {
    schedulers.scheduleDirect(new SchedulerTask(observer));
}

private class SchedulerTask implements Runnable{
    final Observer<T> observer;
    public SchedulerTask(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void run() {
        // 线程池最终回来执行 Runnable -> 这行代码,会执行上游的 subscribe()
        // 而这个run方法在子线程中
        source.subscribe(observer);
    }
}

Schedulers 类

public abstract class Schedulers {
static Schedulers IO;
static {
    IO = new IOSchedulers();
}

public static Schedulers io() {
    return IO;
}

public abstract void scheduleDirect(Runnable runnable);

private static class IOSchedulers extends Schedulers {
    ExecutorService service;
    public IOSchedulers(){
        service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(@NonNull Runnable r) {
                return new Thread(r);
            }
        });
    }

    @Override
    public void scheduleDirect(Runnable runnable) {
        service.execute(runnable);
    }
}
}

四、主线程实现

Observable中

  public Observable<T> observerOn(Schedulers schedulers) {
    return onAssembly(new ObserverOnObservable(this,schedulers));
}

ObserverOnObservable

class ObserverOnObservable<T> extends Observable<T> {
final Observable<T> source;
final Schedulers schedulers;
public ObserverOnObservable(Observable<T> source, Schedulers schedulers) {
    this.source = source;
    this.schedulers = schedulers;
}

@Override
protected void subscribeActual(Observer<T> observer) {
    source.subscribe(new ObserverOnObserver(observer,schedulers));
}

private class ObserverOnObserver<T> implements Observer<T>,Runnable{
    final Observer<T> observer;
    final Schedulers schedulers;
    private T value;
    public ObserverOnObserver(Observer<T> observer, Schedulers schedulers) {
        this.observer = observer;
        this.schedulers = schedulers;
    }

    @Override
    public void onSubscribe() {
        observer.onSubscribe();
    }

    @Override
    public void onNext(@NonNull T item) {
        value = item;
        schedulers.scheduleDirect(this);

    }

    @Override
    public void onError(@NonNull Throwable e) {
        observer.onError(e);
    }

    @Override
    public void onComplete() {
        observer.onComplete();
    }

    @Override
    public void run() {
        // 主线程 或者 其他
        observer.onNext(value);
    }
}

}
完善Schedulers

public abstract class Schedulers {
static Schedulers MAIN_THREAD;
static Schedulers IO;
static {
    IO = new IOSchedulers();
    MAIN_THREAD = new MainSchedulers(new Handler(Looper.getMainLooper()));
}

public static Schedulers io() {
    return IO;
}

public abstract void scheduleDirect(Runnable runnable);

public static Schedulers mainThread() {
    return MAIN_THREAD;
}

private static class IOSchedulers extends Schedulers {
    ExecutorService service;
    public IOSchedulers(){
        service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(@NonNull Runnable r) {
                return new Thread(r);
            }
        });
    }

    @Override
    public void scheduleDirect(Runnable runnable) {
        service.execute(runnable);
    }
}

private static class MainSchedulers extends Schedulers {
    private Handler handler;
    public MainSchedulers(Handler handler) {
        this.handler = handler;
    }

    @Override
    public void scheduleDirect(Runnable runnable) {
        Message message = Message.obtain(handler,runnable);
        handler.sendMessage(message);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容