licode笔记之线程模型ThreadPool和Worker

Scheduler


ThreadPool构造函数中创建了一个Scheduler对象。
Scheduler构造参数kNumThreadsPerScheduler,值为2。
Scheduler使用boost::thread_group创建了kNumThreadsPerScheduler个线程,也就是2个线程。
线程方法为Scheduler::serviceQueue。
在Scheduler::serviceQueue方法中,循环检测任务队列判断超时来执行到期任务。
使用了条件变量std::condition_variable和互斥锁std::mutex来并发任务。

这个Scheduler实例同时又被传入下面的Worker对象中用来实现定时计划任务。

Worker


ThreadPool构造时传入numWorkers,默认配置文件设置此值为24,根据numWorkers创建对应的worker,保存在workers队列。
Worker构造函数中创建了service_和service_worker_:

service_{},
service_worker_{new asio_worker::element_type(service_)}

service_和service_worker_的定义:

typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
boost::asio::io_service service_;
asio_worker service_worker_;

创建ThreadPool之后,要调用ThreadPool::start()

void ThreadPool::start() {
  std::vector<std::shared_ptr<std::promise<void>>> promises(workers_.size());
  int index = 0;
  for (auto worker : workers_) {
    promises[index] = std::make_shared<std::promise<void>>();
    worker->start(promises[index++]);
  }
  for (auto promise : promises) {
    promise->get_future().wait();
  }
}

逐个调用 worker->start(promises[index++]);

void Worker::start(std::shared_ptr<std::promise<void>> start_promise) {
  auto this_ptr = shared_from_this();
  auto worker = [this_ptr, start_promise] {
    start_promise->set_value();
    if (!this_ptr->closed_) {
      return this_ptr->service_.run();
   }
    return size_t(0);
  };
  group_.add_thread(new boost::thread(worker));
}

每个Woker内部都创建了一个线程,在线程中把boost::asio::io_service service_启动run起来.
因为io_service关联了一个io_service::work任务对象,所以io_service会一直运行。

Worker的使用者调用Worker::task将Task任务放入队列中等待执行。

void Worker::task(Task f) {
  service_.post(f);
}

用户可以使用ThreadPool::getLessUsedWorker()从workers中获取一个当前引用计数最少的Worker:

std::shared_ptr<Worker> ThreadPool::getLessUsedWorker() {
  std::shared_ptr<Worker> chosen_worker = workers_.front();
  for (auto worker : workers_) {
    if (chosen_worker.use_count() > worker.use_count()) {
      chosen_worker = worker;
    }
  }
  return chosen_worker;
}

线程安全


如果想要一组Task同步执行那么需要在同一个Worker实例中加入Task。
如果想并发执行Task,那么可以每次从ThreadPool::getLessUsedWorker()取出一个Worker来加入Task。
因为Task执行在Worker的线程中,跟调用者不在同一线程,如果Task中的代码跟调用者有资源竞争那么需要自己实现加锁。

因为licode使用node.js做为sdk库的调用者,所以创建的WebRtcConnection和MediaStream都执行在Node.js中的单线程中。
每个WebRtcConnection和MediaStream又独立使用Worker对象来执行异步线程任务。
所以WebRtcConnection和MediaStream的内部代码执行在两个线程下,一个Node.js主线程,一个Worker线程。
当释放WebRtcConnection和MediaStream对象时,在Node.js中调用对象的close方法同时释放对象引用,close方法会在Worker中执行syncClose来释放内部资源。

我看了一下加入到Worker中的Task基本都没有加锁,那么开发时需要严格保证两个线程中的资源不能冲突。

关于IOThreadPool和IOWorker


默认配置在整个系统中只创建了一个IOWorker。
实际上IOWorker只在使用NicerConnection时才有用,而如果使用LibNiceConnection,那么此IOWorker不需要start,也就没有任何作用。
因为libnice有自己的内部线程,所以不需要外部线程来处理IO。

libnice收到数据后回调LibNiceConnection::onData,加锁boost::mutex::scoped_lock lock(close_mutex_);检测当前IceState状态。
在LibNiceConnection::close也会加锁设置IceState并且释放libnice。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。