首先,我们使用 create-react-app 快速创建一个名为 demo 的项目
> create-react-app demo
安装 RxJS
> yarn add rxjs
或者
> npm install rxjs
完成初始化相关工作后,我们就可以开始编写代码了,我们将在 App.js 里面编写相关内容。
例子一:页面初始化显示 0,在 9 秒的时候显示 3 的平方,在 16 秒的时候显示 4 的平方,在 25 秒的时候显示 5 的平方。
App.js
import React, { useEffect, useState } from 'react';
import { from } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const numbersList$ = from([1,2,3,4,5]);
const squareNumbersList$ = numbersList$.pipe(
filter(val => val > 2),
map(val => val * val)
);
function App() {
const [currentNumber, setCurrentNumber] = useState(0);
useEffect(() => {
const subscription = squareNumbersList$.subscribe(val => setCurrentNumber(val));
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div>
<p>current number: {currentNumber}</p>
</div>
);
}
export default App;
命令行执行yarn start启动服务,在页面上可以看到这样的效果:

current number 直接显示成了 25,如果我们想看到逐个数字显示呐呐?(显示 0,9,16,25)。
首先说明为什么会只能看到 25 的结果。原因在于太快了。。。
我们修改点代码来查看下(observable 的管道里新加了个 tap 方法,observer 下面添加了一条 console 语句):
import React, { useEffect, useState } from 'react';
import { from } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';
const numbersList$ = from([1,2,3,4,5]);
const squareNumbersList$ = numbersList$.pipe(
tap(_ => console.log('tap: ', new Date().getTime())),
filter(val => val > 2),
map(val => val * val)
);
function App() {
const [currentNumber, setCurrentNumber] = useState(0);
useEffect(() => {
const subscription = squareNumbersList$.subscribe(val => {
console.log('subscribe: ', new Date().getTime());
setCurrentNumber(val);
});
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div>
<p>current number: {currentNumber}</p>
</div>
);
}
export default App;
刷新网页后,我们看到了如下的效果:

5 条数据在 33 (第五个 tap - 第一个 tap) 多毫秒里面被 push 出来,有 3 条数据通过管道的过滤并被 observer 订阅。整个过程也就三四十毫秒,太快了。
为了实现逐个数据的查看,我们加个 delay 延迟功能。修改代码成如下:
import React, { useEffect, useState } from 'react';
import { from } from 'rxjs';
import { map, filter, tap, delay } from 'rxjs/operators';
const numbersList$ = from([1,2,3,4,5]);
const squareNumbersList$ = numbersList$.pipe(
tap(_ => console.log('tap: ', new Date().getTime())),
filter(val => val > 2),
map(val => val * val),
delay(2000)
);
function App() {
const [currentNumber, setCurrentNumber] = useState(0);
useEffect(() => {
const subscription = squareNumbersList$.subscribe(val => {
console.log('subscribe: ', new Date().getTime());
setCurrentNumber(val);
});
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div>
<p>current number: {currentNumber}</p>
</div>
);
}
export default App;

如图,两秒后显示出 subscribe 的结果
current number 一开始显示 0,两秒后变成了 25,这也不是我们完全想要的结果。原因在于数据
delay 的时间都是相同的。
所以我们需要实现对每个数据添加延迟功能并且要求大家的延迟时间不一样(数字 9 延迟 9 秒,数字 16 延迟 16 秒,数字 25 延迟 25 秒),这样我们就需要 mergeMap 来帮我们实现,对每条数据实现自己的延迟功能。
修改代码如下(observable 添加 mergeMap):
import React, { useEffect, useState } from 'react';
import { from } from 'rxjs';
import { map, filter, tap, delay, mergeMap } from 'rxjs/operators';
const numbersList$ = from([1,2,3,4,5]);
const squareNumbersList$ = numbersList$.pipe(
tap(_ => console.log('tap: ', new Date().getTime())),
filter(val => val > 2),
map(val => val * val),
mergeMap(val => from([val]).pipe(delay(1000 * val)))
);
function App() {
const [currentNumber, setCurrentNumber] = useState(0);
useEffect(() => {
const subscription = squareNumbersList$.subscribe(val => {
console.log('subscribe: ', new Date().getTime());
setCurrentNumber(val);
});
return () => {
subscription.unsubscribe();
};
}, []);
return (
<div>
<p>current number: {currentNumber}</p>
</div>
);
}
export default App;
页面显示结果如下:

效果成功显示
例子二:pokeman 搜索实现
App.js
import React, { useState, useEffect } from 'react';
import { BehaviorSubject, from } from 'rxjs';
import { filter, debounceTime, distinctUntilChanged, mergeMap } from 'rxjs/operators';
let searchSubject = new BehaviorSubject('');
let searchResultObservable = searchSubject.pipe(
filter(val => val.length > 1),
debounceTime(750),
distinctUntilChanged(),
mergeMap(val => from(getPokenmonByName(val)))
);
const getPokenmonByName = async name => {
const { results: allPokemons } = await fetch('https://pokeapi.co/api/v2/pokemon/?limit=1000').then(res => {
return res.json();
});
return allPokemons.filter(pokemon => pokemon.name.includes(name));
};
const useObservable = (observable, setter) => {
useEffect(() => {
let subscription = observable.subscribe(result => {
setter(result);
});
return () => subscription.unsubscribe();
}, [observable, setter]);
};
function App() {
const [search, setSearch] = useState("");
const [results, setResults] = useState([]);
useObservable(searchResultObservable, setResults);
const handleSearchChange = e => {
const newValue = e.target.value;
setSearch(newValue);
searchSubject.next(newValue);
};
return (
<div>
<input type="text" placeholder="search" value={search} onChange={handleSearchChange} />
<ul>{results.map(pokeman => (
<li key={pokeman.name}>{pokeman.name}</li>
))}</ul>
</div>
);
}
export default App;

页面搜索效果