FRP 与 RxSwift基础

本文主要尝试阐述

  • 什么是 FRP
  • RxSwift的几个基础概念
  • RxSwift的最简单使用流程
  • RxSwift是怎么通过基本流程来实现事件响应的

FRP

FRP全称为 Functional Reactive Programming, 即函数响应式编程。


image.png
函数式编程

强调每个程序都能够被反复分解为越来越小的模块单元,而所有这些块可以通过函数装配起来,以定义一个完整的程序。

响应式编程

在响应式编程当中,a:=b+c声明的是一种绑定关系。(a与b、c绑定起来了,所以b、c的变化会影响a,这也就是所谓【变化传播】)

RxSwift 的 基础概念

可观察序列 Observable

Observable 是RxSwift 框架的核心概念,它是RxSwift的数据源
请求接口的网络数据,可以定义成一个Observable a
请求数据库的数据,也可以定义成一个Observable b
而我们的界面显示的数据 c 可以绑定到 a + b的数据流上,即 c := a + b
我们可以想象到 c 也是一个 Observable ,
而 + 和 := 操作 在RxSwift中定义了相应的函数式
在RxSwift中只要实现了 ObservableType协议的类都可以认为是 Observable,
ObservableType的定义如下

public protocol ObservableType: ObservableConvertibleType {
  func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable 
  where Observer.Element == Element
}

从协议的定义可以认为,要成为一个可观察序列,只要实现subscribe方法即可,
而subscribe方法有一个Observer类型的参数,并且Observer需要实现ObserverType协议,下面我们会介绍观察者Observer

观察者 Observer

观察者,见名知意我们可以认为,它有一个被观察者,即Observable,通过普通的观察者模式,我们应该知道,一般观察者会定义一些行为来相应Observable的变化。我们来看看观察者的定义

public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype Element

    @available(*, deprecated, message: "Use `Element` instead.")
    typealias E = Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}

与Observable类似,要成为一个观察者,只要实现ObserverType协议即可。

以上介绍了RxSwift的两个基础概念, Observable和Observer
那么Rxswift是怎么通过这两个对象来处理业务的呢?

RxSwift的基本使用流程

// 1. 创建序列
let observable = Observable<Int>.create { (observer: AnyObserver<Int>) -> Disposable in    
        
}

// 2. 订阅序列
let disposable = observable.subscribe(onNext: { (value) in
            
})

// 3. 资源回收
disposable.disposed(by: disposeBag)

RxSwift 处理事务的流程如上

  • 创建一个可观察序列
  • 通过观察者订阅可观察序列
  • 将订阅的资源放入资源回收包中统一回收

举个例子,使用RxSwift来处理button的点击事件

button.rx.tap.subscribe(onNext: {
            
}).disposed(by: disposeBag)

为了验证流程, 我们需要查看button.rx.tap是否创建了一个Observable,来带 UIControl + Rx.swift文件,查看到源码如下

public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
        let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
                MainScheduler.ensureRunningOnMainThread()

                guard let control = control else {
                    observer.on(.completed)
                    return Disposables.create()
                }

                let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
                    observer.on(.next(()))
                }

                return Disposables.create(with: controlTarget.dispose)
            }
            .takeUntil(deallocated)
        // ControlEvent其实也是一个特殊的可观察序列
        return ControlEvent(events: source)
    }

即button的点击事件处理的第一步,确实创建了一个Observable。同时我们要对点击事件处理的代码,也确实是写在观察者订阅序列的时候,并且也必须得通过disposed(by: disposeBag)方法回收资源。

RxSwift是怎么通过基本流程来实现事件响应的

首先我们来分析,在UIKit中点击事件的响应流程,即target-action模式

  1. 定义一个响应函数
  2. 通过 button.addTarget将响应函数作为button的事件处理函数

然后我们注意到,在rxswift的button.rx.tap的实现中,Observable的创建过程中,有下面这段代码块:

let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
    observer.on(.next(()))
}

在ControlTarget类文件中我们会看到UIKit的target-action模式,并且可以知道当点击事件发生时就会执行ControlTarget初始化时传进去的闭包,即执行 observer.on(.next(()))

通过以上分析我们知道,当点击事件发生时就会执行 observer.on(.next(())),但是我们处理事件的业务却是写在Observable对象的subscribe函数的参数闭包下面。
因此我们分析这两个闭包是否有关系呢?
或者我们猜测他们肯定有关系,才能保证流程是正确的。
同时我们会有一个疑问,observer是在哪里创建并且传入到Observable中去的呢?

分析RxSwift的源码,可以发现

extension ObservableType {
    // MARK: create
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
}

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
}
  1. Observable create方法会创建一个内部私有类AnonymousObservable的对象
  2. AnonymousObservable继承自Producer类,而Producer实现了ObservableType协议要求的subscribe方法,subscribe方法会接收到一个观察者observer。
  3. 通过subscribe的参数,可以知道我们使用的订阅函数和Producer的subscribe并不是同一个函数

然后分析我们观察者订阅可观察序列时调用的subscribe函数

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            // 这里创建了一个observer
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

发现,原来观察者是通过订阅的时候创建的,并且在return语句时通过self.asObservable().subscribe(observer)传给Observable,这个self应该是AnonymousObservable的对象,这个subscribe即为Producer类定义的subscribe方法。

至此,我们可以确定,observer.on(.next(()))方法确实是调用的订阅时subscribe的onNext闭包。

总结
Observer: onNext()
Observable -> AnonymousObservable -> Producer : subscribe(observer)
ObservableType: subscribe(onNext: ()->()), create() -> AnyAnonymousObservable
  • Observable.create 可以拿到一个可观察序列,并且,可以通过闭包参数observer来发送数据
  • Observable.create 创建的Observable其实是AnonymousObservable类型的(Observable的创建方法还有其他的,它们其实是生成了不同的ObservableType的实现类),创建的各种Observable其实都是继承自Producer
  • 观察者订阅可观察序列,其实是ObservableType协议的默认实现,并且我们只要提供处理数据的闭包就行,observer在ObservableType默认实现的subscribe方法中就自动创建,并且会将我们提供的闭包给到创建的observer
  • ObservableType协议的默认实现会将创建的observer传递给自己的实现类的对象
  • Observable一旦发送数据,observer就会执行我们subscribe时提供的闭包

参考资料

什么是函数式编程
函数响应式编程(FRP)思想
RxSwift源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352