Rxjs入门与初步应用

学习资料

https://cn.rx.js.org/
https://github.com/RxJS-CN/rxjs-articles-translation
https://www.jianshu.com/p/eaf28d5ce6c0

概念

Observable 和 Observer

Observable 可观察者对象,负责发出数据;Observer 观察者,负责接收数据
怎么这么抽象?试着把Observable对象理解为一种数据格式,类似于数组、链表等等
Observable实现了下⾯两种设计模式:

  • 观察者模式(Observer Pattern)
  • 迭代器模式(Iterator Pattern)

观察者模式

定义:观察者模式定义了对象之间的一对多依赖,这样一来,当一个对象改变状态时,它的所有依赖着都会收到通知并自动更新
观察者模式对“治”这个问题提的解决⽅法是这样,将逻辑分为发布者(Publisher)和观察者(Observer),其中发布者只管负责产⽣事件,它会通知所有注册挂上号的观察者,⽽不关⼼这些观察者如何处理这些事件,相对的,观察者可以被注册上某个发布者,只管接收到事件之后就处理,⽽不关⼼这些数据是如何产⽣的。


发布者和观察者的关系
发布者和观察者的关系
// Subject
function Observable() {
  this.observers = [];
  this.data = 'data update';
  // 用于关联observer
  this.addObserve = function(observe) {
    this.observers.push(observe)
  };
  // 用于取消关联observer
  this.removeObserve = function(observe) {
    const index = this.observers.findIndex(item => item == observe);
    this.observers.splice(index,1);
  }

  this.notifyObservers = function () {
    for (let i=0; i<this.observers.length; i++) {
      this.observers[i].update(this.data);
    }
  }
    // 用于发送更新数据到observer
  this.updateData = function(data) {
    this.data = data;
    this.notifyObservers();
  }
}

// 所有 observer 都必须实现update方法
function Observer() {
  this.update = function (data) {
    console.log(data);
  }
}

const observable = new Observable();
const observer1 = new Observer();
const observer2 = new Observer();
// 建立关联
observable.addObserve(observer1);
observable.addObserve(observer2);
observable.notifyObservers();
observable.updateData(123);

迭代器模式

定义:提供一种方法顺序访问一个聚合对象中的各个元素,而又不暴露其内部的表示
数据集合的实现⽅式很多,可以是⼀个数组,也可以是⼀个树形结构,也可以是⼀个单向链表……迭代器的作⽤就是提供⼀个通⽤的接⼜,让使⽤者完全不⽤关⼼这个数据集合的具体实现⽅式

function Iterator(list) {
  this.list = list;
  this.porsition = 0;
  this.hasNext = function() {
    if (this.porsition > this.list.length || this.list[porsition] === null) return false;
    return true;
  }
  this.next = function() {
    const item = this.list[this.porsition];
    this.porsition++;
    return item;
  }
  this.isDone = function() {
    return this.porsition >= this.list.length;
  }
}

在使⽤RxJS的过程中绝对看不到类似这样的代码,实际上,你都看不到上⾯所说的三个函数,因为,上⾯所说的是“拉”式的迭代器实现,⽽RxJS实现的是“推”式的迭代器实现

Subscription

订阅,表示建立关联关系

理解Observable、Observe 和 Subscribtions 之间的关系

Observable 是信号源、生产者,Observer 观察者、消费者
Observable 和 Observer 之间的关系:
如果比作报社和读者,报社是 Observable,是数据源,提供报纸,读者是 Observer,负责消费(处理)数据,阅读报纸,读者向报社订阅(subscribe)报纸后,报社将读者列入他们派送名单,定期派送报纸,报纸是数据。
可以比作前端(Observer)通过 登陆 owncloud 账号(subscribe)时时获取 UI(Observable)更新的 UI稿(数据)
也可以是爱奇艺会员(Observer)点击播放爱奇艺视频(subscribe)观看爱奇艺网站(Observable)提供的视频(数据)
Observable、Observe 和 Subscribtions核心就是解决分工与数据传递。

subscribe 扩展知识

可以通过add、remove 操作子subscription
父subscription取消订阅,子subscription也会一起取消订阅

var observ1 = Rx.Observable.interval(500)
var observ2 = Rx.Observable.interval(800)
var observ3 = Rx.Observable.interval(800)

var subsc1 = observ1.subscribe(x => console.log('first: ' + x))
var subsc2 = observ2.subscribe(x => console.log('second: ' + x))
var subsc3 = observ3.subscribe(x => console.log('three: ' + x))

