Unity实践—多线程任务队列实现

Unity 已可使用 Thread、Task 等处理多线程任务,但缺少成熟的多线程任务队列工具,所以在此实现一个,代码已上传 Git 项目 GRUnityTools,可直接下载源码或通过 UPM 使用

本文原地址:Unity实践—多线程任务队列实现

实现目标

  1. 串行与并发队列

    队列是首要实现目标,且需要串行与并发两种队列,以覆盖不同需求

  2. 同步与异步执行

    因任务队列过多可能阻塞主线程,所以除同步执行外还需要多线程异步操作

  3. 主线程同步

    因为有多线程,但 Unity 部分操作只能在主线程执行,所以还需要线程同步到主线程

实现方式

  1. Task

    Task 为当前 .Net 提供的实用性最高的多线程接口,可实现任务的监控与操纵

  2. TaskScheduler

    Task 专用调度器,可更便捷地实现 Task 队列调度

  3. Loom

    Loom 为网络上广为流传的 Unity 中调用主线程的工具类,目前找不到源码最原始地址,代码拷贝自知乎

实现过程

方案选择

最初即决定使用 Task 作为队列基本单位,但完全没有考虑 TaskScheduler。原计划手动实现一个调度器,负责保存传入的 Task 放入队列,可设置同步异步,根据设置实现对队列的不同操作。后来再研究微软官方文档时发现在其 Task 文档的示例中有一个 LimitedConcurrencyLevelTaskScheduler 的演示代码,直接通过 TaskScheduler 实现了可控并发数量的调度器,且当设置并发数为1时队列中的任务会逐一按顺序执行即产生了串行队列效果

TaskScheduler 有两种使用方式

方式一:为 TaskFactory 配置 TaskScheduler,通过 TaksFactory 使用配置的调度器启动 Task

//创建并发数32的调度器
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(32); 
//方式1 
TaskFactory factory = new TaskFactory(scheduler);
factory.StartNew(()=>{
  //执行任务
});

方式二:直接使用 Task.Start(TaskFactory) 方法

//创建并发数1的调度器(此时即为串行队列效果)
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
//声明一个 Task 对象
Task task = new Task(()=>{
  //任务
});
//启动 Task 指定调度器
task.Start(scheduler);

编写源码

创建名为 TaskQueue 的类,添加变量

//根据需求设置默认并发数
private const int DefaultConcurrentCount = 32;
//线程锁
private static object _lock = new object();
//默认静态串行队列对象
private static TaskQueue _defaultSerial;
//默认静态并发队列对象
private static TaskQueue _defaultConcurrent;
//持有的调度器
private LimitedConcurrencyLevelTaskScheduler _scheduler;  

//提供默认串行队列
public static TaskQueue DefaultSerailQueue
{
    get
    {
        if (_defaultSerial == null)
        {
            lock (_lock)
            {
                if (_defaultSerial == null)
                {
                    _defaultSerial = new TaskQueue(1);
                }
            }
        }
        return _defaultSerial;
    }
}

//提供默认并发队列
public static TaskQueue DefaultConcurrentQueue
{
    get
    {
        if (_defaultConcurrent == null)
        {
            lock (_lock)
            {
                if (_defaultConcurrent == null)
                {
                    _defaultConcurrent = new TaskQueue(DefaultConcurrentCount);
                }
            }
        }
        return _defaultConcurrent;
    }
}

提供快捷构造方法

//默认构造方法,因 Loom 为 UnityEngine.Monobehaviour对象,所以必须执行初始化方法将其加入场景中
public TaskQueue(int concurrentCount)
{
    _scheduler = new LimitedConcurrencyLevelTaskScheduler(concurrentCount);
    Loom.Initialize();
}
//创建串行队列
public static TaskQueue CreateSerialQueue()
{
    return new TaskQueue(1);
}
//创建并发队列
public static TaskQueue CreateConcurrentQueue()
{
    return new TaskQueue(DefaultConcurrentCount);
}

下面是各种同步、异步、主线程执行方法,方法会将执行的 Task 返回,以便在实际使用中需要对 Task 有其他操作

需注意 RunSyncOnMainThreadRunAsyncOnMainThread 为独立的执行系统与队列无关,若在主线程执行方法直接在主线程同步执行即可

//异步执行,因Task本身会开启新线程所以直接调用即可
public Task RunAsync(Action action)
{
    Task t = new Task(action);
    t.Start(_scheduler);
    return t;
}

//同步执行,使用 Task 提供的 RunSynchronously 方法
public Task RunSync(Action action)
{
    Task t = new Task(action);
    t.RunSynchronously(_scheduler);
    return t;
}

//同步执行主线程方法
//为避免主线程调用该方法所以需要判断当前线程,若为主线程则直接执行,防止死锁  
//为保证线程同步,此处使用信号量,仅在主线程方法执行完成后才会释放信号
public static void RunSyncOnMainThread(Action action)
{
    if (Thread.CurrentThread.ManagedThreadId == 1)
    {
        action();
    }
    else
    {
        Semaphore sem = new Semaphore(0, 1);
        Loom.QueueOnMainThread((o => { 
            action();
            sem.Release();
        }), null);
        sem.WaitOne();
    }
}

