针对异步数据流的编程,简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你得以同步方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能
学习
https://rxjs.dev/guide/overview
可通过以上官网进行学习,还可F12进行联系测试
const { fromEvent } = rxjs;
const { mapTo } = rxjs.operators;
const clicks = fromEvent(document, 'click');
const greetings = clicks.pipe(mapTo('Hi'));
greetings.subscribe(x => console.log(x));
概念
Observable
observable 被称为可观察的序列,简单来说数据在observable中流动,可以使用各种operator对流进行处理
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
Observer
观察者,一个回调函数集合,他知道如何去监听由observable提供的值,通过三个可选参数,监听Observable的行为:
- 成功处理函数 next,每次流发出值时调用该函数
- 错误处理函数 error,只有在发生错误时才调用该函数,此处理函数本身接收一个错误
- 完成处理函数 complete,仅当流完成时才调用该函数
如下所示:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Operator
采用函数式编程风格的函数,使用如map, filter, concat, flatmap来处理集合
常用操作符
创建序列: of, from, fromEvent, fromPromise, ajax, throw
https://rxjs.dev/operator-decision-tree
如果不知如何创建序列,可通过以上网址根据自己的需求得出参考
合并序列: forkJoin, merge, concat
const ob1 = Observable.ajax('api/1')
const ob2 = Observable.ajax('api/2')
const obs = [ob1, ob2]
// 串行
Observable.concat(...obs).subscribe(detail => console.log('每个请求都会触发回调'))
// 并行
Observable.merge(...obs).subscribe(detail => console.log('每个请求都会触发回调'))
// 并行,并且合并结果
Observable.forkJoin(...obs).subscribe(detailArray => console.log('触发一次回调'))
操作符: map, filter, swirthMap, to Promise, catch, take Until, timeout, debounceTime, distinctUtilChanged, pluck
https://reactive.how/rxjs/, 可通过此网址学习操作符
应用
- 重试机制,递增延时重试
import { timer, from } from "rxjs";
import { retryWhen, delayWhen, tap, scan } from 'rxjs/operators';
const MAX_RETRY = 10
const ATTEMPT_DELAY_FACTOR = 3000
const promise = Promise.reject('bad');
// const promise = Promise.resolve('success');
promise.pipe(
retryWhen(errors =>
{
return errors.pipe(
scan((errCount, err) => {
if(errCount >= MAX_RETRY) {
throw new Error(err)
}
return errCount + 1
}, 0),
delayWhen((errCount) => timer(ATTEMPT_DELAY_FACTOR * errCount)),
tap((errCount) => console.log(`运行失败,将进行第${errCount}次重试 ` ))
)
}
)
)
.subscribe(
// 监听数据流
res => {},
// 监听错误
err => {
console.log('error:', err);
},
// 监听结束
() => {
console.log('运行成功');
}
)
}