Flutter 学习之旅(四十一) Flutter 状态 Stream 学习(一)

Stream 是Flutter 中非常常用的一个概念,但是他并不是Flutter 所独有的,而是在Dart中默认封装的,他和Future 一样,都可以异步执行,但是相对于Future Stream 更加高级.

Stream代表着事件流,通过Stream可以快速的实现事件流驱动业务逻辑,界面通过订阅事件,针对事件转换最后通过响应事件完成页面布局,而在整个Stream流过程中,有四个角色分别扮演了不同的角色

1.StreamController

字面意思,管理调度整个事件流的流程,并保存整个事件流中所需要的对象,便于管理和使用

2.StreamSink

事件的开始入口,所有的同步和异步事件都是从这里开始的 ,提供了add 和 addStream 等方法

3.Stream

事件本身,可以被转换和监听,订阅后返回StreamSubscription 对象

4.StreamSubscription

订阅Stream 后得到的对象,可以管理订阅过的各种操作,例: cancel() pause();

整个事件流的流经过程就是,先创建Stream ,创建回调方法并订阅, 再通过StreamSink 添加事件源,在订阅的过程中可以使用StreamSubscription管理这个订阅状态,最后回调 Stream 订阅的回调方法,仔细想一下整个过程其实和 StreamController 并没有太大的关系, 但是整个过程如果离开了StreamController , 没有从中间调度的过程,整个代码的逻辑和代码量都是非常让人头疼的,

在这里再吐槽一下dart 源码的结构,虽然通过 with 拼接是一个很好的方式,但是这样对刚刚学习的来说是非常不友好的,我整个学习过程中在学习控件的时候并没有太多的感触,但是在接触Element 和RenderObject 后,心里已经产生了这个想法,今天上午在看Stream的过程更是让我再次认同了这个想法,不过话虽然这么说,还是需要看的,自己也算是看了一些源码了,总结了一下dart源码怎么看,正好利用我对Stream的学习,分享一下我是怎么看源码的. 废话不多说,上套路!!!

这里先贴一下我的环境,
D:\User\flutter_app1>flutter --version
Flutter 1.20.3 • channel stable • https://github.com/flutter/flutter.git
Framework • revision 216dee60c0 (6 weeks ago) • 2020-09-01 12:24:47 -0700
Engine • revision d1bc06f032
Tools • Dart 2.9.2

1.理清构架

想要学习源码,首先你要知道各个类在这个功能中所扮演的角色,不必清楚每个方法是如何实现的,但是你需要他在整个流程中存在的目的, 这些很简单,通过度娘就能获取到你想要知道的浅显的介绍,这里回想一下Stream 的几个角色,想想他们的用途,但是他们是如何关联起来的呢,带着这个疑问,我们就来从代码说起.

2.重点突破

StreamController 作为管理和调度中心,他的源码其实应该就是整个Stream实现的核心,但是你去看一下其实他的方法非常的乱,没有一个主线将它连接起来, 这里我们看一下上面我们对整个Stream 的流程总结,我们就按照总结的流程来看一下Stream 的工作过程, 先看Stream

1>Stream 的创建和订阅

我们首次获取 Stream 是通过StreamController 来获取的,

  // Return a new stream every time. The streams are equal, but not identical.
///在获取Stream 的时候,每次都获取一个新的Stream , 流是相等的,但是并不完全相同,
  Stream<T> get stream => _ControllerStream<T>(this);

根据创建的实体我们看一下他的订阅 也就是listen 方法,

abstract class _StreamImpl<T> extends Stream<T> {

  ////可以传入onError 和 onDone  也就是错误和完成回调
  StreamSubscription<T> listen(void onData(T data)?, {Function? onError, void onDone()?, bool? cancelOnError}) {
    cancelOnError ??= false;
    ////创建订阅 _createSubscription 这个方法被子类重写了
    StreamSubscription<T> subscription =_createSubscription(onData, onError, onDone, cancelOnError);
    _onListen(subscription);
    return subscription;
  }
}

在这里发现这个方法并不是_ControllerStream实现的,而是由他的父类_StreamImpl实现的,代码我已经放在上面了,

class _ControllerStream<T> extends _StreamImpl<T> {
  _StreamControllerLifecycle<T> _controller;
  _ControllerStream(this._controller);
  StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) =>
      _controller._subscribe(onData, onError, onDone, cancelOnError);
}

