十三: 音视频队列的讲解(下):C++多线程队列使用

一. 多线程队列的框图:

image.png

上图是同一个典型的多线程入队,出队的过程。这里需要创建两个线程,一个是入队线程、一个是出队线程。入队线程主要是通过push的api向Queue的队尾插入数据,插入数据的同时通过pthread_cond_broadcast通知出队线程取出数据。此时出队线程正在等待入队线程的唤醒(pthread_cond_wait),若收到唤醒通知则让队列数据出队。

二·.Linux多线程的基本API:

2.1. pthread_mutex_lock****:

int pthread_mutex_lock(pthread_mutex_t *mutex);

第一个传入参数:pthread_mutex_t结构体指针

功能:这个是互斥锁加锁功能,就是每次线程调用的时候都会把锁加上,使其保证访问数据的原子性,直到解锁为止。

2.2. pthread_mutex_unlock:

int pthread_mutex_unlock(pthread_mutex_t *mutex);

第一个传入参数:pthread_mutex_t结构体指针

功能:这个是互斥锁解锁功能,就是每次线程访问完资源的时候都会把锁解锁。

2.3. pthread_cond_broadcast****:

*int pthread_cond_broadcast(pthread_cond_t cond)

传入参数:pthread_cond_t的结构体指针

功能:唤醒所有正在pthread_cond_wait(线程等待)的线程

2.4. pthread_cond_wait:

int pthread_cond_wait (pthread_cond_t *__restrict __cond , pthread_mutex_t *__restrict __mutex)

第一个参数:pthread_cond_t的结构体指针

第二个参数:pthread_mutex_t结构体指针

功能:线程等待并挂起,若被唤醒了,则直接跳出挂起状态。

三. 推流项目中视频队列的实现:

image.png

这张图是视频队列实现的过程,VIDEO_QUEUE是一个类。这个类里面,封装了添加视频队列(putVideoPacketQueue)、获取视频队列数据(getVideoPacketQueue)、获取视频队列长度(getVideoQueueSize)。

image.png

3.1. VIDEO_QUEUE****构造器

这里创建一个VIDEO_QUEUE的C++的构造器,C++构造器主要初始化了线程的量。包括:线程锁的初始化(pthread_mutex_init)、线程条件变量的初始化(pthread_cond_init)。

3.2. putVideoPacketQueue****的讲解:

image.png

putVideoPacketQueue主要是video_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后进行入队操作video_packet_queue.push(video_packet),入队完成之后再通知出队线程取出队列数据pthread_cond_broadcast,最后解锁pthread_mutex_unlock

3.3. getVideoPacketQueue****的讲解:

image.png

getVideoPacketQueue主要是video_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后判断视频队列是否有数据(video_packet_queue.size()==0)。若没有数据,则用pthread_cond_wait去等待线程被唤醒。若队列有数据则唤醒的此线程,则直接从队列取出数据。这里取出数据分两步:第一步,先把队列移动到最前面video_packet_queue.front()。第二步,video_packet_queue.pop出队并删除数据。

3.4. getVideoQueueSize****的讲解:

image.png

getVideoQueueSize主要是获取当前队列的长度,获取长度的步骤跟上面也差不多。

首先,pthread_mutex_lock加锁,然后通过count = video_packet_queue.size(),获取队列的数量。然后pthread_mutex_unlock解锁。

­­

四.推流项目中音频队列的实现:

image.png

这张图是音频队列实现的过程,AUDIO_QUEUE是一个类。这个类里面,封装了添加音频队列(putAudioPacketQueue)、获取视频队列数据(getAudioPacketQueue)、获取音频队列长度(getAudioQueueSize)。

image.png

4.1. AUDIO_QUEUE****构造器

这里创建一个AUDIO_QUEUE的C++的构造器,C++构造器主要初始化了线程的量。包括:线程锁的初始化(pthread_mutex_init)、线程条件变量的初始化(pthread_cond_init)。

4.2. putAudioPacketQueue****讲解:

image.png

putAudioPacketQueue主要是audio_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后进行入队操作audio_packet_queue.push(audio_packet),入队完成之后再通知出队线程取出队列数据pthread_cond_broadcast,最后解锁pthread_mutex_unlock

4.3 getAudioPacketQueue****的讲解:

image.png

getAudioPacketQueue主要是audio_data_packet入队的过程,入队前需要加锁pthread_mutex_lock

然后判断视频队列是否有数据(audio_packet_queue.size()==0),若音频队列为空则用pthread_cond_wait去等待线程被唤醒。若唤醒的此线程,则直接从队列取出数据。这里取出数据分两步:第一步,先把队列移动到最前面audio_packet_queue.front()。第二步,audio_packet_queue.pop出队并删除数据。

