Flutter——手撸线程(Isolate)池和子线程工作模块的设计

前言

一些时候我们执行异步操作,如一个api请求,可直接使用async/await 完成。但有时候我们可能需要短时间内执行大量异步操作,如打印,写入文件等等,那么再用async/await 直接插入到event queue中势必会造成卡顿,为此我们应该将它们放在子线程中。

不过频繁创建/释放isolate也是对资源的浪费,所以我决定模仿java写一个线程池(基础版)。

基础版 略显稚嫩,还请海涵

结构图

image

主要分为三部分:

main isolate : flutter  主线程
proxy isolate :  子线程  负责接收主线程的指令,并下发给work isolate
work isolate :  实际执行任务的子线程

流程

主线程提交任务,proxy线程接收并缓存下来,同时向线程池(3个线程)内部的空闲线程进行任务派发

生命周期:线程创建后,便会一直存在,与APP一致

之后通过反射在工作线程中调用预先定义好的方法。

对于开发人员,无需关注线程内部原理,只需要定义一个方法,
然后通过WorkerMainProxy类的invokeWorker就可以实现子线程执行。

接下来我们分别实现它们,还是老规矩,代码较多时我会将说明写在注释里。

main_isolate

一些常量和数据结构

因为isolate之间只能通过sendPort.send互相通信,所以我们先定义一些常量

const int kSendPortKey = 6633;//第二个元素则为 sendPort

const int kTaskKey = 8844; // 第二个元素为task

const int kTaskParamsKey = 10055; // 第二个元素为 方法对应的 参数

const int kTaskResult = 15500;//任务返回结果

///
const String kMethodName = 'kMethodName';
const String kNameArgs = 'kNameArgs';

port发送的message可以是 null,num,double,bool,或者包含上述类型的List和Map,以及SendPort,所以我们需要设计一下数据结构。

我这里统一使用数组 :

[key,data] key 是上方的常量值
当data为task时,类型会是Map,然后用到上面的字符串常量

WorkerMainProxy

这个类内部与proxy_isolate联系,并通过invokeWorker方法,将任务提交给proxy_isolate,我们看一下他的代码,说明我写在注释里

class WorkerMainProxy{
  static WorkerMainProxy _instance;
  static WorkerMainProxy getInstance(){
    if(_instance == null){
      _instance = WorkerMainProxy._();
    }
    return _instance;
  }

  factory WorkerMainProxy()=>getInstance();


  WorkerMainProxy._();
    //主线程的收听端口
  final ReceivePort receivePort = ReceivePort();
  //proxy
  Isolate isolate;
  //向proxy发送信息的port
  SendPort childPort;
    //因为线程初始化为异步,确保下达的任务不丢失,创建这个缓存
  List<TaskWrapper> taskCache = [];
    //是否在初始化proxy线程
  bool initializing = false;

  ///nameArgs  key: params name
  ///value: params value .
  ///and type can only 'num,null,double,String'
  void invokeWorker({String methodName,Map<String,dynamic> nameArgs})async{
    taskCache.add(TaskWrapper(methodName, nameArgs));
    if(isolate == null && !initializing){
      initializing = true;
      //初始化子线程,第一个参数为顶级函数,由子线程执行,
      //第二个参数为当前线程的sendport,子线程可以用这个向当前线程发送消息
      isolate  = await Isolate.spawn(proxyHandler, receivePort.sendPort);
      //收听子线程发送的消息
      receivePort.listen((message) {
        if(message[0] == kSendPortKey){
          //子线程的sendport传递过来
          childPort = message[1];
          sendTask();
        }else if(message[0]==kTaskResult){
           //初版不支持返回结果
          ///要考虑多线程不同步的情况
          //可能造成难以预料的结果,所以暂不考虑加入返回结果的功能
        }
      });
    }else if(isolate != null && childPort != null){
      sendTask();
    }

  }
    //当childPort初始化成功后,开始发送任务
  void sendTask(){
    if(taskCache.length > 0){
      taskCache.forEach((element) {
        childPort.send([kTaskKey,element.methodName,element.nameArgs]);
      });
      taskCache.clear();
    }
  }

}

