IJKPlayer播放流程分析——PacketQueue

一、概述

IJKPlayer是b站开源的基于FFmpeg的播放器,而保利威官方的点播播放器在开源IJKPlayer的基础上增加了PlaySafe视频全流程加密体系、首屏秒开和众多可自定义的功能。我们借着保利威的点播播放器来探究一下视频播放的流程中的PacketQueue数据结构。

播放器主要的播放流程代码都在ff_ffplay.c这个文件里面。在阅读ffplay的代码前,需要先具备一定的音视频基础,这方面可以先阅读雷神的博客。在这篇文章中,我们先来分析ffplay中的PacketQueue这个结构体,这样才能更好理解后续的IJKPlayer播放流程。Ijk源码雷霄骅的博客

当我们给播放器设定一个url 播放视频的时候,底层的FFmpeg会发起网络请求去拉取视频的数据,这里先忽略解协议、解封装相关的知识;然后播放器中的read_thread线程会负责把一个个的音视频数据包AVPacket从FFmpeg层中读取出来存放到一个叫做PacketQueue的FIFO队列中去,而这个队列是用链表来实现的。下面结合网上众多大佬的分析和自己这段时间对播放器的学习理解,来分析一下PacketQueue的实现和相关操作方法。

二、队列和节点的定义

播放器中使用 PacketQueue来保存解封装后的音视频数据AVPacket。那么PacketQueue是怎么样的一个东西呢?我们先来看一下它的定义:

typedef struct PacketQueue {
    MyAVPacketList *first_pkt, *last_pkt;   //队首节点,队尾节点
    int nb_packets;                         //队列中的节点数量
    int size;                               //统计队列内所有节点的数据大小
    int64_t duration;                       //统计队列内所有节点的音视频时长
    int abort_request;                      //是否要中止队列操作
    int serial;                             //序列号,用于区别前后数据包是否连续
    SDL_mutex *mutex;                       //用于维持PacketQueue的多线程安全
    SDL_cond *cond;                         //用于读、写线程相互通知
    MyAVPacketList *recycle_pkt;      // 循环节点,用于回收利用节点的内存,减少内存频繁申请创建
    int recycle_count;
    int alloc_count;
 
    int is_buffer_indicator;
} PacketQueue;

从结构体定义上我们可以猜到:PacketQueue是用链表实现的一个FIFO队列,这个结构体内定义了队列的属性。

然后链表的节点使用MyAVPacketList这个结构体来表示:

typedef struct MyAVPacketList {
    AVPacket pkt;   // 解封装后的音视频数据包
    struct MyAVPacketList *next;    // 指向下一个节点
    int serial;     // 序列号,用于区别前后数据包是否连续
} MyAVPacketList;

三、队列的操作函数

上面讲了队列和链表节点的定义,接下来我们从队列的操作函数来看看这个链表是如何工作的:

1、packet_queue_init 初始化队列

初始化函数用于创建队列所需的内存空间,并创建mutex和cond用于控制链表工作流程:

static int packet_queue_init(PacketQueue *q)
{
    memset(q, 0, sizeof(PacketQueue));
    q->mutex = SDL_CreateMutex();
    if (!q->mutex) {
        av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
        return AVERROR(ENOMEM);
    }
    q->cond = SDL_CreateCond();
    if (!q->cond) {
        av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
        return AVERROR(ENOMEM);
    }
    q->abort_request = 1;
    return 0;
}

2、packet_queue_destroy:销毁队列

销毁函数负责清理释放节点、mutex和cond:

static void packet_queue_destroy(PacketQueue *q)
{
    packet_queue_flush(q);  //清除队列中所有的节点
 
    SDL_LockMutex(q->mutex);
    while(q->recycle_pkt) {
        MyAVPacketList *pkt = q->recycle_pkt;
        if (pkt)
            q->recycle_pkt = pkt->next;
        av_freep(&pkt);
    }
    SDL_UnlockMutex(q->mutex);
 
    SDL_DestroyMutex(q->mutex);
    SDL_DestroyCond(q->cond);
}

