Stream 是Flutter 中非常常用的一个概念,但是他并不是Flutter 所独有的,而是在Dart中默认封装的,他和Future 一样,都可以异步执行,但是相对于Future Stream 更加高级.
Stream代表着事件流,通过Stream可以快速的实现事件流驱动业务逻辑,界面通过订阅事件,针对事件转换最后通过响应事件完成页面布局,而在整个Stream流过程中,有四个角色分别扮演了不同的角色
1.StreamController
字面意思,管理调度整个事件流的流程,并保存整个事件流中所需要的对象,便于管理和使用
2.StreamSink
事件的开始入口,所有的同步和异步事件都是从这里开始的 ,提供了add 和 addStream 等方法
3.Stream
事件本身,可以被转换和监听,订阅后返回StreamSubscription 对象
4.StreamSubscription
订阅Stream 后得到的对象,可以管理订阅过的各种操作,例: cancel() pause();
整个事件流的流经过程就是,先创建Stream ,创建回调方法并订阅, 再通过StreamSink 添加事件源,在订阅的过程中可以使用StreamSubscription管理这个订阅状态,最后回调 Stream 订阅的回调方法,仔细想一下整个过程其实和 StreamController 并没有太大的关系, 但是整个过程如果离开了StreamController , 没有从中间调度的过程,整个代码的逻辑和代码量都是非常让人头疼的,
在这里再吐槽一下dart 源码的结构,虽然通过 with 拼接是一个很好的方式,但是这样对刚刚学习的来说是非常不友好的,我整个学习过程中在学习控件的时候并没有太多的感触,但是在接触Element 和RenderObject 后,心里已经产生了这个想法,今天上午在看Stream的过程更是让我再次认同了这个想法,不过话虽然这么说,还是需要看的,自己也算是看了一些源码了,总结了一下dart源码怎么看,正好利用我对Stream的学习,分享一下我是怎么看源码的. 废话不多说,上套路!!!
这里先贴一下我的环境,
D:\User\flutter_app1>flutter --version
Flutter 1.20.3 • channel stable • https://github.com/flutter/flutter.git
Framework • revision 216dee60c0 (6 weeks ago) • 2020-09-01 12:24:47 -0700
Engine • revision d1bc06f032
Tools • Dart 2.9.2
1.理清构架
想要学习源码,首先你要知道各个类在这个功能中所扮演的角色,不必清楚每个方法是如何实现的,但是你需要他在整个流程中存在的目的, 这些很简单,通过度娘就能获取到你想要知道的浅显的介绍,这里回想一下Stream 的几个角色,想想他们的用途,但是他们是如何关联起来的呢,带着这个疑问,我们就来从代码说起.
2.重点突破
StreamController 作为管理和调度中心,他的源码其实应该就是整个Stream实现的核心,但是你去看一下其实他的方法非常的乱,没有一个主线将它连接起来, 这里我们看一下上面我们对整个Stream 的流程总结,我们就按照总结的流程来看一下Stream 的工作过程, 先看Stream
1>Stream 的创建和订阅
我们首次获取 Stream 是通过StreamController 来获取的,
// Return a new stream every time. The streams are equal, but not identical.
///在获取Stream 的时候,每次都获取一个新的Stream , 流是相等的,但是并不完全相同,
Stream<T> get stream => _ControllerStream<T>(this);
根据创建的实体我们看一下他的订阅 也就是listen 方法,
abstract class _StreamImpl<T> extends Stream<T> {
////可以传入onError 和 onDone 也就是错误和完成回调
StreamSubscription<T> listen(void onData(T data)?, {Function? onError, void onDone()?, bool? cancelOnError}) {
cancelOnError ??= false;
////创建订阅 _createSubscription 这个方法被子类重写了
StreamSubscription<T> subscription =_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
}
在这里发现这个方法并不是_ControllerStream实现的,而是由他的父类_StreamImpl实现的,代码我已经放在上面了,
class _ControllerStream<T> extends _StreamImpl<T> {
_StreamControllerLifecycle<T> _controller;
_ControllerStream(this._controller);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) =>
_controller._subscribe(onData, onError, onDone, cancelOnError);
}
这里我们发现_createSubscription 这个方法又调用了 StreamController 的_subscribe 方法
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
///不能重复订阅
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
///创建订阅
_ControllerSubscription<T> subscription = _ControllerSubscription<T>( this, onData, onError, onDone, cancelOnError);
_PendingEvents<T>? pendingEvents = _pendingEvents;
///修改状态为已经订阅
_state |= _STATE_SUBSCRIBED;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData as dynamic;
addState.varData = subscription;
addState.resume();
} else {
_varData = subscription;
}
...
return subscription;
}
_varData 就是订阅后的 subscription 在这里共享了他,以便于在后续接收到数据时找到回调 ,他只能被订阅一次,
我们发现在创建订阅过程中使用的是 _ControllerSubscription ,他继承了 _BufferingStreamSubscription 这个类来创建的订阅,
_BufferingStreamSubscription(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError)
: this.zoned(Zone.current, onData, onError, onDone, cancelOnError);
_BufferingStreamSubscription.zoned(this._zone, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0),
_onData = _registerDataHandler<T>(_zone, onData),
_onError = _registerErrorHandler(_zone, onError),
_onDone = _registerDoneHandler(_zone, onDone);
来到创建 StreamSubscription 的这个方法 ,收到了这个onData () 这个方法,并保存了起来,这里我们发现
StreamSubscription 的最终实现子类是 _BufferingStreamSubscription ,记录一下,这个很重要
controller.stream.listen() --> _ControllerStream.listen() --> _ControllerStream._createSubscription() --> controller._subscribe() --> 返回_ControllerSubscription(); 并执行 zone.runUnaryGuarded()
创建Stream 并订阅的整个流程大致就是这些方法,
这里我们就把创建 Stream 和 订阅的这个过程看完了,接下来我们再看 StreamSink 发送事件源的过程
2> StreamSink 发送事件源
说道这里我们就要说一下StreamController 他的创建方法, 这里有一个bool sync ,他控制着是同步执行还是异步执行,
如果 sync 为true 则同步执行,否则异步执行,他默认的是异步执行,也就是说 默认创建StreamController 发送这接收信息之间是异步的,这里先说一下同步和异步的区别,
factory StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false}) {
return sync
? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
abstract class _SyncStreamControllerDispatch<T>
implements _StreamController<T>, SynchronousStreamController<T> {
void _sendData(T data) {
_subscription._add(data);
}
}
abstract class _AsyncStreamControllerDispatch<T>
implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(_DelayedData<T>(data));
}
}
开始说事件源分发
与 Stream 同样的道理,先在 StreamController 中找到sink ,
/**
* Returns a view of this object that only exposes the [StreamSink] interface.
*/
StreamSink<T> get sink => _StreamSinkWrapper<T>(this);
这里的sink 使用的是_StreamSinkWrapper 来实现,我们来看一下他的送法事件源 即 add 方法
class _StreamSinkWrapper<T> implements StreamSink<T> {
final StreamController _target;
_StreamSinkWrapper(this._target);
void add(T data) {
_target.add(data);
}
}
来到这里我们发现整个 _StreamSinkWrapper 就是一个中转类,在调用他的add 方法的时候,其实调用的是target.add()方法,即 StreamController 的add 方法
void _add(T value) {
///在 listen 方法中已经调用controller 的 sub
if (hasListener) {
_sendData(value);
} else if (_isInitialState) {
_ensurePendingEvents().add(_DelayedData<T>(value));
}
}
在StreamController 的add方法中我们看到,如果已经_subscribe 方法将state 修改为 _STATE_SUBSCRIBED;即已订阅的状态,这里走事件的分发 _sendData(value); 这个方法
这里我们先说一下同步执行的方法
abstract class _SyncStreamControllerDispatch<T>
implements _StreamController<T>, SynchronousStreamController<T> {
void _sendData(T data) {
_subscription._add(data);
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addError(error, stackTrace);
}
void _sendDone() {
_subscription._close();
}
}
在同步方法里面,使用的_subscription 即上面所说的_varData 得_add 方法 也就是_BufferingStreamSubscription 的add 方法
///添加数据
void _add(T data) {
assert(!_isClosed);
if (_isCanceled) return;
if (_canFire) {
_sendData(data);
} else {
_addPending(new _DelayedData<T>(data));
}
}
///使用zone 来 调用回调
void _sendData(T data) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
关于异步方法
abstract class _AsyncStreamControllerDispatch<T>
implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(_DelayedData<T>(data));
}
void _sendError(Object error, StackTrace stackTrace) {
_subscription._addPending(_DelayedError(error, stackTrace));
}
void _sendDone() {
_subscription._addPending(const _DelayedDone());
}
}
这里使用的是 _subscription._addPending 的这个方法
void _addPending(_DelayedEvent event) {
_StreamImplEvents<T>? pending = _pending as dynamic;
pending ??= _StreamImplEvents<T>();
_pending = pending;
pending.add(event);
if (!_hasPending) {/// 没有开始循环
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {///没有被暂停
pending.schedule(this);
}
}
}
这里初始化了 _pending 并将event 添加到里这个_pending 里面,这里如果循环处理事件没有被开启,则开启循环事件处理消息,即 pending.schedule(this);
void schedule(_EventDispatch<T> dispatch) {
if (isScheduled) return;///已经开启了,则返回
assert(!isEmpty);
if (_eventScheduled) {///事件已经被消费
assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
return;
}
////这里开始异步,遍历 dispatch , 并开启了循环
scheduleMicrotask(() {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return;
handleNext(dispatch);
});
_state = _STATE_SCHEDULED;
}
////此时是异步执行,使用的是链表方式保存的数据 ,会存在异步耗时任务,第一个没有执行完第二个又进来了,则这个任务会被默认调价到队尾,
void handleNext(_EventDispatch<T> dispatch) {
assert(!isScheduled);
assert(!isEmpty);
_DelayedEvent event = firstPendingEvent!;
_DelayedEvent? nextEvent = event.next;
firstPendingEvent = nextEvent;
if (nextEvent == null) {
lastPendingEvent = null;
}
///最后再执行下一个事件
event.perform(dispatch);
}
///此时是异步执行,使用_BufferingStreamSubscription 的_sendData 方法 执行回调 ,回到同步方法onData
void perform(_EventDispatch<T> dispatch) {
dispatch._sendData(value);
}
如何调用异步的简单的说一下,由于本人了解的也不是很多,简单的说一下loop的这个概念
///在我的理解所有的 _AsyncRun 共享同一个loop ,在你添加任务进来的同时,如果其他的 StreamSink 添加进来的任务没有被消费完,则将你的添加到队尾, 否则开始新的 异步循环
void _scheduleAsyncCallback(_AsyncCallback callback) {
_AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
_AsyncCallbackEntry? lastCallback = _lastCallback;
if (lastCallback == null) {
_nextCallback = _lastCallback = newEntry;
if (!_isInCallbackLoop) {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}
} else {
lastCallback.next = newEntry;
_lastCallback = newEntry;
}
}
///开始执行循环,如果你的循环执行完后,再次检查,如果队伍还有,则重新开始异步循环
void _startMicrotaskLoop() {
_isInCallbackLoop = true;
try {
// Moved to separate function because try-finally prevents
// good optimization.
_microtaskLoop();
} finally {
_lastPriorityCallback = null;
_isInCallbackLoop = false;
if (_nextCallback != null) {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}
}
}
////队列模式,一个一个取出,并调用callback 回调
void _microtaskLoop() {
for (var entry = _nextCallback; entry != null; entry = _nextCallback) {
_lastPriorityCallback = null;
var next = entry.next;
_nextCallback = next;
if (next == null) _lastCallback = null;
(entry.callback)();
}
}
这个发送事件源的过程也结束了,总结一下
StreamSink.add() --> StreamController.sendData ()
--> 同步: 直接StreamSubscription.sendData()
-->异步: 添加到_pending 队列中StreamSubscription.addPending , -->如果loop 循环没有执行 ,则开始循环 即 scheduleMicrotask 异步处理数据, -->_StreamImplEvents.handleNext() -->最后调用 StreamSubscription.sendData() 给予回调
我学习flutter的整个过程都记录在里面了
https://www.jianshu.com/c/36554cb4c804
最后附上demo 地址