Rx ObservableSequence 递归调度


        DispatchQueue.global().async {
            assert(!Thread.current.isMainThread)
            Observable.from(["🐶", "🐱", "🐭", "🐹"], scheduler: MainScheduler.instance)
                .subscribe { event in
                    assert(Thread.current.isMainThread)
                    print(event)
            }
        }


先看下简单的例子:
上面例子的订阅事件会被调度到主线程中去,那么具体是怎么调度,以及细节是什么样的呢?


  public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: array, scheduler: scheduler)
    }


// ObservableSequenceSink run
    func run() -> Disposable {
        
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }


轻车熟路的找到入口位置

// scheduleRecursive method
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }


这个scheduleRecursive的入参非常复杂,要看懂它的结合上下文看,从方法名来看可以理解为递归调度,暂且先了解到这,继续看。

    func schedule(_ state: State) {
        
        
        var scheduleState: ScheduleState = .initial

        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
 
                
        
        _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }
    }


暂且不管它的生命周期和线程安全 ,去掉这些细节代码就变成如下模样:


    func schedule(_ state: State) {
        
        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
 
            action(state, self.schedule)

            return Disposables.create()
        }
 
        

    }


去掉这些边角料代码就清晰多了, schedule(state) { } 这个是一个线程调度,确保closure在指定的线程运行,详情参见:CurrentThreadScheduler

去掉这个线程调度,再来看一下:


    func schedule(_ state: State) {
         
        action(state, self.schedule)

    }


再结合着看一下:

// 代码片段1

    func run() -> Disposable {
        
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
    
    
// 代码片段2

 recursiveScheduler.schedule(state)
 
// 代码片段3
     func schedule(_ state: State) {
         
        action(state, self.schedule)

    }



----> 表示触发的话,那么就是:
ObservableSequenceSink.run ----> recursiveScheduler.schedule ----> action, run函数的执行最终会触发action闭包的执行。好了现在来分析一下:
action(state, self.schedule), 将对象自身schedule函数作为入参传入闭包,那么action闭包干了啥呢? 首先使用迭代器迭代下一个元素,如果不为空的话调用recurse(mutableIterator), 也就是 recursiveScheduler.schedule方法,recursiveScheduler.schedule又会调用action,如此往复直到遍历所有元素。对迭代器不太理解的可以参考下面这个简单的例子:

         let numbers = [2, 3, 5, 7]
         var numbersIterator = numbers.makeIterator()
   
         while let num = numbersIterator.next() {
             print(num)
         }
         // Prints "2"
         // Prints "3"
         // Prints "5"
         // Prints "7"


再来复习一下递归,什么时候可以使用递归:

  1. 一个问题总可以分解为若干个规模更小的相似的问题
  2. 最小的子问题可以直接求解

显然序列的遍历问题满足这个条件,对一个序列的遍历总可以分解为:

  • 求解序列的首部元素
  • 遍历除开首部元素的子序列

回头再分析下 scheduleRecursive 方法

    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable

  • state: 初始值,action在执行的时候必须要有个实参,这个值是由state决定
  • action:重点介绍这个, 第一个入参: state 即当前的值
    第二个入参: 即 (State) -> (), 这个闭包用来做一些额外的处理, 比如在上面的例子当中,就成功的将每一次迭代都放在指定的线程执行。

了解这些我们可以写个非递归的run方法,看起来如下:


        return _parent._scheduler.schedule(_parent._elements.makeIterator(), action: { iterator  -> Disposable in
            var mutableIterator = iterator
            
            while let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                 print("unScheduleRecursive: \(next)")
            }
            
            self.forwardOn(.completed)
            self.dispose()
            return Disposables.create()
        })


你看多好啊,线程调度也实现了,而且简洁明了为什么非要用递归调度呢? 我一次调度就完事了。我想了很久终于想到了, 想象一个应用场景假设这个序列元素非常多,就比如有100万个,当迭代器迭代到第100个的时候突然客户说后面的我都不要了,那你怎么让这个迭代器停止避免不必要的运算呢?为了验证这个想法了写了如下代码:

         let range = 0..<100000
        let source = range.map {"\($0)" }
         fromArrayDisposables =  Observable.from(source)
            .elementAt(1000)
            .subscribe(onNext: {[weak self] element in
                print(element)       
            })


