系统底层源码分析(4)——NSOperation

在日常使用中,特别是在第三方框架中,总会看见NSOperation,今天分别从OC和Swift底层探究一下它:

(一)NSOperation —— OC

看看下面例子:

NSBlockOperation *op1 = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"1 - %@", [NSThread currentThread]);
}];
NSBlockOperation *op2 = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"2 - %@", [NSThread currentThread]);
}];
NSBlockOperation *op3 = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"3 - %@", [NSThread currentThread]);
}];
//添加依赖
[op3 addDependency: op1];
[op3 addDependency: op2];

NSOperationQueue *queue = [[NSOperationQueue alloc] init];
queue.maxConcurrentOperationCount = 3;
//添加后自动调用
[queue addOperation: op1];
[queue addOperation: op2];
[queue addOperation: op3];

结果:op3一定会在op1op2完成后执行。

  1. GNUstep源码探究NSOperation的底层,首先看添加依赖:
@implementation NSOperation
...
- (void) addDependency: (NSOperation *)op
{
  ...
  NS_DURING
    {
      if (NSNotFound == [internal->dependencies indexOfObjectIdenticalTo: op])
    {
      [self willChangeValueForKey: @"dependencies"];
          [internal->dependencies addObject: op];//添加到依赖队列
      if (NO == [op isFinished]
        && NO == [self isCancelled]
        && NO == [self isExecuting]
        && NO == [self isFinished])
        {
          [op addObserver: self
           forKeyPath: @"isFinished"
              options: NSKeyValueObservingOptionNew
              context: isFinishedCtxt];//监听-完成
          if (internal->ready == YES)//默认是yes
        {
          [self willChangeValueForKey: @"isReady"];
          internal->ready = NO;//变为no
          [self didChangeValueForKey: @"isReady"];
        }
        }
      [self didChangeValueForKey: @"dependencies"];
    }
    }
    ...
}

op1,2添加到dependencies中,并且对它的isFinished进行监听,然后op3修改readyNO

  1. 接着把所有NSOperation添加到NSOperationQueue中:
@implementation NSOperationQueue
...
- (void) addOperation: (NSOperation *)op
{
  ...
  if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]
    && NO == [op isFinished])
    {
      [op addObserver: self
       forKeyPath: @"isReady"
          options: NSKeyValueObservingOptionNew
          context: isReadyCtxt];//监听-准备就绪
      [self willChangeValueForKey: @"operations"];
      [self willChangeValueForKey: @"operationCount"];
      [internal->operations addObject: op];//添加到要执行的队列
      [self didChangeValueForKey: @"operationCount"];
      [self didChangeValueForKey: @"operations"];
      if (YES == [op isReady])
    {
      [self observeValueForKeyPath: @"isReady"
                  ofObject: op
                change: nil
                   context: isReadyCtxt];//即时更新状态
    }
    }
  [internal->lock unlock];
}

首先监听NSOperationisReady,并且添加到operations中,然后手动调用KVO的监听回调。

  1. 这时候会先移除对isReady的监听,因为只监听一次。然后把NSOperation添加到waiting中, 最终调用_execute时会取出来:
@implementation NSOperationQueue (Private)

- (void) observeValueForKeyPath: (NSString *)keyPath
               ofObject: (id)object
                         change: (NSDictionary *)change
                        context: (void *)context
{
  /* We observe three properties in sequence ...
   * isReady (while we wait for an operation to be ready)
   * queuePriority (when priority of a ready operation may change)
   * isFinished (to see if an executing operation is over).
   */
  if (context == isFinishedCtxt)
    { ... }
  else if (context == queuePriorityCtxt || context == isReadyCtxt)
    {
      ...
      if (context == isReadyCtxt)//如果任务准备就绪
        {
          [object removeObserver: self forKeyPath: @"isReady"];//移除监听-准备
          [object addObserver: self
                   forKeyPath: @"queuePriority"
                      options: NSKeyValueObservingOptionNew
                      context: queuePriorityCtxt];//监听-队列优先级
        }
      pos = [internal->waiting insertionPosition: object
                                   usingFunction: sortFunc
                                         context: 0];
      [internal->waiting insertObject: object atIndex: pos];//插入
      [internal->lock unlock];
    }
  [self _execute];//执行
}
  1. NSOperationwaiting取出来后,就被移除了。然后监听NSOperationisFinished,便开始start(所以NSOperation添加到NSOperationQueue中会自动调用start)。
