webrtc的线程模块设计的源码分析

对于一个大的项目,比如webrtc或者其它公司内项目,如果采用了并发的设计,那线程的模型就非常非常重要了,可以这么说一定程度上决定了项目的成败,而webrtc的线程模型值得深入学习,网上有很多都是基于m85分支甚至更老的分支总结,现依据最近的m115分支总结下,设计思想大体没有太多变化,但是代码实现还是有很多差别。
webrtc中一个TaskQueue就是一个线程,在webrtc中有两种线程封装,一种是rtc:thread(也是基于TaskQueueBase的实现),他包含了某个SockerServer,用户处理网络相关的请求,比如PeerConnection中就有network_thread、worker_thread、signaling_thread;另一种就是TaskQueue,也封装了线程的实现,应用范围也很广,比如音视频的编解码和渲染都有对应的TaskQueue线程,那么这些线程是如何实现的呢?这两种又有什么区别呢?从网上盗了一张图,该图总结了webrtc的线程模型,以供参考,后续会逐步展开对webrtc源码的研究。


image.png

包括三个部分,api层面的TaskQueue接口、具体实现层面的TaskQueue和rtc::Thread

api层面的TaskQueue接口
定义了TaskQueueBase和TaskQueueFactory这个两个最重要虚基类

Image.png

  • TaskQueueBase
    实现了PostTask、PostDelayedTask、PostDelayedHighPrecisionTask、PostDelayedTaskWithPrecision,具体的PostxxxxxImpl虚函数由其子类实现,比如rtc_base下的TaskQueueWin
namespace webrtc {

class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
public:
  enum class DelayPrecision {
    // This may include up to a 17 ms leeway in addition to OS timer precision.
    // See PostDelayedTask() for more information.
    kLow,

    // This does not have the additional delay that kLow has, but it is still
    // limited by OS timer precision. See PostDelayedHighPrecisionTask() for
    // more information.
    kHigh,
  };

  virtual void Delete() = 0;
  void PostTask(absl::AnyInvocable<void() &&> task,
         const Location& location = Location::Current()) {
    PostTaskImpl(std::move(task), PostTaskTraits{}, location);
  }

  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
                       TimeDelta delay,
                       const Location& location = Location::Current()) {
    PostDelayedTaskImpl(std::move(task), delay,
                        PostDelayedTaskTraits{.high_precision = false},
                        location);
  }

  void PostDelayedHighPrecisionTask(
      absl::AnyInvocable<void() &&> task,
      TimeDelta delay,
      const Location& location = Location::Current()) {

    PostDelayedTaskImpl(std::move(task), delay,
                        PostDelayedTaskTraits{.high_precision = true},
                        location);
  }

  // As specified by `precision`, calls either PostDelayedTask() or
  // PostDelayedHighPrecisionTask().
  void PostDelayedTaskWithPrecision(
      DelayPrecision precision,
      absl::AnyInvocable<void() &&> task,
      TimeDelta delay,
      const Location& location = Location::Current()) {

    switch (precision) {
      case DelayPrecision::kLow:
        PostDelayedTask(std::move(task), delay, location);
        break;

      case DelayPrecision::kHigh:
        PostDelayedHighPrecisionTask(std::move(task), delay, location);
        break;
    }
  }

  // Returns the task queue that is running the current thread.
  // Returns nullptr if this thread is not associated with any task queue.
  // May be called on any thread or task queue, including this task queue.

  static TaskQueueBase* Current();
  bool IsCurrent() const { return Current() == this; }