功能很简单,接下来我们看 proxy_isolate的实现

proxy_isolate

结构图

image

proxyHandler

这个方法是属于proxy 线程(是isolate,叫线程叫习惯了,别被我搞乱了)的,

切记dart线程是不能共享内存的,
同时不要在子线程里使用dart:ui 或者flutter的东西,否则会报错

代码如下:

//任务缓存
List<TaskWrapper> taskLog = [];
//收/发端口
final ReceivePort receiveMainPort = ReceivePort();
final SendPort sendPortOfProxy = receiveMainPort.sendPort;
//线程池
final Map<int,WorkIsolateWrapper> workers = {};

void proxyHandler(SendPort mainPort)async{
  //监听主线程的信息
  receiveMainPort.listen((message) {
    if(message[0] == kTaskKey){
    //对主线程的任务进行缓存
      String name = message[1];
      Map<String,dynamic> args = message[2];
      TaskWrapper wrapper = TaskWrapper(name,args);
      taskLog.add(wrapper);
    }

  });
 //将proxy的sendport传给主线程
  mainPort.send([kSendPortKey,sendPortOfProxy]);


  /// create 3 work isolate
    ///创建三个工作线程
  List.generate(3, (index) async{
   //同理
    final ReceivePort proxyPort = ReceivePort();
    final SendPort proxySendPort = proxyPort.sendPort;
    Isolate.spawn(_workerIsolate, proxySendPort,paused: true)
      .then((isolate) {
      //生成ID
      int id = Random().nextInt(1000);
      while(workers.containsKey(id)){
        id = Random().nextInt(1000);
      }
      var worker = WorkIsolateWrapper(id,proxyPort, proxySendPort, isolate);
      workers[id] = worker;
      //启动工作线程
      worker.init();
    });


  });
  //启动代理
  runProxy();

}
//创建了一个timer用于循环从任务缓存取出任务,并分发给空闲的工作线程
void runProxy(){
  final timer = Timer.periodic(Duration(milliseconds: 1), (timer) {
    if(taskLog.length>0){
      workers.forEach((key, value) {
        if(taskLog.length > 0){
          if(value.isStandBy()){
          //向空闲的线程发送任务
            TaskWrapper task = taskLog.first;
            value.setStatus(false);// not free
            value.workSendPort.send([kTaskKey,{kMethodName:task.methodName,
              kNameArgs:task.nameArgs}]);
              //移除对应任务
            taskLog.removeWhere((element) => element == task);
          }

        }

      });

    }
  });
}

对工作线程进行了一下包装WorkIsolateWrapper,方便操作,我们来看一下他的内部结构

WorkIsolateWrapper

class WorkIsolateWrapper {
  final int id;
  final ReceivePort proxyPort;

  final SendPort proxySendPort;
  //work_isolate
  final Isolate _isolate;

  WorkIsolateWrapper(this.id,this.proxyPort, this.proxySendPort, this._isolate);

  ///是否空闲
  bool _isFree = true;
  bool isStandBy()=> _isFree&&initSuccess;
  setStatus(bool status){
    _isFree = status;
  }

  SendPort workSendPort;
  bool initSuccess = false;
  //这就是我们上面调用的init方法
  //这个方法开始初始化 work isoalte,并把其sendPort保存下来。
  init() {
    _isolate.resume(_isolate.pauseCapability);
    proxyPort.listen((message) {
    //监听工作线程
      if (message[0] == kSendPortKey) {
       //保存工作线程的 sendPort
        workSendPort = message[1];
        initSuccess = true;
      }else if(message[0] == kWorkDone){
        ///work done
        setStatus(true);
        print('isolate $id  完成了 :${message[1].toString()}');
      }
    });
  }
}

