WebRTC PacedSender 原理分析(一)

PacedSender 的族普关系

paced_class_uml_001.png
  • PacedSender继承Module类,实现其Process和TimeUntilNextProcess方法,其中TimeUntilNextProcess的实现便是相隔多少时间Process函数会被paced_thread回调一次

  • PacedSender类依赖PacingController类事实上,PacedSender把大部分工作都交给了PacingController

    和PacketRouter

PacedSender 入队操作

#modules/pacing/paced_sender.cc
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.EnqueuePacket(std::move(packet));
}
  • 通过RTPSenderVideo::SendVideoPacket将rtp包通过回调EnqueuePacket将rtp包存入PacedSender所管理的队列当中
  • PacedSender::EnqueuePacket把工作交给PacingController
#modules/pacing/pacing_controller.cc
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
      << "SetPacingRate must be called before InsertPacket.";

  Timestamp now = CurrentTime();
  prober_.OnIncomingPacket(packet->payload_size());

  if (packet->capture_time_ms() < 0) {
    packet->set_capture_time_ms(now.ms());
  }

  RTC_CHECK(packet->packet_type());
  int priority = GetPriorityForType(*packet->packet_type());
  packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
  • 如果capture_time_ms小于0,在这里为期初始化时间
  • 获取优先级,kAudio(0),kRetransmission(1),kVideo(2),kPadding(3)
  • 最终根据优先级将packet放入packet_queue_队列

RoundRobinPacketQueue原理分析

RoundRobinPacketQueue_01.png
  • RoundRobinPacketQueue队列的核心实现是内部管理4个数据结构
  • streams_容器用来管理以ssrc为key,以Stream对象为value的容器,依次可以看出,对于不同ssrc的流都会被该容器所管理
  • rtp_packets_列表用来托管真正的rtp流对应std::unique_ptr<RtpPacketToSend>,所有的的发送真实的rtp流都会存到这里,后续发送到网络通过从该列表中获得发送
  • enqueue_times_集合用来记录每次rtp流入队列的时间
  • 在每个数据包插入到队列的时候会创建一个QueuedPacket,同时会根据QueuedPacket的优先级创建一个StreamPrioKey对象,并且会以此对象为key,该包的ssrc值为value,将其插入到stream_priorities_集合
#modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(int priority,
                                 Timestamp enqueue_time,
                                 uint64_t enqueue_order,
                                 std::unique_ptr<RtpPacketToSend> packet) {
  uint32_t ssrc = packet->Ssrc();
  uint16_t sequence_number = packet->SequenceNumber();
  int64_t capture_time_ms = packet->capture_time_ms();
  DataSize size =
      DataSize::bytes(send_side_bwe_with_overhead_
                          ? packet->size()
                          : packet->payload_size() + packet->padding_size());
  auto type = packet->packet_type();
  RTC_DCHECK(type.has_value());

  rtp_packets_.push_front(std::move(packet));
  Push(QueuedPacket(
      priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
      size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
      enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}
  • 首先得到ssrc,sequence_number,capture_time_ms,size(rtp包的大小)
  • 将RtpPacketToSend包通过rtp_packets_.push_front存入rtp_packets_列表
  • 以各参数创建QueuedPacket,由此可见每个RtpPacketToSend对应一个QueuedPacket,但是它并不正在存放RtpPacketToSend数据,只是记录了其szie,ssrc,sequence_number,以及rtp_packets_.begin()迭代器头,因为每次将RtpPacketToSend插入到rtp_packets_列表都是从头部插入,这里相当于得到其索引,便于后续发送到网络使用

QueuedPacket数据结构的实现

RoundRobinPacketQueue_QueuedPacket.png
  • QueuedPacket重要的成员变量就是packet_it_,它就是真实rtp包的索引所在
  • QueuedPacket提供了如下函数用于获取当前QueuedPacket对应的RtpPacketToSend包
#modules/pacing/round_robin_packet_queue.cc
std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
  return packet_it_ ? std::move(**packet_it_) : nullptr;
}
  • 与上面分析对应通过std::move(**packet_it_)返回

Stream数据结构的实现

RoundRobinPacketQueue_Stream.png
  • 以下结合代码来分析该数据结构
##modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
  auto stream_info_it = streams_.find(packet.ssrc());
  if (stream_info_it == streams_.end()) {
    stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
    stream_info_it->second.priority_it = stream_priorities_.end();
    stream_info_it->second.ssrc = packet.ssrc();
  }

  Stream* stream = &stream_info_it->second;

  if (stream->priority_it == stream_priorities_.end()) {
    // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
  } else if (packet.priority() < stream->priority_it->first.priority) {
    // If the priority of this SSRC increased, remove the outdated StreamPrioKey
    // and insert a new one with the new priority. Note that |priority_| uses
    // lower ordinal for higher priority.
    stream_priorities_.erase(stream->priority_it);
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
  }
  RTC_CHECK(stream->priority_it != stream_priorities_.end());

  // In order to figure out how much time a packet has spent in the queue while
  // not in a paused state, we subtract the total amount of time the queue has
  // been paused so far, and when the packet is popped we subtract the total
  // amount of time the queue has been paused at that moment. This way we
  // subtract the total amount of time the packet has spent in the queue while
  // in a paused state.
  UpdateQueueTime(packet.enqueue_time());
  packet.SubtractPauseTime(pause_time_sum_);

  size_packets_ += 1;
  size_ += packet.size();

  stream->packet_queue.push(packet);
}
  • 根据QueuedPacket的ssrc来查询streams_集合中是否有该ssrc对应的Stream对象,如果没有则根据该ssrc实例化一个Stream对象并以ssrc为key将其插入到streams_集合
  • 在插入后Stream的成员变量priority_it是指向stream_priorities_.end的
  • 下面的处理如果Stream的成员变量priority_it是指向stream_priorities_.end则为当前的QueuedPacket包通过stream_priorities__.emplace 以StreamPrioKey对象为key,以ssrc为value插入到stream_priorities_集合当中并放回当前迭代器赋值给Stream的成员变量priority_it
  • 假设同一路stream也就是同一个ssrc,在插入的时候,本次的priority小于上一次的priority(越小优先级越高?),那么首先需要将原来stream_priorities_管理的擦除,然后在重新创建StreamPrioKey插入到stream_priorities_
  • 最后通过stream->packet_queue.push(packet)将QueuedPacket插入到Stream管理的packet_queue集合当中
  • 经过以上分析大致可得出如下关系
RoundRobinPacketQueue_Stream_2.png
  • 每一个RtpPacketToSend包对应一个QueuedPacket对象
  • 每一路ssrc对应的stream对应一个Stream,而每一个Stream对象管理着入队的多个QueuedPacket

PacedSender 出队操作

  • PacedSender 出队操作是一个十分复杂的过程,涉及到动态码率估计,webrtc经过bwe发送端码率估计评测出新码率后会将码率作用到paced模块,让PacedSender按照新的码率进行数据发送,本文为便于分析不考虑码率估计进行分析假设码率已知
  • PacedSender 出队操作要从PacedSender派生Module模块谈起,经paced_thread_处理,检测PacedSender重载的TimeUntilNextProcess函数判断下一次回调PacedSender::Process函数
  • webrtc初始化创建PacedSender过程会通过SetPacingRates设置初始化码率
