Subscriber(这篇文章主要讲assign
和sink
)
-
Publisher
中末尾提到了sink
和assign
的调用,现在具体看一下具体做了什么
sink操作符的代码调用
```
let arr: [Int] = [1, 2, 100]
/// `Sink` 将数组转换成一个数据流
arr.publisher
/// 过滤数据流中大于2的元素
.filter{$0 > 2}
/// 进行一次转换,转成String类型
.compactMap{"\($0)"}
/// 订阅数据源
.sink { value in
debugPrint("数据流: \(value)")
}.store(in: &cancel)
```
Sink
之前讲解了
Publisher
的数据流发布的过程,sink
相当于接收publisher
发送的数据,然后执行receiveValue
闭包,调用者可以进行自己的业务处理-
先看一下
sink
操作符public func sink( receiveValue: @escaping (Output) -> Void ) -> AnyCancellable { /// 包装类`Sink`处理数据流,然后回调给receiveValue闭包 let subscriber = Subscribers.Sink<Output, Failure>( receiveCompletion: { _ in }, receiveValue: receiveValue ) /// 订阅数据流 subscribe(subscriber) /// GC 包装类 return AnyCancellable(subscriber) }
Sink包装类实现
/// A simple subscriber that requests an unlimited number of values upon subscription. public final class Sink<Input, Failure: Error> : Subscriber, Cancellable { /// 持有外部的闭包 public var receiveValue: (Input) -> Void /// 完成闭包 public var receiveCompletion: (Subscribers.Completion<Failure>) -> Void /// 状态 private var status = SubscriptionStatus.awaitingSubscription /// 初始化持有完成回调闭包和数据流回调闭包 public init( receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void, receiveValue: @escaping ((Input) -> Void) ) { self.receiveCompletion = receiveCompletion self.receiveValue = receiveValue } /// `Subscription`协议方法实现 public func receive(subscription: Subscription) { guard case .awaitingSubscription = status else { subscription.cancel() return } subscription.request(.unlimited) } /// `Subscribers` 协议实现,执行数据流回调 public func receive(_ value: Input) -> Subscribers.Demand { /// 记录当前的数据流回调 let receiveValue = self.receiveValue /// 触发数据流回调 receiveValue(value) return .none } /// `Subscribers` 完成回调 public func receive(completion: Subscribers.Completion<Failure>) { let receiveCompletion = self.receiveCompletion self.receiveCompletion = { _ in } withExtendedLifetime(receiveValue) { receiveValue = { _ in } } /// 执行完成回调 receiveCompletion(completion) } /// 数据流取消(`cancel`) public func cancel() { guard case let .subscribed(subscription) = status else { return } withExtendedLifetime((receiveValue, receiveCompletion)) { receiveCompletion = { _ in } receiveValue = { _ in } } subscription.cancel() } }
assign操作符代码调用
```
let arr: [Int] = [1, 2, 100]
/// `Assign`使用keypath进行赋值
arr.publisher
.filter{$0 > 2}
.compactMap{"\($0)"}
/// keypath 赋值
.assign(to: \.name, on: root).store(in: &cancel)
debugPrint("root name: \(root.name)")
```
Assign
使用
KeyPath
来实现具体类的赋值操作,仅支持Class
类型的具体类-
assign
操作符public func assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Output>, on object: Root) -> AnyCancellable { /// 订阅类,`Assign`进行了包装,数据流流转到这里进行keypath赋值 let subscriber = Subscribers.Assign(object: object, keyPath: keyPath) /// 订阅数据流 subscribe(subscriber) /// GC 处理 return AnyCancellable(subscriber) }
Assign
包装类public final class Assign<Root, Input>: Subscriber, Cancellable { public typealias Failure = Never public private(set) var object: Root? /// The key path that indicates the property to assign. public let keyPath: ReferenceWritableKeyPath<Root, Input> private var status = SubscriptionStatus.awaitingSubscription /// 持有keypath的关联类和keypath的键 public init(object: Root, keyPath: ReferenceWritableKeyPath<Root, Input>) { self.object = object self.keyPath = keyPath } public func receive(subscription: Subscription) { guard case .awaitingSubscription = status else { subscription.cancel() return } status = .subscribed(subscription) subscription.request(.unlimited) } /// 数据流流转到这里,进行keypath赋值 public func receive(_ value: Input) -> Subscribers.Demand { guard case .subscribed = status, let object = self.object else { return .none } object[keyPath: keyPath] = value return .none } /// 完成回调 public func receive(completion: Subscribers.Completion<Never>) { guard case .subscribed = status else { return } terminateAndConsumeLock() } public func cancel() { guard case let .subscribed(subscription) = status else { return } terminateAndConsumeLock() subscription.cancel() } private func terminateAndConsumeLock() { withExtendedLifetime(object) { object = nil } } }