protected:
  // This is currently only present here to simplify introduction of future
  // planned task queue changes.
  struct PostTaskTraits {};

  struct PostDelayedTaskTraits {
    // If `high_precision` is false, tasks may execute within up to a 17 ms
    // leeway in addition to OS timer precision. Otherwise the task should be
    // limited to OS timer precision. See PostDelayedTask() and
    // PostDelayedHighPrecisionTask() for more information.

    bool high_precision = false;
  };

  class RTC_EXPORT CurrentTaskQueueSetter {
   public:
    explicit CurrentTaskQueueSetter(TaskQueueBase* task_queue);
    CurrentTaskQueueSetter(const CurrentTaskQueueSetter&) = delete;
    CurrentTaskQueueSetter& operator=(const CurrentTaskQueueSetter&) = delete;
    ~CurrentTaskQueueSetter();

   private:
    TaskQueueBase* const previous_;
  };

  // Subclasses should implement this method to support the behavior defined in
  // the PostTask and PostTaskTraits docs above.
  virtual void PostTaskImpl(absl::AnyInvocable<void() &&> task,
                            const PostTaskTraits& traits,
                            const Location& location) = 0;

  // Subclasses should implement this method to support the behavior defined in
  // the PostDelayedTask/PostHighPrecisionDelayedTask and PostDelayedTaskTraits
  // docs above.

  virtual void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
                                   TimeDelta delay,
                                   const PostDelayedTaskTraits& traits,
                                   const Location& location) = 0;

  // Users of the TaskQueue should call Delete instead of directly deleting
  // this object.

  virtual ~TaskQueueBase() = default;
};

struct TaskQueueDeleter {
  void operator()(TaskQueueBase* task_queue) const { task_queue->Delete(); }
};

}  // namespace webrtc

  • TaskQueueFactory
    创建TaskQueue的工厂类基类,
namespace webrtc {

// The implementation of this interface must be thread-safe.
class TaskQueueFactory {

public:
  // TaskQueue priority levels. On some platforms these will map to thread
  // priorities, on others such as Mac and iOS, GCD queue priorities.
  enum class Priority { NORMAL = 0, HIGH, LOW };

  virtual ~TaskQueueFactory() = default;
  virtual std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const = 0;
};

}  // namespace webrtc

CreateDefaultTaskQueueFactory接口
在default_task_queue_factory.h文件中,申明了CreateDefaultTaskQueueFactory接口

namespace webrtc {

std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
    const FieldTrialsView* field_trials = nullptr);
}  // namespace webrtc

在default_task_queue_factory_win.cc、default_task_queue_factory_stdlib.cc、default_task_queue_factory_stdlib_or_libevent_experiment.cc、default_task_queue_factory_libevent.cc、default_task_queue_factory_gcd.cc都实现了CreateDefaultTaskQueueFactory,比如default_task_queue_factory_win.cc中的实现如下,即CreateTaskQueueWinFactory的实现最终在rtc_base\task_queue_win.cc中。

\\ api\default_task_queue_factory_win.cc

namespace webrtc {

std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
    const FieldTrialsView* field_trials) {
  return CreateTaskQueueWinFactory();
}

}  // namespace webrtc

\\ rtc_base\task_queue_win.cc

std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
  return std::make_unique<TaskQueueWinFactory>();
}

CreateDefaultTaskQueueFactory接口有多种实现,那到底使用那一个呢?这是在编译期间决定的,在api/task_queue的BUILD.gn中指定了按照编译开关决定使用那个实现,比如Android使用default_task_queue_factory_stdlib_or_libevent_experiment.cc的实现,windows使用default_task_queue_factory_win.cc的实现。

rtc_library("default_task_queue_factory") {
  visibility = [ "*" ]
  if (!is_ios && !is_android) {
    poisonous = [ "default_task_queue" ]
  }

  sources = [ "default_task_queue_factory.h" ]
  deps = [
    ":task_queue",
    "../../api:field_trials_view",
    "../../rtc_base/memory:always_valid_pointer",
  ]

  if (rtc_enable_libevent) {
    if (is_android) {
      sources +=
          [ "default_task_queue_factory_stdlib_or_libevent_experiment.cc" ]
      deps += [
        "../../api/transport:field_trial_based_config",
        "../../rtc_base:logging",
        "../../rtc_base:rtc_task_queue_libevent",
        "../../rtc_base:rtc_task_queue_stdlib",
      ]
    } else {
      sources += [ "default_task_queue_factory_libevent.cc" ]
      deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
    }
  } else if (is_mac || is_ios) {
    sources += [ "default_task_queue_factory_gcd.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_gcd" ]
  } else if (is_win && current_os != "winuwp") {
    sources += [ "default_task_queue_factory_win.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_win" ]
  } else {
    sources += [ "default_task_queue_factory_stdlib.cc" ]
    deps += [ "../../rtc_base:rtc_task_queue_stdlib" ]
  }
}