subsc1.add(subsc2)
subsc1.add(subsc3)

setTimeout(() => {
  subsc1.remove(subsc2)
  subsc1.unsubscribe()
}, 1100)
// second: n将会一直执行下去,我们添加了 subsc1.add(subsc2) , 在1.1S后移除了它,所以在 unsubscribe() 时,我们并没有清除掉它

unsubscribe:释放资源和取消Observable执行的功能

Operators

操作符,类似于管道,对数据源发出的数据进行过滤或其他处理,使数据源发出的数据更加满足Observe 的需求

Subject

有一些场景,需要将Cold Observable 转成 Hot Observable,在这个场景下需要⼀个“中间⼈”做串接的事情,这个中间⼈有两个职责:

  • 中间⼈要提供subscribe⽅法,让其他⼈能够订阅⾃⼰的数据源。
  • 中间⼈要能够有办法接受推送的数据,包括Cold Observable推送的数据。

上⾯所说的第⼀个职责,相当于⼀个Observable,第⼆个⼯作,相当于⼀个Observer。在RxJS中,提供了名为Subject的类型,⼀个Subject既有Observable的接口,也具有Observer的接口,⼀个Subject就具备上述的两个职责。

Cold Observable 和 Hot Observable的区别

好比视频和电视频道的区别,视频没有时间限制,任何时候想看都可以看,电视有时间限制
视频是 Cold Observable,电视是 Hot Observable
Cold Observable:你见或者不见,我一直在,只要你愿意去取的话
Hot Observable:过了这个村,没有这个店

var interval$ = Rx.Observable.interval(500);

