1)前言
经过对WebRtc Video Receiver 创建分析(一)、WebRtc Video Receiver RTP包接收分析(二)、以及NACK 模块的工作原理进行了深入的分析。
按照在WebRtc Video Receiver 创建分析(一)中所提到的视频接收模块的分块,本文着重讲解视频接收模块组包的实现原理。
-
重新回顾视频接收模块对RTP数据流的处理流程如下图:
首先经过
Call
模块处理将rtp视频数据送到RtpVideoStreamReceiver::OnRtpPacket
函数,然后将调用RtpVideoStreamReceiver::ReceivePacket
函数进行RTP包解析。解析完之后的数据,会通过
RtpVideoStreamReceiver::OnReceivedPayloadData
回调,在该函数中会将rtp数据包打包成PacketBuffer::Packet包,然后将PacketBuffer::Packet包插入到packet_buffer_
,上图第8步。在
PacketBuffer::InsertPacket
函数中插入完后会调用PacketBuffer::FindFrames函数查找有没有合适的帧。最后
PacketBuffer::InsertPacket
函数会返回struct InsertResult
结构,然后RtpVideoStreamReceiver2
模块回调OnInsertedPacket
函数对其进行处理。
struct InsertResult {
std::vector<std::unique_ptr<Packet>> packets;
// Indicates if the packet buffer was cleared, which means that a key
// frame request should be sent.
bool buffer_cleared = false;
};
- 如果buffer_cleared为true的话,
RtpVideoStreamReceiver2
模块的OnInsertedPacket
函数会发起关键帧请求处理,此处是和m79版本当中较大的变化。 - 本文首先分析
PacketBuffer
的数据结构,然后再分析其组帧原理,以及组帧后的触发机制。
2)video_coding::PacketBuffer数据结构分析
-
成员关系如下图:
-
video_coding::PacketBuffer
的数据存储主要依赖于其成员变量buffer_
当中他的默认大小为512,最大可支持到2048,支持动态扩容,最大扩容到2048每次以512的步进进行扩张,通过调用PacketBuffer::ExpandBufferSize()
函数来达到目的。 - 相比m79版本,PacketBuffer中所维护的数据结构变得简单了些。
-
video_coding::PacketBuffer::Packet
的定义如下
struct Packet {
// If all its previous packets have been inserted into the packet buffer.
// Set and used internally by the PacketBuffer.
bool continuous = false;
bool marker_bit = false;
uint8_t payload_type = 0;
uint16_t seq_num = 0;
uint32_t timestamp = 0;
// NTP time of the capture time in local timebase in milliseconds.
int64_t ntp_time_ms = -1;
int times_nacked = -1;
rtc::CopyOnWriteBuffer video_payload;
RTPVideoHeader video_header;
RtpPacketInfo packet_info;
};
-
struct Packet
定义在PacketBuffer
内部。 - 根据其注释,若一帧数据全部收到那么该帧对应的各Packet的continuous成员应该都会被成置true
- 通过
PacketBuffer::PotentialNewFrame(uint16_t seq_num)
根据传入的seq number来查找潜在的帧。
bool PacketBuffer::PotentialNewFrame(uint16_t seq_num) const {
//通过取模运算来获取传入seq numer对赢的Packet在buffer_中的位置索引
size_t index = seq_num % buffer_.size();
//得到前一个包的索引
int prev_index = index > 0 ? index - 1 : buffer_.size() - 1;
//得到seq_number对应的Packet实例引用
const auto& entry = buffer_[index];
//得到seq_number的前一个包对应的Packet实例引用
const auto& prev_entry = buffer_[prev_index];
//如果entry为空说明当前seq_num对应的Packet还没有被插到buffer_中,返回false
//说明当前seq num还没有潜在的帧存在
if (entry == nullptr)
return false;
if (entry->seq_num != seq_num)
return false;
//如果seq num对应的包是一帧数据的第一个包,则说明前面可能有一帧数据
if (entry->is_first_packet_in_frame())
return true;
if (prev_entry == nullptr)
return false;
//上一个包的seq不等于当前seq num -1 表明丢包
if (prev_entry->seq_num != static_cast<uint16_t>(entry->seq_num - 1))
return false;
if (prev_entry->timestamp != entry->timestamp)
return false;
//如前面所以条件都满足
if (prev_entry->continuous)
return true;
return false;
}
- 很明显相比m79版本要见多许多。
- 潜在一帧的条件其一,若传入的seq num 对应的Packet为一帧中的首个包,则表示可能前面有一帧完整的数据
- 其二、如果seq 连续,并且和前一个包的timestamp不一样,这里充分利用同一帧数据的timestamp一样的条件
3)PacketBuffer::InsertPacket 工作流程
PacketBuffer::InsertResult PacketBuffer::InsertPacket(
std::unique_ptr<PacketBuffer::Packet> packet) {
PacketBuffer::InsertResult result;
MutexLock lock(&mutex_);
uint16_t seq_num = packet->seq_num;
//计算索引
size_t index = seq_num % buffer_.size();
//首次接收到rtp包,更新first_seq_num_为seq_num
if (!first_packet_received_) {
first_seq_num_ = seq_num;
first_packet_received_ = true;
} else if (AheadOf(first_seq_num_, seq_num)) {//如果收到重传恢复的包
// If we have explicitly cleared past this packet then it's old,
// don't insert it, just silently ignore it.
if (is_cleared_to_first_seq_num_) {
return result;
}
first_seq_num_ = seq_num;
}
if (buffer_[index] != nullptr) {
// Duplicate packet, just delete the payload.
if (buffer_[index]->seq_num == packet->seq_num) {
return result;
}
// The packet buffer is full, try to expand the buffer.
while (ExpandBufferSize() && buffer_[seq_num % buffer_.size()] != nullptr) {
}
index = seq_num % buffer_.size();
//容器已经满了,需要清除buffer
// Packet buffer is still full since we were unable to expand the buffer.
if (buffer_[index] != nullptr) {
// Clear the buffer, delete payload, and return false to signal that a
// new keyframe is needed.
RTC_LOG(LS_WARNING) << "Clear PacketBuffer and request key frame.";
ClearInternal();
//RtpVideoStreamReceiver2::OnInsertedPacket()函数根据该标识进行关键帧请求
result.buffer_cleared = true;
return result;
}
}
int64_t now_ms = clock_->TimeInMilliseconds();
last_received_packet_ms_ = now_ms;
if (packet->video_header.frame_type == VideoFrameType::kVideoFrameKey ||
last_received_keyframe_rtp_timestamp_ == packet->timestamp) {
last_received_keyframe_packet_ms_ = now_ms;
last_received_keyframe_rtp_timestamp_ = packet->timestamp;
}
packet->continuous = false;
buffer_[index] = std::move(packet);
/*4) 更新丢包容器*/
UpdateMissingPackets(seq_num);
/*5) 组帧处理*/
result.packets = FindFrames(seq_num);
return result;
}
-
InsertPacket
函数,根据seq 得到索引。 - 在插包之前,首先会判断,容器是否已经满了,如果满了说明丢包严重,会进行扩容处理,如果扩容后,继续接收包,发现还是丢包严重,buffer_得不到释放,则会清空buffer,并且设置
result.buffer_cleared
为true,这样RtpVideoStreamReceiver2
模块会根据组帧结果发送关键帧请求。 - 通过
std::move(packet);
将包插入到buffer_
对应的位置当中。 - 调用
UpdateMissingPackets
进行丢包统计。 - 调用
FindFrames
进行组帧。
4) 更新丢包记录
void PacketBuffer::UpdateMissingPackets(uint16_t seq_num) {
if (!newest_inserted_seq_num_)
newest_inserted_seq_num_ = seq_num;
const int kMaxPaddingAge = 1000;
//如果不丢包的话条件会一直成立
if (AheadOf(seq_num, *newest_inserted_seq_num_)) {
uint16_t old_seq_num = seq_num - kMaxPaddingAge;
auto erase_to = missing_packets_.lower_bound(old_seq_num);
missing_packets_.erase(missing_packets_.begin(), erase_to);
// Guard against inserting a large amount of missing packets if there is a
// jump in the sequence number.
if (AheadOf(old_seq_num, *newest_inserted_seq_num_))
*newest_inserted_seq_num_ = old_seq_num;
++*newest_inserted_seq_num_;
//如果条件成立则表示丢包,missing_packets_插入丢失的包号
while (AheadOf(seq_num, *newest_inserted_seq_num_)) {
missing_packets_.insert(*newest_inserted_seq_num_);
++*newest_inserted_seq_num_;
}
} else {//收到恢复的包
missing_packets_.erase(seq_num);
}
}
- 在
PacketBuffer::InsertPacket
函数每次插入数据后都会调用该函数来刷新missing_packets_
丢包管理容器。 - 第一次调用会更新
newest_inserted_seq_num_
,表示最新插入的seq number。 - 以上分两种情况讨论,其一是如果在插入过程中有被恢复的包被插入(之前丢过的包),假设先插入1434号包,后插入1433号包,此时
newest_inserted_seq_num_
的值为1434,seq_num的值为1433,从而导致AheadOf(seq_num, *newest_inserted_seq_num_)
的返回值为false,所以会走else分支,在插入恢复包的过程中只是通过missing_packets_.erase(seq_num);
将对应的1433从丢包记录中进行移除。 - 其二是在每次插入的过程中通过
AheadOf(seq_num, *newest_inserted_seq_num_)
来判断是否有丢包,从而将丢包的seq 插入到missing_packets_
容器。 -
++*newest_inserted_seq_num_
自加操作,此时newest_inserted_seq_num_
的值为1433,通过while(AheadOf(seq_num, *newest_inserted_seq_num_))
循环来进行丢包统计,将被丢失包的seq 插入到missing_packets_
容器。
5) PacketBuffer::FindFrames查找合适的帧
std::vector<std::unique_ptr<PacketBuffer::Packet>> PacketBuffer::FindFrames(
uint16_t seq_num) {
std::vector<std::unique_ptr<PacketBuffer::Packet>> found_frames;
//在for循环条件中根据PotentialNewFrame查找潜在帧
for (size_t i = 0; i < buffer_.size() && PotentialNewFrame(seq_num); ++i) {
//得到索引
size_t index = seq_num % buffer_.size();
//能到这里将Packet.continuous设置成true,说明对应当前(之前的帧就不一定了)帧的
//每一个包是连续的
buffer_[index]->continuous = true;
// If all packets of the frame is continuous, find the first packet of the
// frame and add all packets of the frame to the returned packets.
// 如果该seq 对应的包是当前帧的最后一个包再进行实际操作,进行逆向查找。
if (buffer_[index]->is_last_packet_in_frame()) {
uint16_t start_seq_num = seq_num;
// Find the start index by searching backward until the packet with
// the |frame_begin| flag is set.
int start_index = index;
size_t tested_packets = 0;
int64_t frame_timestamp = buffer_[start_index]->timestamp;
// Identify H.264 keyframes by means of SPS, PPS, and IDR.
bool is_h264 = buffer_[start_index]->codec() == kVideoCodecH264;
bool has_h264_sps = false;
bool has_h264_pps = false;
bool has_h264_idr = false;
bool is_h264_keyframe = false;
int idr_width = -1;
int idr_height = -1;
//第2部分,以当前seq的包对应的位置为索引进行逆向查找找出当前帧第一个包的位置
//也就是start_seq_num
while (true) {
++tested_packets;
//如果是h264,找到该帧的首个包则跳出该循环,核心就是这一句代码。。
if (!is_h264 && buffer_[start_index]->is_first_packet_in_frame())
break;
//以下操作是对H264数据进行校验
if (is_h264) {
const auto* h264_header = absl::get_if<RTPVideoHeaderH264>(
&buffer_[start_index]->video_header.video_type_header);
if (!h264_header || h264_header->nalus_length >= kMaxNalusPerPacket)
return found_frames;
for (size_t j = 0; j < h264_header->nalus_length; ++j) {
if (h264_header->nalus[j].type == H264::NaluType::kSps) {
has_h264_sps = true;
} else if (h264_header->nalus[j].type == H264::NaluType::kPps) {
has_h264_pps = true;
} else if (h264_header->nalus[j].type == H264::NaluType::kIdr) {
has_h264_idr = true;
}
}
/*通过WebRTC-SpsPpsIdrIsH264Keyframe/Enabled/来开启
sps_pps_idr_is_h264_keyframe_
* 表示idr包必须前面有sps pps 等信息,表示当前帧是否为关键帧
*/
if ((sps_pps_idr_is_h264_keyframe_ && has_h264_idr && has_h264_sps &&
has_h264_pps) ||
(!sps_pps_idr_is_h264_keyframe_ && has_h264_idr)) {
//判断当前帧是否为关键帧
is_h264_keyframe = true;
// Store the resolution of key frame which is the packet with
// smallest index and valid resolution; typically its IDR or SPS
// packet; there may be packet preceeding this packet, IDR's
// resolution will be applied to them.
if (buffer_[start_index]->width() > 0 &&
buffer_[start_index]->height() > 0) {
idr_width = buffer_[start_index]->width();
idr_height = buffer_[start_index]->height();
}
}
}
if (tested_packets == buffer_.size())
break;
start_index = start_index > 0 ? start_index - 1 : buffer_.size() - 1;
// In the case of H264 we don't have a frame_begin bit (yes,
// |frame_begin| might be set to true but that is a lie). So instead
// we traverese backwards as long as we have a previous packet and
// the timestamp of that packet is the same as this one. This may cause
// the PacketBuffer to hand out incomplete frames.
// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7106
//同一帧数据的timestamp是相等的,如果不相等说明不是同一帧
if (is_h264 && (buffer_[start_index] == nullptr ||
buffer_[start_index]->timestamp != frame_timestamp)) {
break;
}
--start_seq_num;
}//while (true)结束,已经得到当前帧的首个包的seq
//第3部分判断帧的连续性
if (is_h264) {
// Warn if this is an unsafe frame.
if (has_h264_idr && (!has_h264_sps || !has_h264_pps)) {
RTC_LOG(LS_WARNING)
<< "Received H.264-IDR frame "
"(SPS: "
<< has_h264_sps << ", PPS: " << has_h264_pps << "). Treating as "
<< (sps_pps_idr_is_h264_keyframe_ ? "delta" : "key")
<< " frame since WebRTC-SpsPpsIdrIsH264Keyframe is "
<< (sps_pps_idr_is_h264_keyframe_ ? "enabled." : "disabled");
}
// Now that we have decided whether to treat this frame as a key frame
// or delta frame in the frame buffer, we update the field that
// determines if the RtpFrameObject is a key frame or delta frame.
// 得到该帧的首个包的在buffer_中的索引。
const size_t first_packet_index = start_seq_num % buffer_.size();
// h264数据,这里解析判断当前帧是否为关键帧,并初始化Packet的
// ideo_header.frame_type成员变量
if (is_h264_keyframe) {
buffer_[first_packet_index]->video_header.frame_type =
VideoFrameType::kVideoFrameKey;
if (idr_width > 0 && idr_height > 0) {
// IDR frame was finalized and we have the correct resolution for
// IDR; update first packet to have same resolution as IDR.
buffer_[first_packet_index]->video_header.width = idr_width;
buffer_[first_packet_index]->video_header.height = idr_height;
}
} else {
buffer_[first_packet_index]->video_header.frame_type =
VideoFrameType::kVideoFrameDelta;
}
// If this is not a keyframe, make sure there are no gaps in the packet
// sequence numbers up until this point.
// 对于H264数据,若当前组好的帧为P帧那么必须要有前向参考帧才能正常解码, 通过
// missing_packets_.upper_bound(start_seq_num) 判断missing_packets_容器中
// 是否有start_seq_num之前的包还没有收到,如果有则直接返回,不再继续组帧了
if (!is_h264_keyframe && missing_packets_.upper_bound(start_seq_num) !=
missing_packets_.begin()) {
return found_frames;
}
// 举个例子,假设25~27号为一帧完整的数据,到这个地方,程序也发现了,但是由于丢包的
// 原因假设此时missing_packets_容器中记录的数据为 20 23 30 31,又由于此帧为非
// 关键帧所以帧不连续,则不再继续进行组帧操作。
// 由此也可以看出,对于H264数据,只要是有一帧完整的I帧率到达此处则可以继续往下执行
}
// 第4部分将已经发现的帧对应的Packet插入到found_frames容器
const uint16_t end_seq_num = seq_num + 1;
// Use uint16_t type to handle sequence number wrap around case.
uint16_t num_packets = end_seq_num - start_seq_num;
found_frames.reserve(found_frames.size() + num_packets);
for (uint16_t i = start_seq_num; i != end_seq_num; ++i) {
std::unique_ptr<Packet>& packet = buffer_[i % buffer_.size()];
RTC_DCHECK(packet);
RTC_DCHECK_EQ(i, packet->seq_num);
// Ensure frame boundary flags are properly set.
packet->video_header.is_first_packet_in_frame = (i == start_seq_num);
packet->video_header.is_last_packet_in_frame = (i == seq_num);
found_frames.push_back(std::move(packet));
}
// 把missing_packets_容器中小于seq的序号进行清除。
// 对于H264如果P帧的前向参考帧丢失,那么在之前就会返回,程序运行不到此处。
// 程序运行到这里,假设该帧是关键帧率,但是前面有丢失的帧,buffer_还没有被清理,
// 在该帧进入解码之前会调用ClearTo函数对seq 之前的buffer_进行清除。
missing_packets_.erase(missing_packets_.begin(),
missing_packets_.upper_bound(seq_num));
}
++seq_num;
}
每收到一个包都会调用该函数,分成4部分进行分析。
第1部分是外部for循环,调用
PotentialNewFrame
查找当前传入的seq 是否可能会存在一潜在的帧。如果第1部分的条件成立,则判断当前seq对应的包是否是一帧中的最后一个包,如果是则执行第2部分逻辑处理,第2部分的核心逻辑是使用while(true)循环以当前seq 进行逆向查找,并得出当前帧的第一个seq包号。
第3部分是判断帧的连续性,对于H264数据,如果发现当前帧不是关键帧并且它的前向参考帧率有丢包情况,则会直接返回,不再进行组帧。
第4部分,每次for 循环如果找到一帧完整的帧,并且符号解码条件,则会将该帧数据插入到
found_frames
容器,如果前面由丢包,则将该帧对应的seq之前的所有记录在missing_packets_
容器中的包进行清除。-
最后将已组好的一帧数据对应的
std::vector<std::unique_ptr<PacketBuffer::Packet>> found_frames
返回。
RtpVideoStreamReceiver2
模块的OnInsertedPacket
函数对每一个rtp包首先调用webrtc::video_coding::PacketBuffer
模块的InsertPacket()
函数将其插入到PacketBuffer
当中。在插入的过程中会对每一个rtp包进行一次组帧查询操作,将查询到的符合一帧并且可以顺利解码的帧数据封装成
std::vector<std::unique_ptr<PacketBuffer::Packet>>
并将其回调回RtpVideoStreamReceiver2
模块。接着在
OnInsertedPacket
函数中会调用OnAssembledFrame
函数对已经组好的一帧数据进行打包操作。如果在传输的过程出现了严重的丢包现象,导致
PacketBuffer
已经满了,这样会导致PacketBuffer
被清空,从而引发关键帧请求操作。OnAssembledFrame
函数的参数为std::unique_ptr<video_coding::RtpFrameObject> frame
,所以在分析其之前先分析video_coding::RtpFrameObject
的打包操作。
6) OnInsertedPacket组帧处理
- 在分析
OnInsertedPacket
函数之前首先弄清楚PacketBuffer
、RtpFrameObject
、Packet
、以及RtpVideoStreamReceiver
之间的关系。
-
FindFrames
函数最终返回的是一个video_coding::PacketBuffer::InsertResult
数据结构,而该结构中包含了std::vector<std::unique_ptr<PacketBuffer::Packet>>
容器,由上图可知,在组包的过程中最终会将已经准备好的包集合打包成RtpFrameObject
。 - 最终一帧编码视频数据用
RtpFrameObject
来描述,对应一个RtpPacketInfos
,其中每个RtpPacketInfos
对应多个RtpPacketInfo结构,数量对应Packet的数量。 - 其中
Packet
中就有RtpPacketInfo
成员变量packet_info,在创建Packet
的时候对其进行了初始化,RtpPacketInfo
模块初始化的时候保存了当前包的rtp头部和该包的接收时间。
void RtpVideoStreamReceiver2::OnInsertedPacket(
video_coding::PacketBuffer::InsertResult result) {
RTC_DCHECK_RUN_ON(&worker_task_checker_);
video_coding::PacketBuffer::Packet* first_packet = nullptr;
int max_nack_count;
int64_t min_recv_time;
int64_t max_recv_time;
std::vector<rtc::ArrayView<const uint8_t>> payloads;
RtpPacketInfos::vector_type packet_infos;
bool frame_boundary = true;
for (auto& packet : result.packets) {
// PacketBuffer promisses frame boundaries are correctly set on each
// packet. Document that assumption with the DCHECKs.
....
payloads.emplace_back(packet->video_payload);
packet_infos.push_back(packet->packet_info);
frame_boundary = packet->is_last_packet_in_frame();
//遍历到最后一个包后将各个包打包成video_coding::RtpFrameObject结构
if (packet->is_last_packet_in_frame()) {
auto depacketizer_it = payload_type_map_.find(first_packet->payload_type);
RTC_CHECK(depacketizer_it != payload_type_map_.end());
rtc::scoped_refptr<EncodedImageBuffer> bitstream =
depacketizer_it->second->AssembleFrame(payloads);
if (!bitstream) {
// Failed to assemble a frame. Discard and continue.
continue;
}
const video_coding::PacketBuffer::Packet& last_packet = *packet;
OnAssembledFrame(std::make_unique<video_coding::RtpFrameObject>(
first_packet->seq_num, //
last_packet.seq_num, //
last_packet.marker_bit, //
max_nack_count, //
min_recv_time, //
max_recv_time, //
first_packet->timestamp, //
first_packet->ntp_time_ms, //
last_packet.video_header.video_timing, //
first_packet->payload_type, //
first_packet->codec(), //
last_packet.video_header.rotation, //
last_packet.video_header.content_type, //
first_packet->video_header, //
last_packet.video_header.color_space, //
RtpPacketInfos(std::move(packet_infos)), //
std::move(bitstream)));
}
}
RTC_DCHECK(frame_boundary);
if (result.buffer_cleared) {
RequestKeyFrame();
}
}
首先依据各个数据包构建video_coding::RtpFrameObject。
其次调用
OnAssembledFrame
对组好的聚合包进行投递。如果在插包的时候清除了
PacketBuffer
,则需要发送关键帧请求。-
RtpFrameObject
的派生关系如下:
从
RtpFrameObject
的派生关系来看RtpFrameObject
对应的就是对应编码后的一帧数据。RtpFrameObject
构造函数这里不做详细分析,其内部包含了RTPVideoHeader
,播放延迟、first_seq、last_seq等信息。最终调用
OnAssembledFrame
对当前已组好的聚合帧进行参考帧查找,并对其进行设置。
9) 总结
- 本文着重分析组帧原理,在有这些信息的基础上为后续分析视频帧的解码分析奠定基础。
- 同时根据本文的分析,我们可以得出,在组帧过程中如果发现当前被组帧的包的前面有丢包存在并且该帧为非关键帧,则会直接返回,等待前面已丢包的信息恢复,这样的话话出现延迟的问题
- 那么如何优化呢,结合前面文章的分析我们可以在NACK模块中适当的调节其Process模块检测丢失的包延迟了多长时间,如果超过阀值还未收到该包,应该立即清除该丢失的包,然后发送关键帧请求来降低延迟,同时结合上述分析的逻辑,如果出现该种情况,当收到关键帧的时候,然后组包如果发现有一帧关键帧组包完成则会立马将该关键帧送到解码模块进行解码。
- 留下一个问题,每组完一帧然后就会发送给到解码器进行解码吗?