//因 Loom 本身即为不会立即执行方法,所以直接调用即可
public static void RunAsyncOnMainThread(Action action)
{
    Loom.QueueOnMainThread((o => { action(); }), null);
}

扩展延迟执行方法,因延迟本身为异步操作,所以只提供异步执行方式

// 此处使用async、await 关键字实现延迟操作, delay 为秒,Task.Delay 参数为毫秒
public Task RunAsync(Action action, float delay)
{
    Task t = Task.Run(async () =>
    {
        await Task.Delay((int) (delay * 1000));
        return RunAsync(action);
    });
    return t;
}

实现效果

并发队列异步执行
并发队列同步执行
串行队列异步执行
串行队列同步执行
并发队列延迟执行
子线程异步执行主线程
子线程同步执行主线程

到此一个多线程任务队列工具就完成了,一般的需求基本可以满足,后续还可提供更多扩展功能,如传参、取消任务等

另外我个人想尽力将这套工具脱离 UnityEngine.Monobehaviour,但目前还没找到除 Loom 外其他 Unity 获取主线程的方法,当然 Loom 本身仍然是一个很巧妙的工具

若想了解 LimitedConcurrencyLevelTaskSchedulerLoom 可继续想下看

其他

LimitedConcurrencyLevelTaskScheduler

TaskScheduler 为抽象类,想自定义任务调度需继承该类,并复写部分内部调度方法

LimitedConcurrencyLevelTaskScheduler ,以下简称为 LCLTS,为微软官方文档提供的示例代码,用于调度任务,控制并发数

LCLTS 工作流程
  1. 将 Task 入队放入链表
  2. 判断当前已执行任务数量,若未达到最大值则通过 ThreadPool 分配工作线程执行,并计数+1
  3. 标记已分匹配的线程并死循环执行任务队列,将已执行的任务出队
  4. 任务队列为空时退出循环,并移除标记
  5. 若有任务想插队到线程执行,先检查当前线程标记,若无标记则无法执行插队操作,该操作为避免任务占用队列外繁忙线程
  6. 若插队成功则检查该 Task 是否已在队列中,若存在则出队执行,若不存在则直接执行
源码解释
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
   //ThreadStatic 线程变量特性,表明是当前线程是否正在处理任务
   [ThreadStatic]
   private static bool _currentThreadIsProcessingItems;

  // 任务队列,使用链表比 List 和 Array 更方便执行插队出队操作(队列中不会出现空位)
   private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // 该队列由 lock(_tasks) 锁定

   // 最大并发数
   private readonly int _maxDegreeOfParallelism;

   // 当前已分配入队的任务数量 
   private int _delegatesQueuedOrRunning = 0;

   // 带并发数的构造方法
   public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
   {
       if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
       _maxDegreeOfParallelism = maxDegreeOfParallelism;
   }

   // 将 Task 放入调度队列
   protected sealed override void QueueTask(Task task)
   {
      //将任务放入列表,检查当前执行数是否达到最大值,若未达到则分配线程执行,并计数+1
       lock (_tasks)
       {
           _tasks.AddLast(task);
           if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
           {
               ++_delegatesQueuedOrRunning;
               NotifyThreadPoolOfPendingWork();
           }
       }
   }

   // 使用 ThreadPool 将 Task 分配到工作线程
   private void NotifyThreadPoolOfPendingWork()
   {
       ThreadPool.UnsafeQueueUserWorkItem(_ =>
       {
             //标记当前线程正在执行任务,当有 Task 想插入此线程执行时会检查该状态
           _currentThreadIsProcessingItems = true;
           try
           {
               // 死循环处理所有队列中 Task
               while (true)
               {
                   Task item;
                   lock (_tasks)
                   {
                       // 任务队列执行完后退出循环,并将占用标记置为 false
                       if (_tasks.Count == 0)
                       {
                           --_delegatesQueuedOrRunning;
                           break;
                       }

                       // 若还有 Task 则获取第一个,并出队
                       item = _tasks.First.Value;
                       _tasks.RemoveFirst();
                   }

                   // 执行 Task
                   base.TryExecuteTask(item);
               }
           }
           // 线程占用标记置为 false
           finally { _currentThreadIsProcessingItems = false; }
       }, null);
   }

   // 尝试在当前线程执行指定任务
   protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
   {
       // 若当前线程没有在执行任务则无法执行插队操作
       if (!_currentThreadIsProcessingItems) return false;

       // 若该任务已在队列中,则出队
       if (taskWasPreviouslyQueued) 
          // 尝试执行 Task
          if (TryDequeue(task)) 
            return base.TryExecuteTask(task);
          else
             return false; 
       else 
          return base.TryExecuteTask(task);
   }

   // 尝试将已调度的 Task 移出调度队列
   protected sealed override bool TryDequeue(Task task)
   {
       lock (_tasks) return _tasks.Remove(task);
   }

   // 获取最大并发数
   public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

   // 获取已调度任务队列迭代器
   protected sealed override IEnumerable<Task> GetScheduledTasks()
   {
       bool lockTaken = false;
       try
       {
             // Monitor.TryEnter 作用为线程锁,其语法糖为 lock (_tasks)
           Monitor.TryEnter(_tasks, ref lockTaken);
           if (lockTaken) return _tasks;
           else throw new NotSupportedException();
       }
       finally
       {
           if (lockTaken) Monitor.Exit(_tasks);
       }
   }
}