至此在api层面交代了TaskQueue的架构设计,即编译开关加上不同的工厂创建方法,对外提供了统一的创建TaskQueue的接口,具体接口的实现在rtc_base目录下。

  • rtc_base对TaskQueueBase和TaskQueueFactory的实现
  • rtc_base/platform_thread.cc
    跨平台封装了线程的实现,对于windows调用CreateThread创建线程,对于Linux调用pthread_create创建线程,创建线程的时候可以设置线程的优先级,优先级定义如下:
enum class ThreadPriority {
  kLow = 1,
  kNormal,
  kHigh,
  kRealtime,
};
  • TaskQueueBase、TaskQueueFactory的子类实现和对应创建TaskQueueFactory实例的接口
    不同的实现细节都不同,主要体现在事件循环的机制上。
  • task_queue_win.h \ task_queue_win.cc
namespace webrtc {
class TaskQueueWin : public TaskQueueBase {
  ..
};

class TaskQueueWinFactory : public TaskQueueFactory {
public:

  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
  }
};

std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
  return std::make_unique<TaskQueueWinFactory>();
}

}  // namespace webrtc
  • task_queue_stdlib.h \ task_queue_stdlib.cc
namespace webrtc {
class TaskQueueStdlib final : public TaskQueueBase {
  ..
};

class TaskQueueStdlibFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));
  }
};

std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() {
  return std::make_unique<TaskQueueStdlibFactory>();
}

} // namespace webrtc
  • task_queue_libevent.h \ task_queue_libevent.cc
namespace webrtc {
namespace {
class TaskQueueLibevent final : public TaskQueueBase {
  ..

};

class TaskQueueLibeventFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueLibevent(name,
                              TaskQueuePriorityToThreadPriority(priority)));
  }
};
}  // namespace

std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
  return std::make_unique<TaskQueueLibeventFactory>();
}
  • task_queue_gcd.h \ task_queue_gcd.cc
namespace webrtc {
namespace {
class TaskQueueGcd final : public TaskQueueBase {
  ..
};

class TaskQueueGcdFactory final : public TaskQueueFactory {
public:
  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
      absl::string_view name,
      Priority priority) const override {
    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority)));
  }
};
}  // namespace

std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory() {
  return std::make_unique<TaskQueueGcdFactory>();
}

}  // namespace webrtc

rtc::Thread

实现见rtc_base\thread.h, rtc_base\thread.cc两个文件。

  • rtc::Thread和ThreadManager

除了拥有TaskQueue的PostTask、PostDelayTask方法外还有以下功能特性,这些功能特性方便我们对所有的线程的健康状态进行监控:

  • 阻塞调用的接口BlockingCall,当然也可以设置线程禁止阻塞调用;
  void BlockingCall(
      FunctionView<void()> functor,
      const webrtc::Location& location = webrtc::Location::Current()) {
    BlockingCallImpl(std::move(functor), location);
  }

  template <typename Functor,
            typename ReturnT = std::invoke_result_t<Functor>,
            typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>

  ReturnT BlockingCall(
      Functor&& functor,
      const webrtc::Location& location = webrtc::Location::Current()) {
    ReturnT result;
    BlockingCall([&] { result = std::forward<Functor>(functor)(); }, location);
    return result;
  }
  • 统计一段时间内当前线程有多少个阻塞调用,并将结果输出到日志中,具体查看RTC_LOG_THREAD_BLOCK_COUNT宏的实现;
  • 防止阻塞调用过载,具体查看RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN宏的实现;
  • 调试阻塞调用的耗时,具体查看RegisterSendAndCheckForCycles函数的实现,这样可以检测目标线程是否死锁;
  • 巡检所有线程是否健康,具体看ProcessAllMessageQueuesInternal
  • 统计任务执行的耗时,大于某个阈值则会告警,具体看Dispatch的dispatch_warning_ms_定义
void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
  TRACE_EVENT0("webrtc", "Thread::Dispatch");
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time = TimeMillis();
  std::move(task)();
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch.";

    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_ = diff + 1;
  }
}
  • 事件循环单次循环的耗时限制,超过限制后进入wait状态,具体看Get函数的实现。

参考文档:

https://zhuanlan.zhihu.com/p/136070941

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,002评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,777评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,341评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,085评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,110评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,868评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,528评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,422评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,938评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,067评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,199评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,877评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,540评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,079评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,192评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,514评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,190评论 2 357

推荐阅读更多精彩内容