鸿蒙Next并发线程TaskPool使用

ArkTS提供了TaskPool与Worker两种多线程并发方案,当任务不需要长时间(3分钟)占据后台线程,而是一个个独立的任务时,推荐使用TaskPool,反之推荐使用Worker。
使用TaskPool可以实现:
1.创建一个任务线程,执行一个耗时任务,并返回结果
2.一次执行一组任务,执行完成后,按照添加任务的顺序统一返回结果
3.执行一组需要串行执行的任务,任务依次执行,后一个任务在前一个任务结束后执行
4.在任务执行过程中向宿主线程发送消息并触发回调
看一下简单任务的功能演示:


taskpool演示.gif

创建任务Task

参数名 类型 说明
name string 任务名称
func Function 执行的逻辑需要传入函数,必须使用@Concurrent装饰器装饰
args Object[] 任务执行传入函数的入参

例如,创建一个任务,执行从0加到200,每次加1,耗时10ms

@Concurrent
async function taskPool1(text:number): Promise<number>{
  for (let index = 0; index < 200; index++) {
    text++
    await new Promise<void>(resolve => setTimeout(resolve, 10)); // 每次循环增加10毫秒延时
  }
  return text
}
//创建任务
let task = new taskpool.Task(taskPool1,this.a)
//执行
taskpool.execute(task).then((value: Object)=>{
            this.a = value as number
          })

创建任务组TaskGroup

如果所有任务正常执行,异步执行完毕后返回所有任务结果的数组,数组中元素的顺序与addTask的顺序相同,任务组执行时间和耗时最多的任务执行时间差不多,因此同时执行3个任务可以节省时间

let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
let task1 = new taskpool.Task('task1',taskPool1,this.a)
let task2 = new taskpool.Task('task2',taskPool2,this.b)
let task3 = new taskpool.Task('task3',taskPool3,this.c)
taskGroup.addTask(task1)
taskGroup.addTask(task2)
taskGroup.addTask(task3)
let s = await taskpool.execute(taskGroup)

创建串行队列的任务SequenceRunner

串行队列的任务会依次执行,相当于同步任务

let task1 = new taskpool.Task( taskPool1,this.a)
let task2 = new taskpool.Task( taskPool2,this.b)
let task3 = new taskpool.Task( taskPool3,this.c)
let runner:taskpool.SequenceRunner = new taskpool.SequenceRunner();
runner.execute(task1).then((value: Object)=>{
})
runner.execute(task2).then((value: Object)=>{
})
runner.execute(task3).then((value: Object)=>{
})

任务依赖执行

例如有3个任务,创建任务时,设置1依赖2,2依赖3,虽然先execute任务1,但是结果是等任务3执行完之后,再执行任务2,最后执行任务1

let task1 = new taskpool.Task( taskPool1,this.a)
let task2 = new taskpool.Task( taskPool2,this.b)
let task3 = new taskpool.Task( taskPool3,this.c)
task1.addDependency(task2)
task2.addDependency(task3)
taskpool.execute(task1)
taskpool.execute(task2)
taskpool.execute(task3)

任务执行回调

当我们需要监听执行过程的某些参数,可以在任务中设置回调,发送到主线程中

@Concurrent
async function taskPool5(text:number): Promise<number>{
  for (let index = 0; index < 100; index++) {
    text++
//在任务执行过程中向宿主线程发送消息
    taskpool.Task.sendData(text);
    await new Promise<void>(resolve => setTimeout(resolve, 100)); // 每次循环增加10毫秒延时
  }
  return text
}
let task = new taskpool.Task(taskPool5,this.g)
taskpool.execute(task)
task.onReceiveData((current:number)=>{
  //接受回调结果
})

演示源码:

import { taskpool } from '@kit.ArkTS';

