1)前言
- WebRtc基于transport wide cc 的延迟动态码率估计主要分成四大部分,如下:
- 第一部分、在发送端,当rtp包扩展transport wide cc 协议在发送过程中,当包发送到网络环境过程中的处理和顺利发送到网络环境后的处理,最后作用到
GoogCcNetworkController
模块。 - 第二部分、接收端接收到带transport wide cc 协议的rtp包后的处理,主要是生成基于transport wide cc 的RTCP报文,并定时将报文发送给发送端。
- 第三部分、发送端收到接收端基于transport wide cc 协议的rtcp 反馈报文,并对其进行解析,解析完成后进行再封装将其作用到
GoogCcNetworkController
模块。 - 第四部分、
GoogCcNetworkController
模块基于延迟模型根据transport wide cc feedback进行码率估计。 - 本文重点分析第一部分,分析其工作流程并结合代码分析其代码数据传递链路,以及其最终对
GoogCcNetworkController
模块的影响。
2)工作流程一
- 如要使用transport-wide-cc-extensions就必须在sdp协议中扩展其协议,按照transport-wide-cc协议可得知
a=extmap:5 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions
必须在rtp包头部扩展如上协议。
其大致发送流程如下:
- 首先在
PacketRouter
模块的SendPacket函数中对跟进transport-wide-cc头扩展支持为rtp包添加TransportSequenceNumber,其实现如下:
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
rtc::CritScope cs(&modules_crit_);
// With the new pacer code path, transport sequence numbers are only set here,
// on the pacer thread. Therefore we don't need atomics/synchronization.
if (packet->IsExtensionReserved<TransportSequenceNumber>()) {
packet->SetExtension<TransportSequenceNumber>(AllocateSequenceNumber());
}
......
}
- 其次在
RTPSender
模块的TrySendPacket函数中会对上述封装的TransportSequenceNumber进行解析,并通过AddPacketToTransportFeedback函数将其传递给TransportFeedbackObserver
模块,其处理代码如下:
bool RTPSender::TrySendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(packet);
.....
// Downstream code actually uses this flag to distinguish between media and
// everything else.
if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
options.packet_id = *packet_id;
options.included_in_feedback = true;
options.included_in_allocation = true;
AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
}
......
return true;
}
void RTPSender::AddPacketToTransportFeedback(
uint16_t packet_id,
const RtpPacketToSend& packet,
const PacedPacketInfo& pacing_info) {
if (transport_feedback_observer_) {
size_t packet_size = packet.payload_size() + packet.padding_size();
if (send_side_bwe_with_overhead_) {
packet_size = packet.size();
}
RtpPacketSendInfo packet_info;
packet_info.ssrc = SSRC();
packet_info.transport_sequence_number = packet_id;
packet_info.has_rtp_sequence_number = true;
packet_info.rtp_sequence_number = packet.SequenceNumber();
packet_info.length = packet_size;
packet_info.pacing_info = pacing_info;
transport_feedback_observer_->OnAddPacket(packet_info);
}
}
- 生成RtpPacketSendInfo结构,记录当前要发送的rtp包的信息,如SequenceNumber、TransportSequenceNumber、包大小等。
-
TransportFeedbackObserver
模块的派生及其依赖关系如下:
-
RtpTransportControllerSend
模块的OnAddPacket函数实现如下
void RtpTransportControllerSend::OnAddPacket(
const RtpPacketSendInfo& packet_info) {
transport_feedback_adapter_.AddPacket(
packet_info,
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load()
: 0,
Timestamp::ms(clock_->TimeInMilliseconds()));
}
- 该函数把任务交给
TransportFeedbackAdapter
模块,它的实现如下:
const int64_t kNoTimestamp = -1;
const int64_t kSendTimeHistoryWindowMs = 60000;
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
size_t overhead_bytes,
Timestamp creation_time) {
{
rtc::CritScope cs(&lock_);
PacketFeedback packet(creation_time.ms(),
packet_info.transport_sequence_number,
packet_info.length + overhead_bytes, local_net_id_,
remote_net_id_, packet_info.pacing_info);
if (packet_info.has_rtp_sequence_number) {
packet.ssrc = packet_info.ssrc;
packet.rtp_sequence_number = packet_info.rtp_sequence_number;
}
packet.long_sequence_number =
seq_num_unwrapper_.Unwrap(packet.sequence_number);
/*历史记录的生命周期是当前时间和PacketFeedback包创建的时间的差值小于60000的窗口,
也就是,历史记录可以保留6秒以内的待发送包信息?,超过该时间窗口的历史记录将被清除*/
while (!history_.empty() &&
creation_time.ms() - history_.begin()->second.creation_time_ms >
packet_age_limit_ms_) {//默认值为kSendTimeHistoryWindowMs,6s的时间窗口
// TODO(sprang): Warn if erasing (too many) old items?
RemoveInFlightPacketBytes(history_.begin()->second);
history_.erase(history_.begin());
}
history_.insert(std::make_pair(packet.long_sequence_number, packet));
}
....
}
- 该函数的核心作用是,利用RtpPacketSendInfo(包含,seq,transport_seq,发送包的大小等)信息创建PacketFeedback实例。
- 将PacketFeedback实例以rtp transport seq 为key,以PacketFeedback为value插入到history_容器当中。
- 此时PacketFeedback记录的信息是创建时间、transport seq、ssrc、本次待发送的rtp包大小等信息。
-
其中history_是一个map集合,它和TransportFeedbackAdapter的关系如下:
- 那么这个历史记录有什么作用?接着看下文分析。
- 以上分析了,RTP包扩展transport seq的发送到网络层前的处理流程,接下来分析当数据发送到网络层socket后的回调流程。
3)工作流程二
-
首先介绍RTP数据流发送反馈原理,如下图:
首先,从信号注册说起,PeerConnection在用户添加音频、视频、数据轨的时候在其内部会调用对应的CreatexxChannel函数,在该函数中会为其对应的Media通道注册对应的信号函数。
其次、当RTP包发送到网络层后会通过信号机制将发送信息经过信号将其反馈到BaseChnnel,同时BaseChnnel同样会经过信号将信息通过SignalSentPacket触发
PeerConnection
模块的OnSentPacket_w函数。最终经过函数回调,消息经Call模块,到达worker线程,最终反馈到
RtpTransportControllerSend
模块。本文从上图的第五步说起,其实现如下:
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
task_queue_.PostTask([this, packet_msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_)
PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
}
- 该函数的核心业务分成三大成分。
- 其一是调用
TransportFeedbackAdapter
模块的ProcessSentPacket打包SentPacket消息。 - 其二是将生成的SentPacket结构消息作用到GoogCcNetworkController模块,同时调用PostUpdates()进行码率更新(如果符合条件的话)。
- 其三是将当前发送的字节数更新到
pacer
模块,这样pacer模块就知道当前网络中有多少数据正在发送,从而进行拥塞控制。
3.1)发送反馈SentPacket包的封装
absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
const rtc::SentPacket& sent_packet) {
rtc::CritScope cs(&lock_);
// TODO(srte): Only use one way to indicate that packet feedback is used.
if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
int64_t unwrapped_seq_num =
seq_num_unwrapper_.Unwrap(sent_packet.packet_id);
auto it = history_.find(unwrapped_seq_num);
if (it != history_.end()) {
bool packet_retransmit = it->second.send_time_ms >= 0;
it->second.send_time_ms = sent_packet.send_time_ms;
last_send_time_ms_ =
std::max(last_send_time_ms_, sent_packet.send_time_ms);
// TODO(srte): Don't do this on retransmit.
if (pending_untracked_size_ > 0) {
if (sent_packet.send_time_ms < last_untracked_send_time_ms_)
RTC_LOG(LS_WARNING)
<< "appending acknowledged data for out of order packet. (Diff: "
<< last_untracked_send_time_ms_ - sent_packet.send_time_ms
<< " ms.)";
it->second.unacknowledged_data += pending_untracked_size_;
pending_untracked_size_ = 0;
}
if (!packet_retransmit) {
AddInFlightPacketBytes(it->second);
auto packet = it->second;
SentPacket msg;
msg.size = DataSize::bytes(packet.payload_size);
msg.send_time = Timestamp::ms(packet.send_time_ms);
msg.sequence_number = packet.long_sequence_number;
msg.prior_unacked_data = DataSize::bytes(packet.unacknowledged_data);
msg.data_in_flight = GetOutstandingData();
return msg;
}
}
} else if (sent_packet.info.included_in_allocation) {
if (sent_packet.send_time_ms < last_send_time_ms_) {
RTC_LOG(LS_WARNING) << "ignoring untracked data for out of order packet.";
}
pending_untracked_size_ += sent_packet.info.packet_size_bytes;
last_untracked_send_time_ms_ =
std::max(last_untracked_send_time_ms_, sent_packet.send_time_ms);
}
return absl::nullopt;
}
- 承接上面工作流程一中的分析,有涉及到history_容器,此处,根据已发送的包的信息,从该容器中根据seq number进行查询,查询到后,对立面的数据进行更新,主要是更新其发送时间。
- 同时通过AddInFlightPacketBytes函数将已发送的RTP包中的实际palyload大小,记录到in_flight_bytes_容器当中。
- 生成SentPacket并为其进行初始化,主要包含(实际发送数据的大小、发送时间、transport seq number、以及data_in_flight),其中data_in_flight表示一共已经发送了多少字节的数据到网络层了(以时间6s为窗口)。
-
in_flight_bytes_
容器定义在TransportFeedbackAdapter
模块中,其定义如下:
using RemoteAndLocalNetworkId = std::pair<uint16_t, uint16_t>;
std::map<RemoteAndLocalNetworkId, size_t> in_flight_bytes_;
- 通过AddInFlightPacketBytes函数将每次发送了多少字节的数据填入该容器,进行发送字节统计,最终会作用到
pacer
模块。 - 通过RemoveInFlightPacketBytes函数在每次接收到接收端发送回来的twcc feedback报告后根据收到的seq number 将对应seq number的包的大小计数从in_flight_bytes_中进行移除,同时在发送数据包的时候在
TransportFeedbackAdapter
模块AddPacket函数中会判断发送包的生命周期,如果超时(大于6s)进行移除。
void TransportFeedbackAdapter::AddInFlightPacketBytes(
const PacketFeedback& packet) {
....
auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
if (it != in_flight_bytes_.end()) {
it->second += packet.payload_size;
} else {
in_flight_bytes_[{packet.local_net_id, packet.remote_net_id}] =
packet.payload_size;
}
}
- 每次发送进行累加。
void TransportFeedbackAdapter::RemoveInFlightPacketBytes(
const PacketFeedback& packet) {
....
auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
if (it != in_flight_bytes_.end()) {
it->second -= packet.payload_size;
if (it->second == 0)
in_flight_bytes_.erase(it);
}
}
- 超时或者对应的seq 包已经收到接收端发回来的twcc报告进行移除操作。
DataSize TransportFeedbackAdapter::GetOutstandingData() const {
rtc::CritScope cs(&lock_);
auto it = in_flight_bytes_.find({local_net_id_, remote_net_id_});
if (it != in_flight_bytes_.end()) {
return DataSize::bytes(it->second);
} else {
return DataSize::Zero();
}
}
- 综上所述:in_flight_bytes_容器描述的应当是以6s为最大时间窗口,描述当前总共有多少字节的数据正处于网络发送当中
- 返回已发送的总字节数,最终对
pacer
模块有用。 - 到此为止SentPacket封装完成,主要包含(本次发送了多少字节的数据、本包的seq、本次发送时间、一共有多少字节的数据在发送[最多6s])
3.2)发送反馈SentPacket包作用到GoogCcNetworkController模块
NetworkControlUpdate GoogCcNetworkController::OnSentPacket(
SentPacket sent_packet) {
alr_detector_->OnBytesSent(sent_packet.size.bytes(),
sent_packet.send_time.ms());
acknowledged_bitrate_estimator_->SetAlr(
alr_detector_->GetApplicationLimitedRegionStartTime().has_value());
if (!first_packet_sent_) {
first_packet_sent_ = true;
// Initialize feedback time to send time to allow estimation of RTT until
// first feedback is received.
bandwidth_estimation_->UpdatePropagationRtt(sent_packet.send_time,
TimeDelta::Zero());
}
bandwidth_estimation_->OnSentPacket(sent_packet);
if (congestion_window_pushback_controller_) {
congestion_window_pushback_controller_->UpdateOutstandingData(
sent_packet.data_in_flight.bytes());
NetworkControlUpdate update;
MaybeTriggerOnNetworkChanged(&update, sent_packet.send_time);
return update;
} else {
return NetworkControlUpdate();
}
}
该函数会调用
AlrDetector
模块进行带宽受限区域探测,如果网络受限会导致AlrDetector
模块中的alr_started_time_ms_成员被赋值,也就是受限的起使时间,其原理就是通过理论预设的码率,和预设的带宽利用率,以及每次发送数据的时间间隔,然后配合本次实际发送的字节数,进行比较,比较预算应该发送多少字节的数据和实际发送了多少字节的数据的比例,来判断当前发送到底有没有充分利用好网络带宽。其详细的原理可以参考WebRTC动态码率-AlrDetector原理。经
AlrDetector
模块受限探测后,将探测结果通过调用AcknowledgedBitrateEstimator
模块的SetAlr函数设置到该模块,探测结果可能会没有值,假设带宽利用率不错的话。调用
SendSideBandwidthEstimation
模块的OnSentPacket函数记录本次发送的包信息,为基于丢包的动态码率估计提供入参信息,基于丢包率的动态码率估计可以参考WebRTC动态码率-基于丢包的码率估计原理最后通过
CongestionWindowPushbackController
模块来触发码率估计,然而截止m79版本CongestionWindowPushbackController模块默认并未开启,用户可以通过配置"WebRTC-CongestionWindow/QueueSize:100,MinBitrate:100000/" FieldTrials来启用,该模块放到后续分析。在
CongestionWindowPushbackController
模块未开启的情况下,该函数默认返回了一个NetworkControlUpdate实例。
3.3)当前发送总字节数作用到pacer模块
- 回到上图中的步骤8,transport wide cc 发送反馈的最后环节就是将当前统计到的正在网络环境下处于正在发送的总字节数,告诉
pacer
模块。
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
rtc::CritScope cs(&critsect_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
}
void PacingController::UpdateOutstandingData(DataSize outstanding_data) {
outstanding_data_ = outstanding_data;
}
- 更新当前网络中时间有多少值在发送。
- 如何使用该值?
bool PacingController::Congested() const {
if (congestion_window_size_.IsFinite()) {
/*congestion_window_size_为经过GoogCcNetworkController模块动态码率估计后得出的网络拥塞窗口上限,
经RtpTransportControllerSend::PostUpdates()函数将拥塞窗口上限配置到pacer模块。
*/
return outstanding_data_ >= congestion_window_size_;
}
return false;
}
- 通过判断当前网络中实际在发送的字节总数大小和拥塞窗口上限做比较,来得出当前网络是否拥塞。
- 如果网络拥塞则发送数据的时候会取消本次发送。
- 逻辑如下:
- 本文到此分析结束
4)总结
- 本文主要介绍基于transport wide cc的rtp数据发送在发送端的预处理逻辑和原理,主要分析了在发送过程中的回调机制。
- 同时分析在RTP数据发送过程中的消息回调对其他如
AlrDetector
模块和GoogCcNetworkController
模块和pacer
模块的影响。 - 通过本文的分析可以清晰的看出基于transport wide cc的rtp数据发送在发送过程中为后续基于延迟的码率估计和基于丢包的码率估计提供入参,从而可以看出webrtc 动态码率估计得复杂性,同时通过本文的分析对后续分析基于延迟的码率估计奠定基础。