4.4. getAudioQueueSize****的讲解:

image.png

getAudioQueueSize主要是获取当前队列的长度,获取长度的步骤跟上面也差不多。首先,pthread_mutex_lock加锁,然后通过count = audio_packet_queue.size(),获取队列的数量。然后pthread_mutex_unlock解锁。

new与extern

image.png
image.png

在main函数中new的对象可供所有cpp文件使用,extern表示借用全局变量的声明

new

在C++中,new是一个运算符,用于动态分配内存。它的一般形式如下:

pointer = new type;
pointer = new type[numberOfElements];
第一个表达式将分配一个类型为type的对象并返回指向该对象的指针;第二个表达式将分配一个大小为numberOfElements的type数组,并返回指向该数组的第一个元素的指针。

例如,以下代码将分配一个整数并将其值设置为42:

int *p = new int;
*p = 42;
以下代码将分配一个大小为10的整数数组:

int *a = new int[10];
在使用new运算符分配内存时,需要注意以下几点:

new运算符会抛出bad_alloc异常,如果无法分配所需的内存,则会抛出该异常。

在使用完new运算符分配的内存后,必须使用delete运算符释放该内存。这可以通过以下方式实现:

delete pointer;
delete[] pointer;

第一个表达式释放由单个对象使用new运算符分配的内存;第二个表达式释放由数组使用new[]运算符分配的内存。忘记释放分配的内存可能会导致内存泄漏。

在使用new运算符分配内存时,应该始终检查返回的指针是否为NULL。如果返回的指针为NULL,则说明内存分配失败。

可以使用placement new运算符来将对象放置到预先分配的内存中。这可以通过以下方式实现:

void *memory = operator new(sizeof(MyClass));
MyClass *p = new (memory) MyClass;

在这个例子中,operator new运算符分配了一个大小为sizeof(MyClass)的内存块,并返回指向该块的指针。然后,placement new运算符在此内存中构造了一个名为MyClass的对象。使用placement new时,需要确保内存块的大小至少等于所创建对象的大小。

总之,new运算符是C++中动态分配内存的一种常用方式。它提供了一种方便的方法来创建对象或数组,并且可以与其他C++特性如类、异常和模板结合使用。
但是需要注意,在使用完动态分配的内存后,必须使用delete运算符释放该内存,否则可能会导致内存泄漏。

extern

在C++中,如果需要定义一个全局变量或函数,可以将其声明为extern。这意味着变量或函数的定义将在另一个源文件中进行,并且编译器将在链接时查找该定义。例如:

// File1.cpp
int myGlobalInt = 42;

// File2.cpp
extern int myGlobalInt;
void myFunction() {
    // 使用myGlobalInt
}

在上面的例子中,myFunction()函数声明了一个外部整数变量myGlobalInt,表示它已经在另一个源文件中定义。因此,在链接时,编译器将在程序中查找myGlobalInt的实际定义。

另一个情况是在C++类的实现中,需要在多个源文件中共享静态数据成员。与上述情况相似,可以将静态成员声明为extern,以便在其他源文件中访问。例如:

// MyClass.h
class MyClass {
public:
    static int myStaticInt;
};

// MyClass.cpp
#include "MyClass.h"
int MyClass::myStaticInt = 42;

// AnotherFile.cpp
#include "MyClass.h"
extern int MyClass::myStaticInt;
void myFunction() {
    // 使用myStaticInt
}

在上面的例子中,MyClass类声明了一个静态整数变量myStaticInt。MyClass.cpp文件定义了这个静态成员,并将其初始化为42。在AnotherFile.cpp中,使用extern关键字声明该静态成员,表示它已经在另一个源文件中定义。然后可以在myFunction()函数中访问和修改myStaticInt。

例图:

多线程框图.png

队列

消息队列(queue)是一种先进先出(FIFO)的数据结构,它支持在队尾添加新元素和在队头移除并返回队头元素。C++ STL提供了queue类,其中包含front和pop方法可以用来获取队头元素和移除队头元素。

具体来说:

front() 返回队头元素,但不会将其从队列中移除。

pop() 将队头元素从队列中移除,但不返回该元素的值。

下面给出一个示例代码,展示如何使用queue::front()和queue::pop()方法:

#include <iostream>
#include <queue>