// Cold Observable
interval$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(()=>{
  interval$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 2000);

// Hot Observable
const subject$ = new Rx.Subject();
interval$.subscribe(subject$);
subject$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(() => {
  subject$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 1500);

回过头来讲subject,subject相当于一个转换器,它将 Cold Observable 转化成 Hot Observable。这就要求subject同时是observe又是 observable。subject 既能订阅数据源,同时本身又是数据源,能发出数据。
举个不十分恰当的例子,比如手机,它可以接收基站信号(Observe),同时也可以发出信号(Observable)

Schedulers

调度器
Scheduer是⼀种数据结构,可以根据优先级或者其他某种条件来安排任务执⾏队列
https://www.jianshu.com/p/5624c8a6bd2b

类型 执行类型 内部调用
queue Sync同步的方式 scheduler.schedule(task, delay) scheduler.flush()
asap Async(异步微任务) Promise.resolve().then(() => task)
async Async(异步宏任务) id = setInterval(task, delay) clearInterval(id)
animationFrame Async id = requestAnimationFrame(task) cancelAnimationFrame(id)

为什么要学

我们学习RxJS,并不是因为RxJS是⼀项炫酷的技术,也不是因为RxJS是⼀个最新的技术,是因为RxJS的的确确能够帮助我们解决问题,⽽且这些问题长期以来⼀直在困扰我们,没有好的解决办法,这些问题包括:

  • 如何控制⼤量代码的复杂度;
  • 如何保持代码可读;
  • 如何处理异步操作。

Rxjs 引用了两个重要的编程思想,让代码更加清爽,更加容易维护:
函数式
响应式

函数式

  • 声明式

声明式区别于命令式,命令式强调的是告诉机器怎么去做(how),一步步的告诉计算机如何完成一项工作
声明式强调的是告诉机器你想要什么(what),不关注内部实现,声明式把通用的共性抽离出来,避免重复代码
应用声明式的困难点:归纳和提取完备的what,是件很困难、很技术化的工作,令人望而却步声明式能够应用在特定领域如SQL中,是工具的编写者,已经归纳和提取what,替你完成了

// 命令式
// how:写for循环一一处理
function double(arr) {
  const results = []
  for (let i = 0; i < arr.length; i++){
  results.push(arr[i] * 2)
  }
  return results
}
function addOne(arr) {
  const results = []
  for (let i = 0; i < arr.length; i++){
  results.push(arr[i] + 1)
  }
  return results
}
// 声明式
// what:通过map把每一项加倍或+1,不关注内部实现
function double(arr) {
  return arr.map(function(item) {return item * 2});
} function addOne(arr) {
  return arr.map(function(item) {return item + 1});
}
  • 纯函数:

函数的执⾏过程完全由输⼊参数决定,不会受除参数之外的任何数据影响,只要入参不变,返回的参数也不会变
函数不会修改任何外部状态,⽐如修改全局变量或传⼊的参数对象
纯函数没有副作用,是稳定的,可以和其他纯函数像搭积木一样一起组合使用,获得更强的处理能力

  • 数据不可变性

有数据,替换⽅法是通过产⽣新的数据,来实现这种"变化",也就是说,当我们需要数据状态发⽣改变时,保持原有数据不变,产⽣⼀个新的数据来体现这种变化

JavaScript中数组的push、pop、sort函数都会改变⼀个数组的内容,由此引发的bug可不少。这些不纯的函数导致JavaScript天⽣不是⼀个纯粹意义上的函数式编程语⾔

响应式

EXCEL 中的公式就是典型的响应式,数据改变了公式计算结果也会跟着变
类似于MVVM中的M->V

体验两个小例子:

测试鼠标按住时间

const buttonDom = document.querySelector('#button');
const mouseDown$ = Rx.Observable.fromEvent(buttonDom, 'mousedown');
const mouseUp$ = Rx.Observable.fromEvent(buttonDom, 'mouseup');
const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(), (mouseUpEvent, mouseDownEvent)=>{
    return mouseUpEvent.timestamp - mouseDownEvent.timestamp;
});
holdTime$.subscribe((ms)=>{
    document.querySelector('#holdTime').innerText = ms;
});

takeUntil,统计5秒内用户点击数

const click$ = Rx.Observable.fromEvent(document, 'click');
click$.bufferWhen(()=>Rx.Observable.interval(5000)).subscribe(arr=>console.log(arr.length))

弹珠图

弹珠图可以用来表示数据流,例如:

--a---b-c---d---X---|->
a, b, c, d 表示发出的数据
X 表示错误
|表示 '结束' 信号
---> 是时间轴

弹珠图在线演示:https://rxviz.com/

操作符

创建

功能需求 适用的操作符
直接操作观察者 create
根据有限的数据产生同步数据流 of
产生一个数值范围内的数据 range
以循环方式产生数据 generate
重复产生数据流中的数据 repeat 和 repeatWhen
产生空数据流 empty
产生直接出错的数据流 throw
产生永远不完结的数据流 never
间隔给定时间持续产生数据 interval 和 timer
从数组等枚举类型数据产生数据流 from
从Promise 对象产生数据流 fromPromise
从外部事件对象产生数据流 fromEvent 和 fromEventPattern
从Ajax 请求结果产生数据流 ajax
延迟产生数据流 defer

from 和 toArray
from:数组转 Observable
toArray:Observable 转数组
fromPromise 和 toPromise
fromPromise:promise转 Observable
toPromise:Observable 转promise

合并

功能需求 使用的操作符
把多个数据流以首位相连的方式合并 concat 和 concatAll
把多个数据流中数据以先到先得方式合并 merge 和 mergeAll
把多个数据流中的数据以一一对应的方式合并 zip 和 zipAll
持续合并多个数据流中最新产生的数据 combineLatest、combineAll 和 withLatestFrom
从多个数据流中选取第一个产生内容的数据流 race
在数据流前面添加一个指定数据 startWith
只获取多个数据流最后产生的那个数据 forkJoin
从高阶数据流中切换数据源 switch 和 exhaust

对of产生的数据进行concat和merge操作哦产生的不同结果

例1:merge:事件的合并处理

startWith和concat的关联关系
zip
拉链,一对一咬合

QQ截图20190417140650.png
QQ截图20190417140650.png
QQ图片20190417141214.png
QQ图片20190417141214.png

例2:zip应用

  • 让of产生的数据流交叉输出
  • 实现异步队列

forkJoin
forkJoin就是RxJS界的Promise.all,Promise.all等待所有输⼊的Promise对象成功之后把结果合并,forkJoin等待所有输⼊的Observable对象完结之后把最后⼀个数据合并

const testList = [
  this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:1},params)),
  this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:2},params)),
];
Observable.create(observer => {
  forkJoin(testList).subscribe((data)=>{
    observer.next(data);
  });
})subscribe((res)=>{
  if(res.some(data=>data==false)) return;
  // 1-入门测成绩, 2-出门测成绩
  this.scoreTestType = {
    1: { scoreTestDetail: res[0] },
    2: { scoreTestDetail: res[1] }
  };
})

辅助

功能需求 使用的操作符
统计数据流中产生的所有数据个数 count
获得数据流中最大或最小的数据 max 和 min
对数据流中所有数据进行规约操作 reduce
判断是否所有数据满足某个判定条件 every
找到第一个满足判定条件的数据 find 和 findIndex
判断一个数据流是否不包含任何数据 isEmpty
如果一个数据流为空就默认产生一个指定数据 defaultEmpty