#modules/pacing/paced_sender.cc
int64_t PacedSender::TimeUntilNextProcess() {
  rtc::CritScope cs(&critsect_);

  // When paused we wake up every 500 ms to send a padding packet to ensure
  // we won't get stuck in the paused state due to no feedback being received.
  TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
  if (pacing_controller_.IsPaused()) {
    return std::max(PacingController::kPausedProcessInterval - elapsed_time,
                    TimeDelta::Zero())
        .ms();
  }

  auto next_probe = pacing_controller_.TimeUntilNextProbe();
  if (next_probe) {
    return next_probe->ms();
  }

  const TimeDelta min_packet_limit = TimeDelta::ms(5);
  return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
  • 首先通过pacing_controller_.TimeElapsedSinceLastProcess()得到已经流逝的时间,也就是当前时间和上一次处理时间相减
  • 假设next_probe为-1或nullptr也就是不做码率探测
  • 默认最小发包间隔是5ms,这里将min_packet_limit - elapsed_time和0取最大值,超过5ms则立即执行
void PacedSender::Process() {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.ProcessPackets();
}
  • PacedSender将真正的处理交给PacingController
void PacingController::ProcessPackets() {
  Timestamp now = CurrentTime();
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
  ....
  if (paused_)
    return;

  if (elapsed_time > TimeDelta::Zero()) {
    DataRate target_rate = pacing_bitrate_;
    DataSize queue_size_data = packet_queue_.Size();
    if (queue_size_data > DataSize::Zero()) {
      // Assuming equal size packets and input/output rate, the average packet
      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
      // time constraint shall be met. Determine bitrate needed for that.
      packet_queue_.UpdateQueueTime(CurrentTime());
      if (drain_large_queues_) {
        TimeDelta avg_time_left =
            std::max(TimeDelta::ms(1),
                     queue_time_limit - packet_queue_.AverageQueueTime());
        DataRate min_rate_needed = queue_size_data / avg_time_left;
        if (min_rate_needed > target_rate) {
          target_rate = min_rate_needed;
          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
                              << target_rate.kbps();
        }
      }
    }

    media_budget_.set_target_rate_kbps(target_rate.kbps());
    UpdateBudgetWithElapsedTime(elapsed_time);
  }

  bool is_probing = prober_.IsProbing();
  PacedPacketInfo pacing_info;
  absl::optional<DataSize> recommended_probe_size;
  if (is_probing) {
    pacing_info = prober_.CurrentCluster();
    recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
  }

  DataSize data_sent = DataSize::Zero();
  // The paused state is checked in the loop since it leaves the critical
  // section allowing the paused state to be changed from other code.
  while (!paused_) {
    auto* packet = GetPendingPacket(pacing_info);
    if (packet == nullptr) {
      // No packet available to send, check if we should send padding.
      DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
      if (padding_to_add > DataSize::Zero()) {
        std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
            packet_sender_->GeneratePadding(padding_to_add);
        if (padding_packets.empty()) {
          // No padding packets were generated, quite send loop.
          break;
        }
        for (auto& packet : padding_packets) {
          EnqueuePacket(std::move(packet));
        }
        // Continue loop to send the padding that was just added.
        continue;
      }

      // Can't fetch new packet and no padding to send, exit send loop.
      break;
    }

    std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
    RTC_DCHECK(rtp_packet);
    packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);

    data_sent += packet->size();
    // Send succeeded, remove it from the queue.
    OnPacketSent(packet);
    if (recommended_probe_size && data_sent > *recommended_probe_size)
      break;
  }

  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
    }
  }
}
  • 获取流逝的时间病更新上一次处理的时间为当前时间,流逝时间不得大于2s,如果大于2s则elapsed_time为2s
  • 在drain_large_queues_支持的情况下(一次处理可以发送多个数据包?),根据时间差来计算本次发送的最小码率,如果当前的码率比实际发送的最小码率要小则通过media_budget_.set_target_rate_kbps(target_rate.kbps())设置码率
  • 如果正在进行码率探测,则获取本次码率探测得出的本次推荐发送的数(推荐发送多少数据)
  • 进入while循环通过GetPendingPacket()从RoundRobinPacketQueue中获取QueuedPacket包,然后通过packet->ReleasePacket()得到RtpPacketToSend,最后通过packet_sender_->SendRtpPacket进行发送
  • GetPendingPacket如果在网络拥塞并且码率探测其未进入探测的的情况下会返回空,并且会将从RoundRobinPacketQueue弹出的QueuedPacket重新插入到队列当中,同时跳出循环
  • 如果RoundRobinPacketQueue为空GetPendingPacket获取不到数据while循环会跳出
  • 如果成功发送后recommended_probe_size的值大于0并且实际发送值已经大于或等于recommended_probe_size也会跳出循环结束本次process
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
    const PacedPacketInfo& pacing_info) {
  if (packet_queue_.Empty()) {
    return nullptr;
  }

  // Since we need to release the lock in order to send, we first pop the
  // element from the priority queue but keep it in storage, so that we can
  // reinsert it if send fails.
  RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
  bool apply_pacing = !audio_packet || pace_audio_;
  if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
                                       pacing_info.probe_cluster_id ==
                                           PacedPacketInfo::kNotAProbe))) {
    packet_queue_.CancelPop();
    return nullptr;
  }
  return packet;
}
  • packet_queue_.BeginPop()弹出QueuedPacket
  • packet_queue_.CancelPop()重新将QueuedPacket加入到队列
  • BeginPop的原理是首先通过GetHighestPriorityStream遍历stream_priorities_获取优先发送的流对应的ssrc
  • 其次通过对应的ssrc查找streams_集合得到Stream,然后通过Stream得到依次要发送的QueuedPacket
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。