Dart 单线程模型
众所周知,在Java中使用多线程来处理并发任务,适量并合适地使用多线程,能够极大地提高资源的利用率和程序运行效率,但是缺点也比较明显,比如过度开启线程会带来额外的资源和性能消耗或多线程共享内存容易出现死锁等。实际上,在APP的使用过程中,多数处理空闲状态,并不需要进行密集或高并发的处理,因此从某些意义上来说,多线程显得有点多余。正是因为如此,Dart作为一种新的语言,通过引单线程模型很好地处理了并发任务对多线程的依赖。
1.1 单线程模型
Dart是一种单线程语言,因此Dart程序没有主线程和子线程之分,而在Dart中线程并不是指Thread,而是指Isolate。因为Dart没有线程的概念,只有Isolate,每个Isolate都是隔离的,并不会共享内存。所有的Dart代码都是在Isolate中运行,它就像机器上的一个小空间,具有自己的私有内存块和一个运行着事件循环模型的单线程。也就是说,一旦某个Dart函数开始执行,它将执行到这个函数的结束而不被其他Dart代码打断,这就是单线程的特性。
默认情况下,Dart程序只有一个Isolate(未自己创建的情况下),而这个Isolate就是Main Isolate。也就是说,一个Dart程序是从Main Isolate的main函数开始的,而在main函数结束后,Main isolate线程开始一个一个处理事件循环模型队列中的每一事件(Event)。上图描述的就是Main Isolate的消息循环模型。
1.2 事件循环模型
也许你会问,既然Dart是一种单线程语言,那么是不是就意味着Dart无法并发处理异步任务了?此言差矣。前面说到,所有的Dart程序都在Isolate中运行,每个Isolate拥有自己的私有内存块和一个事件循环模型,其中,事件循环模型就是用来处理各种事件,比如点输入/输出,点击,定时器以及异步任务等。下图描述了一个Isolate的事件循环模型的整个流程:
从上图可知,Dart事件循环机制由一个消息循环(event looper)和两个消息队列构成,其中,两个消息队列是指事件队列(event queue)和微任务队列(Microtask queue)。该机制运行原理为:
- 首先,Dart程序从main函数开始运行,待main函数执行完毕后,event looper开始工作;
- 然后,event looper优先遍历执行Microtask队列所有事件,直到Microtask队列为空;
- 接着,event looper才遍历执行Event队列中的所有事件,直到Event队列为空;
- 最后,视情况退出循环。
为了进一步理解,我们解释下上述三个概念:
(1)消息循环(Event Looper)
顾名思义,消息循环就是指一个永不停歇且不能阻塞的循环,它将不停的尝试从微任务队列和事件队列中获取事件(event)进行处理,而这些Event包括了用户输入,点击,Timer,文件IO等。
(2)事件队列(Event queue)
该队列的事件来源于外部事件和Future,其中,外部事件主要包括I/O,手势,绘制,计时器和isolate相互通信的message等,而Future主要是指用户自定义的异步任务,通过创建Future类实例来向事件队列添加事件。需要注意的是,当Event looper正在处理Microtask Queue时,Event queue会被阻塞,此时APP将无法进行UI绘制,响应用户输入和I/O等事件。下列示例演示了向Event queue中添加一个异步任务事件:
main(List<String> args) {
print('main start...')
var futureInstance = Future<String>(() => "12345");
futureInstance.then((res) {
print(res);
}).catchError((err) {
print(err);
});
print('main end...')
}
// 打印结果:
// main start...
// main end...
// 12345
(3)微任务队列(Microtask queue)
该队列的事件来源与当前isolate的内部或通过scheduleMicrotask函数创建,Microtask一般用于非常短的内部异步动作,并且任务量非常少,如果微任务非常多,就会造成Event queue排不上队,会阻塞Event queue的执行造成应用ANR,因为Microtask queue的优先级高于Event queue。因此,大多数情况下的任务优先考虑使用Event queue,不到万不得已不要使用Microtask queue。下列 示例演示了两个事件队列执行情况:
iimport 'dart:async';
void main(List<String> args) {
print('main start...');
/// 创建异步event queue
var futureInstance = Future<String>(() => "12345");
futureInstance.then((res) {
print(res);
}).catchError((err) {
print(err);
});
/// 执行函数
stark();
/// scheduleMicrotask函数创建微任务MircroTask queue
scheduleMicrotask(() => print('microtask #1 of 2'));
print('main end...');
/// scheduleMicrotask函数创建微任务MircroTask queue
scheduleMicrotask(() => print('microtask #2 of 2'));
}
void stark() async {
print('stark start...');
/// 创建异步event queue
var futureInstance = Future<String>(() => "900303");
/// 异步等待
var rep = await futureInstance;
print(rep);
/// 创建异步event queue
var futureInstance1 = Future<String>(() => "xxxxx");
futureInstance1.then((res) {
print(res);
}).catchError((err) {
print(err);
});
print('stark end...');
}
// 执行顺序:
// main start...
// stark start...
// main end...
// microtask #1 of 2
// microtask #2 of 2
// 12345
// 900303
// stark end...
// xxxxx
1.3 Isolate
大多数计算机中,甚至在移动平台上,都在使用多核CPU。 为了有效利用多核性能,开发者一般使用共享内存数据来保证多线程的正确执行。 然而多线程共享数据通常会导致很多潜在的问题,并导致代码运行出错。Dart作为一种新语言,为了缓解上述问题,提出了Isolate(隔离区)的概念,即Dart没有线程的概念,只有Isolate,所有的Dart代码都是在Isolate中运行,它就像是机器上的一个小空间,具有自己的私有内存堆和一个运行着Event Looper的单个线程。
通常,一个Dart应用对应着一个Main Isolate,且应用的入口即为该Isolate的main函数。当然,我们也可以创建其它的Isolate,由于Isolate的内存堆是私有的,因此这些Isolate的内存都不会被其它Isolate访问。假如不同的Isolate需要通信(单向/双向),就只能通过向对方的事件循环队列里写入任务,并且它们之间的通讯方式是通过port(端口)实现的,其中,Port又分为receivePort(接收端口)和sendPort(发送端口),它们是成对出现的。Isolate之间通信过程:
- 首先,当前Isolate创建一个ReceivePort对象,并获得对应的SendPort对象;
var receivePort = ReceivePort();
var sendPort = receivePort.sendPort;
- 其次,创建一个新的Isolate,并实现新Isolate要执行的异步任务,同时,将当前Isolate的SendPort对象传递给新的Isolate,以便新Isolate使用这个SendPort对象向原来的Isolate发送事件;
// 调用Isolate.spawn创建一个新的Isolate
// 这是一个异步操作,因此使用await等待执行完毕
var anotherIsolate = await Isolate.spawn(otherIsolateInit, receivePort.sendPort);
// 新Isolate要执行的异步任务
// 即调用当前Isolate的sendPort向其receivePort发送消息
void otherIsolateInit(SendPort sendPort) async {
value = "Other Thread!";
sendPort.send("BB");
}
- 第三,调用当前Isolate#receivePort的listen方法监听新的Isolate传递过来的数据。Isolate之间什么数据类型都可以传递,不必做任何标记。
receivePort.listen((date) {
print("Isolate 1 接受消息:data = $date");
});
- 最后,消息传递完毕,关闭新创建的Isolate。
import 'dart:isolate';
var anotherIsolate;
var value = "Now Thread!";
void startOtherIsolate() async {
value = " Isolate1 Thread!";
/// 获取当前 Isolate 接收对象
var receivePort = ReceivePort();
receivePort.listen((message) {
if (message is String) {
print("Isolate1 接受消息:data = $message; value = $value");
} else if (message is SendPort) {
message.send("我来自 isolate1 发送 消息");
}
});
/// 创建 Isolate 对象
anotherIsolate = await Isolate.spawn(otherIsolateInit, receivePort.sendPort);
}
void otherIsolateInit(SendPort sendPort) async {
value = " Isolate2 Thread!";
/// 获取当前 Isolate 接收对象
var receivePort2 = ReceivePort();
receivePort2.listen((message) {
print("Isolate2 接受消息:message = $message; value = $value ");
});
/// 发送消息
sendPort.send("加油");
sendPort.send(receivePort2.sendPort);
}
void main(List<String> args) {
startOtherIsolate();
}
==============执行结果===========
Isolate1 接受消息:data = 加油; value = Isolate1 Thread!
Isolate2 接受消息:message = 我来自 isolate1 发送 消息; value = Isolate2 Thread!
Isolate 处理耗时操作使用场景
import 'dart:isolate';
var anotherIsolate;
var value = "Now Thread!";
void startOtherIsolate() async {
value = " Isolate1 Thread!";
int count = 1000000000;
/// 获取当前 Isolate 接收对象
var receivePort = ReceivePort();
receivePort.listen((message) {
if (message is String) {
print("Isolate1 接收消息:message = $message; value = $value");
} else if (message is SendPort) {
message.send("我来自 isolate1 发送 消息");
} else if (message is int) {
print('Isolate1 接接int消息: $message');
}
});
/// 创建 Isolate 对象
anotherIsolate = await Isolate.spawn(
otherIsolateInit, {"sendPort": receivePort.sendPort, "params": count});
}
void otherIsolateInit(Map<String, dynamic> info) async {
SendPort sendPort = info['sendPort'];
int count = info['params'];
value = " Isolate2 Thread!";
print('otherIsolateInit start...');
/// 获取当前 Isolate 接收对象
var receivePort2 = ReceivePort();
receivePort2.listen((message) {
print("Isolate2 接受消息:message = $message; value = $value ");
});
/// 发送消息
sendPort.send(receivePort2.sendPort);
/// 处理耗时任务
/// 耗时任务会阻塞该isolate、不会阻塞main isolate
int _sum = 0;
int _index = 0;
while (_index <= count) {
_sum += _index;
_index++;
}
sendPort.send(_sum);
print('otherIsolateInit end...');
}
void main(List<String> args) {
startOtherIsolate();
}
============输入日志===============
otherIsolateInit start...
otherIsolateInit end...
Isolate1 接接int消息: 500000000500000000
Isolate2 接受消息:message = 我来自 isolate1 发送 消息; value = Isolate2 Thread!