@Concurrent
async function taskPool1(text:number): Promise<number>{
  for (let index = 0; index < 200; index++) {
    text++
    await new Promise<void>(resolve => setTimeout(resolve, 10)); // 每次循环增加10毫秒延时
  }

  return text
}
@Concurrent
async function taskPool2(text:number): Promise<number>{
  for (let index = 0; index < 100; index++) {
    text++
    await new Promise<void>(resolve => setTimeout(resolve, 20)); // 每次循环增加20毫秒延时
  }

  return text
}
@Concurrent
async function taskPool3(text:number): Promise<number>{
  for (let index = 0; index < 50; index++) {
    text++
    await new Promise<void>(resolve => setTimeout(resolve, 30)); // 每次循环增加30毫秒延时
  }

  return text
}
@Concurrent
async function taskPool4(args1: number, args2: number): Promise<number>{
  await new Promise<void>(resolve => setTimeout(resolve, 100)); // 每次循环增加30毫秒延时
  return args1+args2
}
@Concurrent
async function taskPool5(text:number): Promise<number>{
  for (let index = 0; index < 100; index++) {
    text++
    taskpool.Task.sendData(text);
    await new Promise<void>(resolve => setTimeout(resolve, 100)); // 每次循环增加10毫秒延时
  }

  return text
}
@Entry
@ComponentV2
struct TaskPoolTest {
  @Local a:number=0
  @Local time1:number=0
  @Local b:number=0
  @Local time2:number=0
  @Local c:number=0
  @Local time3:number=0
  @Local d:string=''
  @Local time4:number=0
  @Local e:string=''
  @Local f:string=''
  @Local ac:string=''
  @Local g:number=0