@implementation NSOperationQueue (Private)
...
- (void) _execute
{
  ...
  while (NO == [self isSuspended]
    && max > internal->executing
    && [internal->waiting count] > 0)
    {
      NSOperation   *op;

      op = [internal->waiting objectAtIndex: 0];//取出
      [internal->waiting removeObjectAtIndex: 0];//移除
      [op removeObserver: self forKeyPath: @"queuePriority"];//移除队列优先级监听
      [op addObserver: self
       forKeyPath: @"isFinished"
          options: NSKeyValueObservingOptionNew
          context: isFinishedCtxt];//监听完成
      internal->executing++;//计数加1
      if (YES == [op isConcurrent])//是否并发
    {
          [op start];//执行
    }
      else { ... }
    }
  [internal->lock unlock];
}
  1. start其实就是修改一下executing,然后调用_finish
@implementation NSOperation
...
- (void) start
{
  ...
  if (NO == internal->executing)//是否在执行
    {
      [self willChangeValueForKey: @"isExecuting"];//手动通知-kvo
      internal->executing = YES;//start的作用主要是修改状态
      [self didChangeValueForKey: @"isExecuting"];
    }
  ...
  [self _finish];//也只是修改状态
  [pool release];
}
  1. _finish也只是修改executingfinished
@implementation NSOperation (Private)
- (void) _finish
{
  [self retain];
  [internal->lock lock];
  if (NO == internal->finished)
    {
      if (YES == internal->executing)
        {
      [self willChangeValueForKey: @"isExecuting"];
      [self willChangeValueForKey: @"isFinished"];
      internal->executing = NO;
      internal->finished = YES;
      [self didChangeValueForKey: @"isFinished"];
      [self didChangeValueForKey: @"isExecuting"];
    }
      else
    { ... }
    ...
}
  1. finished被修改后,便会监听回调:
@implementation NSOperationQueue (Private)

- (void) observeValueForKeyPath: (NSString *)keyPath
               ofObject: (id)object
                         change: (NSDictionary *)change
                        context: (void *)context
{
  if (context == isFinishedCtxt)
    {
      [internal->lock lock];
      internal->executing--;//正在执行的任务计数减一
      [object removeObserver: self forKeyPath: @"isFinished"];//移除监听-完成
      ...
      [internal->operations removeObjectIdenticalTo: object];//移除任务
      ...
    }
  else if (context == queuePriorityCtxt || context == isReadyCtxt)
  { ... }
  [self _execute];//执行
}

同时,NSOperation初始化时自身也添加了监听,所以也会回调:

@implementation NSOperation
...
- (id) init
{
  if ((self = [super init]) != nil)
    {
      ...
      internal->ready = YES;//默认
      ...
      [self addObserver: self
             forKeyPath: @"isFinished"
                options: NSKeyValueObservingOptionNew
                context: isFinishedCtxt];//添加监听-完成
    }
  return self;
}
@implementation NSOperation
...
- (void) observeValueForKeyPath: (NSString *)keyPath
               ofObject: (id)object
                         change: (NSDictionary *)change
                        context: (void *)context
{
  [internal->lock lock];

  [object removeObserver: self forKeyPath: @"isFinished"];//移除监听-完成
  ...
  if (NO == internal->ready)
    {
      NSEnumerator  *en;
      NSOperation   *op;

      en = [internal->dependencies objectEnumerator];
      while ((op = [en nextObject]) != nil)//遍历判断依赖的op是否全部完成
        {
          if (NO == [op isFinished])
        break;
        }
      if (op == nil)
    {
          [self willChangeValueForKey: @"isReady"];
      internal->ready = YES;//设为yes;可以开始启动
          [self didChangeValueForKey: @"isReady"];
    }
    }
  [internal->lock unlock];
}

同样是会先移除对isFinished的监听,然后遍历循环判断是否全部op1,2都完成。如果都完成,当前op将为nil,然后进入判断修改op3readyYES

  1. op3ready被修改后,便会重复第3步,最终实现调用。也就是在op1,2执行完成后才执行的op3

总的来说,在OC底层中,使用KVONSOperation的各个状态进行监听,从而进行优先级调用。

(二)NSOperation —— Swift

看看下面例子:

let op1 = BlockOperation.init {
    print("1 - \(Thread.current)")
}
let op2 = BlockOperation.init {
    print("2 - \(Thread.current)")
}
let op3 = BlockOperation.init {
    print("3 - \(Thread.current)")
}
//添加依赖
op3.addDependency(op1)
op3.addDependency(op2)

let queue = OperationQueue.init()
queue.maxConcurrentOperationCount = 3
//添加后自动调用
queue.addOperation(op1)
queue.addOperation(op2)
queue.addOperation(op3)

结果:op3一定会在op1op2完成后执行。

  1. swift-corelibs-foundation源码探究Operation的底层,首先看添加依赖:
open class Operation : NSObject {
    ...
    internal func _addDependency(_ op: Operation) {
        withExtendedLifetime(self) {
            withExtendedLifetime(op) {
                var up: Operation?
                _lock()
                if __dependencies.first(where: { $0 === op }) == nil {
                    __dependencies.append(op)//添加到依赖序列
                    up = op
                }
                _unlock()
                
                if let upwards = up {
                    upwards._lock()
                    _lock()
                    let upIsFinished = upwards._state == __NSOperationState.finished
                    if !upIsFinished && !_isCancelled {
                        assert(_unfinishedDependencyCount >= 0)
                        _incrementUnfinishedDependencyCount()//计数加1
                        upwards._addParent(self)//反过来被添加
                    }
                    _unlock()
                    upwards._unlock()
                }
                Operation.observeValue(forKeyPath: _NSOperationIsReady, ofObject: self)//手动通知-准备就绪
            }
        }
    }
    
    open func addDependency(_ op: Operation) {
        _addDependency(op)
    }
    ...

同样地,op1,2会被添加到__dependencies,然后计数,计数会影响isReady的判断:

open class Operation : NSObject {
    ...
    open var isReady: Bool {
        _lock()
        defer { _unlock() }
        return __unfinishedDependencyCount == 0
    }
    ...
}

最终通过_addParent(self)op1,2会把op3添加到__downDependencies中:

open class Operation : NSObject {
    ...
    internal func _addParent(_ parent: Operation) {
        __downDependencies.insert(PointerHashedUnmanagedBox(contents: .passUnretained(parent)))//添加
    }
    ...
}
  1. 最终会手动通知isReady状态,但是计数不为0所以不会调用_schedule()
open class Operation : NSObject {
    ...
    internal static func observeValue(forKeyPath keyPath: String, ofObject op: Operation) {
        ...
        if let transition = kind {
            switch transition {
            case .toFinished: ...
            case .toExecuting: ...
            case .toReady:
                let r = op.isReady//通过计数判断ready
                op._cachedIsReady = r
                let q = op._queue
                if r {
                    q?._schedule()
                }
            }
        }
    }
    ...
}
  1. 接着把所有Operation添加到OperationQueue中:
open class OperationQueue : NSObject, ProgressReporting {
    ...
    internal func _addOperations(_ ops: [Operation], barrier: Bool = false) {
        ...
        for op in ops {
            if op._compareAndSwapState(.initialized, .enqueuing) {
                successes += 1
                if 0 == failures {
                    let retained = Unmanaged.passRetained(op)
                    op._cachedIsReady = op.isReady
                    let schedule: DispatchWorkItem
                    
                    if let qos = op.__propertyQoS?.qosClass {
                        schedule = DispatchWorkItem.init(qos: qos, flags: .enforceQoS, block: {
                            self._schedule(op)//执行任务
                        })
                    } else { ... }
                    op._adopt(queue: self, schedule: schedule)//保存任务
                    ...
                } else { ... }
            } else { ... }
        }
        ...
        if !barrier {
            _schedule()//执行
        }
    }
    
    open func addOperation(_ op: Operation) {
        _addOperations([op], barrier: false)
    }
    ...
}

然后先把任务包装成DispatchWorkItem保存:

open class Operation : NSObject {
    ...
    internal func _adopt(queue: OperationQueue, schedule: DispatchWorkItem) {
        _lock()
        defer { _unlock() }
        __queue = Unmanaged.passRetained(queue)
        __schedule = schedule//保存任务
    }
    ...
}
  1. 最后会调用_schedule()开始执行任务,并在完成后修改isFinished,当然op3并不会执行这步:
open class OperationQueue : NSObject, ProgressReporting {
    ...
    internal func _schedule() {
        ...
        for prio in Operation.QueuePriority.priorities {
            ...
            while let operation = op?.takeUnretainedValue() {
                ...
                if Operation.__NSOperationState.enqueued == operation._state && operation._fetchCachedIsReady(&retest) {
                    ...
                    if let schedule = operation.__schedule {
                        if operation is _BarrierOperation {
                            queue.async(flags: .barrier, execute: {
                                schedule.perform()
                            })
                        } else {
                            queue.async(execute: schedule)//执行任务
                        }
                    }
                    
                    op = next
                } else { ... }
            }
        }
        ...
    }
    ...
}
open class OperationQueue : NSObject, ProgressReporting {
    ...
    internal func _schedule(_ op: Operation) {
        op._state = .starting
        OperationQueue._currentQueue.set(self)
        op.start()//执行
        OperationQueue._currentQueue.clear()
        Unmanaged.passUnretained(self).release()
        
        if op.isFinished && op._state.rawValue < Operation.__NSOperationState.finishing.rawValue {
            Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: op)//手动通知-完成
        }
    }
    ...
}
open class Operation : NSObject {
    ...
    open func start() {
        ...
        if !isCanc {
            _state = .executing
            Operation.observeValue(forKeyPath: _NSOperationIsExecuting, ofObject: self)//手动通知-Executing
            
            _queue?._execute(self) ?? main() //调用main
        }
        
        if __NSOperationState.executing == _state {
            _state = .finishing
            Operation.observeValue(forKeyPath: _NSOperationIsExecuting, ofObject: self)//手动通知-Executing
            Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: self)//手动通知-完成
        } else { ... }
    }
    ...
}
open class Operation : NSObject {
    ...
    internal static func observeValue(forKeyPath keyPath: String, ofObject op: Operation) {
        ...
        if let transition = kind {
            switch transition {
            case .toFinished: // we only care about NO -> YES
                ...
                let down_deps = op.__downDependencies//取出
                op.__downDependencies.removeAll()//移除
                ...
                if 0 < ready_deps.count {
                    for down in ready_deps {
                        down._lock()
                        if down._unfinishedDependencyCount >= 1 {
                            down._decrementUnfinishedDependencyCount()//计数减1
                        }
                        down._unlock()
                        Operation.observeValue(forKeyPath: _NSOperationIsReady, ofObject: down)//手动通知-准备就绪
                    }
                }
                ...
            case .toExecuting:
                let isExecuting = op.isExecuting
                op._lock()
                if op._state.rawValue < __NSOperationState.executing.rawValue && isExecuting {
                    op._state = .executing//只是修改状态
                }
                op._unlock()
            case .toReady:
                let r = op.isReady//通过计数判断ready
                op._cachedIsReady = r
                let q = op._queue
                if r {
                    q?._schedule()//执行
                }
            }
        }
    }
    ...
}

最后会手动通知downisReady的变化,而down就是从__downDependencies取出的op3

  1. op1,2都执行完成,计数为0时,op3isReady便为true,开始调用_schedule()执行任务,重复第4步。

总的来说,Swift和OC不一样,没有KVO,只是通过手动通知状态变化,从而进行优先级调用。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容