int main () {
    std::queue<int> myqueue;
    for (int i=1;i<=5;i++) myqueue.push(i);

    std::cout << "myqueue.front() is now " << myqueue.front() << '\n';

    std::cout << "Popping out elements...";
    while (!myqueue.empty())
    {
        std::cout << ' ' << myqueue.front();
        myqueue.pop();
    }
    std::cout << '\n';

    return 0;
}

在以上代码中,我们首先创建一个空队列 myqueue,然后用 push() 方法向队列尾部添加元素,接着用 front() 方法返回队头元素并输出它的值,然后用 pop() 方法将队头元素移除并输出队列中所有元素的值。

输出结果如下:

myqueue.front() is now 1
Popping out elements... 1 2 3 4 5

可以看出,front() 方法返回队头元素1,而 pop() 方法将所有元素从队列中移除并输出在屏幕上。需要注意的是,在队列中没有元素的情况下调用front()方法会导致未定义行为,因此使用前必须要检查队列是否为空。

项目中相关的使用(以vi为例,ai类似):

1.在main函数中的init_rv1126_first_assignment(protocol_type, network_address);里的camera_venc_thread线程中将视频数据入到压缩队列

void *camera_venc_thread(void *args)
{
    pthread_detach(pthread_self());//自我设置本线程为分离属性设置线程分离属性
    MEDIA_BUFFER mb = NULL;

    VENC_PROC_PARAM venc_arg = *(VENC_PROC_PARAM *)args;
    free(args);

    printf("video_venc_thread...\n");

    while (1)
    {
        // 从指定通道中获取VENC数据
        mb = RK_MPI_SYS_GetMediaBuffer(RK_ID_VENC, venc_arg.vencId, -1);//-1:设置为不阻塞
        if (!mb)
        {
            printf("get venc media buffer error\n");
            break;
        }

        // int naluType = RK_MPI_MB_GetFlag(mb);
        // 分配video_data_packet_t结构体
        video_data_packet_t *video_data_packet = (video_data_packet_t *)malloc(sizeof(video_data_packet_t));
        // 把VENC视频缓冲区数据传输到video_data_packet的buffer中
        memcpy(video_data_packet->buffer, RK_MPI_MB_GetPtr(mb), RK_MPI_MB_GetSize(mb));
        // 把VENC的长度赋值给video_data_packet的video_frame_size中
        video_data_packet->video_frame_size = RK_MPI_MB_GetSize(mb);
        // video_data_packet->frame_flag = naluType;
        // 入到视频压缩队列
        video_queue->putVideoPacketQueue(video_data_packet);
        // printf("#naluType = %d \n", naluType);
        // 释放VENC资源
        RK_MPI_MB_ReleaseBuffer(mb);
    }

2.在main函数中的init_rv1126_first_assignment(protocol_type, network_address);里的push_server_thread线程中的deal_video_avpacket将视频数据从压缩队列中取出以及写入到复合流中 **

2.1push_server_thread的实现

void *push_server_thread(void *args)
{
    pthread_detach(pthread_self());
    RKMEDIA_FFMPEG_CONFIG ffmpeg_config = *(RKMEDIA_FFMPEG_CONFIG *)args;
    free(args);
    AVOutputFormat *fmt = NULL;
    int ret;

    while (1)
    {
        /*
     我们以转换到同一时基下的时间戳为例,假设上一时刻音、视频帧的保存时间戳都是0。
     当前任意保存一种视频帧,例如保存视频的时间戳为video_t1。接着比较时间戳,发现音频时间戳为0 < video_t1,保存一帧音频,时间戳为audio_t1。
     继续比较时间戳,发现audio_t1 < video_t1,选择保存一帧音频,时间戳为audio_t2。
     再一次比较时间戳video_t1 < audio_t2,选择保存一帧视频,时间戳为video_t2。
     int av_compare_ts(int64_t ts_a, AVRational_tb_b,int64_t ts_b, AVRational tb_b)
     {
         int64_t a = tb_a.num * (int64_t)tb_b.den;
         int64_t b = tb_b.num * (int64_t)tb_a.den;
         if ((FFABS64U(ts_a)|a|FFABS64U(ts_b)|b) <= INT_MAX)
             return (ts_a*a > ts_b*b) - (ts_a*a < ts_b*b);
         if (av_rescale_rnd(ts_a, a, b, AV_ROUND_DOWN) < ts_b)
             return -1;
          if (av_rescale_rnd(ts_b, b, a, AV_ROUND_DOWN) < ts_a)
             return -1;
         return 0;
     }
     */
        ret = av_compare_ts(ffmpeg_config.video_stream.next_timestamp,
                            ffmpeg_config.video_stream.enc->time_base,
                            ffmpeg_config.audio_stream.next_timestamp,
                            ffmpeg_config.audio_stream.enc->time_base);

        if (ret <= 0)
        {
            ret = deal_video_avpacket(ffmpeg_config.oc, &ffmpeg_config.video_stream); // 处理FFMPEG视频数据
            if (ret == -1)
            {
                printf("deal_video_avpacket error\n");
                break;
            }
        }
        else
        {
            ret = deal_audio_avpacket(ffmpeg_config.oc, &ffmpeg_config.audio_stream); // 处理FFMPEG音频数据
            if (ret == -1)
            {
                printf("deal_video_avpacket error\n");
                break;
            }
        }
    }

    av_write_trailer(ffmpeg_config.oc);                         // 写入AVFormatContext的尾巴
    free_stream(ffmpeg_config.oc, &ffmpeg_config.video_stream); // 释放VIDEO_STREAM的资源
    free_stream(ffmpeg_config.oc, &ffmpeg_config.audio_stream); // 释放AUDIO_STREAM的资源
    avio_closep(&ffmpeg_config.oc->pb);                         // 释放AVIO资源
    avformat_free_context(ffmpeg_config.oc);                    // 释放AVFormatContext资源
    return NULL;
}

2.2deal_video_avpacket的实现

int deal_video_avpacket(AVFormatContext *oc, OutputStream *ost)
{
    int ret;
    AVCodecContext *c = ost->enc;
    AVPacket *video_packet = get_ffmpeg_video_avpacket(ost->packet); // 从RV1126视频编码数据赋值到FFMPEG的Video AVPacket中
    if (video_packet != NULL)
    {
        video_packet->pts = ost->next_timestamp++; // VIDEO_PTS按照帧率进行累加
    }

    ret = write_ffmpeg_avpacket(oc, &c->time_base, ost->stream, video_packet); // 向复合流写入视频数据
    if (ret != 0)
    {
        printf("write video avpacket error");
        return -1;
    }

    return 0;
}

2.2.1get_ffmpeg_video_avpacket从队列中取出视频数据并将RV1126视频编码数据赋值到FFMPEG的Video AVPacket中

extern VIDEO_QUEUE *video_queue;
extern AUDIO_QUEUE *audio_queue;

// 从RV1126视频编码数据赋值到FFMPEG的Video AVPacket中
AVPacket *get_ffmpeg_video_avpacket(AVPacket *pkt)
{
    video_data_packet_t *video_data_packet = video_queue->getVideoPacketQueue(); // 从视频队列获取数据

    if (video_data_packet != NULL)
    {
        /*
     重新分配给定的缓冲区
   1.  如果入参的 AVBufferRef 为空,直接调用 av_realloc 分配一个新的缓存区,并调用 av_buffer_create 返回一个新的 AVBufferRef 结构;
   2.  如果入参的缓存区长度和入参 size 相等,直接返回 0;
   3.  如果对应的 AVBuffer 设置了 BUFFER_FLAG_REALLOCATABLE 标志,或者不可写,再或者 AVBufferRef data 字段指向的数据地址和 AVBuffer 的 data 地址不同,递归调用 av_buffer_realloc 分配一个新
的 buffer,并将 data 拷贝过去;
   4.  不满足上面的条件,直接调用 av_realloc 重新分配缓存区。
 */
        int ret = av_buffer_realloc(&pkt->buf, video_data_packet->video_frame_size + 70);
        if (ret < 0)
        {
            return NULL;
        }
        pkt->size = video_data_packet->video_frame_size;                                        // rv1126的视频长度赋值到AVPacket Size
        memcpy(pkt->buf->data, video_data_packet->buffer, video_data_packet->video_frame_size); // rv1126的视频数据赋值到AVPacket data
        pkt->data = pkt->buf->data;                                                             // 把pkt->buf->data赋值到pkt->data
        pkt->flags |= AV_PKT_FLAG_KEY;                                                          // 默认flags是AV_PKT_FLAG_KEY
        if (video_data_packet != NULL)
        {
            free(video_data_packet);
            video_data_packet = NULL;
        }

        return pkt;
    }
    else
    {
        return NULL;
    }
}

2.2.2write_ffmpeg_avpacket向复合流写入视频数据

int write_ffmpeg_avpacket(AVFormatContext *fmt_ctx, const AVRational *time_base, AVStream *st, AVPacket pkt)
{
/
将输出数据包时间戳值从编解码器重新调整为流时基 */
av_packet_rescale_ts(pkt, *time_base, st->time_base);
pkt->stream_index = st->index;

return av_interleaved_write_frame(fmt_ctx, pkt);

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355

推荐阅读更多精彩内容