译自 RxSwift to Combine: The Complete Transition Guide
不逐字翻译了,只翻主要的知识点信息。
简介
Combine 是swift新推出的一种面向 响应式编程的框架。
很多开发者都想从RxSwift 切换到Combine,两者之间有很多的相似之处。
但细节方面还是有很多区别的,如果你想切换过来,这篇文章会列出 RxSwift和Combine之间的 操作符,函数,类型等等的映射。
响应式编程可以让 数据状态在对象之间,工程之间,甚至app之间互相同步。
在ios开发中,最常见的就是在UI和Model之间进行状态同步。在之前很长一段时间里,RxSwift都是最好的选择。但apple推出了 跟SwiftUI 更契合的Combine 作为响应式编程的框架,基本可以完全替代RxSwift的所有功能了。
此文章将分为 Combine是怎么工作的 切换到Combine是好的选择吗 怎么更简单地从RxSwift切换到Combine 三个部分。
RxSwift 和 Combine的主要区别
| RxSwift | Combine | |
|---|---|---|
| 支持的iOS版本 | iOS 8.0+ | iOS 13.0+ |
| 支持的平台 | iOS, macOS, tvOS, watchOS, Linux | iOS, macOS, tvOS, watchOS, UIKit for Mac |
| 框架所属 | 第三方 | Apple第一方,SDK内置 |
| 谁来维护? | 开源社区 | Apple技术团队 |
| 协作的UI框架 | RxCocoa | SwiftUI |
Combine是怎么工作的
考虑到本文章是介绍如何从RxSwift迁移到Combine, 假设读者已经对RxSwift有一定的了解了。 如果需要了解RxSwift,可以参考 RxSwift repository on GitHub.
Combine在很多方面跟RxSwift都有相似之处以及对等的映射概念映射,比如 方法,类型声明等。
如果要找出两者之间更深层次的差异,需要对Combine进行更深地挖掘,看看它的底层机制。
Publisher
跟RxSwift的Observable 相对应的是Combine里的Publisher。
在RxSwift里是一个类,而在Combine里是一个协议。
protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func receive<S: Subscriber>(subscriber: S)
where Self.Failure == S.Failure,
Self.Output == S.Input
}
Combine 不会指定 Wrapper types来描述它的唯一特征,比如RxSwift里的Infallible, Maybe or Single。
但每个Publisher也有自己的自定义类型,借由该类型,Publisher的特征也能被推导出来。
RxSwift的Observable 需要实现subscribe 函数,对应的Publisher需要实现 receive函数,它们的功能基本一致。相比于RxSwift而言,Publisher指定了一个 Failure类型,可以表明该publisher 是否/如何 失败。对于那些不用处理失败的Publisher,我们设定Failure类型为Never 即可。
Combine的Publisher 可以是 值类型(比如struct)或是 引用类型(比如类)。大多数的Publisher都是值类型的。
在RxSwift里,操作符一般就是简单地返回一个Observable 类型,而Combine里返回的是一个混合类型,比较复杂;比如Timer.TimerPublisher, CombineLatest<Just<Int>, Just<Int>>. 鉴于自己定义一个混合类型比较麻烦,我们可以就用 AnyPublisher 来简化Publisher的使用。
举个例子:AnyPublisher<Int, Never>,可以作为一个 不会失败的抛出Int值的Publisher。

