RxJava1.x源码解析

带着疑问分析RxJava1.x原理:
事件流源头(observable)怎么发出数据
响应者(subscriber)怎么收到数据
怎么对事件流进行操作(operator/transformer)
整个过程的调度(scheduler)

响应式编程

响应式编程是一种基于异步数据流概念的编程模式。响应式编程依赖事件,事件可以被等待,可以触发过程,也可以触发其他事件。
Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。

操作符

  • 转换类操作符
    map flatMap concatMap flatMapIterable switchMap scan groupBy
flatMap
concatMap
  • 过滤类操作符
    filter take takeLast taskUntil debounce distinct distinctUntilchanged skip skipLast
debounce

如果在最后一个事件的等待时间内重新发出了事件,则以该事件作为最后一个事件。直到最后一个事件过了等待时间后才返回。具体例子可看下幅图:


debounce
distinct
  • 组合类操作符
    merge zip join combineLatest and/when/then switch startSwitch
merge,无序
concat,有序
zip

源码解析

Observable/Subscriber

Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。

Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe。

Subscriber 继承了 Subscription,用于取消订阅。

public abstract class Subscriber<T> implements Observer<T>, Subscription

事件传递流程

如果传入的是Action,则先封装成Subscriber。对传入的Subscriber进行包装,包装为 SafeSubscriber,SafeSubscribersubscriber的一个代理,对subscriber的一系列方法做了严格的安全校验。保证onCompleted()和onError()只会有一个被执行且只执行一次,一旦它们其中方法被调用过后onNext()就不再执行了。

onStart() 就是在我们调用 subscribe() 的线程执行的。

obsevable.subscribe(observer)的显式调用流程

显式调用流程

obsevable.subscribe(observer)的内部调用流程

subscribe内部调用流程

操作符流程

map操作符流程

Schedulers执行线程

执行线程

线程执行的内部调用过程

subscribeOn 影响它上面的调用执行时所在的线程。

observeOn 影响它下面的调用执行时所在的线程。

subscribeOn与操作符的原理一致,创造一个新的Observable用于进行干预操作,并通过线程池executor最终实现了线程切换。当不指定observeOn时,SubscriberOn()对上下游的线程都有影响。

observeOn切换线程是通过lift来实现。Lift的功能是做包装,将上游对下游的on***()事件传给包装好的Operator。
Operator继承了Function,主要是控制上下游事件发送的速率,最终将上游的事件发送给内部静态类ObserveOnSubscriber(继承了Action)。具体的处理操作会将封装好的Action发送到线程池中。每个observeOn都会对它所管辖的下游Observalbe生效。

通过schedule()将新观察者ObserveOnSubscriber发送给subscriberOne的所有事件换到了recursiveScheduler所对应的线程。subscriberOne的onNext()/onCompleted()/onError()方法丢到了recursiveScheduler对应的线程中执行。recursiveScheduler是一个Worker,在执行schedule()时创建了一个Runnable,在run()方法中调用了observeOnSubscriber.call()。

整个流程传递

ScheduledAction是Runnable,将上游Observable.call()事件和Subscriber.onNext/onError/onCompleted事件都封装成Action事件放入ScheduledAction这个实际的Runnable方法中,并交由Worker的schedule()方法处理。

由于指定了Thread(io/newThread/mainThread),内部会先将ThreadFactory创建的线程放入只有一个核心进程的ScheduledExecutorService线程池中。在scheduler()方法被调用时执行该Runnable。

Scheduler管理Work,Work内部通过线程池ScheduledExecutorService执行call()方法中封装的Runnable对象。

backpressure

backpressure主要通过Producer实现。原理是让subscriber向observable主动请求数据,通过producer成为observable和subscriber的数据通信的协调桥梁。

大多数异步操作符,比如observeOn会有一个限定大小的Buffer,

在内部,Observable通过给Subscriber调用setProducer方法,方便Subscriber之后通过记录onNext()调用频率(即上游下发事件速率),调用Observable.request(n)方法,控制上游Observable发送事件的速率。

hook

在众多节点(创建Observable,获取Scheduler等)时,通过hook可进行任意想要的操作,记录、修饰、甚至抛出异常。
通过RxJavaPlugins及RxJavaHook类对关心的节点(hook point)插桩,让我们可以控制(manipulate)程序在这些节点的行为。

为什么subscribeOn 只有第一次调用生效?

subscribeOn 的作用域就是调用前序列中所有的 Todo List 任务清单(Observable.OnSubscribe),当我们执行 subscribe() 时,这些任务清单就会执行在 subscribeOn 指定的工作线程,而第二个 subscribeOn 早就没有任务可做了,所以无法生效。

参考

RxJava系列6-张磊
拆轮子系列:拆 RxJava
RxJava 线程切换源码的一些体会和思考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容