数学类操作符有四个:count、max、min、reduce
遍历上游Observable对象中吐出的所有数据才给下游传递数据、只有在上游完结的时候,才给下游传递唯⼀数据

过滤

功能需求 使用的操作符
过滤掉不满足判定条件的数据 filter
获得满足判定条件的第一个数据 first
获得满足判定条件的最后一个数据 last
从数据流中选取最先出现的若干数据 take
从数据流中选取最后出现的若干数据 takeLast
从数据流中选取数据直到某种情况发生 takeWhile 和 takeUntil
从数据流中忽略最先出现的若干数据 skip
从数据流中忽略数据直到某种情况发生 skipWhile 和 skipUntil
基于时间的数据流量筛选 throttleTime、debounceTime 和 auditTime
基于数据内容的数据流量筛选 throttle、debounce 和 audit
基于采样方式的数据流量筛选 sample 和 sampleTime
删除重复的数据 distinct
删除重复的连续数据 distinctUntil 和 distinctUntilKeyChange
忽略数据流中的所有数据 ignoreElement
只选取指定出现位置的数据 elementAt
判断是否只有一个数据满足判定条件 single

takeUntil让我们可以⽤Observable对象作为notifier来控制另⼀个Observable对象的数据产⽣,使用起来非常灵活
有损回压控制:throttle、debounce、audit、sample、throttleTime、debounceTime、auditTime、sampleTime
对比:https://www.jianshu.com/p/a176d28c9eb5

例3:防抖和节流

debounce,去抖动。策略是当事件被触发时,设定一个周期延迟执行动作,若期间又被触发,则重新设定周期,直到周期结束,执行动作。 这是debounce的基本思想,在后期又扩展了前缘debounce,即执行动作在前,然后设定周期,周期内有事件被触发,不执行动作,且周期重新设定。

// 暴力版
var debounce = (fn, wait) => {
    let timer, timeStamp=0;
    let context, args;
 
    let run = ()=>{
        timer= setTimeout(()=>{
            fn.apply(context,args);
        },wait);
    }
    let clean = () => {
        clearTimeout(timer);
    }
 
    return function(){
        context=this;
        args = arguments;
        let now = (new Date()).getTime();
 
        if(now - timeStamp < wait){
            console.log('reset',now);
            clean();  // clear running timer 
            run();    // reset new timer from current time
        } else{
            console.log('set',now);
            run();    // last timer alreay executed, set a new timer
        }
        timeStamp = now;
    }
}

// rxls
let foo$ = Rx.Observable.fromEvent(document, 'click');
foo$.debounceTime(2000).subscribe(
  console.log,
  null,
  () => console.log('complete')
);

throttling,节流的策略是,固定周期内,只执行一次动作,若有新事件触发,不执行。周期结束后,又有事件触发,开始新的周期。 节流策略也分前缘和延迟两种。与debounce类似,延迟是指 周期结束后执行动作,前缘是指执行动作后再开始周期。
throttling的特点在连续高频触发事件时,动作会被定期执行,响应平滑。

// 简单版: 定时器期间,只执行最后一次操作
var throttling = (fn, wait) => {
    let timer;
    let context, args;
 
    let run = () => {
        timer=setTimeout(()=>{
            fn.apply(context,args);
            clearTimeout(timer);
            timer=null;
        },wait);
    }
 
    return function () {
        context=this;
        args=arguments;
        if(!timer){
            console.log("throttle, set");
            run();
        }else{
            console.log("throttle, ignore");
        }
    }
}

// rxjs
let foo$ = Rx.Observable.fromEvent(document, 'click');
foo$.throttleTime(2000).subscribe(
  console.log,
  null,
  () => console.log('complete')
);

转化

功能需求 使用的操作符
将每个元素用映射函数产生新的数据 map
将数据流中每个元素映射为同一数据 mapTo
提取数据流中每个数据的某个字段 pluck
产生高阶 Observable 对象 windowTime、windowCount、windowToggle 和window
产生数组构成的数据流 bufferTime、BufferCount、bufferWhen、bufferToggle 和 buffer
映射产生高阶 Observable 对象然后合并 concatMap、mergeMap(flatMap)、switchMap、exhaustMap
产生规约运算结果组成的数据流 scan 和 mergeScan