然后我在run函数加了一日志语句, 果然迭代器迭代到1000的时候,就停止了不再迭代,换上我自己的写的run函数, 迭代器一口气迭代完了100000个元素,这里不讨论elementAt具体是怎么实现的,只是想说明这个递归调度不是为了炫耀技术是,而是出于可扩展性需要而实现的。

线程安全和生命周期管理


  func schedule(_ state: State) {
        
        //FIX ME
   // ConcurrentMainScheduler
        var scheduleState: ScheduleState = .initial

        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }
            
            
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
 

        
        
        _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }
    }


这里主要是要保证scheduleState的线程安全, schedule {}的代码块不一定立马执行,如果调度的线程正忙的时候,只会简单的将closure存储起来。


//代码块1
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }



// 代码块2
    _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }

也就是说这两个代码块不一定按照顺序执行,而这两块代码都涉及到scheduleState的状态,所以必须加锁。 先假设按12执行,那么皆大欢喜。

  • 代码块1 会执行 initial case
  • 代码块2 会执行 done case

这两个case什么都没干

那么按21执行呢?

  • 代码块2 会执行 initial case
  • 代码块1 会执行 added case

那么先将Disposable 加入到 _group中进行管理,然后在闭包执行完毕之后,执行dispose,这个主要是为了管理其生命周期.如果没有这个dispose操作,有可能造成内存泄漏

Simple Demo


/// 实现阶加, 比如 输入5的话 ,就是 1 + 2 + 3 + 4 + 5

typealias Action =  (_ state: Int, _ recurse: (Int) -> Void) -> Void


struct RecursiveAdder {
    
    // 最简单的递归,递归的缺陷在于容易造成爆栈,当然这种递归也是尾递归,很多编译器能够对其进行优化
    static func recursiveAdd(_ number: Int, result: Int = 0) -> Int {
        
        if number <= 0 {
            return result
        } else {
            let updateResult = result + number
            return recursiveAdd(number - 1, result: updateResult)
        }
    }
    
    // 非递归
    static func unRecursiveAdd(_ number: Int) -> Int {
        var m = number
        var result = 0
        while m > 0 {
            result = addNumber(m, number2: result)
            m -= 1
        }
        return result
    }
    
    static func addNumber(_ number1: Int, number2: Int) -> Int {
        return number1 + number2
    }
    
    // 重现Rx 递归调度原理
    static func functinalRecursiveAdd(_ number: Int, result: Int = 0) -> Int {
        var _result = result
        // number 当前状态
        // recursive 递归调度函数
        // number - 1 模拟迭代器迭代
        add(number) { (number, recursive) in
            _result += number
            if number <= 0 {
                
                print("recursive end")
            } else {
                recursive(number - 1)
            }
        }
        
        return _result
        
    }
    
    static func add(_ state: Int, action: @escaping Action)  {
        
        let item = RecursiveAdderItem(action: action)
        item.add(state)
        
    }
}

struct RecursiveAdderItem {
    let action: Action
    
    // 这里什么也没有做,在rx中这一步是做了线程调度的
    func add(_ state: Int)  {
        action(state, self.add)
    }
}



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容

  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,632评论 18 399
  • Java继承关系初始化顺序 父类的静态变量-->父类的静态代码块-->子类的静态变量-->子类的静态代码快-->父...
    第六象限阅读 2,157评论 0 9
  • 孩子撒谎一定是父母给孩子制造了一个不撒谎活不下去的理由。—曾奇峰 01、奇怪的谎言 最近相继有几位妈妈找我,说起孩...
    临界冰阅读 7,576评论 35 156
  • 猫,是怀中虎。 蟋蟀,是掌中牛。 鸡,是庭前鹰鹫。 盆栽,是屋里山水。 珠玉,是指尖上的天空、海洋、大地。
    中浮阅读 230评论 0 0
  • 我个人很喜欢,但是不推荐。 这部作品真没什么必要用太长篇幅去分析具体剧情,对它的评价不用搞那么复杂。 我个人喜欢是...
    风格里哦阅读 1,121评论 0 3