浅入深出,RxJava实现原理

rxJava就是观察者模式的变形增强,具体怎么变形增强网上的文章有各种讲解,各种比喻,最普遍的是上下游的说法,上游的水通过水管流到下游,源码里确实有upStream,downStream的命名,不过水是什么,水管是什么,上下游又怎么连接,有时候交代不清楚,或者生硬造词,反倒是对理解其原理增加了难度,我也不知道怎么精准的用词,索性从最简单的代码一步步扩展来理解。
从0到1我设计不出来,从1到0我分析一下总可以吧,好的,先看最简单的写法如下:

class Observable {
    fun subscribe(observer:Observer) {
         observer.onNext()
    }
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main(args: Array<String>) {
    Observable().subscribe(Observer())
}

rxJava最基础的形式是这样的:

  Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(emitter: ObservableEmitter<Int>) {
                emitter.onNext(10)
            }
        }).subscribe(object : Observer<Int> {
            override fun onSubscribe(d: Disposable) {
            }

            override fun onError(e: Throwable) {
            }

            override fun onComplete() {
            }

            override fun onNext(t: Int) {
                println(t)
            }
        })

下面我们朝着这个目标去变形扩展,将Observable的构造方法私有化,增加create静态方法,返回一个Observable,如下:

class Observable private constructor() {
    
    fun subscribe(observer:Observer) {
         observer.onNext()
    }
    
    companion object {
        fun create() : Observable {
           return Observable()
        }
    }
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main() {
    Observable.create().subscribe(Observer())
}

再近一步,create方法里面包含了一个匿名的实现了ObservableOnSubscribe接口的object,同时它还是一个Observable,所以ObservableCreate应该继承Observable,同时实现ObservableOnSubscribe接口,或者让Observable实现ObservableOnSubscribe接口,ObservableCreate的类继承Observable后自然也实现了ObservableOnSubscribe接口。翻看源码是让Observable实现ObservableOnSubscribe接口,所以这里新增一个ObservableOnSubscribe接口,里面有一个subscribe方法,把Observable改为抽象类并实现它,同时新增一个ObservableCreate的类继承Observable。那么既然传过来这个object是个Observable,create方法是直接return这个object吗,好像也行,但是翻看源码是声明了成员变量,接住他,为什么多此一举我们后面再说。修改下ObservableCreate的构造方法,声明一个变量接住它。

interface ObservableOnSubscribe {
    fun subscribe(observer : Observer)
}

abstract class Observable : ObservableOnSubscribe {
    
    override fun subscribe(observer : Observer) {
         observer.onNext()
    }
    
    companion object {
        fun create(source : ObservableOnSubscribe) : Observable {
           return ObservableCreate(source)
        }
    }
    
}

class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
    
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main() {
    Observable.create(object : ObservableOnSubscribe {
        override fun subscribe(emitter: Observer) {
            
        }
    }).subscribe(Observer())
}

相对于目标代码,就差在subscribe方法中调用emitter.onNext(10)了,这里ObservableCreate没有重写Observable的subscribe方法,导致抽象类Observable调用具体方法执行了具体的通知操作,好像不太符合面向对象的设计思想。所以这里增加一个抽象方法,命名subscribeActual,让子类ObservableCreate重写这个方法,Observable只是起到承上启下的作用,不做具体的逻辑。

interface ObservableOnSubscribe {
    fun subscribe(observer : Observer)
}

abstract class Observable  : ObservableOnSubscribe {
    
    fun subscribe(observer : Observer) {
         subscribeActual(observer)
    }
    
    abstract fun subscribeActual(observer: Observer)
    
    companion object {
        fun create(source : ObservableOnSubscribe) : Observable {
           return ObservableCreate(source)
        }
    }
    
}

class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
    override fun subscribeActual(observer : Observer) {
        observer.onNext(10)
    }
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main() {
    Observable.create(object : ObservableOnSubscribe {
        override fun subscribe(emitter: Observer) {
            
        }
    }).subscribe(Observer())
}

到这我们发现还是差一步啊,再分析下,这里observer.onNext(10)其实还是在ObservableCreate的subscribe方法执行的,而目标代码其实要在实现了ObservableOnSubscribe的那个object的subscribe中执行,在subscribeActual里面调用source即object的subscribe方法,object的subscribe方法执行observer.onNext(10)

class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
    override fun subscribeActual(observer : Observer) {
        source.subscribe(observer)
    }
}

fun main() {
    Observable.create(object : ObservableOnSubscribe {
        override fun subscribe(emitter: Observer) {
            emitter.onNext(10)
        }
    }).subscribe(Observer())
}

是不是差不多了,再给Observable加上泛型,支持发送任意类型的数据,把类Observer改为接口,增加剩下的方法

interface ObservableOnSubscribe<T> {
    fun subscribe(observer : Observer<T>)
}

abstract class Observable<T> : ObservableOnSubscribe<T>{
    
    fun subscribe(observer : Observer<T>) {
         subscribeActual(observer)
    }
    