scan可能是RxJS中对构建交互式应⽤程序最重要的⼀个操作符,因为它能够维持应⽤的当前状态,⼀⽅⾯可以根据数据流持续更新这些状态,另⼀⽅⾯可以持续把更新的状态传给另⼀个数据流⽤来做必要处理。
定义:public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable对源 Observable 使用累加器函数, 返回生成的中间值, 可选的初始值index 是赋给 acc 的初始值

let foo$ = Rx.Observable.interval(1000);
// acc 为上次返回值
// cur 更新的值,此处由foo$提供
foo$.scan((acc, cur) => {
    return cur
}, 0).subscribe((data)=>console.log(data));

acc 是上一个 scan 的返回值
subscript data 显示的是当前值

scan 和 reduce 的区别

reduce需要数据结束才能输出结果
scan可以输出中间状态

无损回压控制

数据组合成数组:bufferTime、bufferCount、bufferWhen、bufferToggle、buffer
数据组合成Observable:windowTime、windowCount、windowToggle 和window

bufferCount
支持两个参数 bufferSize 和 startBufferEvery
bufferSize 表示 缓存区长度,缓存区长度达到bufferSize的时候传新的数据给下游
startBufferEvery 可选,表示 新的缓存区长度,即新数据个数,从上次bufferCount触发以后,上游每发出startBufferEvery个数据后向下游传出数据,数组中旧数据个数为bufferSize- startBufferEvery
如果不填startBufferEvery,则默认值为 bufferSize,都是新数据
如果startBufferEvery大于bufferSize,则会丢失startBufferEvery-bufferSize个数据

例4: 判断连续输入是否正确

召唤隐藏英雄

const code = [
  "ArrowUp",
  "ArrowUp",
  "ArrowDown",
  "ArrowDown",
  "ArrowLeft",
  "ArrowRight",
  "ArrowLeft",
  "ArrowRight",
  "KeyB",
  "KeyA",
  "KeyB",
  "KeyA"
]

Rx.Observable.fromEvent(document, 'keyup')
  .map(e => e.code)
  .bufferCount(12, 1)
  .subscribe(last12key => {
    if (_.isEqual(last12key, code)) {
      console.log('隐藏的彩蛋 \(^o^)/~')
    }
  });

bufferToggle
利⽤Observable来控制缓冲窗口的开和关
有两个参数openings 和 closingSelector,openings 是一个Observable,控制每个缓冲窗口的开始时间,closingSelector是一个返回Observable的函数(这样能够灵活控制取值范围),控制每个缓冲窗口的结束时间(相对于开始时间而言)

clipboard.png
clipboard.png

对例4进行优化:限定3s时间内连续输入正确

 const code = [
   "ArrowUp",
   "ArrowUp",
   "ArrowDown",
   "ArrowDown",
   "ArrowLeft",
   "ArrowRight",
   "ArrowLeft",
   "ArrowRight",
   "KeyB",
   "KeyA",
   "KeyB",
   "KeyA"
 ]

Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code)
.bufferToggle(Rx.Observable.timer(0, 3000), i=>Rx.Observable.interval(3000))
.subscribe(last12key => {
console.log(last12key);
if (_.isEqual(last12key, code)) {
console.log('隐藏的彩蛋 \(^o^)/~')
}
});

高阶Observable

QQ图片20190417115019.png
QQ图片20190417115019.png

高阶Observable和一阶Observable的关系
正如二维数组和一维数组的关系

相关的操作符

打平:concatAll、mergeAll、zipAll、combineAll、forkJoin、switch、exhaust
组合:windowTime、windowCount、windowToggle 和window、groupBy分组
(前四个是按顺序分组,最后一个打乱了顺序)
转化:concatMap、mergeMap(flatMap)、switchMap、exhaustMap、mergeScan
cancatMap=一对多的map+concatAll
映射:concatMapTo、mergeMapTo、switchMapTo
生成高阶函数

const ho$ = Rx.Observable.interval(1000)
  .take(2)
  .concat(Rx.Observable.never())  // 添加了一个never数据流
  .map(x => Rx.Observable.interval(1500).map(y => x+':'+y).take(3));
ho$.subscribe(
  console.log,
  null,
  () => console.log('complete')
);

// 打平
ho$.zipAll().subscribe(
  console.log,
  null,
  () => console.log('complete')
);

例4:拖拽

const box = document.querySelector('.box');
const mousedown$ = Rx.Observable.fromEvent(box, 'mousedown');
const mousemove$ = Rx.Observable.fromEvent(box, 'mousemove');
const mouseup$ = Rx.Observable.fromEvent(box, 'mouseup');
const mouseout$ = Rx.Observable.fromEvent(box, 'mouseout$');