这里我们发现_createSubscription 这个方法又调用了 StreamController 的_subscribe 方法

  StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
      void onDone()?, bool cancelOnError) {
    ///不能重复订阅
    if (!_isInitialState) {
      throw StateError("Stream has already been listened to.");
    }
///创建订阅
    _ControllerSubscription<T> subscription = _ControllerSubscription<T>( this, onData, onError, onDone, cancelOnError);
    _PendingEvents<T>? pendingEvents = _pendingEvents;
///修改状态为已经订阅  
    _state |= _STATE_SUBSCRIBED;
if (_isAddingStream) {
      _StreamControllerAddStreamState<T> addState = _varData as dynamic;
      addState.varData = subscription;
      addState.resume();
    } else {
      _varData = subscription;
    }
     ...
    return subscription;
  }

_varData 就是订阅后的 subscription 在这里共享了他,以便于在后续接收到数据时找到回调 ,他只能被订阅一次,

我们发现在创建订阅过程中使用的是 _ControllerSubscription ,他继承了 _BufferingStreamSubscription 这个类来创建的订阅,

  _BufferingStreamSubscription(void onData(T data)?, Function? onError,
      void onDone()?, bool cancelOnError)
      : this.zoned(Zone.current, onData, onError, onDone, cancelOnError);

  _BufferingStreamSubscription.zoned(this._zone, void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError)
      : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0),
        _onData = _registerDataHandler<T>(_zone, onData),
        _onError = _registerErrorHandler(_zone, onError),
        _onDone = _registerDoneHandler(_zone, onDone);

来到创建 StreamSubscription 的这个方法 ,收到了这个onData () 这个方法,并保存了起来,这里我们发现
StreamSubscription 的最终实现子类是 _BufferingStreamSubscription ,记录一下,这个很重要

controller.stream.listen() --> _ControllerStream.listen() --> _ControllerStream._createSubscription() --> controller._subscribe() --> 返回_ControllerSubscription(); 并执行 zone.runUnaryGuarded()

创建Stream 并订阅的整个流程大致就是这些方法,

这里我们就把创建 Stream 和 订阅的这个过程看完了,接下来我们再看 StreamSink 发送事件源的过程

2> StreamSink 发送事件源

说道这里我们就要说一下StreamController 他的创建方法, 这里有一个bool sync ,他控制着是同步执行还是异步执行,
如果 sync 为true 则同步执行,否则异步执行,他默认的是异步执行,也就是说 默认创建StreamController 发送这接收信息之间是异步的,这里先说一下同步和异步的区别,

  factory StreamController(
      {void onListen()?,
      void onPause()?,
      void onResume()?,
      FutureOr<void> onCancel()?,
      bool sync = false}) {
    return sync
        ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
        : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
  }

abstract class _SyncStreamControllerDispatch<T>
    implements _StreamController<T>, SynchronousStreamController<T> {
  void _sendData(T data) {
    _subscription._add(data);
  }
}

abstract class _AsyncStreamControllerDispatch<T>
    implements _StreamController<T> {
  void _sendData(T data) {
    _subscription._addPending(_DelayedData<T>(data));
  }
}

开始说事件源分发
与 Stream 同样的道理,先在 StreamController 中找到sink ,

  /**
   * Returns a view of this object that only exposes the [StreamSink] interface.
   */
  StreamSink<T> get sink => _StreamSinkWrapper<T>(this);

这里的sink 使用的是_StreamSinkWrapper 来实现,我们来看一下他的送法事件源 即 add 方法

class _StreamSinkWrapper<T> implements StreamSink<T> {
  final StreamController _target;
  _StreamSinkWrapper(this._target);
  void add(T data) {
    _target.add(data);
  }
}

来到这里我们发现整个 _StreamSinkWrapper 就是一个中转类,在调用他的add 方法的时候,其实调用的是target.add()方法,即 StreamController 的add 方法

 void _add(T value) {
  ///在 listen 方法中已经调用controller 的 sub
    if (hasListener) {
      _sendData(value);
    } else if (_isInitialState) {
      _ensurePendingEvents().add(_DelayedData<T>(value));
    }
  }

在StreamController 的add方法中我们看到,如果已经_subscribe 方法将state 修改为 _STATE_SUBSCRIBED;即已订阅的状态,这里走事件的分发 _sendData(value); 这个方法

这里我们先说一下同步执行的方法

abstract class _SyncStreamControllerDispatch<T>
    implements _StreamController<T>, SynchronousStreamController<T> {
  void _sendData(T data) {
    _subscription._add(data);
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addError(error, stackTrace);
  }

  void _sendDone() {
    _subscription._close();
  }
}

在同步方法里面,使用的_subscription 即上面所说的_varData 得_add 方法 也就是_BufferingStreamSubscription 的add 方法

///添加数据
  void _add(T data) {
    assert(!_isClosed);
    if (_isCanceled) return;
    if (_canFire) {
      _sendData(data);
    } else {
      _addPending(new _DelayedData<T>(data));
    }
  }
  
