- 作为webrtc pacer发送的重要预算模块被定义为IntervalBudget
- 该模块分成如下四个步骤来完成每此应该发送多少数据量的计算
- pacer模块将连续的时间分成不同时间间隔的时间块,发送一段数据后休息一会,然后再发送一段数据,然后再休息一会
- 假设上次休息的时间到本次发送的时间间隔为delta_time_ms
- 那么本次要发送多少数据呢?这里则使用当前的发送速率,配合流逝的delta_time_ms进行计算。
- 本文理解每次循环发送表示至少发送delta_time_ms的数据量,而每发送成功一包表示本次循环发送中的一次。
1)设置目标发送码率
namespace {
constexpr int64_t kWindowMs = 500;
}
void IntervalBudget::set_target_rate_kbps(int target_rate_kbps) {
target_rate_kbps_ = target_rate_kbps;
max_bytes_in_budget_ = (kWindowMs * target_rate_kbps_) / 8;
bytes_remaining_ = std::min(std::max(-max_bytes_in_budget_, bytes_remaining_),
max_bytes_in_budget_);
}
- 在
IntervalBudget
中根据目标码率配合时间窗口进行每相隔指定的时间范围内应该发送预算多少字节。
- 按照上面的假设经计算得出
max_bytes_in_budget_
按照500ms最大时间窗口进行最大预算,假设目标发送码率为240kbps则得出本次预算最大发送为15000字节。
- 计算剩余预算,
bytes_remaining_
首次默认情况下为0,那么经过赋值后本次剩余的预算为0(初始值)。
- 此函数计算得出当前目标码率下以500ms为时间窗口得出最大的发送预算,也就是500ms内按照该目标发送码率最大可以发送1500字节的数据。
2)增加发送预算
void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) {
int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
if (bytes_remaining_ < 0 || can_build_up_underuse_) {
// We overused last interval, compensate this interval.
bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_);
} else {
// If we underused last interval we can't use it this interval.
bytes_remaining_ = std::min(bytes, max_bytes_in_budget_);
}
}
- 以delta时间为单位,假设以30ms的时间进行预算增加。
- 首先delta时间内根据目标发送码率得出
240kbps * 30 / 1000 / 8
等于900
字节。
- 计算
bytes_remaining_
,初始值为0,和最大值进行比较取最小值。
- 如果已经发送了一些数据,此时
bytes_remaining_
大于0,此时剩余量为上一次的剩余量加上delta时间内要发送的字节数据。
- 首次调用该函数计算得到bytes_remaining_应该为900字节。
3)调整剩余预算
void IntervalBudget::UseBudget(size_t bytes) {
bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
-max_bytes_in_budget_);
}
- 每成功发送一包数据后需要调用该函数对使用预算进行更新。
- 这里假设在发送数据前调用了
set_target_rate_kbps(240)
,IncreaseBudget(30)
,也就是目标发送码率240kbps,发送时间为30ms。经上述的计算得出,预算该30ms内可以发送900字节的数据。
- 假设第一次发送数据发送了200字节,那么发送成功后调用该函数,算出剩余的预算还有900-200=700字节。
4)获取剩余预算
size_t IntervalBudget::bytes_remaining() const {
return rtc::saturated_cast<size_t>(std::max<int64_t>(0, bytes_remaining_));
}
5)使用案例
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
Timestamp previous_process_time = last_process_time_;
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
if (elapsed_time > TimeDelta::Zero()) {
...
if (mode_ == ProcessMode::kPeriodic) {
// In periodic processing mode, the IntevalBudget allows positive budget
// up to (process interval duration) * (target rate), so we only need to
// update it once before the packet sending loop.
// 设置当前实时发送速率
media_budget_.set_target_rate_kbps(target_rate.kbps());
// 调用IncreaseBudget进行预算计算,假设 elapsed_time为30ms, taeget rate 为240kbps
UpdateBudgetWithElapsedTime(elapsed_time);
} else {
media_rate_ = target_rate;
}
}
...
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
// 循环发送数据
while (!paused_) {
// Fetch the next packet, so long as queue is not empty or budget is not
// exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
...
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
// Send done, update send/process time to the target send time.
// 每成功发送一包数据后需要将当前发送的数据量调用UseBudget来更新本次循环预算的剩余
OnPacketSent(packet_type, packet_size, target_send_time);
// If we are currently probing, we need to stop the send loop when we have
// reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
}
last_process_time_ = std::max(last_process_time_, previous_process_time);
}