前言:
这是一个关于Rx的博文系列 ,也是本人第一次通过网络来记录、整理自己的学习历程。若有纰漏之处,尤其是错误的理解和观点,请各位大神拍砖。
干货在后头呢。
Rx——Reactive Extension,使用可观测的序列(流)来组成异步的、基于事件的程序的库。我们使用Observable(数据流)描述数据序列,通过Operator(操作符)查询、改变数据流,并且支持以参数的形式利用Scheduler(调度器)控制异步数据流的并发。简而言之:Rx = Observable + Operator + Scheduler。
若无特别说明,本系列文章中所出现的数据流和事件流均代表Observable。
我更愿意称之为一种编程方式——响应式编程。实际上Rx不止一个库,它包括了多个实现了Rx方式的各种编程语言的开源库。如非特别说明,本系列文章均参考Rx的Java实现——RxJava2——进行阐述。
Rx扩展了观察者模式,使得该模式从对一个数据或事件的观察升级为对一个序列的数据或事件的观察,更强大的是它可通过操作符将多个数据序列组合成一个序列。借助于Rx我们能够更加专注于程序的逻辑,而不用过多地关注诸如多线程、同步、线程安全、并发、非阻塞IO这些底层的东东。
看到这里有人会质疑了:没有Rx我照样能得到想要的数据序列啊,谈何“升级”?这里以吃饭来举例。
场景一和场景二:
来到公司楼下的食堂——没错,就是那种去晚了菜都凉了的地方:
一)中午胃口不错,于是点了个两荤一素:Iterable<Dish> lunch = getDish(); eat(lauch);
二)晚上想减肥,只要了一个素菜:Dish dinner = getDish(); eat(dinner);
食堂的菜已经提前准备好了,想吃多少任君挑选。点好菜就开吃不用等待,可见在食堂吃饭是一个同步的操作。
场景三:
某个下午堆代码上瘾错过食堂饭点,只好去楼下的拉面馆吃面:
三) 老板,来碗牛肉炒刀削:Future<Dish> dinner = waitDish(); eat(lauch);
给牛肉炒刀削点赞!接下来就是等面出锅,可见吃面一个异步的操作。
场景四和场景五:
周末好不容易约了女神一起吃中餐:
为了这顿饭我饿了3天,服务员菜单递过来我底气十足地点了5个硬菜,女神却说要减肥(不早说)。为了体现我的绅士风度,饿死也必须等女神先吃。
四) 然而女神说等菜全部上齐了再吃:Future<Iterable<Dish>> lunch = waitDishAll(); eat(lauch);
五) 如果第一道菜上桌就开始吃我就不用饿那么久了:Iterable<Future<Dish>> lunch = waitDishAny(); eat(lauch);
可见吃大餐是一个由多个异步过程组合的操作,组合的方式由具体业务决定。
毫无疑问,我们能从容面对场景一和场景二;对于场景三,不同语言提供了类似功能的工具,比如Java提供了Future(配合Callback)来应对这类单一异步操作。然而在场景四和场景五中,存在多个、甚至可能嵌套的异步操作,考虑到异步操作在时间上地不确定性,尽管Future仍然可以作为一种解决方案,但随之而来的复杂性也不容忽视,程序员需要提供一些额外的逻辑来判断异步操作何时结束以及后续的操作(对应本场景中的吃)何时开始。
吃瓜群众开始起哄:Rx行那Rx上啊。
Observable<Dish> lauch = getDish(); // 得到数据流
5道菜上齐再吃: lauch.buffer(5).subscribe(eat); // 转换数据流 、订阅
第一道菜上桌就吃: lauch.firstElement().subscribe(eat); // 过滤数据流 、订阅
如何获取和响应包含多个数据的异步数据流,Java的Future+Callback方案显得有点笨拙,程序员不得不分散精力去处理核心业务之外的逻辑,编写出的代码也容易产生臭名昭著的“回调地狱”。相比之下,Rx表现得游刃有余,它提供了丰富的操作符(此处暂且按下不表),原始数据流(上游数据流)经若干次操作符处理后变成目标数据流(下游数据流)。观察者订阅了目标数据流后,剩下的工作就是集中精力处理后续业务。
Rx不但可以方便地处理单个数据——更重要的是——它在需要处理多个、甚至无限多个数据的场景中表现优异。因此,Rx的准则是:(几乎)一切都可以成为一个数据流(哪怕只有单个数据、甚至没有任何数据)。
Rx相比经典的观察者模式增加了两个能力:
1) 被观察者向观察者发送“没有更多数据”通知的能力(调用观察者的onComplete方法),类比于Iterable因 !hasNext()正常结束遍历。
2) 被观察者向观察者发送“异常”通知的能力 (调用观察者的onError方法),类比于Iterable遍历过程中抛出异常 提前结束遍历。
观察者只能收到以上两个通知其中之一。一旦收到任何一个通知,之后观察者将不会收到任何数据或通知。
获得以上两个能力的加持后,Rx的Observable与Iterable看起来就像是孪生兄弟,我们可以像使用Iterable一样使用Observable。
通过Iterable处理数据序列,用的是pull(拉取)的方式,处理过程发生在当前线程:
getDataFromLocalMemory()
.skip(10)
.take(5)
.map({ s -> s + " transformed" })
.forEach({ println "next -> " + it })
通过Observable处理数据序列,用的是push(推送)的方式,可以灵活地选择同步或异步地发送处理结果:
getDataFromNetwork()
.skip(10)
.take(5)
.map({ s ->s + " transformed" })
.subscribe({ println "onNext -> " + it })
Observable比它的孪生兄弟Iterable多了一对隐形的翅膀——在处理异步的数据流时,我们便可以打开这对翅膀。
Rx使用非常灵活,尽管我们通常用它来处理异步数据流,事实上,Rx完全不关心产生数据的方式,无论是通过线程池、event loops、non-blocking I/O、 actors。即便你已经通过线程池来产生数据,仍然可以改造成其他方式,而完全不用改变现有观察者的工作方式。
数据是同步or异步获取的?
数据序列是否需要在多个不同线程中计算并依次返回给调用者?
是否通过异步的网络请求来获取数据?
是否通过callback线程获取数据?
Rx把一切与Observable的交互视为异步的。