Subscriber
Subscriber (RxSwift: Observer)可以接收订阅信息(subscriptions), 输入数据(inputs),以及结束回调(completions). 这点跟RxSwift不同,没有声明包含了数据回调以及 结束回调的event的枚举,但提供了单独的方法来分别处理对应的事件。
protocol Subscriber: CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
}
在上面的代码片段里,你可以看到Subscriber 也声明了一个Failure 类型,决定了它能失败,以及失败的相关类型。对应的RxSwift也声明了独立的Error事件。Combine非常明显的表明了 失败会导致Publisher 终结(RxSwift里,error事件也表明了 Observable的终结)。
Subscribers 必须是类,因为 subscription要引用它来做数据传递,而不能在订阅期间重新创建多次Subscribers(值类型比如struct 会在数据变化时会重新创建实例)。
Subscription
每当你订阅时,就会创建一个 Subscription,它会持有所有必要的资源的引用,而且可以随时cancel(对应RxSwift里的DisposeBag)。这也是为什么Subscription 必须是类。
protocol Subscription: Cancellable, ... {
func request(_ demand: Subscribers.Demand)
}
protocol Cancellable {
func cancel()
}
与RxSwift相比,Combine允许订阅者随时可以去获取更多数据,并以 backpressure support 的形式返回;因此 实际上是订阅者在主动请求数据而不是 Publisher来决定什么时候将数据传出去。
这能让订阅者不会被大量数据堵塞而导致无法及时处理,这看上去更像是可控制的数据流。
如果想更深入地了解 backpressure support ,可以参考 Processing Published Elements with Subscribers
Subject
Subject 跟RxSwift的差不多,可以看作是 Subscriber 和 Publisher 的组合,既可以发送数据也能接收数据。
protocol Subject: AnyObject, Publisher {
func send(_ value: Self.Output)
func send(completion: Subscribers.Completion<Self.Failure>)
func send(subscription: Subscription)
}
一方面你可以将数据传给 Subject,另一方面由于 Subject 遵从了 Publisher协议,它也可以被监听来获取数据。
在Combine里,订阅者 receive 而 Subject send 事件。尽管如此,在每个subject里,每当value 遵从 Subscriber协议时,你都必须创建一个 AnySubscriber。
综上所述,RxSwift和Combine 大部分功能都是一样的,但在接口方面有一些很小的区别。 那么哪个更适合我们呢?
切换到Combine是好的选择吗
最大的争议点在于,Combine是apple的第一方框架,使用它你就不用做一些多余的framework 依赖配置工作,app也体积也会小一些。
当然,RxSwift也没有明确表明是否会有错误发生。一般情况下,直接就用 Observable 然后等着数据过来就行了。 但在Combine里,就明确表明了Publisher 可能会失败,而且失败后产生的错误是啥。
但这些可失败和不会失败的publisher的交互可能会非常复杂,你得在每个使用处都加上 eraseToAnyPublisher(),但帮助也不大。
在 backpressure support 的帮助下,Combine 能考虑得更加周到。订阅者可以随时获取数据,而不用等待Publisher 来发送数据。这更符合我们的直觉,特别是在一些很耗时的操作发生时,大量的数据同时在publisher 进出。
简单来说,Combine 更适应整个swift的环境。很多扩展方法已经在常用的类型里实现了(比如 Timer.publisher, (0...3).publisher, Optional<String>.none.publisher),这些方法让Combine的功能更加的全面。
不过Combine不像RxSwift那样全面和强大,是么?
RxSwift 没有像Combine 那么多的编译问题。RxSwift不用管 失败的错误类型,而且单个泛型类型的检查比两个要简单得多。
RxSwift支持iOS9以及以上版本,但Combine只能用在iOS13 以及以上版本。它俩在所有的apple 平台上都能work,但Combine对于linux的支持还是有缺失。OpenCombine 可以帮助解决这个问题,它用的是跟Combine一样的接口,且能支持更多平台。
Combine的命名规则更契合iOS平台,而RxSwift 更契合跨平台模式的规则。
RxSwift的技术社区已经存在很长一段时间了,在维护和扩展框架方面已经做了很多改进了。社区成员们为RxSwift 扩展了很多关于特定场景的功能,包括很多自定义的操作符和类型等。而Combine是一个封闭的框架,而且只推出了三年而已,只在少部分的项目里被使用到,如果你需要自己写一些字定义的操作符,还不如直接使用现有的知识点来实现。
现在你要写一个字定义的Publisher比自定义的Observable要麻烦许多。在RxSwift里,你只要用 Observable.create 来发送数据,发送错误,以及完成事件即可。在Combine里却没有对应的操作,你可以用Future (基本上就是一个带completion回调的Publisher,且completion只会调用一次) 或是写一个自定义的Publisher 来模拟 Observable.create 的行为(我们接下来就会讲到这点)
总结一下
回到我们之前的 该用RxSwift还是Combine的问题,这个要看情况的。
如果你想要干掉之前那些“多余的”依赖关系,而且你的app支持iOS13 以及以上版本,且不用支持iOS 以外的平台,那Combine就是比较好的选择了。
怎么更简单地从RxSwift切换到Combine
我们搜集了下面的切换引导信息,你可以直接搜索RxSwift的方法,操作符来找到Combine里的对应替换方案。
一些简单的说明
一般来说,Combine在处理错误时,相关接口看上去比较复杂。给个小提示, setFailureType(to: Failure.self) 更容易理解,而不是当作 mapError { _ -> Failure in } 来看待。
下面的这些链接在我们从RxSwift切换到Combine时提供了很大的帮助。
- CombineExt 里是Combine技术社区提供的一些 当前系统Combine里没有支持的操作符。
- cheat sheet 也提供了不少帮助。
类型
Disposable
| RxSwift | Combine |
|---|---|
| Disposable | Cancellable Use AnyCancellable to create them like RxSwift’s Disposables.create |
| DisposeBag | Set<AnyCancellable> any RandomAccessCollection of AnyCancellable of course, you can also create references one by one |
ℹ️ 注意,subscriptions 会在 Cancellable 析构后直接cancel掉。
Publishers
| RxSwift | Combine |
|---|---|
| Observable<Element> |
AnyPublisher<Element, Error>或者 其他的 Publisher 类型 同样根据抛出的错误类型,Combine也能创建对应的Failure类型 |
| Single<Element> |
AnyPublisher<Element, Error> Future<Element, Error> 可惜不能保证只抛出一个值,除非使用Future publisher |
| ConnectableObservable<Element> | ConnectablePublisher |
| Infallible<Element> |
AnyPublisher<Element, Never> 注意:Failure在这里的类型是 Never |
| Maybe<Element> | 在Combine里没有对应的东东 |
Subjects
| RxSwift | Combine |
|---|---|
| BehaviorSubject | CurrentValueSubject |
| PublishSubject | PassthroughSubject |
| ReplaySubject | Combine里没有 👉 CombineExt里的替代方案 |
| AsyncSubject | Combine 里没有 |
Relays
Relay是Subject的一个变种,他跟Subject的不同之处在于 无法发送/接收 完成事件。
class Relay<SubjectType: Subject>: Publisher,
CustomCombineIdentifierConvertible where SubjectType.Failure == Never {
typealias Output = SubjectType.Output
typealias Failure = SubjectType.Failure
let subject: SubjectType
init(subject: SubjectType) {
self.subject = subject
}
func send(_ value: Output) {
subject
.send(value)
}
func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure, Output == S.Input {
subject
.subscribe(on: DispatchQueue.main)
.receive(subscriber: subscriber)
}
}
typealias CurrentValueRelay<Output> = Relay<CurrentValueSubject<Output, Never>>
typealias PassthroughRelay<Output> = Relay<PassthroughSubject<Output, Never>>
extension Relay {
convenience init<O>(_ value: O)
where SubjectType == CurrentValueSubject<O, Never> {
self.init(subject: CurrentValueSubject(value))
}
convenience init<O>()
where SubjectType == PassthroughSubject<O, Never> {
self.init(subject: PassthroughSubject())
}
}
Observer Operators
| RxSwift | Combine |
|---|---|
| asObserver() | AnySubscriber |
| on(_:) |
Subscriber ↳ receive(_:) ↳ receive(completion:) Subject ↳ send(_:) ↳ send(completion:) |
| onNext(_:) |
Subscriber ↳ receive(_:) Subject ↳ send(_:) |
| onError(_:) |
Subscriber ↳receive(completion: .failure(<error>)) Subject ↳ send(completion: .failure(<error>)) |
| onCompleted() |
Subscriber ↳ receive(completion: .finished) Subject ↳ send(completion: .finished) |
| mapObserver(_:) | Combine里没有对应的,不过实现起来很简单 👉replacement in CombineExt |
如果要修改subscriber的 Input或 Failure 类型,可以使用下面这个扩展方式
extension Subscriber {
func map<Input>(
_ map: @escaping (Input) -> Self.Input
) -> AnySubscriber<Input, Failure> {
.init(
receiveSubscription: receive,
receiveValue: { self.receive(map($0)) },
receiveCompletion: receive
)
}
func mapError<Failure>(
_ map: @escaping (Failure) -> Self.Failure
) -> AnySubscriber<Input, Failure> {
.init(
receiveSubscription: receive,
receiveValue: receive,
receiveCompletion: { completion in
switch completion {
case let .failure(error):
self.receive(completion: .failure(map(error)))
case .finished:
self.receive(completion: .finished)
}
}
)
}
}
Scheduling
RxSwift 有很多不同的Scheduler 类型,比如:MainScheduler,ConcurrentDispatchQueueScheduler, SerialDispatchQueueScheduler, CurrentThreadScheduler, HistoricalScheduler, OperationQueueScheduler,以及其他更多类型。
而在Combine里,这个就很简单了。可以直接使用GCD(Grand Central Dispatc)的相关类型就可以了。比如 DispatchQueue, OperationQueue 以及 RunLoop。
Combine里唯一的Scheduler类型只有 ImmediateScheduler,它能让任务直接同步执行(跟CurrentThreadScheduler 差不多)。
在这篇文章里可以获取更多关于Combine里的scheduling 的信息。
Factory Functions
Publisher / Observable Operators
| RxSwift | Combine |
|---|---|
| amb(_:) | Combine里没有 👉 replacement in CombineExt |
| asCompletable() | Combine里没有 |
| asObservable() | eraseToAnyPublisher() |
| buffer( timeSpan: count: scheduler: ) |
collect( .byTimeOrCount(::_:), options: ) |
| catch(:) catchError(:) |
catch(_:) tryCatch(_:) |
| catchAndReturn(:) catchErrorJustReturn(:) |
replaceError(with:) |
| compactMap(_:) |
compactMap(_:) tryCompactMap(_:) |
| concat(...) |
append Publishers.Concatenate |
| concatMap(_:) | Combine里没有 可以用 reduce(::)和 append(_:)来实现 |
| debounce(_:scheduler:) | debounce( for: scheduler: options: ) |
| debug( _: trimOutput: file: line: function: ) |
print(_:to:) |
| delay(_:scheduler:) | delay( for: tolerance: scheduler: options: ) |
| delaySubscription( _: scheduler: ) |
Combine里没有 可以用 Deferred和 delay(for:scheduler:options:)的组合来替代 |
| dematerialize() | Combine里没有 👉 replacement in CombineExt |
| distinctUntilChanged(...) |
removeDuplicates() 当Output是Equatable时用 removeDuplicates(by:) tryRemoveDuplicates(by:) |
| do( onNext: afterNext: onError: afterError: onCompleted: afterCompleted: onSubscribe: onSubscribed: onDispose: ) |
handleEvents( receiveSubscription: receiveOutput: receiveCompletion: receiveCancel: receiveRequest: ) 没有onDispose: 在publisher complete时,cancel不会被调用。 |
| element(at:) | output(at:) |
| enumerated() | Combine里没有 建议可以用 scan(::)魔改一下 |
| filter(_:) |
filter(_:) tryFilter(_:) |
| first() | first() |
| flatMapFirst(_:) | Combine里没有 |
| flatMap(_:) |
flatMap(_:) tryFlatMap(_:) |
| flatMapLatest(_:) |
map(_:)接上 switchToLatest() 👉 replacement in CombineExt |
| groupBy(keySelector:) | 没有 |
| ifEmpty(default:) | replaceEmpty(with:) |
| ifEmpty(switchTo:) | 没有 |
| ignoreElements() | ignoreOutput() |
| map(_:) |
map(_:) tryMap(_:) |
| materialize() | 没有 👉 replacement in CombineExt |
| multicast(_:) | multicast(subject:) |
| multicast(makeSubject:) | multicast(_:) |
| observe(on:) |
observe(on:) Combine用的是不同的scheduler类型! |
| publish() |
multicast { PassthroughSubject() } makeConnectable() |
| reduce(into:_:) | 没有 不过可以用 reduce 来实现 |
| reduce(::) |
reduce(::) tryReduce(::) |
| reduce( _: accumulator: mapResult: ) |
reduce(::)接上 map(_:) 或 tryMap(_:) |
| refCount() | autoconnect() |
| replay(_:) | 没有 可以用 multicast(_:)和 ReplaySubject 来实现 |
| replayAll() | 没有 |
| retry() |
retry(.max) 可能有微小的意义上的差别,但实际使用中一般没区别 |
| retry(_:) | retry(_:) |
| retry(when:) | 没有 |
| sample(_:defaultValue:) | 没有 |
| scan(into:_:) | 没有 可以用一般的 scan(::)来实现 |
| scan(::) |
scan(::) tryScan(::) |
| share(replay:scope:) |
share() 或者用 multicast(_:)加上 PassthroughSubject和 autoconnect() 或者用 multicast(_:)加上 ReplaySubject 👉 extension in CombineExt |
| single() | 没有 当有且只有一个值时,用 first(),否则会报错 |
| single(_:) | 没有 可以用 filter(_:)接上 single()的替代方案 |
| skip(_:) | dropFirst(_:) |
| skip(while:) | drop(while:) |
| skip(until:) | drop(untilOutputFrom:) |
| startWith(...) | prepend(...) |
| subscribe(_:) | 最类似的接口: sink( receiveValue: receiveCompletion: ) |
| subscribe(on:) |
subscribe(on:options:) Combine用的是不同的scheduler类型! |
| subscribe( onNext: onError: onCompleted: onDisposed: ) |
最类似的接口: sink( receiveValue: receiveCompletion: ) |
| subscribe( with: onNext: onError: onCompleted: onDisposed: ) |
最类似的接口: sink( receiveValue: receiveCompletion: ) 或 [weak object]
|
| switchLatest() | switchToLatest() |
| take(_:) | prefix(_:) |
| take(for:scheduler:) | 没有 可以用以下方式解决: prefix(untilOutputFrom: Timer.publish( every: <time>, on: <scheduler> ) .autoconnect() .prefix(1) ) 👉 replacement in CombineExt |
| take(until:) | prefix(untilOutputFrom:) |
| take(until:behavior:) | 用 prefix(while:)的相反条件来替代,但没有 behavior参数功能了 |
| take(while:behavior:) | prefix(while:),但没有 behavior参数功能了 |
| takeLast(_:) | 没有 最接近的方案: reduce([]) { .flatMap { $0.suffix(<count>).publisher } |
| throttle( _: latest: scheduler: ) |
throttle( for: scheduler: latest: ) |
| timeout( _: other: scheduler: ) |
最接近的方案: timeout( _: scheduler: options: customError: ) 然后再 catch(_:), 用map转到其他publisher上 |
| timeout(_:scheduler:) | timeout( _: scheduler: options: customError: ) |
| toArray() | collect() |
| window( timeSpan: count: scheduler: ) |
collect( .byTime(<scheduler>, <time>) ) collect( .byTimeOrCount( <scheduler>, <time>, <count> ) ) |
| withLatestFrom(_:) | 没有 👉 replacement in CombineExt |
| withLatestFrom( _: resultSelector: ) |
没有 👉 replacement in CombineExt |
| withUnretained(_:) | 最接近的方案: compactMap { [weak object] value in object.map { ($0, value) } } |
| withUnretained( _: resultSelector: ) |
最接近的方案: compactMap { [weak object] value in object.map { resultSelector($0, value) } } |
总结一下
对我们来说,总是要调用 setFailureType(to:) 和 eraseToAnyPublisher() 感觉不舒坦,除此之外,我们还是觉得Combine 很棒的。