  build() {
    Column({space:10}){
      Row({space:10}){
        Text('a:'+this.a+'耗时:'+this.time1)
        Button('task1: a加到200 累加延时10ms').onClick(()=>{
          this.a=0
          let task = new taskpool.Task(taskPool1,this.a)
          let start = new Date().getTime()
          taskpool.execute(task).then((value: Object)=>{
            console.info("taskpool result: " + value);
            this.a = value as number
            this.time1 =  new Date().getTime() -start
          })
        })
      }
      Row({space:10}){
        Text('b:'+this.b+'耗时:'+this.time2)
        Button('task2: b加到100 累加延时20ms').onClick(()=>{
          this.b=0
          let task = new taskpool.Task(taskPool2,this.b)
          let start = new Date().getTime()
          taskpool.execute(task).then((value: Object)=>{
            console.info("taskpool result: " + value);
            this.b = value as number
            this.time2 =  new Date().getTime() -start
          })
        })
      }
      Row({space:10}){
        Text('c:'+this.c+'耗时:'+this.time3)
        Button('task3: c加到50 累加延时30ms').onClick(()=>{
          this.c=0
          let task = new taskpool.Task(taskPool3,this.c)
          let start = new Date().getTime()
          taskpool.execute(task).then((value: Object)=>{
            console.info("taskpool result: " + value);
            this.c = value as number
            this.time3 =  new Date().getTime() -start
          })
        })
      }
      Text('任务组执行:'+this.d+'耗时:'+this.time4)
      Row({space:10}){

        Button('组内顺序:123').onClick(async ()=>{
          this.a=0
          this.b=0
          this.c=0
          let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
          let start = new Date().getTime()
          let task1 = new taskpool.Task('task1',taskPool1,this.a)
          let task2 = new taskpool.Task('task2',taskPool2,this.b)
          let task3 = new taskpool.Task('task3',taskPool3,this.c)
          taskGroup.addTask(task1)
          taskGroup.addTask(task2)
          taskGroup.addTask(task3)
          let s = await taskpool.execute(taskGroup)
          this.d = s.toString()
          this.time4 =  new Date().getTime() -start
        })
        Button('组内顺序:321').onClick(async ()=>{
          this.a=0
          this.b=0
          this.c=0
          let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
          let start = new Date().getTime()
          let task1 = new taskpool.Task('task1',taskPool1,this.a)
          let task2 = new taskpool.Task('task2',taskPool2,this.b)
          let task3 = new taskpool.Task('task3',taskPool3,this.c)
          taskGroup.addTask(task3)
          taskGroup.addTask(task2)
          taskGroup.addTask(task1)
          let s = await taskpool.execute(taskGroup)
          this.d = s.toString()
          this.time4 =  new Date().getTime() -start
        })
      }

      Text('串行队列:\n'+this.e)
      Row({space:10}){
        Button('串行执行123').onClick(()=>{
          this.a=0
          this.b=0
          this.c=0
          let start = new Date().getTime()
          let task1 = new taskpool.Task( taskPool1,this.a)
          let task2 = new taskpool.Task( taskPool2,this.b)
          let task3 = new taskpool.Task( taskPool3,this.c)
          let runner:taskpool.SequenceRunner = new taskpool.SequenceRunner();
          runner.execute(task1).then((value: Object)=>{
            this.e =this.e+ 'a:'+value+'耗时:'+(new Date().getTime() -start+'\n')
          })
          runner.execute(task2).then((value: Object)=>{
            this.e = this.e+'b:'+value+'耗时:'+(new Date().getTime() -start+'\n')
          })
          runner.execute(task3).then((value: Object)=>{
            this.e = this.e+'c:'+value+'耗时:'+(new Date().getTime() -start)
          })
        })
        Button('串行执行321').onClick(()=>{
          this.a=0
          this.b=0
          this.c=0
          let start = new Date().getTime()
          let task1 = new taskpool.Task( taskPool1,this.a)
          let task2 = new taskpool.Task( taskPool2,this.b)
          let task3 = new taskpool.Task( taskPool3,this.c)
          let runner:taskpool.SequenceRunner = new taskpool.SequenceRunner();
          runner.execute(task3).then((value: Object)=>{
            this.e = this.e+'c:'+value+'耗时:'+(new Date().getTime() -start+'\n')
          })
          runner.execute(task2).then((value: Object)=>{
            this.e = this.e+'b:'+value+'耗时:'+(new Date().getTime() -start+'\n')
          })
          runner.execute(task1).then((value: Object)=>{
            this.e =this.e+ 'a:'+value+'耗时:'+(new Date().getTime() -start)
          })
        })
      }

      Text('依赖执行')
      Button('依赖执行123').onClick(()=>{
        this.a=0
        this.b=0
        this.c=0
        let start = new Date().getTime()
        let task1 = new taskpool.Task( taskPool1,this.a)
        let task2 = new taskpool.Task( taskPool2,this.b)
        let task3 = new taskpool.Task( taskPool3,this.c)
        task1.addDependency(task2)
        task2.addDependency(task3)
        taskpool.execute(task1).then((value) => {
          this.a = value as number
          this.time1 =  new Date().getTime() -start
        })
        taskpool.execute(task2).then((value) => {
          this.b = value as number
          this.time2 =  new Date().getTime() -start
        })
        taskpool.execute(task3).then((value) => {
          this.c = value as number
          this.time3 =  new Date().getTime() -start
        })
      })

      Text('执行中回调'+this.g)
      Button('task5: g加到100 累加延时100ms').onClick(()=>{
        this.g=0
        let task = new taskpool.Task(taskPool5,this.g)
        taskpool.execute(task).then((value: Object)=>{
          this.g = value as number
        })
        task.onReceiveData((current:number)=>{
          this.g=current
        })
      })

      Text('任务1、3、a+c:'+ this.ac)
      Row({space:10}){
        Button('串行执行').onClick(async ()=>{
          let start = new Date().getTime()
          this.a=0
          this.c=0
          let task1 = new taskpool.Task( taskPool1,this.a)
          let task2 = new taskpool.Task( taskPool3,this.c)
          this.a = await taskpool.execute(task1) as  number
          this.c = await taskpool.execute(task2) as  number
          let task3 = new taskpool.Task( taskPool4,this.a,this.c)
          this.ac = await taskpool.execute(task3) +'耗时:'+(new Date().getTime() -start)
        })
        Button('分组执行').onClick(async ()=>{
          let start = new Date().getTime()
          this.a=0
          this.c=0
          let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
          let task1 = new taskpool.Task( taskPool1,this.a)
          let task2 = new taskpool.Task( taskPool3,this.c)
          taskGroup.addTask(task1)
          taskGroup.addTask(task2)
          let s= await taskpool.execute(taskGroup)
          let res  =  s.toString().split(',').map((item) => Number(item.trim()) as number);
          this.a =  res[0]
          this.c =  res[1]
          let task3 = new taskpool.Task( taskPool4,this.a,this.c)
          this.ac = await taskpool.execute(task3) +'耗时:'+(new Date().getTime() -start)
        })

      }
      Row({space:10}){
        Text('异步队列:AsyncRunner 5.1.0支持')
      }
    }
    .alignItems(HorizontalAlign.Start)
    .width('100%')
  }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容