rxJava就是观察者模式的变形增强,具体怎么变形增强网上的文章有各种讲解,各种比喻,最普遍的是上下游的说法,上游的水通过水管流到下游,源码里确实有upStream,downStream的命名,不过水是什么,水管是什么,上下游又怎么连接,有时候交代不清楚,或者生硬造词,反倒是对理解其原理增加了难度,我也不知道怎么精准的用词,索性从最简单的代码一步步扩展来理解。
从0到1我设计不出来,从1到0我分析一下总可以吧,好的,先看最简单的写法如下:
class Observable {
fun subscribe(observer:Observer) {
observer.onNext()
}
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main(args: Array<String>) {
Observable().subscribe(Observer())
}
rxJava最基础的形式是这样的:
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(10)
}
}).subscribe(object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
override fun onNext(t: Int) {
println(t)
}
})
下面我们朝着这个目标去变形扩展,将Observable的构造方法私有化,增加create静态方法,返回一个Observable,如下:
class Observable private constructor() {
fun subscribe(observer:Observer) {
observer.onNext()
}
companion object {
fun create() : Observable {
return Observable()
}
}
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main() {
Observable.create().subscribe(Observer())
}
再近一步,create方法里面包含了一个匿名的实现了ObservableOnSubscribe接口的object,同时它还是一个Observable,所以ObservableCreate应该继承Observable,同时实现ObservableOnSubscribe接口,或者让Observable实现ObservableOnSubscribe接口,ObservableCreate的类继承Observable后自然也实现了ObservableOnSubscribe接口。翻看源码是让Observable实现ObservableOnSubscribe接口,所以这里新增一个ObservableOnSubscribe接口,里面有一个subscribe方法,把Observable改为抽象类并实现它,同时新增一个ObservableCreate的类继承Observable。那么既然传过来这个object是个Observable,create方法是直接return这个object吗,好像也行,但是翻看源码是声明了成员变量,接住他,为什么多此一举我们后面再说。修改下ObservableCreate的构造方法,声明一个变量接住它。
interface ObservableOnSubscribe {
fun subscribe(observer : Observer)
}
abstract class Observable : ObservableOnSubscribe {
override fun subscribe(observer : Observer) {
observer.onNext()
}
companion object {
fun create(source : ObservableOnSubscribe) : Observable {
return ObservableCreate(source)
}
}
}
class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main() {
Observable.create(object : ObservableOnSubscribe {
override fun subscribe(emitter: Observer) {
}
}).subscribe(Observer())
}
相对于目标代码,就差在subscribe方法中调用emitter.onNext(10)了,这里ObservableCreate没有重写Observable的subscribe方法,导致抽象类Observable调用具体方法执行了具体的通知操作,好像不太符合面向对象的设计思想。所以这里增加一个抽象方法,命名subscribeActual,让子类ObservableCreate重写这个方法,Observable只是起到承上启下的作用,不做具体的逻辑。
interface ObservableOnSubscribe {
fun subscribe(observer : Observer)
}
abstract class Observable : ObservableOnSubscribe {
fun subscribe(observer : Observer) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer)
companion object {
fun create(source : ObservableOnSubscribe) : Observable {
return ObservableCreate(source)
}
}
}
class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
override fun subscribeActual(observer : Observer) {
observer.onNext(10)
}
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main() {
Observable.create(object : ObservableOnSubscribe {
override fun subscribe(emitter: Observer) {
}
}).subscribe(Observer())
}
到这我们发现还是差一步啊,再分析下,这里observer.onNext(10)其实还是在ObservableCreate的subscribe方法执行的,而目标代码其实要在实现了ObservableOnSubscribe的那个object的subscribe中执行,在subscribeActual里面调用source即object的subscribe方法,object的subscribe方法执行observer.onNext(10)
class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
override fun subscribeActual(observer : Observer) {
source.subscribe(observer)
}
}
fun main() {
Observable.create(object : ObservableOnSubscribe {
override fun subscribe(emitter: Observer) {
emitter.onNext(10)
}
}).subscribe(Observer())
}
是不是差不多了,再给Observable加上泛型,支持发送任意类型的数据,把类Observer改为接口,增加剩下的方法
interface ObservableOnSubscribe<T> {
fun subscribe(observer : Observer<T>)
}
abstract class Observable<T> : ObservableOnSubscribe<T>{
fun subscribe(observer : Observer<T>) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer<T>)
companion object {
fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
return ObservableCreate(source)
}
}
}
class ObservableCreate<T> (private val source:ObservableOnSubscribe<T>) : Observable<T>() {
override fun subscribeActual(observer : Observer<T>) {
source.subscribe(observer)
}
}
interface Observer<T> {
fun onSubscribe()
fun onNext(t : T)
fun onError(t:Throwable)
fun onComplete()
}
fun main() {
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: Observer<Int>) {
emitter.onNext(10)
}
}).subscribe(object: Observer<Int> {
override fun onSubscribe() {
}
override fun onNext(t:Int) {
println("value:$t")
}
override fun onError(t:Throwable) {
}
override fun onComplete() {
}
})
}
这样第一个基础的扩展就完成了,那如果实现一个操作符呢,这里选取最基础的map操作符,我们知道map的作用是映射,将输入的某种类型的值通过map函数转化为另外一种类型的值,也有可能类型是一样的,要看这个map函数怎么写了
实现map操作符
在Observable里面增加一个map函数
abstract class Observable<T> : ObservableOnSubscribe<T> {
override fun subscribe(observer : Observer<T>) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer<T>)
companion object {
fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
return ObservableCreate(source)
}
}
fun <R> map(func: (T) -> R): Observable<R> {
return ObservableMap<T,R>(this, func)
}
}
可以看到这里返回了一个ObservableMap,接收两个参数,一个是调用该函数的Observable,一个是map函数,这里是将T类型转为R类型,看返回值ObservableMap是一个R类型的Observable,说明这个操作符将上游发送的T类型经过map转化为R类型后接着向下游发送数据,最终到达Observer,所以Observer收到的数据为R类型,但是我们的source发送的数据是T类型,所以subscribeActual里面source.subscribe的observer类型是T类型,这里新增一个MapObserver内部类,包裹了Observer<R>,MapObserver的onNext调用时,执行observer.onNext(func(t)),最终发给下游转化为R类型的数据。
新增一个ObservableMap类
class ObservableMap<T, R>(
private val source: ObservableOnSubscribe<T>, private val func: (T) -> R
) : Observable<R>() {
override fun subscribeActual(observer: Observer<R>) {
source.subscribe(MapObserver(observer, func))
}
private class MapObserver<T, R>(private val observer: Observer<R>, private val func: (T) -> R) : Observer<T> {
override fun onSubscribe() {
observer.onSubscribe()
}
override fun onNext(t: T) {
observer.onNext(func(t))
}
override fun onError(t:Throwable) {
observer.onError(t)
}
override fun onComplete() {
observer.onComplete()
}
}
}
最终调用如下:
fun main() {
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: Observer<Int>) {
emitter.onNext(10)
}
}).map {
"value:$it"
}.subscribe(object: Observer<String> {
override fun onSubscribe() {
}
override fun onNext(t:String) {
println("value:$t")
}
override fun onError(t:Throwable) {
}
override fun onComplete() {
}
})
}
到此,map操作符也实现了,那么再添加其他操作符是不是也是一个套路呢
实现线程切换
可以发现实现其他操作符也是同一个套路,下面看下切换线程subscribeOn的实现过程:
在Observable里面增加一个subscribeOn函数:
abstract class Observable<T> : ObservableOnSubscribe<T> {
override fun subscribe(observer: Observer<T>) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer<T>)
companion object {
fun <T> create(source:ObservableOnSubscribe<T>): Observable<T> {
return ObservableCreate(source)
}
}
fun <R> map(func: (T) -> R): Observable<R> {
return MapObservable(this, func)
}
fun subscribeOn(scheduler: Scheduler): Observable<T> {
return ObservableSubscribeOn(this, scheduler)
}
}
可以看到这里返回了一个ObservableSubscribeOn,接收两个参数,一个是调用该函数的Observable,一个是Scheduler类型的变量,这个变量就是用于切换线程的。新建ObservableSubscribeOn如下:
class ObservableSubscribeOn<T>(
private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
scheduler.scheduleDirect {
source.subscribe(SubscribeOnObserver(observer))
}
}
private class SubscribeOnObserver<T>(private val observer: Observer<T>) : Observer<T> {
override fun onNext(t: T) {
observer.onNext(t)
}
}
}
新建Scheduler抽象类,包含scheduleDirect方法,抽象静态内部类Worker表示工作线程,抽象方法createWorker:
abstract class Scheduler {
fun scheduleDirect(runnable: Runnable) {
val worker = createWorker()
worker.schedule(runnable)
}
abstract fun createWorker(): Worker
abstract class Worker {
abstract fun schedule(runnable: Runnable)
}
}
接下来可以实现一个具体的Scheduler,命名IoScheduler:
import java.util.concurrent.Executors
class IoScheduler : Scheduler() {
companion object {
private val executor = Executors.newCachedThreadPool()
}
override fun createWorker(): Worker {
return EventLoopWorker()
}
private inner class EventLoopWorker : Worker() {
override fun schedule(runnable: Runnable) {
executor.submit(runnable)
}
}
}
回头看下所谓线程切换,就是让订阅的代码执行在一个线程里面,如下
scheduler.scheduleDirect {
source.subscribe(SubscribeOnObserver(observer))
}
调用一下看看呢,这里为了保持和源码基本一致的写法,在Scheduler类里面添加一个静态方法,用于生成IoScheduler,如下:
abstract class Scheduler {
companion object {
fun io(): IoScheduler {
return IoScheduler()
}
}
... //后面的代码同上
}
调用一下:
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(observer: Observer<Int>) {
observer.onNext(10)
}
}).map {
"value:$it"
}.subscribeOn(Scheduler.io()).subscribe(object : Observer<String> {
override fun onNext(t: String) {
Log.d(tag, "$t,thread:${Thread.currentThread().name}")
}
})
结果如下:
D value:10,thread:pool-2-thread-1
这里是线程池里面一个线程的名字,我们也可以直接使用线程Thread类,执行Runnable,给Thread设置一个名字,修改IoScheduler,线程命名worker-thread看下:
class IoScheduler : Scheduler() {
override fun createWorker(): Worker {
return EventLoopWorker()
}
private inner class EventLoopWorker : Worker() {
override fun schedule(runnable: Runnable) {
val t = Thread(runnable)
t.name = "worker-thread"
t.start()
}
}
}
执行结果:
D value:10,thread:worker-thread
好的,现在已经实现了subscribeOn阶段的线程切换,我们知道还有observerOn的线程切换,那又是怎么写呢?
新增observerOn操作符:
abstract class Observable<T> : ObservableOnSubscribe<T> {
... // 以上代码省略
fun observerOn(scheduler: Scheduler): Observable<T> {
return ObservableObserverOn(this, scheduler)
}
}
新建ObservableObserverOn:
class ObservableObserverOn<T>(
private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
val w = scheduler.createWorker()
source.subscribe(ObserverOnObserver(observer, w))
}
private class ObserverOnObserver<T>(
private val observer: Observer<T>, val w: Scheduler.Worker
) : Observer<T> {
override fun onNext(t: T) {
w.schedule {
observer.onNext(t)
}
}
}
}
这里和ObservableSubscribeOn不同的是,订阅过程没有在新的线程执行,而是把发送消息放到了worker的schedule方法里面,那这时候我想切换到主线程怎么办呢,继续仿写,新增AndroidSchedulers,里面返回一个HandlerScheduler,而这个类就是借助主线程的handler,把消息发送到主线程的消息队列的:
import android.os.Handler
import android.os.Looper
class AndroidSchedulers {
companion object {
private val DEFAULT =
HandlerScheduler(Handler(Looper.getMainLooper()))
fun mainThread(): Scheduler {
return DEFAULT
}
}
}
再看HandlerScheduler怎么写?
import android.os.Handler
import android.os.Message
class HandlerScheduler(private val handler: Handler) : Scheduler() {
private class HandlerWorker(private val handler: Handler) : Worker() {
override fun schedule(runnable: Runnable) {
handler.sendMessage(Message.obtain(handler, runnable))
}
}
override fun createWorker(): Worker {
return HandlerWorker(handler)
}
}
可以看到就是利用handler,把runnable包装成Message对象发送到了主线程的消息队列,runnable里面的代码observer.onNext(t)自然运行在主线程,进而完成了切换的过程。
调用一下试试:
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(observer: Observer<Int>) {
observer.onNext(10)
}
}).map {
Log.d(tag, "map in,thread:${Thread.currentThread().name}")
"value:$it"
}.subscribeOn(Scheduler.io()).observerOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<String> {
override fun onNext(t: String) {
Log.d(tag, "$t,thread:${Thread.currentThread().name}")
}
})
这里map操作符在subscribeOn之前,里面的打印应该在IoScheduler获取的线程执行,而onNext里面的打印应该在主线程执行。
结果:
D map in,thread:pool-2-thread-1
D value:10,thread:main