Loom

Loom 通过继承 UnityEngine.MonoBehaviour,使用 Unity 主线程生命周期 Update 在主线程执行方法,同时 Loom 也支持简单的多线程异步执行

Loom 结构和流程

Loom 包含两个队列有延迟方法队列和无延迟方法队列,两条队列方法都可执行传参方法

  1. Actionparam 以及延迟时间打包入结构体放入延迟或无延迟队列
  2. 若为延迟任务,则使用 Time.time 获取添加任务的时间加上延迟时间得到预定执行时间打包入延迟任务结构体并入队
  3. 待一个 Update 周期执行,清空执行队列旧任务,取出无延迟队列所有对象,放入执行队列,清空无延迟队列,遍历执行执行队列任务
  4. 同一个 Update 周期,清空延迟执行队列旧任务,取出预计执行时间小于等于当前时间的任务,放入延迟执行队列,将取出的任务移出延迟队列,遍历执行延迟执行队列任务
Loom 的使用

用户可将 Loom 脚本挂载在已有对象上,也可直接代码调用方法,Loom 会自动在场景中添加一个不会销毁的 Loom 单例对象

代码中使用 QueueOnMainThread 将延迟和无延迟方法加入主线程队列, RunAsync 执行异步方法

public class Loom :MonoBehaviour
{
    public static int maxThreads = 8;
    static int numThreads;

    private static Loom _current;
    //private int _count;
    public static Loom Current
    {
        get
        {
            Initialize();
            return _current;
        }
    }

    void Awake()
    {
        _current = this;
        initialized = true;
    }

    static bool initialized;

    [RuntimeInitializeOnLoadMethod]
    public static void Initialize()
    {
        if (!initialized)
        {

            if (!Application.isPlaying)
                return;
            initialized = true;
            var g = new GameObject("Loom");
            _current = g.AddComponent<Loom>();
#if !ARTIST_BUILD
            UnityEngine.Object.DontDestroyOnLoad(g);
#endif
        }
    }
    public struct NoDelayedQueueItem
    {
        public Action<object> action;
        public object param;
    }

    private List<NoDelayedQueueItem> _actions = new List<NoDelayedQueueItem>();
    public struct DelayedQueueItem
    {
        public float time;
        public Action<object> action;
        public object param;
    }
    private List<DelayedQueueItem> _delayed = new List<DelayedQueueItem>();

    List<DelayedQueueItem> _currentDelayed = new List<DelayedQueueItem>();

    public static void QueueOnMainThread(Action<object> taction, object tparam)
    {
        QueueOnMainThread(taction, tparam, 0f);
    }
    public static void QueueOnMainThread(Action<object> taction, object tparam, float time)
    {
        if (time != 0)
        {
            lock (Current._delayed)
            {
                Current._delayed.Add(new DelayedQueueItem { time = Time.time + time, action = taction, param = tparam });
            }
        }
        else
        {
            lock (Current._actions)
            {
                Current._actions.Add(new NoDelayedQueueItem { action = taction, param = tparam });
            }
        }
    }

    public static Thread RunAsync(Action a)
    {
        Initialize();
        while (numThreads >= maxThreads)
        {
            Thread.Sleep(100);
        }
        Interlocked.Increment(ref numThreads);
        ThreadPool.QueueUserWorkItem(RunAction, a);
        return null;
    }

    private static void RunAction(object action)
    {
        try
        {
            ((Action)action)();
        }
        catch
        {
        }
        finally
        {
            Interlocked.Decrement(ref numThreads);
        }

    }


    void OnDisable()
    {
        if (_current == this)
        {

            _current = null;
        }
    }



    // Use this for initialization
    void Start()
    {

    }

    List<NoDelayedQueueItem> _currentActions = new List<NoDelayedQueueItem>();

    // Update is called once per frame
    void Update()
    {
        if (_actions.Count > 0)
        {
            lock (_actions)
            {
                _currentActions.Clear();
                _currentActions.AddRange(_actions);
                _actions.Clear();
            }
            for (int i = 0; i < _currentActions.Count; i++)
            {
                _currentActions[i].action(_currentActions[i].param);
            }
        }

        if (_delayed.Count > 0)
        {
            lock (_delayed)
            {
                _currentDelayed.Clear();
                _currentDelayed.AddRange(_delayed.Where(d => d.time <= Time.time));
                for (int i = 0; i < _currentDelayed.Count; i++)
                {
                    _delayed.Remove(_currentDelayed[i]);
                }
            }

            for (int i = 0; i < _currentDelayed.Count; i++)
            {
                _currentDelayed[i].action(_currentDelayed[i].param);
            }
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容