    abstract fun subscribeActual(observer: Observer<T>)
    
    companion object {
        fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
           return ObservableCreate(source)
        }
    }
    
}

class ObservableCreate<T> (private val source:ObservableOnSubscribe<T>) : Observable<T>() {
    override fun subscribeActual(observer : Observer<T>) {
        source.subscribe(observer)
    }
}

interface Observer<T> {
    fun onSubscribe()
    fun onNext(t : T)
    fun onError(t:Throwable)
    fun onComplete()
}

fun main() {
    Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: Observer<Int>) {
            emitter.onNext(10)
        }
    }).subscribe(object: Observer<Int> {
        
        override fun onSubscribe() {
            
        }
        
        override fun onNext(t:Int) {
            println("value:$t")
        }
        
        override fun onError(t:Throwable) {
            
        }
        
        override fun onComplete() {
            
        }
    })
}

这样第一个基础的扩展就完成了,那如果实现一个操作符呢,这里选取最基础的map操作符,我们知道map的作用是映射,将输入的某种类型的值通过map函数转化为另外一种类型的值,也有可能类型是一样的,要看这个map函数怎么写了

实现map操作符

在Observable里面增加一个map函数

abstract class Observable<T> : ObservableOnSubscribe<T> {
    
    override fun subscribe(observer : Observer<T>) {
         subscribeActual(observer)
    }
    
    abstract fun subscribeActual(observer: Observer<T>)
    
    companion object {
        fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
           return ObservableCreate(source)
        }
    }
    
    fun <R> map(func: (T) -> R): Observable<R> {
        return ObservableMap<T,R>(this, func)
    }
}

可以看到这里返回了一个ObservableMap,接收两个参数,一个是调用该函数的Observable,一个是map函数,这里是将T类型转为R类型,看返回值ObservableMap是一个R类型的Observable,说明这个操作符将上游发送的T类型经过map转化为R类型后接着向下游发送数据,最终到达Observer,所以Observer收到的数据为R类型,但是我们的source发送的数据是T类型,所以subscribeActual里面source.subscribe的observer类型是T类型,这里新增一个MapObserver内部类,包裹了Observer<R>,MapObserver的onNext调用时,执行observer.onNext(func(t)),最终发给下游转化为R类型的数据。
新增一个ObservableMap类

class ObservableMap<T, R>(
    private val source: ObservableOnSubscribe<T>, private val func: (T) -> R
) : Observable<R>() {

    override fun subscribeActual(observer: Observer<R>) {
        source.subscribe(MapObserver(observer, func))
    }

    private class MapObserver<T, R>(private val observer: Observer<R>, private val func: (T) -> R) : Observer<T> {
        
         override fun onSubscribe() {
            observer.onSubscribe()
         }
        
        override fun onNext(t: T) {
            observer.onNext(func(t))
        }

        override fun onError(t:Throwable) {
            observer.onError(t)
        }
        
        override fun onComplete() {
            observer.onComplete()
        }
    }
}

最终调用如下:

fun main() {
    Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: Observer<Int>) {
            emitter.onNext(10)
        }
    }).map {
        "value:$it"
    }.subscribe(object: Observer<String> {
        
        override fun onSubscribe() {
            
        }
        
        override fun onNext(t:String) {
            println("value:$t")
        }
        
        override fun onError(t:Throwable) {
            
        }
        
        override fun onComplete() {
            
        }
    })
}

到此,map操作符也实现了,那么再添加其他操作符是不是也是一个套路呢

实现线程切换

可以发现实现其他操作符也是同一个套路,下面看下切换线程subscribeOn的实现过程:
在Observable里面增加一个subscribeOn函数:

abstract class Observable<T> : ObservableOnSubscribe<T> {

    override fun subscribe(observer: Observer<T>) {
        subscribeActual(observer)
    }

    abstract fun subscribeActual(observer: Observer<T>)


    companion object {
        fun <T> create(source:ObservableOnSubscribe<T>): Observable<T> {
            return ObservableCreate(source)
        }
    }

    fun <R> map(func: (T) -> R): Observable<R> {
        return MapObservable(this, func)
    }

    fun subscribeOn(scheduler: Scheduler): Observable<T> {
        return ObservableSubscribeOn(this, scheduler)
    }
}

可以看到这里返回了一个ObservableSubscribeOn,接收两个参数,一个是调用该函数的Observable,一个是Scheduler类型的变量,这个变量就是用于切换线程的。新建ObservableSubscribeOn如下:

class ObservableSubscribeOn<T>(
    private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {

    override fun subscribeActual(observer: Observer<T>) {
        scheduler.scheduleDirect {
            source.subscribe(SubscribeOnObserver(observer))
        }
    }

    private class SubscribeOnObserver<T>(private val observer: Observer<T>) : Observer<T> {

        override fun onNext(t: T) {
            observer.onNext(t)
        }
    }
}

新建Scheduler抽象类,包含scheduleDirect方法,抽象静态内部类Worker表示工作线程,抽象方法createWorker:

abstract class Scheduler {

    fun scheduleDirect(runnable: Runnable) {
        val worker = createWorker()
        worker.schedule(runnable)
    }

    abstract fun createWorker(): Worker

    abstract class Worker {
        abstract fun schedule(runnable: Runnable)
    }
}

接下来可以实现一个具体的Scheduler,命名IoScheduler:

import java.util.concurrent.Executors

class IoScheduler : Scheduler() {

    companion object {
        private val executor = Executors.newCachedThreadPool()
    }

    override fun createWorker(): Worker {
        return EventLoopWorker()
    }

    private inner class EventLoopWorker : Worker() {
        override fun schedule(runnable: Runnable) {
            executor.submit(runnable)
        }
    }
}

回头看下所谓线程切换,就是让订阅的代码执行在一个线程里面,如下

 scheduler.scheduleDirect {
        source.subscribe(SubscribeOnObserver(observer))
 }

调用一下看看呢,这里为了保持和源码基本一致的写法,在Scheduler类里面添加一个静态方法,用于生成IoScheduler,如下:

abstract class Scheduler {

    companion object {
        fun io(): IoScheduler {
            return IoScheduler()
        }
    }
    ...  //后面的代码同上
}

调用一下:

 Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(observer: Observer<Int>) {
                observer.onNext(10)
            }
        }).map {
            "value:$it"
        }.subscribeOn(Scheduler.io()).subscribe(object : Observer<String> {
            override fun onNext(t: String) {
                Log.d(tag, "$t,thread:${Thread.currentThread().name}")
            }
        })

结果如下:

D  value:10,thread:pool-2-thread-1

这里是线程池里面一个线程的名字,我们也可以直接使用线程Thread类,执行Runnable,给Thread设置一个名字,修改IoScheduler,线程命名worker-thread看下:

class IoScheduler : Scheduler() {

    override fun createWorker(): Worker {
        return EventLoopWorker()
    }

    private inner class EventLoopWorker : Worker() {
        override fun schedule(runnable: Runnable) {
             val t = Thread(runnable)
             t.name = "worker-thread"
             t.start()
        }
    }
}

执行结果:

 D  value:10,thread:worker-thread

好的,现在已经实现了subscribeOn阶段的线程切换,我们知道还有observerOn的线程切换,那又是怎么写呢?
新增observerOn操作符:

abstract class Observable<T> : ObservableOnSubscribe<T> {
     
    ... // 以上代码省略

    fun observerOn(scheduler: Scheduler): Observable<T> {
        return ObservableObserverOn(this, scheduler)
    }
}

新建ObservableObserverOn:

class ObservableObserverOn<T>(
    private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {
    override fun subscribeActual(observer: Observer<T>) {
        val w = scheduler.createWorker()
        source.subscribe(ObserverOnObserver(observer, w))
    }

    private class ObserverOnObserver<T>(
        private val observer: Observer<T>, val w: Scheduler.Worker
    ) : Observer<T> {

        override fun onNext(t: T) {
            w.schedule {
                observer.onNext(t)
            }
        }
    }
}

这里和ObservableSubscribeOn不同的是,订阅过程没有在新的线程执行,而是把发送消息放到了worker的schedule方法里面,那这时候我想切换到主线程怎么办呢,继续仿写,新增AndroidSchedulers,里面返回一个HandlerScheduler,而这个类就是借助主线程的handler,把消息发送到主线程的消息队列的:

import android.os.Handler
import android.os.Looper

class AndroidSchedulers {

    companion object {
        private val DEFAULT = 
        HandlerScheduler(Handler(Looper.getMainLooper()))

        fun mainThread(): Scheduler {
            return DEFAULT
        }
    }
}

再看HandlerScheduler怎么写?

import android.os.Handler
import android.os.Message

class HandlerScheduler(private val handler: Handler) : Scheduler() {

    private class HandlerWorker(private val handler: Handler) : Worker() {
        override fun schedule(runnable: Runnable) {
            handler.sendMessage(Message.obtain(handler, runnable))
        }
    }

    override fun createWorker(): Worker {
        return HandlerWorker(handler)
    }
}

可以看到就是利用handler,把runnable包装成Message对象发送到了主线程的消息队列,runnable里面的代码observer.onNext(t)自然运行在主线程,进而完成了切换的过程。
调用一下试试:

Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(observer: Observer<Int>) {
                observer.onNext(10)
            }
        }).map {
            Log.d(tag, "map in,thread:${Thread.currentThread().name}")
            "value:$it"
     }.subscribeOn(Scheduler.io()).observerOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<String> {
            override fun onNext(t: String) {
                Log.d(tag, "$t,thread:${Thread.currentThread().name}")
            }
        })

这里map操作符在subscribeOn之前,里面的打印应该在IoScheduler获取的线程执行,而onNext里面的打印应该在主线程执行。
结果:

 D  map in,thread:pool-2-thread-1
 D  value:10,thread:main
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容