一、流 与 流变换
1. 流(stream)
2. 流变换(transform)
二、实战(React Hook + Rx.js + Observable Hooks)
1. useState
import React, { useState } from "react";
import { Button } from "antd";
export function StateExample() {
debugger; // 1、5
const [output, setOutput] = useState(initOutput);
debugger; // 2、6
const onClickButton = () => {
debugger; // 4
setOutput(output + 1);
};
debugger; // 3、7
return (
<div>
<Button onClick={onClickButton}>Click</Button>
<div>{output}</div>
</div>
);
}
const initOutput = 1;
2. useObservableState
import React from "react";
import { Button } from "antd";
import { useObservableState } from "observable-hooks";
import { from, Observable } from "rxjs";
import { map, switchMap } from "rxjs/operators";
export function ObservableStateExample() {
debugger; // 1、8
const [output, onInput] = useObservableState(transform, initOutput);
debugger; // 3、9
const onClickButton = () => {
debugger; // 5
onInput(output + 1);
};
debugger; // 4、10
return (
<div>
<Button onClick={onClickButton}>Click</Button>
<div>{output}</div>
</div>
);
}
const initOutput = 0;
const transform = (stream$: Observable<number>) => {
debugger; // 2
return stream$.pipe(
switchMap((input) => {
debugger; // 6
return from(
new Promise<number>((res) => setTimeout(() => res(input), 500))
);
}),
map((input) => {
debugger; // 7
return input + 1;
})
);
};
注:
- 当流中出现错误,可以使用
catchError
处理,那么map
这一步会被跳过,直接进行catchError
处理【见绿色箭头】。 -
catchError
处理后,流就终止了,下一轮onInput
调用,switchMap
这一步也不会再执行了【见黄色箭头】。
为了能继续响应 onInput
事件,我们可以在 from
后面 pipe
一个 catchError
const transform = (stream$: Observable<number>) => {
debugger;
return stream$.pipe(
switchMap((input) => {
debugger;
return (
from(
new Promise<number>((res, rej) => setTimeout(() => rej(input), 500))
)
// 在这里处理 catchError
.pipe(
catchError((error) => {
debugger;
return of(-1);
})
)
);
}),
map((input) => {
debugger; // 可以进入了
return input + 1;
})
);
};
3. useObservable
import React, { useState } from "react";
import { Button } from "antd";
import { useObservable, useObservableState } from "observable-hooks";
import { from, Observable } from "rxjs";
import { map, switchMap } from "rxjs/operators";
export function ObservableExample() {
debugger; // 1、9、15、22
const [value, setValue] = useState(1);
debugger; // 2、10、16、23
const stream$ = useObservable(transform, [value]);
debugger; // 4、11、17、24
const output = useObservableState(stream$, initOutput);
debugger; // 5、12、18、25
const onClickButton = () => {
debugger; // 14
setValue(output + 1);
};
debugger; // 6、13、19、26
return (
<div>
<Button onClick={onClickButton}>Click</Button>
<div>{output}</div>
</div>
);
}
const initOutput = 0;
const transform = (stream$: Observable<number[]>) => {
debugger; // 3
return stream$.pipe(
switchMap(([input]) => {
debugger; // 7、20
return from(
new Promise<number>((res) => setTimeout(() => res(input), 500))
);
}),
map((input) => {
debugger; // 8、21
return input + 1;
})
);
};