前言
一些时候我们执行异步操作,如一个api请求,可直接使用async/await 完成。但有时候我们可能需要短时间内执行大量异步操作,如打印,写入文件等等,那么再用async/await 直接插入到event queue中势必会造成卡顿,为此我们应该将它们放在子线程中。
不过频繁创建/释放isolate也是对资源的浪费,所以我决定模仿java写一个线程池(基础版)。
基础版 略显稚嫩,还请海涵
结构图
主要分为三部分:
main isolate : flutter 主线程
proxy isolate : 子线程 负责接收主线程的指令,并下发给work isolate
work isolate : 实际执行任务的子线程
流程
主线程提交任务,proxy线程接收并缓存下来,同时向线程池(3个线程)内部的空闲线程进行任务派发
生命周期:线程创建后,便会一直存在,与APP一致
之后通过反射在工作线程中调用预先定义好的方法。
对于开发人员,无需关注线程内部原理,只需要定义一个方法,
然后通过WorkerMainProxy类的invokeWorker就可以实现子线程执行。
接下来我们分别实现它们,还是老规矩,代码较多时我会将说明写在注释里。
main_isolate
一些常量和数据结构
因为isolate之间只能通过sendPort.send互相通信,所以我们先定义一些常量
const int kSendPortKey = 6633;//第二个元素则为 sendPort
const int kTaskKey = 8844; // 第二个元素为task
const int kTaskParamsKey = 10055; // 第二个元素为 方法对应的 参数
const int kTaskResult = 15500;//任务返回结果
///
const String kMethodName = 'kMethodName';
const String kNameArgs = 'kNameArgs';
port发送的message可以是 null,num,double,bool,或者包含上述类型的List和Map,以及SendPort,所以我们需要设计一下数据结构。
我这里统一使用数组 :
[key,data] key 是上方的常量值
当data为task时,类型会是Map,然后用到上面的字符串常量
WorkerMainProxy
这个类内部与proxy_isolate联系,并通过invokeWorker方法,将任务提交给proxy_isolate,我们看一下他的代码,说明我写在注释里
class WorkerMainProxy{
static WorkerMainProxy _instance;
static WorkerMainProxy getInstance(){
if(_instance == null){
_instance = WorkerMainProxy._();
}
return _instance;
}
factory WorkerMainProxy()=>getInstance();
WorkerMainProxy._();
//主线程的收听端口
final ReceivePort receivePort = ReceivePort();
//proxy
Isolate isolate;
//向proxy发送信息的port
SendPort childPort;
//因为线程初始化为异步,确保下达的任务不丢失,创建这个缓存
List<TaskWrapper> taskCache = [];
//是否在初始化proxy线程
bool initializing = false;
///nameArgs key: params name
///value: params value .
///and type can only 'num,null,double,String'
void invokeWorker({String methodName,Map<String,dynamic> nameArgs})async{
taskCache.add(TaskWrapper(methodName, nameArgs));
if(isolate == null && !initializing){
initializing = true;
//初始化子线程,第一个参数为顶级函数,由子线程执行,
//第二个参数为当前线程的sendport,子线程可以用这个向当前线程发送消息
isolate = await Isolate.spawn(proxyHandler, receivePort.sendPort);
//收听子线程发送的消息
receivePort.listen((message) {
if(message[0] == kSendPortKey){
//子线程的sendport传递过来
childPort = message[1];
sendTask();
}else if(message[0]==kTaskResult){
//初版不支持返回结果
///要考虑多线程不同步的情况
//可能造成难以预料的结果,所以暂不考虑加入返回结果的功能
}
});
}else if(isolate != null && childPort != null){
sendTask();
}
}
//当childPort初始化成功后,开始发送任务
void sendTask(){
if(taskCache.length > 0){
taskCache.forEach((element) {
childPort.send([kTaskKey,element.methodName,element.nameArgs]);
});
taskCache.clear();
}
}
}
功能很简单,接下来我们看 proxy_isolate的实现
proxy_isolate
结构图
proxyHandler
这个方法是属于proxy 线程(是isolate,叫线程叫习惯了,别被我搞乱了)的,
切记dart线程是不能共享内存的,
同时不要在子线程里使用dart:ui 或者flutter的东西,否则会报错
代码如下:
//任务缓存
List<TaskWrapper> taskLog = [];
//收/发端口
final ReceivePort receiveMainPort = ReceivePort();
final SendPort sendPortOfProxy = receiveMainPort.sendPort;
//线程池
final Map<int,WorkIsolateWrapper> workers = {};
void proxyHandler(SendPort mainPort)async{
//监听主线程的信息
receiveMainPort.listen((message) {
if(message[0] == kTaskKey){
//对主线程的任务进行缓存
String name = message[1];
Map<String,dynamic> args = message[2];
TaskWrapper wrapper = TaskWrapper(name,args);
taskLog.add(wrapper);
}
});
//将proxy的sendport传给主线程
mainPort.send([kSendPortKey,sendPortOfProxy]);
/// create 3 work isolate
///创建三个工作线程
List.generate(3, (index) async{
//同理
final ReceivePort proxyPort = ReceivePort();
final SendPort proxySendPort = proxyPort.sendPort;
Isolate.spawn(_workerIsolate, proxySendPort,paused: true)
.then((isolate) {
//生成ID
int id = Random().nextInt(1000);
while(workers.containsKey(id)){
id = Random().nextInt(1000);
}
var worker = WorkIsolateWrapper(id,proxyPort, proxySendPort, isolate);
workers[id] = worker;
//启动工作线程
worker.init();
});
});
//启动代理
runProxy();
}
//创建了一个timer用于循环从任务缓存取出任务,并分发给空闲的工作线程
void runProxy(){
final timer = Timer.periodic(Duration(milliseconds: 1), (timer) {
if(taskLog.length>0){
workers.forEach((key, value) {
if(taskLog.length > 0){
if(value.isStandBy()){
//向空闲的线程发送任务
TaskWrapper task = taskLog.first;
value.setStatus(false);// not free
value.workSendPort.send([kTaskKey,{kMethodName:task.methodName,
kNameArgs:task.nameArgs}]);
//移除对应任务
taskLog.removeWhere((element) => element == task);
}
}
});
}
});
}
对工作线程进行了一下包装WorkIsolateWrapper,方便操作,我们来看一下他的内部结构
WorkIsolateWrapper
class WorkIsolateWrapper {
final int id;
final ReceivePort proxyPort;
final SendPort proxySendPort;
//work_isolate
final Isolate _isolate;
WorkIsolateWrapper(this.id,this.proxyPort, this.proxySendPort, this._isolate);
///是否空闲
bool _isFree = true;
bool isStandBy()=> _isFree&&initSuccess;
setStatus(bool status){
_isFree = status;
}
SendPort workSendPort;
bool initSuccess = false;
//这就是我们上面调用的init方法
//这个方法开始初始化 work isoalte,并把其sendPort保存下来。
init() {
_isolate.resume(_isolate.pauseCapability);
proxyPort.listen((message) {
//监听工作线程
if (message[0] == kSendPortKey) {
//保存工作线程的 sendPort
workSendPort = message[1];
initSuccess = true;
}else if(message[0] == kWorkDone){
///work done
setStatus(true);
print('isolate $id 完成了 :${message[1].toString()}');
}
});
}
}
至此proxy_isolate就实现了,我们接下来看一下work_isolate的实现
work_isolate
实现
const int kWorkDone = 98766; //处理完后的回复tag
void _workerIsolate(SendPort proxyPort){
//启动反射,这里我在后面介绍
initializeReflectable();
final ReceivePort receivePort = ReceivePort();
final SendPort sendPort = receivePort.sendPort;
receivePort.listen((message) {
if(message[0] == kTaskKey){
///执行任务
//对msg进行解构再组装
Map method = message[1];
String mn = method[kMethodName];
Map<Symbol,dynamic> nameArguments = {};
if(method[kNameArgs] is Map){
///为了避免顺序错误导致的参数异常,这里不使用positionalArguments
(method[kNameArgs] as Map).forEach((key, value) {
nameArguments[Symbol(key)] = value;
});
//功能模板类,里面定义了一些列方法,通过反射进行调用
final WorkList workerList = WorkList();
final InstanceMirror instanceMirror = myReflect.reflect(workerList);
//执行静态方法用的,这里不需要先注释掉
//final ClassMirror classMirror = myReflect.reflectType(WorkList);
//调用方法
instanceMirror.invoke(mn, [],nameArguments);
///work done
///结构暂定为 [order flag, result(Map)]
///个人认为这种多线程处理任务,最好不要有返回结果 ....待设计
proxyPort.send([kWorkDone,{'method':mn,'args':nameArguments.toString()}]);
}
}
});
//将工作线程的sendPort发送给proxy
proxyPort.send([kSendPortKey,sendPort]);
}
workList 反射模板
介绍
我们在worklist里定义好方法,子线程通过反射就可以调用这些方法,这样实际使用的时候,开发人员就不需要关心子线程的实现,只用在WorkList里面定义好自己的方法,然后通过WorkerMainProxy.invoerWorker()方法调用即可。
实现
首先我们需要两个插件:
reflectable: ^2.2.5
build_runner: ^1.7.0
之后我们在main.dart文件中增加如下代码:
///具体方法模板类
@myReflect
class WorkList{
test({String n,String m}){
print(' test method $n');
}
}
///插件反射对象
const myReflect = MyReflectable();
class MyReflectable extends Reflectable{
const MyReflectable():super(invokingCapability);
}
同时在根目录中增加一个build.yaml文件用来生成代码
targets:
$default:
builders:
reflectable:
generate_for:
- lib/main.dart
都弄好了后,我们在控制台执行
flutter packages pub run build_runner build
过一小会,你就会看到main.dart下方多了一个
main.refelctable.dart
这样我们就完成了反射模板workList的配置,每次编辑workList后都需要运行一下
flutter packages pub run build_runner build
测试
至此整个功能就开发完毕了,我们测试一下,点击按钮发起100次调用:
RaisedButton(onPressed: ()async{
List.generate(100, (index){
WorkerMainProxy.getInstance()
///参数一:方法名字,参数二:方法对应的命名参数,
///务必确保参数名与WorkList中的一致
.invokeWorker(methodName: 'test',nameArgs: {'n':'第$index次唤起','m':'第二个参数'});
});
},
child: Text('测试worker'),),
可以看到测试结果是符合我们预期的,初版功能就开发完毕了。
结语
以上只是初版功能,还有诸多需要完善的,后续我会不断增加,另外功能模块也可以进一步拓展,如 java中的cache线程池。
也欢迎大家补充,如有不足还请指出,
该功能已加入 Bedrock开发框架,希望大家多提意见 :)