一、RxSwift 操作符分类概览
| 类别 | 常见操作符 |
|---|---|
| 创建型 |
just, of, from, create, interval, timer
|
| 转换型 |
map, flatMap, flatMapLatest, switchMap, concatMap
|
| 过滤型 |
filter, take, skip, distinctUntilChanged, debounce, throttle
|
| 组合型 |
merge, concat, combineLatest, zip, withLatestFrom
|
| 错误处理 |
catch, retry
|
| 工具型 |
do, subscribe(on:), observeOn, share, share(replay:1)
|
注意:RxSwift 中没有
switchMap,它叫flatMapLatest;也没有concatMap,用flatMap { $0 }+concat实现。
二、核心操作符详解与对比
1. map vs flatMap vs flatMapLatest
✅ map
- 作用:一对一转换。
- 输入一个值 → 输出一个新值(非 Observable)。
Observable.of(1, 2, 3)
.map { "\($0)!" }
.subscribe { print($0) }
// next("1!")
// next("2!")
// next("3!")
✅ flatMap
- 作用:将每个元素转为一个 Observable,然后 合并所有内部 Observable 的事件流(并发)。
- 适用于多个异步任务同时进行。
let search = PublishSubject<String>()
search
.flatMap { query -> Observable<[String]> in
print("Searching for: $query)")
return simulateSearch(query) // 返回 Observable
}
.subscribe(onNext: { results in
print("Results: $results)")
})
⚠️ 问题:如果用户快速输入 "a" → "ab" → "abc",三个搜索请求会同时发出并可能乱序返回("abc" 的结果可能先到,"a" 的后到)。
✅ flatMapLatest(等价于其他语言中的 switchMap)
- 作用:只保留最新的内部 Observable,自动取消之前的。
- 非常适合搜索框、自动补全等场景。
search
.flatMapLatest { query in
print("Searching for: $query)")
return simulateSearch(query).delay(.milliseconds(500), scheduler: MainScheduler.instance)
}
.subscribe(onNext: { print("Got: $0)") })
✅ 效果:
- 输入 "a" → 开始搜索
- 立即输入 "ab" → 取消 "a" 的搜索,开始 "ab"
- 最终只显示最新查询的结果,避免竞态条件。
🔁 总结对比:
| 操作符 | 是否并发 | 是否取消旧流 | 适用场景 |
|--------|--------|------------|--------|
|flatMap| ✅ 是 | ❌ 否 | 多个独立异步任务(如批量上传) |
|flatMapLatest| ❌ 否 | ✅ 是 | 搜索、实时预览、最新请求优先 |
|concatMap(需手动实现) | ❌ 否 | ❌ 否(排队) | 严格顺序执行(如队列任务) |
💡 RxSwift 中没有内置
concatMap,但可通过flatMap { $0 }.concat()或使用enumerated().flatMap { index, value in ... }+ 队列控制实现。
2. debounce vs throttle
两者都用于限制高频事件,但策略不同。
✅ debounce(防抖)
- 含义:在事件停止触发 一段时间后 才发出最后一次值。
- 典型场景:搜索框输入完成后再请求。
textField.rx.text.orEmpty
.debounce(.milliseconds(300), scheduler: MainScheduler.instance)
.subscribe(onNext: { query in
print("Search after pause: $query)")
})
用户输入 "r", "rx", "rxs" —— 如果中间停顿超过 300ms,才触发。
✅ throttle(节流)
- 含义:固定时间窗口内最多发出一次(通常取窗口内的第一个或最后一个值)。
- 默认是 leading: true, trailing: false(取第一个)。
button.rx.tap
.throttle(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { _ in
print("Button tapped (at most once per second)")
})
即使用户狂点按钮,每秒最多响应一次。
🔁 对比总结:
| 特性 |debounce|throttle|
|------|-----------|-----------|
| 触发时机 | 事件静止后延迟触发 | 固定间隔触发 |
| 是否依赖“停止输入” | ✅ 是 | ❌ 否 |
| 适合场景 | 搜索、保存草稿 | 按钮防连点、滚动监听 |
⚠️ 注意:
throttle在 RxSwift 6+ 中已更名为throttle(_:_:latest:),可通过latest: true改为取窗口最后一个值(类似 debounce 但有最大频率限制)。
3. merge vs concat vs switchLatest(即 flatMapLatest)
假设我们有多个 Observable 流:
let stream1 = Observable.of("A1", "A2").delay(.seconds(1), scheduler: MainScheduler.instance)
let stream2 = Observable.of("B1", "B2").delay(.seconds(1), scheduler: MainScheduler.instance)
✅ merge
- 并发合并:谁先发出就先输出。
Observable.of(stream1, stream2)
.merge()
// 可能输出:A1, B1, A2, B2(顺序不确定)
✅ concat
- 顺序执行:等前一个完成,再订阅下一个。
Observable.of(stream1, stream2)
.concat()
// 输出:A1, A2, B1, B2(严格顺序)
✅ switchLatest(通过 flatMapLatest 实现)
- 只保留最新流,丢弃旧的。
searchInput
.map { query in
return api.search(query) // 返回 Observable
}
.switchLatest() // 等价于 .flatMapLatest { $0 }
适用于“只关心最新查询结果”的场景。
4. combineLatest vs zip
✅ combineLatest
- 只要任一源发出新值,就用各源的最新值组合发出。
- 不要求对齐,只要有初始值即可。
let a = BehaviorSubject(value: 1)
let b = BehaviorSubject(value: "X")
Observable.combineLatest(a, b) { ($0, $1) }
.subscribe { print($0) }
a.onNext(2) // 输出 (2, "X")
b.onNext("Y") // 输出 (2, "Y")
✅ zip
- 严格按顺序配对:第1个和第1个配,第2个和第2个配……
- 必须双方都有新值才发出。
let clicks = button.rx.tap.map { "click" }
let timer = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
Observable.zip(clicks, timer) { click, time in
return "\(click) at \(time)s"
}
.subscribe { print($0) }
只有当点击和计时器都至少发出一次,才会输出。且第 n 次点击对应第 n 秒。
🔁 对比:
| 特性 |combineLatest|zip|
|------|------------------|------|
| 触发条件 | 任一源更新 | 所有源都更新(同步) |
| 数据对齐 | 用最新值 | 严格按发射顺序 |
| 初始值要求 | 每个源至少有一个值 | 同上 |
| 典型用途 | 表单验证、UI 状态组合 | 动画同步、步骤引导 |
5. share() vs share(replay: 1)
用于将 冷 Observable 转为热 Observable,避免重复订阅导致重复执行。
let source = Observable.create { observer in
print("Executing network request...")
observer.onNext("Data")
observer.onCompleted()
return Disposables.create()
}
❌ 不加 share(冷 Observable)
source.subscribe { print("Sub1:", $0) }
source.subscribe { print("Sub2:", $0) }
// 输出两次 "Executing network request..."
✅ share()
- 多个订阅者共享同一个执行,但只转发后续事件(不缓存历史值)。
- 如果第一个订阅者已收到 completed,第二个订阅者将收不到任何值。
✅ share(replay: 1)
- 缓存最近 1 个值,新订阅者立即收到该值。
- 非常适合状态管理(如当前用户信息)。
let shared = source.share(replay: 1)
shared.subscribe { print("Sub1:", $0) } // 执行请求,收到 Data
shared.subscribe { print("Sub2:", $0) } // 不执行请求,直接收到 Data
💡 类似
BehaviorSubject的行为。
三、实战建议
| 场景 | 推荐操作符 |
|---|---|
| 搜索框 |
debounce + flatMapLatest
|
| 表单验证 |
combineLatest + map
|
| 按钮防连点 | throttle |
| 多个独立 API 请求 |
flatMap + merge
|
| 顺序任务(如引导页) | concat |
| 共享网络请求结果 | share(replay: 1) |
| 错误重试 |
.retry(3) 或 .catch { fallback }
|
四、小结:易混淆操作符速查表
| 对比组 | 关键区别 |
|---|---|
flatMap vs flatMapLatest
|
并发 vs 取消旧流 |
debounce vs throttle
|
静止后触发 vs 固定频率 |
merge vs concat
|
并发 vs 顺序 |
combineLatest vs zip
|
最新值组合 vs 严格配对 |
share() vs share(replay:1)
|
无缓存 vs 缓存最近值 |