【UE】 UE 多线程框架

最近有同学问到在UE中要想处理一个耗时20+ms的任务,要怎么做?考虑到这个任务是运行时执行的,为了避免导致卡顿,两个思路是:

  1. 分帧执行

  2. 异步调用

分帧执行的目的是化大为小,分成多帧来执行,由于这个任务不太方便拆解,因此这个思路可行性不高,唯一的方式就是异步执行了,然而由于对UE的多线程逻辑不是非常清晰,因此无法给出非常具体的实现方案,只能提供思路,这里为了补足对这一块的理解,抽空对UE的多线程框架做了学习,并梳理出这篇文章,后续会随着工作中的深入来对这篇文章进行不断迭代与完善。

1. 基本介绍

UE自己实现了一套多线程机制,这套机制包含了众多的异步方法实现(参考后面的介绍),此外,由于UE是支持C++11的,所以使用std::thread也是支持的。

2. 使用方式

UE的异步(多线程)执行有两种主要的方式,一种是基于异步类型的,一种则是基于全局异步方法的。

2.1 异步类型

基于类型的异步执行又可以分成三种:

  1. 继承FRunnable接口创建单个线程

    使用案例可以参考FAsyncWriter,FUdpSocketReceiver以及FTcpListener,这里也给出一个大致的实现代码:

class FRunnableTest : public FRunnable
    {
     virtual uint32 Run() override
     {
     UE_LOG(LogTemp, Log, TEXT("[FRunnableTest::Run] Start"));
     FPlatformProcess::Sleep(30);
     UE_LOG(LogTemp, Log, TEXT("[FRunnableTest::Run] End"));
     return 0;
     }
    
    };
    
    FRunnable* RunnablePtr = new FRunnableTest();
    TestThread = FRunnableThread::Create(RunnablePtr, TEXT("Just For Test"));

简单解释一下实现逻辑,UE抽象了一种基于可执行体 + 执行线程载体的组合方式,其中可执行体指的是FRunnable,而执行线程载体则是FRunnableThread,FRunnableThread的工作有:

  • 调用各个平台内部的 API 创建线程
  • 调用可执行体的 Init()、Run()、Exit()
  • 提供管理线程生命周期的各种方法
  • 兼容不支持多线程的平台:不过这个得在实现自定义 FRunnable 的时候,同时继承 FSingleThreadRunnable 并重载 Tick() 方法,使用 Tick 来调用可执行体

在上面的代码里:我们直接继承自FRunnable创建了一个新的可执行体FRunnableTest,并对Run方法做了重载(实际上还有其他的函数也需要做重载,不过这里就不展示了)。

在需要进行异步执行的时候,会通过FRunnableThread的静态方法Create创建一个执行线程载体FRunnableThread,这个线程中执行的任务就是刚刚定义的执行体的Run方法。

  1. 创建AsyncTask来调用线程池里面空闲的线程
    这是一种基于线程池的异步任务处理系统,这套系统同样是基于Runnable实现的,在实际工作中,我们经常会遇到需要将部分代码放在特定的线程中进行执行,而这种问题就可以通过这种方式来解决:
if(IsInGameThread())
    {
     //….一些操作
    }
    else
    {
     AsyncTask(ENamedThreads::GameThread, [=]()
     {
     //….一些操作
     });
    }
  1. 通过TaskGraph系统来异步完成一些自定义任务
    这是一套抽象的异步任务处理系统,通过这套系统,我们可以创建多个多线程任务,并且指定各个任务之间的依赖关系,并按照该关系来依次处理任务,所有任务依赖关系形成一张有向无环图。

2.2 全局异步方法

UE还提供了若干个用于实现异步(多线程)执行的全局函数:

  1. AsyncTask
    这个方法调用GraphTask创建了一个立刻执行的任务,可以看成是是TaskGraph的简单版本。
    这个需要执行的任务可以指定执行的线程,需要注意的是,如果某个任务从AnyThread改成GameThread执行,AsyncTask下面的代码也是不会阻塞的。这个时候还是单线程,只是传入的Lambda方法会在主线程一帧里的其他地方调用,GameThread执行由于没有线程切换,因此整体时间消耗会少于AnyThread,不过问题则在于加长了整体的单帧时间消耗。

  2. Async
    当我们在进行异步调用的时候需要有返回值和回调函数的时候通常会使用Async方法,不过这个方法的性能也较差,因此如果不是十分必要,不要考虑这个方法。

    这个方法的第一个参数是EAsyncExecution,指定了任务执行的方式:

/**
     * Enumerates available asynchronous execution methods.
     */
    enum class EAsyncExecution
    {
     /** Execute in Task Graph (for short running tasks). */
     TaskGraph,
    
     /** Execute in Task Graph on the main thread (for short running tasks). */
     TaskGraphMainThread,
    
     /** Execute in separate thread if supported (for long running tasks). */
     Thread,
    
     /** Execute in separate thread if supported or supported post fork (see FForkProcessHelper::CreateThreadIfForkSafe) (for long running tasks). */
     ThreadIfForkSafe,
    
     /** Execute in global queued thread pool. */
     ThreadPool,
    
    #if WITH_EDITOR
     /** Execute in large global queued thread pool. */
     LargeThreadPool
    #endif
    };

进入到这个方法的内部实现:

/**
     * Execute a given function asynchronously.
     *
     * Usage examples:
     *
     *  // using global function
     *  int TestFunc()
     *  {
     *      return 123;
     *  }
     *
     *  TUniqueFunction<int()> Task = TestFunc();
     *  auto Result = Async(EAsyncExecution::Thread, Task);
     *
     *  // using lambda
     *  TUniqueFunction<int()> Task = []()
     *  {
     *      return 123;
     *  }
     *
     *  auto Result = Async(EAsyncExecution::Thread, Task);
     *
     *
     *  // using inline lambda
     *  auto Result = Async(EAsyncExecution::Thread, []() {
     *      return 123;
     *  }
     *
     * @param CallableType The type of callable object.
     * @param Execution The execution method to use, i.e. on Task Graph or in a separate thread.
     * @param Function The function to execute.
     * @param CompletionCallback An optional callback function that is executed when the function completed execution.
     * @return A TFuture object that will receive the return value from the function.
     */
    template<typename CallableType>
    auto Async(EAsyncExecution Execution, CallableType&& Callable, TUniqueFunction<void()> CompletionCallback = nullptr) -> TFuture<decltype(Forward<CallableType>(Callable)())>
    {
     using ResultType = decltype(Forward<CallableType>(Callable)());
     TUniqueFunction<ResultType()> Function(Forward<CallableType>(Callable));
     TPromise<ResultType> Promise(MoveTemp(CompletionCallback));
     TFuture<ResultType> Future = Promise.GetFuture();
    
     switch (Execution)
     {
     case EAsyncExecution::TaskGraphMainThread:
     // fallthrough
     case EAsyncExecution::TaskGraph:
     {
     TGraphTask<TAsyncGraphTask<ResultType>>::CreateTask().ConstructAndDispatchWhenReady(MoveTemp(Function), MoveTemp(Promise), Execution == EAsyncExecution::TaskGraph ? ENamedThreads::AnyThread : ENamedThreads::GameThread);
     }
     break;

     case EAsyncExecution::Thread:
     if (FPlatformProcess::SupportsMultithreading())
     {
     TPromise<FRunnableThread*> ThreadPromise;
     TAsyncRunnable<ResultType>* Runnable = new TAsyncRunnable<ResultType>(MoveTemp(Function), MoveTemp(Promise), ThreadPromise.GetFuture());

     const FString TAsyncThreadName = FString::Printf(TEXT("TAsync %d"), FAsyncThreadIndex::GetNext());
     FRunnableThread* RunnableThread = FRunnableThread::Create(Runnable, *TAsyncThreadName);
    
     check(RunnableThread != nullptr);
     check(RunnableThread->GetThreadType() == FRunnableThread::ThreadType::Real);
    
     ThreadPromise.SetValue(RunnableThread);
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    
     case EAsyncExecution::ThreadIfForkSafe:
     if (FPlatformProcess::SupportsMultithreading() || FForkProcessHelper::IsForkedMultithreadInstance())
     {
     TPromise<FRunnableThread*> ThreadPromise;
     TAsyncRunnable<ResultType>* Runnable = new TAsyncRunnable<ResultType>(MoveTemp(Function), MoveTemp(Promise), ThreadPromise.GetFuture());
    
     const FString TAsyncThreadName = FString::Printf(TEXT("TAsync %d"), FAsyncThreadIndex::GetNext());
     FRunnableThread* RunnableThread = FForkProcessHelper::CreateForkableThread(Runnable, *TAsyncThreadName);
    
     check(RunnableThread != nullptr);
     check(RunnableThread->GetThreadType() == FRunnableThread::ThreadType::Real);
    
     ThreadPromise.SetValue(RunnableThread);
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    
     case EAsyncExecution::ThreadPool:
     if (FPlatformProcess::SupportsMultithreading())
     {
     GThreadPool->AddQueuedWork(new TAsyncQueuedWork<ResultType>(MoveTemp(Function), MoveTemp(Promise)));
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    
    #if WITH_EDITOR
     case EAsyncExecution::LargeThreadPool:
     if (FPlatformProcess::SupportsMultithreading())
     {
     GLargeThreadPool->AddQueuedWork(new TAsyncQueuedWork<ResultType>(MoveTemp(Function), MoveTemp(Promise)));
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    #endif
    
     default:
     check(false); // not implemented yet!
     }
    
     return MoveTemp(Future);
    }

可以看到,这是一个模板函数,其中实现中有两个C++11的关键字:

  • decltype:自动类型推导,与auto区别在于不需要赋值就可以推导
  • Forward:完美转发,指的是函数模板可以将自己的参数“完美”地转发给内部调用的其它函数,所谓的完美,指的是不仅能准确地转发参数的值,还能保证被转发参数的左、右值属性不变。
    在实现完美转发时,只要函数模板的参数类型为 T&&,则 C++ 可以自行准确地判定出实际传入的实参是左值还是右值,见下面的万能引用
    不过,对于函数模板内部来说,由于形参既有名称又能寻址,因此它还是左值。那么如何才能将函数模板接收到的形参连同其左、右值属性,一起传递给被调用的函数呢?
    C++11 标准引入了一个模板函数 forword<T>(),我们只需要调用该函数,就可以很方便地解决此问题
template <typename T>
void function(T&& t) 
{
  otherdef(forward<T>(t));
}

万能引用

  • C++11 标准中规定,通常情况下右值引用形式的参数只能接收右值,不能接收左值
  • 对于函数模板中使用右值引用语法定义的参数来说,它不再遵守这一规定,既可以接收右值,也可以接收左值
  • 此时的右值引用又被称为“万能引用”
template <typename T>
void function(T&& t) 
{
  otherdef(t);
}

此模板函数的参数 t 既可以接收左值,也可以接收右值,这里在实际使用中,会需要用到一个叫做引用折叠的概念,C++ 11标准为了更好地实现完美转发,特意为模板函数中的参数指定了新的类型匹配规则:

  • 当实参为左值或者左值引用(A&)时,函数模板中 T&& 将转变为 A&(A& && = A&)
  • 当实参为右值或者右值引用(A&&)时,函数模板中 T&& 将转变为 A&&(A&& && = A&&)

这里给出Async方法使用的一个简单示例代码:

// 使用全局函数
int TestFunc()
{
return 123;
}

TFunction<int()> Task = TestFunc();
auto Result = Async(EAsyncExecution::Thread, Task);

// 使用lambda
TFunction<int()> Task = []()
{
return 123;
}

auto Result = Async(EAsyncExecution::Thread, Task);


// 使用inline lambda
auto Result = Async<int>(EAsyncExecution::Thread, []() {
return 123;
}

此外,值得一提的是,Async方法的亮点在于其返回了一个TFuture<T>值:

  1. 这个值可以获得Lambda返回值,即通过调用Get函数即可得到,不过这种方法虽然可以获得返回值,但是是会造成主线程阻塞,这个问题可以通过在Tick里调用FutureResult.IsReady,等它准备好了再调用Get获取返回值。此外,当没有返回值的时候,它的主线程执行时长是稍差于TaskGraph和AsyncTask的。
  2. 这个值也可以判断Lambda的逻辑有没有执行完
  3. 这个值还支持执行完成的函数回调

基于Async的函数,我们有两个函数变种:

  • AsyncThread
    依然是Async函数调用,不过第一个参数为EAsyncExecution::Thread
  • AsyncPool
    依然是Async函数调用,不过第一个参数为EAsyncExecution::ThreadPool
  1. ParallelFor
    这个方法本质是通过TaskGraph创建了多个Task并行执行任务,不过实测发现,这种方式的执行时间非常慢(时间消耗是不使用这种方法的4~5倍),因此如果不是特别复杂的逻辑,不建议使用ParallelFor。

3. 实现原理

3.1 FRunnable方式

这种方式的核心包含三个结构,分别是FRunnable、FRunnableThread以及FThreadManager

FRunnable

这是一个在线程上执行的对象的封装,或者说交付给线程执行的函数体的封装,可以理解成是线程的数据。

在实际使用中,FRunnable会被作为参数传入FRunnableThread,FRunnableThread会在特定时机调用其Run接口。

考虑到硬件不支持多线程的情况,提供了GetSingleThreadInterface接口,在单线程情况下,引擎会通过这个接口返回对象的Tick完成任务驱动。

这个结构包含了如下几个关键的方法:

  • FRunnable::Init:完成初始化

  • FRunnable::Run:多线程执行

  • FRunnable::Stop

  • FRunnable::Exit:执行完毕后的清理工作

  • FRunnable::GetSingleThreadInterface:单线程执行

FRunnableThread

这是线程对象基类,会用来驱动FRunnable。

这个类型是平台无关的线程对象的抽象,不同平台的线程都会继承自他:

  • Win平台:Windows的Thread API

  • 其他平台:pthread

这个结构包含了如下的一些方法:

  • FRunnableThread::Create

    • 这是一个静态工厂方法,用于完成FRunnableThread的创建

    • 线程模式不同,表现不同

      • 多线程:调用各个平台的实现接口创建出平台线程子类并运行

      • 单线程:创建FFakeThread(伪线程)对象,后续在主线程中Tick中驱动

    • 底层调用CreateInternal实现,做了跨平台处理

      • 平台具有如下区别:

        • Android和iOS都是采用的 pthread标准线程库

        • Windows平台是单独实现的

      • 线程创建完毕后会统一调用FThreadManager::Get().AddThread(ThreadID, this);将线程本身添加至管理器

    • 参数

      • 线程可运行对象指针

      • 线程名字

      • InStackSize:前线程栈空间大小(请勿轻易改动,0 表示使用默认值 1MiB)

      • InThreadPri:当前线程调度优先级,优先级高会先执行(不强制,操作系统有最终解释权)

      • InThreadAffinityMask:

        • 表示当前线程在哪些CPU上执行(不强制,操作系统有最终解释权)

        • 十六进制数(默认值 0xFFFFFFFFFFFFFFFF 表示哪个CPU都行)

        • 参数作用:

          • 因为 core cache 的命中率会极大的影响性能,线程绑定 CPU 运行能够最大化增加 cache 的命中率, 也减少了多核之间的同步开销

          • 设置 ThreadAffinity 对性能的提升得视具体情况而定,推荐使用默认值

  • FRunnableThread::Kill:会先执行 runnable 对象的 stop 函数,然后根据传入的bShouldWait 参数决定是否等待线程执行完毕。如果不等待,则强制杀死线程,可能会造成内存泄漏

  • FRunnableThread::Suspend

  • FRunnableThread::WaitForCompletion:阻塞调用例程直到线程执行完毕

  • FRunnableThread::GetThreadID:每个线程都有一个线程ID,线程ID在它所属的进程环境中有效,线程ID是唯一的

  • FRunnableThread::GetThreadName:为增加标识性,UE4还增加了线程名称,线程名称可以重复

FRunnableThread有若干个子类,其中FRunnableThreadWin是Windows平台的线程;FFakeThread则是伪线程结构,这类任务后续会在主线程中通过Tick进行驱动。

FThreadManager

这是一个单例,凡是通过FRunnableThread创建的线程都是需要通过FThreadManager进行统一管理的,此外内部还维护着一个线程ID到FFakeThread对象的TMap,包含了如下的一些方法:

  • FThreadManager::AddThread

  • FThreadManager::RemoveThread

  • FThreadManager::ForEachThread

  • FThreadManager::Tick

    • 对fake thread及其对应的runnable objects进行tick

    • 在FEngineLoop::Tick方法中被调用

3.2 AsyncTask方式

这种方式同样包含了若干个关键的类型,下面一一进行介绍。

FQueuedThreadPool

队列线程池对象基类,规范了线程池接口,整套线程池是基于FRunnable和FRunnableThread实现的。

一个问题是,我们为什么需要线程池呢?这是因为线程过多会带来调度开销,进而影响缓存局部性和整体性能,频繁创建和销毁线程也会带来极大的开销,而我们更加关心的是任务可以并发执行,并不想管理线程的创建,销毁和调度。通过将任务处理成队列,交由线程池统一执行,可以提升任务的执行效率。

线程池由若干个Worker线程,和一个同步队列构成:

  • 同步队列执行的任务抽象为IQueuedWork

  • 线程池中的Worker线程都是消费者,会不停地从队列中取出IQueuedWork,并执行work。

引擎初始化(FEngineLoop::PreInit)时,如果启用了多线程模式,则会创建下面四种线程池(按顺序):

  • GThreadPool:

    • 工作线程数量有限制。DedicatedServer方式运行只有一个工作线程,DedicatedServer模式下,游戏大部分逻辑运行在GameThread。如果开启单线程模式,可避免线程间切换,提高Cache命中率,会提升约10% CPU性能,可作为一种优化思路

    • 线程优先级TPri_SlightlyBelowNormal,数值为14,优先级范围是1-31,31为最高优先级

  • GBackgroundPriorityThreadPool:

    • 工作线程数量,DedicatedServer运行模式为1,其它运行模式为2

    • 线程优先级为TPri_Lowest(=1)

  • GLargeThreadPool:

    • WITH_EDITOR模式下创建:Editor做关照等大运算量的工作项

    • 工作线程数量取决于逻辑核心数量

    • 优先级是TPri_Normal(=15)

  • GIOTheadPool:

    • 在GLargeThreadPool初始化代码后完成初始化

    • 优先级TPri_AboveNormal(=25)

    • 主要用来处理IO相关的任务,例如从大文件中读取数据

FQueuedThreadPool的方法可以大致分为两类:

  • 线程池相关

    • FQueuedThreadPool::Allocate:静态函数,用于创建FQueuedThreadPool

    • FQueuedThreadPool::Create:创建线程池

    • FQueuedThreadPool::Destroy:清理所有线程并销毁线程池

  • 线程相关

    • FQueuedThreadPool::AddQueuedWork

    • FQueuedThreadPool::RetractQueuedWork:尝试删除或取消一个work对象,如果work不在队列当中,或者请求删除时已经在执行和执行完成,都无法取消

这种类型有一个实现子类是FQueuedThreadPoolBase,下面会有介绍。

IQueuedWork

这是可在线程池中运行的任务基类,同时也是对同步队列执行的任务抽象,值得注意的是,基本上所有的异步任务统一都继承至 IQueuedWork。包含IQueuedWork::DoThreadedWork与IQueuedWork::Abandon,子类实现有FAsyncTask,FAutoDeleteAsyncTask与FAsyncEncode。

FQueuedThread

这是线程池worker线程的实现(是一个FRunnable的实现类),内部包含一个FRunnableThread的实例对象。

提供了工厂方法Create,这个方法会创建一个线程对象并运行,通常和FQueuedThreadPool结合使用。

包含如下的成员变量:

  • FEvent* DoWorkEvent:用来触发线程执行的事件,在FQueuedThread::Run中有一个循环调用,在循环体中会对此变脸进行Wait检查,如果返回true就继续等待,否则就开始执行线程

  • TAtomic<bool> TimeToDie:判断线程是否该终结了

  • IQueuedWork* volatile QueuedWork:线程正在执行的工作

  • FQueuedThreadPoolBase* OwningThreadPool:线程所从属的线程池

  • FRunnableThread* Thread:对应的执行线程

FQueuedThreadPoolBase

这是队列线程池的实现类,父类为FQueuedThreadPool,维护一个IQueuedWork任务队列和FQueuedThread线程列表,工作过程可以描述为:

  • 生产者创建了一个IQueuedWork实现对象

  • 调用AddQueuedWork接口,向线程池添加要执行的work

    • 根据线程池状态不同进行不同处理

      • 如果线程池中还有空闲线程:

        • QueuedThreads不为空,那么QueuedWork一定为空

        • 这时候就从空闲线程数组中,取一个线程,并直接唤醒该线程执行由生产者当前传递进来的work

          • 可以避免惊群效应[11]

          • UE4每次获取空闲线程都是取数组的最末尾的空闲线程,也就是最近归还的work线程,好处是:

            • 最近归还意味最近使用

            • 操作系统还未对它进行context切换,或者它的context数据还留存在缓存当中的概率更大

            • 优先使用该线程,就有更大的概率获取较为低廉的线程切换开销

      • 如果线程池中已经没有空闲的线程

        • 也就意味着QueuedThreads为空

        • 由于没有空闲线程可用,因此就直接将work入队即可

这个类型包含了如下的一些成员:

  • FThreadPoolPriorityQueue QueuedWork:跟互斥锁SynchQueue一起,组成了一个线程安全的同步队列

  • TArray<FQueuedThread*> QueuedThreads:管理着空闲的线程,即FQueuedThread归还自己到线程池的空闲队列

  • TArray<FQueuedThread*> AllThreads:管理着全部的worker线程

  • FCriticalSection* SynchQueue

  • bool TimeToDie

除此之外,有几个方法需要关注一下:

  • FQueuedThreadPoolBase::Create:线程池创建逻辑,会依次创建每个worker线程

  • FQueuedThreadPoolBase::Destroy:线程池销毁逻辑,依次向每个worker线程发出销毁的命令,并等待线程退出,线程池的销毁会放弃还未执行的work

FAsyncTask

这是一个模板类,需要将要执行的任务作为模板参数传入,在DoWork接口中完成任务的执行。

基于FQueuedThreadPool完成线程管理,在FAsyncTask::Start中完成QueuedWork到QueuedPool的添加,在此函数中如果传入StartSynchronousTask参数为true,也可以走同步执行。

调用方法有StartBackgroundTask与StartSynchronousTask的区别:

  • StartBackgroundTask:利用线程池里空闲的线程来执行

  • StartSynchronousTask:主线程执行

    • 主线程会等AsyncTask里面的逻辑执行完了之后才会继续往下走,其实相当于单线程执行。

StartSynchronousTask存在的目的是为多线程代码提供灵活性:当我们在使用多线程时发现部分逻辑代码只能跑在主线程或者它跑异步线程其实并没有变快,这个时候想把它改成单线程的时候就很方便。

FAutoDeleteAsyncTask

这个类型与FAsyncTask类似,都是模板类,其父类都是IQueuedWork,任务都是作为模板参数,都是基于FQueuedThreadPool完成线程管理,不同的是,在任务完成后会通过线程池的Destroy函数删除自身或者在执行DoWork后删除自身,而FAsyncTask需要手动delete。

线程池的Task类

线程池的Task类是可以传入FAsyncTask用作具体的任务逻辑对应的类型,这个类型需要实现DoWork接口。这里可以参考FAsyncReleaseFbxScene中的实现,FAsyncReleaseFbxScene继承自FNonAbandonableTask。

继承自FNonAbandonableTask的Task表明任务不可以放弃,必须执行完成,具体而言,这类任务不可以在执行阶段终止,即使中途执行了Abandon函数也会去触发DoWork函数。

这里的一个疑问是,我们为什么要继承FNonAbandonableTask?这是因为当线程池被销毁的时候,会调用Abandon函数,而继承FNonAbandonableTask的话这个时候就不会丢弃而且等待执行完;当然,如果需要丢弃,则就不要继承,并且自己实现CanAbandon和Abandon函数。源码里可丢弃的任务参考:FAsyncStatsFile。

3.3 TaskGraph方式

这种方式有如下的一些类型数据:

TaskGraph的任务类

这类class需要手动定义,无需继承特定接口。需要声明DoTask函数来表示要执行的任务内容,GetDesiredThread函数来表示要在哪个线程上面执行:

  • GetDesiredThread:可以指定使用哪种线程。除了AnyThread还有GameThread、RHIThread等多种线程设置

  • GetSubsequentsMode:任务完成模式

    • ESubsequentsMode::TrackSubsequents:追踪完成状态,这是最常用的模式

    • ESubsequentsMode::FireAndForget:做了以后无法得知是否完成,只有没有任何依赖的Task才用

  • DoTask,需要用到如下参数:

    • ENamedThreads::Type

    • const FGraphEventRef&,用来传递任务完成状态

下面给出几个任务类的示例:

  • FTickFunctionTask:调用tick函数的任务类,包括actor与component的tick

  • FReturnGraphTask

  • FBaseGraphTask:TaskGraph子系统中任务基类,规范了任务生命周期中必须的几个阶段,运行过程中会校验,同时定义了任务执行接口

    • 一个子类为:FGraphTask

      • 带有模板类参数TTASK,这个参数即为我们前面的任务类

      • 成员方法FGraphTask::CreateTask

        • 参数

          • Prerequisties(FGraphEventArray类型)表示该TTASK依赖的前序任务列表

          • TTask必须实现DoTask方法,否则编译报错

        • 在这个方法中会创建TTASK对象,该任务执行完成后,会试着启动Prerequisties表示的任务

        • 最后返回FGraphEventRef,用于实现Task依赖,经由此返回值可以知道前序Task是否完成

FGraphEvent

这是任务依赖关系工具类,维护了依赖该Event的所有FBaseGraphTask列表,并用来传递任务完成状态,如果某个任务完成了,就会将其完成的事件传递给下游,这就是FGraphEvent的主要职责,此外,FGraphEventRef是FGraphEvent的指针。

包含如下成员变量:

  • FGraphEventArray EventsToWaitFor:前序事件列表

  • TClosableLockFreePointerListUnorderedSingleConsumer<FBaseGraphTask, 0> SubsequentList:依赖于当前事件的线程列表

线程类

FTaskGraphImplementation

这是一个单例,是task graph系统的核心部分,其父类是FTaskGraphInterface(Task Graph System的接口类型)。Task的控制、创建与分配等逻辑都是通过这个单例来完成的。

初始化在FEngineLoop.PreInit里面进行,会默认构建24个FWorkerThread工作线程(这里支持最大的线程数量也就是24):

  • 5个是默认带名字的线程:StatThread,RHIThread,AudioThread,GameThread,ActualRenderingThread

  • N(由CPU核数决定)个非指定名称的任意线程(参考NumberOfWorkerThreadsToSpawn变量),不过需要注意的是,如果平台本身不支持多线程,那么其他的工作也会在GameThread里面进行

Runnable线程创建根据类型的不同,会有不同的时机:

  • 引擎会在合适时机为带名字的线程创建Runnable线程,不需手动创建:StatThread以及RenderingThread会在FEngineLoop.PreInit里创建,并处理名字对应的一些工作。

  • 无名线程需要手动创建Runnable线程,同时设置其优先级比前面线程的优先级要低,处理其他的工作

FWorkerThread

这个类型并非真正线程,而是对线程对象的封装,包含了如下两个成员变量:

  • FTaskThreadBase* TaskGraphWorker:线程可执行对象

  • FRunnableThread* RunnableThread:线程对象

FTaskThreadBase

这个类型用于管理当前任务的FTaskThread,用于执行任务的线程基类(线程可执行对象),继承自FRunnable与FSingleThreadRunnable,子类包含有FTaskThreadAnyThread与FNamedTaskThread。

FTaskThreadAnyThread是无名Task线程执行体,TaskGraph子系统初始化时新创建的工作线程,分为高、中、低(Linux平台Nice值分别为3、5、10)三种优先级。已创建未使用的线程属于此类,放到此线程的任务,执行顺序按照优先顺序逐个进行,优先顺序可以调整,具体的执行顺序可以通过IncomingAnyThreadTasks数组得到。内部维护一个任务队列,提供接口循环执行队列中的任务。

FNamedTaskThread是有名字的Task线程执行体,非TaskGraph子系统内部创建的线程多属于此类,比如GameThread,RenderThread都属于此列。放到此线程的任务,执行顺序按照优先顺序逐个进行,优先顺序不可调整,具体而言是通过FThreadTaskQueue来处理执行顺序。同样内部会维护任务队列,提供接口循环执行队列中的任务。

4. 线程同步

当多个线程共享相同的内存时,需要确保每个线程看到一致的数据视图,当多个线程共享相同的内存时,需要确保每个线程看到一致的数据视图,如果变量是只读的,多个线程同时读取该变量也不会有一致性问题,当某个线程可以修改变量,而其他线程也可以读取或者修改这个变量的时候,就需要对这些线程进行同步,以确保它们在访问变量的存储内容时不会访问到无效的数值。

UE提供了多种同步机制

Atomics 原子机制

这种机制可以用来保证CPU在读取和写入内存时总线操作是不可分割的,即任意线程对变量的一次操作结束后,其他线程读取到的都是最新的(已经写入完成的)数据。基于这个机制,可以进行比较快的进行比较和解锁操作。

class FThreadSafeCounter{
public:
int32 Add( int32 Amount ) {
 return FPlatformAtomics::InterlockedAdd(&Counter, Amount);
 }
private:
 volatile int32 Counter; // 因为值可能以编译器无法预测的异步方式被改变,声明为volatile禁用优化
};

UE4封装了一系列基础的原子操作,window平台下对应的是FWindowsPlatformAtomics类,里面包括读写,加减,与或,以及原子编程的核心方法 compare and swap(CAS),CompareAndSwap是实现无锁数据结构的最基本的操作,UE中的接口为:InterlockedCompareExchange(volatile int64* Dest, int64 Exchange, int64 Comparand),当Dest指向的值和Comparand相等时,将Exchange存到Dest指向的地址,否则什么也不做,这个接口可以用于实现基于循环的自旋锁,可以参考FLockFreePointerFIFO 与FLockFreePointerLIFO 类型中的实现,下面也给出一个示例代码:

while(true)
{
   TDoublePtr Local;
   Local.AtomicRead(Current); // 需要保证读取操作也是原子的,所以用AtomicRead
   TDoublePtr New;
   New.SetPtr(Item); // 新的指针赋值
   if (Current.InterlockedCompareExchange(New, Local))
   {
       break;
   }
}

在Local.AtomicRead(Current)语句中,我们通过原子操作完成Current到Local的赋值。

在Current.InterlockedCompareExchange(New, Local)语句中,尝试将New赋值给Current,但是担心Current在其他线程中被修改,这里通过InterlockedCompareExchange判断Local跟Current是否相等,如果相等就进行赋值,如果不等就继续循环,完成再次赋值,从而排除当前线程被其他线程的干扰。不过这里无法解决ABA的问题,即其他线程改完又改回原值,此时无法检测到,为了解决这个问题,UE4选择了一种比较简单常用的方法,就是给每个需要操作的值加标记,每次操作该值之前都给标记加1,这样CAS操作检测的时候就算值相等,标记不相等也不能返回true。

Locking 锁机制

UE提供了四种不同的锁:

  • FCriticalSection:临界区

    • 根据各个平台的互斥锁进行的抽象

      • Windows 平台是基于Windows平台的临界区

      • iOS, Android,Linux平台则是使用的POSIX的线程标准实现

  • FSpinLock:自旋锁

  • FScopeLock:区域锁,基于作用域的锁,在构造时对当前区域加锁,离开作用域时执行析构并解锁,类似class有

    • FScopedMovementUpdate

    • FScopeCycleCounter

    • FScopedEvent

  • FRWLock:读写锁

Signaling 信号机制

类型为FSemaphore,是一种信号量与互斥锁类型,这种类型包含了一种信号机制,但不是所有平台都支持。更加常用的线程间通信机制是 FEvent

Waiting

这是一种等待同步机制,包含如下的一些事件类型:

  • FEvent:阻塞直至被触发或者超时,经常被用来激活其他工作线程

  • FScopedEvent:这是对FEvent的一次包装,阻塞在域代码退出时

5. 其他

使用的一些tips:

  • UE4常见的容器类【TArray, TMap, TSet】通常都不是线程安全的,需要我们仔细编写代码保证线程安全

常见的线程安全类有:

  • FThreadSafeCounter计数器

  • FThreadSingleton 单例类

  • FThreadIdleStats 线程空闲状态统计类

  • TLockFreePointerList 无锁队列

  • TQueue队列

6. 参考

[1]. UE4的多线程——无锁LockFree

[2]. UE4异步编程专题 - 线程池FQueuedThreadPool

[3]. 《Exploring in UE4》多线程机制详解[原理分析]

[4]. UE多线程框架

[5]. [原创]UE基础—多线程(一)

[6]. UE4异步操作总结

[7]. UE4异步编程专题 - 多线程

[8]. UE4 C++基础 - 多线程

[9]. 【UE·引擎篇】Runnable、TaskGraph、AsyncTask、Async多线程开发指南

[10]. C++11完美转发及实现方法详解

[11]. 关于网络编程中惊群效应那点事儿

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容