mousedown$.mergeMap((md) => {
  const stop$ = mouseup$.merge(mouseout$);
  return mousemove$.takeUntil(stop$).map((mm) =>{
    return {
      target: md.target,
      x: mm.clientX - md.offsetX,
      y: mm.clientY - md.offsetY
    }
  });
}).subscribe((obj) => {
  console.log(obj);
  obj.target.style.top = obj.y + 'px';
  obj.target.style.left = obj.x + 'px';
});

综上:在RxJS中,创建类操作符是数据流的源头,其余所有操作符最重要的三类就是合并类、过滤类和转化类。不夸张地说,使⽤RxJS解决问题绝大部分时间就是在使⽤这三种操作符

多播

Observable和Observer的关系,就是前者在播放内容,后者在收听内容。播放内容的⽅式分为三种:

  • 单播(unicast):微信发给朋友,只有一个接收者
  • ⼴播(broadcast):朋友圈广告,所有人都能看得见
  • 多播(multicast):群聊天,发给一群人,只有选中的朋友才能看见
clipboard.png
clipboard.png

前面的例⼦⼤都是单播
RxJS是⽀持⼀个Observable被多次subscribe的,所以,RxJS⽀持多播,但是,表⾯上看到的是多播,实质上还是单播

const tick$ = Rx.Observable.interval(1000).take(3);

tick$.subscribe(value => console.log('observer 1: ' + value));

setTimeout(() => {
    tick$.subscribe(value => console.log('observer 2: ' + value));
}, 2000);

第⼆个Observer依然接收到了0、1、2总共三个数据。为什么会是这样的结果?因为interval这个操作符产⽣的是⼀个Cold Observable对象。
Cold Observable,就是每次被subscribe都产⽣⼀个全新的数据序列的数据流,例如对interval产⽣的Observable对象每subscribe⼀次,都会产⽣⼀个全新的递增整数序列,从0开始产⽣Hot Observable:fromPromise、fromEvent、fromEventPattern就是异步的创建操作符真正的多播,必定是⽆论有多少Observer来subscribe,推给Observer的都是⼀样的数据源把Cold Observable变成Hot Observable,用的是Subject

Subject

clipboard.png
clipboard.png
var interval$ = Rx.Observable.interval(500);

const subject$ = new Rx.Subject();
    interval$.subscribe(subject$);
    subject$.map(val=>'a'+val).subscribe(x => console.log(x));
    setTimeout(() => {
      subject$.map(val=>'b'+val).subscribe(x => console.log(x));
    }, 1500);

Subject不能重复使⽤
Subject可以有多个上游

例5:scan管理react状态

class Counter extends React.Component {
  state = {count: 0}
  onIncrement() {
    this.setState({count: this.state.count + 1});
  }
  onDecrement() {
    this.setState({count: this.state.count - 1});
  }
  render() {
    return (
      <CounterView
        count={this.state.count}
        onIncrement={this.onIncrement.bind(this)}
        onDecrement={this.onDecrement.bind(this)}
      />
    );
  }
}
export default Counter;

// subject作为桥梁进行状态维护
class RxCounter extends React.Component {
  constructor() {
    super(...arguments);
    this.state = {count: 0};
    this.counter = new Subject();
    const observer = value => this.setState({count: value});
    this.counter.scan((result, inc) => result + inc, 0)
    .subscribe(observer);
  }
  render() {
    return <CounterView
      count={this.state.count}
      onIncrement={()=> this.counter.next(1)}
      onDecrement={()=> this.counter.next(-1)}
    />
  }
}
export default RxCounter;

例6:买房放租

const house$ = new Rx.Subject();
const housecount$ = house$.scan((has, one) => has = has+one, 0).startWith(0);

const month$ = Rx.Observable.interval(1000);
const salary$ = month$.mapTo(1);
const rent$ = month$.withLatestFrom(housecount$).map(arr=>arr[1]*0.5);

// 月收入累加
const income$ = salary$.merge(rent$);

const cash$ = income$.scan((has, one)=>{
  has = has + one;
  if (has >= 100) {
    has -= 100;
    console.log('买房啦');
    house$.next(1);
  }
  return has;
}, 0)

cash$.subscribe(
  (data)=>{
    console.log('进账,余额:',data)
  },
  null,
  ()=>{
    console.log('complete');
  }
)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335