至此proxy_isolate就实现了,我们接下来看一下work_isolate的实现

work_isolate

实现

const int kWorkDone = 98766; //处理完后的回复tag

void _workerIsolate(SendPort proxyPort){
  //启动反射,这里我在后面介绍
  initializeReflectable();
  final ReceivePort receivePort = ReceivePort();
  final SendPort sendPort = receivePort.sendPort;

  receivePort.listen((message) {
    if(message[0] == kTaskKey){
      ///执行任务
      //对msg进行解构再组装
      Map method = message[1];
      String mn = method[kMethodName];
      Map<Symbol,dynamic> nameArguments = {};
      if(method[kNameArgs] is Map){
        ///为了避免顺序错误导致的参数异常,这里不使用positionalArguments
        (method[kNameArgs] as Map).forEach((key, value) {
          nameArguments[Symbol(key)] = value;
        });
        //功能模板类,里面定义了一些列方法,通过反射进行调用
        final WorkList workerList = WorkList();
        final InstanceMirror instanceMirror = myReflect.reflect(workerList);
        //执行静态方法用的,这里不需要先注释掉
        //final ClassMirror classMirror = myReflect.reflectType(WorkList);
        //调用方法
        instanceMirror.invoke(mn, [],nameArguments);
        ///work done
        ///结构暂定为 [order flag, result(Map)]
        ///个人认为这种多线程处理任务,最好不要有返回结果 ....待设计
        proxyPort.send([kWorkDone,{'method':mn,'args':nameArguments.toString()}]);
      }
    }
  });
    //将工作线程的sendPort发送给proxy
  proxyPort.send([kSendPortKey,sendPort]);

}

workList 反射模板

介绍

我们在worklist里定义好方法,子线程通过反射就可以调用这些方法,这样实际使用的时候,开发人员就不需要关心子线程的实现,只用在WorkList里面定义好自己的方法,然后通过WorkerMainProxy.invoerWorker()方法调用即可。

实现

首先我们需要两个插件:

reflectable: ^2.2.5
build_runner: ^1.7.0

之后我们在main.dart文件中增加如下代码:

///具体方法模板类
@myReflect
class WorkList{

  test({String n,String m}){
    print('  test method   $n');
  }


}

///插件反射对象
const myReflect = MyReflectable();

class MyReflectable extends Reflectable{
  const MyReflectable():super(invokingCapability);
}

同时在根目录中增加一个build.yaml文件用来生成代码

targets:
  $default:
    builders:
      reflectable:
        generate_for:
          - lib/main.dart

都弄好了后,我们在控制台执行

flutter packages pub run build_runner build

过一小会,你就会看到main.dart下方多了一个

main.refelctable.dart 

这样我们就完成了反射模板workList的配置,每次编辑workList后都需要运行一下

flutter packages pub run build_runner build

测试

至此整个功能就开发完毕了,我们测试一下,点击按钮发起100次调用:

              RaisedButton(onPressed: ()async{
                List.generate(100, (index){
                  WorkerMainProxy.getInstance()
                  ///参数一:方法名字,参数二:方法对应的命名参数,
                  ///务必确保参数名与WorkList中的一致
                      .invokeWorker(methodName: 'test',nameArgs: {'n':'第$index次唤起','m':'第二个参数'});

                });

              },
                child: Text('测试worker'),),
image

可以看到测试结果是符合我们预期的,初版功能就开发完毕了。

结语

以上只是初版功能,还有诸多需要完善的,后续我会不断增加,另外功能模块也可以进一步拓展,如 java中的cache线程池。

也欢迎大家补充,如有不足还请指出,

该功能已加入 Bedrock开发框架,希望大家多提意见 :)

Bedrock开发框架

我的其它文章

Bedrock——基于MVVM+Provider的Flutter快速开发框架

Flutter自定义View——仿高德三级联动Drawer

Flutter 自定义View——仿同花顺自选股列表

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342