3、packet_queue_start:开始队列

队列开始的时候存入了一个flush_pkt,它是一个特殊的AVPacket,主要用来当做不连续数据之间的标记。

static void packet_queue_start(PacketQueue *q)
{
    SDL_LockMutex(q->mutex);
    q->abort_request = 0;
    packet_queue_put_private(q, &flush_pkt); 
    SDL_UnlockMutex(q->mutex);
}

4、packet_queue_abort:中止队列

中止标记为置1,SDL_CondSignal的作用在于确保当前等待该条件的线程能被激活并继续执行退出流程。

static void packet_queue_abort(PacketQueue *q)
{
    SDL_LockMutex(q->mutex);
    q->abort_request = 1;
    SDL_CondSignal(q->cond);
    SDL_UnlockMutex(q->mutex);
}

5、packet_queue_put:往队列中存入一个节点

这个函数主要完成下面的操作:
a)、链表插入节点的实现。
b)、维护serial。当链表中插入了一个flush_pkt后,serial会增加1。
c)、更新队列属性。

static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
    int ret;
    SDL_LockMutex(q->mutex);
    ret = packet_queue_put_private(q, pkt); //存入节点
    SDL_UnlockMutex(q->mutex);
    if (pkt != &flush_pkt && ret < 0)
        av_packet_unref(pkt);       //存入失败则释放AVPacket
    return ret;
}
 
 
// 链表插入节点的实现
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
    MyAVPacketList *pkt1;
    //如果已中止,则插入节点失败
    if (q->abort_request)
       return -1;

#ifdef FFP_MERGE
    pkt1 = av_malloc(sizeof(MyAVPacketList));   //分配节点内存
#else
    pkt1 = q->recycle_pkt;                      // 回收利用旧节点的内存空间
    if (pkt1) {
        q->recycle_pkt = pkt1->next;
        q->recycle_count++;
    } else {
        q->alloc_count++;
        pkt1 = av_malloc(sizeof(MyAVPacketList));
    }
#endif
    if (!pkt1)
        return -1;
    pkt1->pkt = *pkt;        //浅拷贝AVPacket的结构体,AVPacket.data等内存并没有拷贝
    pkt1->next = NULL;
    if (pkt == &flush_pkt)  //如果插入的节点是flush_pkt,需要增加队列的序列号,以区分不连续的两段数据
        q->serial++;
    pkt1->serial = q->serial;     //用队列序列号作为节点序列号
 
    //队列操作:如果队列是空的,新增节点为队头;否则,新增节点为原队尾节点的next。
    if (!q->last_pkt)
        q->first_pkt = pkt1;
    else
        q->last_pkt->next = pkt1;
    q->last_pkt = pkt1;
 
    //计算更新队列的节点数、cache大小、cache总时长
    q->nb_packets++;
    q->size += pkt1->pkt.size + sizeof(*pkt1);
    q->duration += FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
 
    //发出信号,通知等待中的读线程读取数据
    SDL_CondSignal(q->cond);
    return 0;
}

6、packet_queue_get:从队列中获取一个节点

这个函数内容较长,我们先看下函数整体流程为:加锁、进入循环、取队头节点然后break。然后再看细节的for循环内部做了什么。

我们知道队列是一个FIFO先进先出的模型,所以从队头拿数据。对于没有取到数据的情况,根据block参数进行判断是否阻塞,如果阻塞,通过SDL_CondWait等待信号。如果有取到数据,则转移队头、维护计算队列属性、输出结果、释放节点内存等操作。