///使用zone 来 调用回调
  void _sendData(T data) {
    assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);
    bool wasInputPaused = _isInputPaused;
    _state |= _STATE_IN_CALLBACK;
    _zone.runUnaryGuarded(_onData, data);
    _state &= ~_STATE_IN_CALLBACK;
    _checkState(wasInputPaused);
  }

关于异步方法

abstract class _AsyncStreamControllerDispatch<T>
    implements _StreamController<T> {
  void _sendData(T data) {
    _subscription._addPending(_DelayedData<T>(data));
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addPending(_DelayedError(error, stackTrace));
  }

  void _sendDone() {
    _subscription._addPending(const _DelayedDone());
  }
}

这里使用的是 _subscription._addPending 的这个方法

  void _addPending(_DelayedEvent event) {
    _StreamImplEvents<T>? pending = _pending as dynamic;
    pending ??= _StreamImplEvents<T>();
    _pending = pending;
    pending.add(event);
    if (!_hasPending) {/// 没有开始循环
      _state |= _STATE_HAS_PENDING;
      if (!_isPaused) {///没有被暂停
        pending.schedule(this);
      }
    }
  }

这里初始化了 _pending 并将event 添加到里这个_pending 里面,这里如果循环处理事件没有被开启,则开启循环事件处理消息,即 pending.schedule(this);

  void schedule(_EventDispatch<T> dispatch) {
    if (isScheduled) return;///已经开启了,则返回
    assert(!isEmpty);
    if (_eventScheduled) {///事件已经被消费
      assert(_state == _STATE_CANCELED);
      _state = _STATE_SCHEDULED;
      return;
    }
////这里开始异步,遍历 dispatch , 并开启了循环
    scheduleMicrotask(() {
      int oldState = _state;
      _state = _STATE_UNSCHEDULED;
      if (oldState == _STATE_CANCELED) return;
      handleNext(dispatch);
    });
    _state = _STATE_SCHEDULED;
  }
  
////此时是异步执行,使用的是链表方式保存的数据  ,会存在异步耗时任务,第一个没有执行完第二个又进来了,则这个任务会被默认调价到队尾,
  void handleNext(_EventDispatch<T> dispatch) {
    assert(!isScheduled);
    assert(!isEmpty);
    _DelayedEvent event = firstPendingEvent!;
    _DelayedEvent? nextEvent = event.next;
    firstPendingEvent = nextEvent;
    if (nextEvent == null) {
      lastPendingEvent = null;
    }
    ///最后再执行下一个事件
    event.perform(dispatch);
  }
  ///此时是异步执行,使用_BufferingStreamSubscription   的_sendData 方法 执行回调 ,回到同步方法onData 
 void perform(_EventDispatch<T> dispatch) {
    dispatch._sendData(value);
  }

如何调用异步的简单的说一下,由于本人了解的也不是很多,简单的说一下loop的这个概念

///在我的理解所有的  _AsyncRun 共享同一个loop ,在你添加任务进来的同时,如果其他的 StreamSink 添加进来的任务没有被消费完,则将你的添加到队尾, 否则开始新的 异步循环
void _scheduleAsyncCallback(_AsyncCallback callback) {
  _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  _AsyncCallbackEntry? lastCallback = _lastCallback;
  if (lastCallback == null) {
    _nextCallback = _lastCallback = newEntry;
    if (!_isInCallbackLoop) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  } else {
    lastCallback.next = newEntry;
    _lastCallback = newEntry;
  }
}

///开始执行循环,如果你的循环执行完后,再次检查,如果队伍还有,则重新开始异步循环
void _startMicrotaskLoop() {
  _isInCallbackLoop = true;
  try {
    // Moved to separate function because try-finally prevents
    // good optimization.
    _microtaskLoop();
  } finally {
    _lastPriorityCallback = null;
    _isInCallbackLoop = false;
    if (_nextCallback != null) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  }
}
 ////队列模式,一个一个取出,并调用callback 回调
void _microtaskLoop() {
  for (var entry = _nextCallback; entry != null; entry = _nextCallback) {
    _lastPriorityCallback = null;
    var next = entry.next;
    _nextCallback = next;
    if (next == null) _lastCallback = null;
    (entry.callback)();
  }
}

这个发送事件源的过程也结束了,总结一下
StreamSink.add() --> StreamController.sendData ()
--> 同步: 直接StreamSubscription.sendData()
-->异步: 添加到_pending 队列中StreamSubscription.addPending , -->如果loop 循环没有执行 ,则开始循环 即 scheduleMicrotask 异步处理数据, -->_StreamImplEvents.handleNext() -->最后调用 StreamSubscription.sendData() 给予回调

我学习flutter的整个过程都记录在里面了
https://www.jianshu.com/c/36554cb4c804

最后附上demo 地址

https://github.com/tsm19911014/tsm_flutter

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343