//pkt: 输出参数,即MyAVPacketList节点中的AVPacket
//block: 是否需要在没节点可取的情况下阻塞等待
//serial: 输出参数,即节点的序列号
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
    MyAVPacketList *pkt1;
    int ret;
    SDL_LockMutex(q->mutex); // 加锁
 
    for (;;) {
        if (q->abort_request) {     
            ret = -1;
            break;
        }
 
        // 。。。省略代码,取队头节点,break 。。。
    }
    SDL_UnlockMutex(q->mutex);
    return ret;
}
 
 
// for循环的主体:取队头节点,break
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
    // 。。。省略。。。
    for (;;) {
        // 。。。省略。。。
 
        pkt1 = q->first_pkt;     // 从队头拿数据
        if (pkt1) {
            q->first_pkt = pkt1->next;        // 队头移到第二个节点
            if (!q->first_pkt)
                q->last_pkt = NULL;
 
            // 队列属性维护:节点数、cache大小、cache总时长
            q->nb_packets--;
            q->size -= pkt1->pkt.size + sizeof(*pkt1);
            q->duration -= FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
 
            *pkt = pkt1->pkt;        // 输出AVPacket,这里拷贝了AVPacket结构体,AVPacket的data只拷贝了指针
            if (serial)             // 输出serial
                *serial = pkt1->serial;
 
#ifdef FFP_MERGE
            av_free(pkt1);          // 释放节点内存
#else
            pkt1->next = q->recycle_pkt;
            q->recycle_pkt = pkt1;
#endif
            ret = 1;
            break;
        } else if (!block) {        // 队列中没有数据,返回失败结果
            ret = 0;
            break;
        } else {                    // 队列中没有数据,阻塞等待新节点的插入
            SDL_CondWait(q->cond, q->mutex);  
        }
    }
    // 。。。省略。。。
}

7、packet_queue_put_nullpacket:存入一个空包到队列中

播放器定义了插入空包就代表流的结束,当视频读取完成的时候将会放入空包。该函数的实现是构建一个空包,然后调用packet_queue_put存入队列。

static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
{
    AVPacket pkt1, *pkt = &pkt1;
    av_init_packet(pkt);
    pkt->data = NULL;
    pkt->size = 0;
    pkt->stream_index = stream_index;
    return packet_queue_put(q, pkt);
}

8、packet_queue_flush:清空队列

用于清空链表中的所有节点。比如用于销毁队列、seek操作等。该函数主要是遍历链表释放节点和AVPacket的内存。最后将PacketQueue的属性恢复为初始状态。

static void packet_queue_flush(PacketQueue *q)
{
    MyAVPacketList *pkt, *pkt1;
    SDL_LockMutex(q->mutex);
    for (pkt = q->first_pkt; pkt; pkt = pkt1) {
        pkt1 = pkt->next;
        av_packet_unref(&pkt->pkt);
#ifdef FFP_MERGE
        av_freep(&pkt);
#else
        pkt->next = q->recycle_pkt;
        q->recycle_pkt = pkt;
#endif
    }
    q->last_pkt = NULL;
    q->first_pkt = NULL;
    q->nb_packets = 0;
    q->size = 0;
    q->duration = 0;
    SDL_UnlockMutex(q->mutex);
}

四、packet queue的关键点分析

1、PacketQueue 的内存管理

MyAVPacketList 的内存是完全由 PacketQueue 维护的,在存入节点的时候创建内存 ,在取出节点的时候销毁内存。而 AVPacket 一部分是 AVPacket 结构体的内存,这部分跟随 MyAVPacketList创建和销毁;另一部分是 AVPacket 字段指向的内存data,这部分是在取出节点之后用 av_packet_unref 函数释放。内存示意如下图所示:


image.png

2、serial序列号的变化过程

如下图所示,从队头到队尾标注了3个节点和它们的 serial,以及放入flush_pkt之后 Packet Queue 的 serial 变化。可以看到放入 flush_pkt 之后,serial 增加了1。


image.png

到这里PacketQueue的分析就结束了,在后续的文章中我们会继续分析另一个用于存放解码后的音视频数据AVFrame的队列FrameQueue。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容