一. 多线程队列的框图:
上图是同一个典型的多线程入队,出队的过程。这里需要创建两个线程,一个是入队线程、一个是出队线程。入队线程主要是通过push的api向Queue的队尾插入数据,插入数据的同时通过pthread_cond_broadcast通知出队线程取出数据。此时出队线程正在等待入队线程的唤醒(pthread_cond_wait),若收到唤醒通知则让队列数据出队。
二·.Linux多线程的基本API:
2.1. pthread_mutex_lock****:
int pthread_mutex_lock(pthread_mutex_t *mutex);
第一个传入参数:pthread_mutex_t结构体指针
功能:这个是互斥锁加锁功能,就是每次线程调用的时候都会把锁加上,使其保证访问数据的原子性,直到解锁为止。
2.2. pthread_mutex_unlock:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
第一个传入参数:pthread_mutex_t结构体指针
功能:这个是互斥锁解锁功能,就是每次线程访问完资源的时候都会把锁解锁。
2.3. pthread_cond_broadcast****:
*int pthread_cond_broadcast(pthread_cond_t cond)
传入参数:pthread_cond_t的结构体指针
功能:唤醒所有正在pthread_cond_wait(线程等待)的线程
2.4. pthread_cond_wait:
int pthread_cond_wait (pthread_cond_t *__restrict __cond , pthread_mutex_t *__restrict __mutex)
第一个参数:pthread_cond_t的结构体指针
第二个参数:pthread_mutex_t结构体指针
功能:线程等待并挂起,若被唤醒了,则直接跳出挂起状态。
三. 推流项目中视频队列的实现:
这张图是视频队列实现的过程,VIDEO_QUEUE是一个类。这个类里面,封装了添加视频队列(putVideoPacketQueue)、获取视频队列数据(getVideoPacketQueue)、获取视频队列长度(getVideoQueueSize)。
3.1. VIDEO_QUEUE****构造器:
这里创建一个VIDEO_QUEUE的C++的构造器,C++构造器主要初始化了线程的量。包括:线程锁的初始化(pthread_mutex_init)、线程条件变量的初始化(pthread_cond_init)。
3.2. putVideoPacketQueue****的讲解:
putVideoPacketQueue主要是video_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后进行入队操作video_packet_queue.push(video_packet),入队完成之后再通知出队线程取出队列数据pthread_cond_broadcast,最后解锁pthread_mutex_unlock。
3.3. getVideoPacketQueue****的讲解:
getVideoPacketQueue主要是video_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后判断视频队列是否有数据(video_packet_queue.size()==0)。若没有数据,则用pthread_cond_wait去等待线程被唤醒。若队列有数据则唤醒的此线程,则直接从队列取出数据。这里取出数据分两步:第一步,先把队列移动到最前面video_packet_queue.front()。第二步,video_packet_queue.pop出队并删除数据。
3.4. getVideoQueueSize****的讲解:
getVideoQueueSize主要是获取当前队列的长度,获取长度的步骤跟上面也差不多。
首先,pthread_mutex_lock加锁,然后通过count = video_packet_queue.size(),获取队列的数量。然后pthread_mutex_unlock解锁。
四.推流项目中音频队列的实现:
这张图是音频队列实现的过程,AUDIO_QUEUE是一个类。这个类里面,封装了添加音频队列(putAudioPacketQueue)、获取视频队列数据(getAudioPacketQueue)、获取音频队列长度(getAudioQueueSize)。
4.1. AUDIO_QUEUE****构造器:
这里创建一个AUDIO_QUEUE的C++的构造器,C++构造器主要初始化了线程的量。包括:线程锁的初始化(pthread_mutex_init)、线程条件变量的初始化(pthread_cond_init)。
4.2. putAudioPacketQueue****讲解:
putAudioPacketQueue主要是audio_data_packet_t入队的过程,入队前需要加锁pthread_mutex_lock。然后进行入队操作audio_packet_queue.push(audio_packet),入队完成之后再通知出队线程取出队列数据pthread_cond_broadcast,最后解锁pthread_mutex_unlock。
4.3 getAudioPacketQueue****的讲解:
getAudioPacketQueue主要是audio_data_packet入队的过程,入队前需要加锁pthread_mutex_lock,
然后判断视频队列是否有数据(audio_packet_queue.size()==0),若音频队列为空则用pthread_cond_wait去等待线程被唤醒。若唤醒的此线程,则直接从队列取出数据。这里取出数据分两步:第一步,先把队列移动到最前面audio_packet_queue.front()。第二步,audio_packet_queue.pop出队并删除数据。
4.4. getAudioQueueSize****的讲解:
getAudioQueueSize主要是获取当前队列的长度,获取长度的步骤跟上面也差不多。首先,pthread_mutex_lock加锁,然后通过count = audio_packet_queue.size(),获取队列的数量。然后pthread_mutex_unlock解锁。
new与extern
在main函数中new的对象可供所有cpp文件使用,extern表示借用全局变量的声明
new
在C++中,new是一个运算符,用于动态分配内存。它的一般形式如下:
pointer = new type;
pointer = new type[numberOfElements];
第一个表达式将分配一个类型为type的对象并返回指向该对象的指针;第二个表达式将分配一个大小为numberOfElements的type数组,并返回指向该数组的第一个元素的指针。
例如,以下代码将分配一个整数并将其值设置为42:
int *p = new int;
*p = 42;
以下代码将分配一个大小为10的整数数组:
int *a = new int[10];
在使用new运算符分配内存时,需要注意以下几点:
new运算符会抛出bad_alloc异常,如果无法分配所需的内存,则会抛出该异常。
在使用完new运算符分配的内存后,必须使用delete运算符释放该内存。这可以通过以下方式实现:
delete pointer;
delete[] pointer;
第一个表达式释放由单个对象使用new运算符分配的内存;第二个表达式释放由数组使用new[]运算符分配的内存。忘记释放分配的内存可能会导致内存泄漏。
在使用new运算符分配内存时,应该始终检查返回的指针是否为NULL。如果返回的指针为NULL,则说明内存分配失败。
可以使用placement new运算符来将对象放置到预先分配的内存中。这可以通过以下方式实现:
void *memory = operator new(sizeof(MyClass));
MyClass *p = new (memory) MyClass;
在这个例子中,operator new运算符分配了一个大小为sizeof(MyClass)的内存块,并返回指向该块的指针。然后,placement new运算符在此内存中构造了一个名为MyClass的对象。使用placement new时,需要确保内存块的大小至少等于所创建对象的大小。
总之,new运算符是C++中动态分配内存的一种常用方式。它提供了一种方便的方法来创建对象或数组,并且可以与其他C++特性如类、异常和模板结合使用。
但是需要注意,在使用完动态分配的内存后,必须使用delete运算符释放该内存,否则可能会导致内存泄漏。
extern
在C++中,如果需要定义一个全局变量或函数,可以将其声明为extern。这意味着变量或函数的定义将在另一个源文件中进行,并且编译器将在链接时查找该定义。例如:
// File1.cpp
int myGlobalInt = 42;
// File2.cpp
extern int myGlobalInt;
void myFunction() {
// 使用myGlobalInt
}
在上面的例子中,myFunction()函数声明了一个外部整数变量myGlobalInt,表示它已经在另一个源文件中定义。因此,在链接时,编译器将在程序中查找myGlobalInt的实际定义。
另一个情况是在C++类的实现中,需要在多个源文件中共享静态数据成员。与上述情况相似,可以将静态成员声明为extern,以便在其他源文件中访问。例如:
// MyClass.h
class MyClass {
public:
static int myStaticInt;
};
// MyClass.cpp
#include "MyClass.h"
int MyClass::myStaticInt = 42;
// AnotherFile.cpp
#include "MyClass.h"
extern int MyClass::myStaticInt;
void myFunction() {
// 使用myStaticInt
}
在上面的例子中,MyClass类声明了一个静态整数变量myStaticInt。MyClass.cpp文件定义了这个静态成员,并将其初始化为42。在AnotherFile.cpp中,使用extern关键字声明该静态成员,表示它已经在另一个源文件中定义。然后可以在myFunction()函数中访问和修改myStaticInt。
例图:
队列
消息队列(queue)是一种先进先出(FIFO)的数据结构,它支持在队尾添加新元素和在队头移除并返回队头元素。C++ STL提供了queue类,其中包含front和pop方法可以用来获取队头元素和移除队头元素。
具体来说:
front() 返回队头元素,但不会将其从队列中移除。
pop() 将队头元素从队列中移除,但不返回该元素的值。
下面给出一个示例代码,展示如何使用queue::front()和queue::pop()方法:
#include <iostream>
#include <queue>
int main () {
std::queue<int> myqueue;
for (int i=1;i<=5;i++) myqueue.push(i);
std::cout << "myqueue.front() is now " << myqueue.front() << '\n';
std::cout << "Popping out elements...";
while (!myqueue.empty())
{
std::cout << ' ' << myqueue.front();
myqueue.pop();
}
std::cout << '\n';
return 0;
}
在以上代码中,我们首先创建一个空队列 myqueue,然后用 push() 方法向队列尾部添加元素,接着用 front() 方法返回队头元素并输出它的值,然后用 pop() 方法将队头元素移除并输出队列中所有元素的值。
输出结果如下:
myqueue.front() is now 1
Popping out elements... 1 2 3 4 5
可以看出,front() 方法返回队头元素1,而 pop() 方法将所有元素从队列中移除并输出在屏幕上。需要注意的是,在队列中没有元素的情况下调用front()方法会导致未定义行为,因此使用前必须要检查队列是否为空。
项目中相关的使用(以vi为例,ai类似):
1.在main函数中的init_rv1126_first_assignment(protocol_type, network_address);里的camera_venc_thread线程中将视频数据入到压缩队列
void *camera_venc_thread(void *args)
{
pthread_detach(pthread_self());//自我设置本线程为分离属性设置线程分离属性
MEDIA_BUFFER mb = NULL;
VENC_PROC_PARAM venc_arg = *(VENC_PROC_PARAM *)args;
free(args);
printf("video_venc_thread...\n");
while (1)
{
// 从指定通道中获取VENC数据
mb = RK_MPI_SYS_GetMediaBuffer(RK_ID_VENC, venc_arg.vencId, -1);//-1:设置为不阻塞
if (!mb)
{
printf("get venc media buffer error\n");
break;
}
// int naluType = RK_MPI_MB_GetFlag(mb);
// 分配video_data_packet_t结构体
video_data_packet_t *video_data_packet = (video_data_packet_t *)malloc(sizeof(video_data_packet_t));
// 把VENC视频缓冲区数据传输到video_data_packet的buffer中
memcpy(video_data_packet->buffer, RK_MPI_MB_GetPtr(mb), RK_MPI_MB_GetSize(mb));
// 把VENC的长度赋值给video_data_packet的video_frame_size中
video_data_packet->video_frame_size = RK_MPI_MB_GetSize(mb);
// video_data_packet->frame_flag = naluType;
// 入到视频压缩队列
video_queue->putVideoPacketQueue(video_data_packet);
// printf("#naluType = %d \n", naluType);
// 释放VENC资源
RK_MPI_MB_ReleaseBuffer(mb);
}
2.在main函数中的init_rv1126_first_assignment(protocol_type, network_address);里的push_server_thread线程中的deal_video_avpacket将视频数据从压缩队列中取出以及写入到复合流中 **
2.1push_server_thread的实现
void *push_server_thread(void *args)
{
pthread_detach(pthread_self());
RKMEDIA_FFMPEG_CONFIG ffmpeg_config = *(RKMEDIA_FFMPEG_CONFIG *)args;
free(args);
AVOutputFormat *fmt = NULL;
int ret;
while (1)
{
/*
我们以转换到同一时基下的时间戳为例,假设上一时刻音、视频帧的保存时间戳都是0。
当前任意保存一种视频帧,例如保存视频的时间戳为video_t1。接着比较时间戳,发现音频时间戳为0 < video_t1,保存一帧音频,时间戳为audio_t1。
继续比较时间戳,发现audio_t1 < video_t1,选择保存一帧音频,时间戳为audio_t2。
再一次比较时间戳video_t1 < audio_t2,选择保存一帧视频,时间戳为video_t2。
int av_compare_ts(int64_t ts_a, AVRational_tb_b,int64_t ts_b, AVRational tb_b)
{
int64_t a = tb_a.num * (int64_t)tb_b.den;
int64_t b = tb_b.num * (int64_t)tb_a.den;
if ((FFABS64U(ts_a)|a|FFABS64U(ts_b)|b) <= INT_MAX)
return (ts_a*a > ts_b*b) - (ts_a*a < ts_b*b);
if (av_rescale_rnd(ts_a, a, b, AV_ROUND_DOWN) < ts_b)
return -1;
if (av_rescale_rnd(ts_b, b, a, AV_ROUND_DOWN) < ts_a)
return -1;
return 0;
}
*/
ret = av_compare_ts(ffmpeg_config.video_stream.next_timestamp,
ffmpeg_config.video_stream.enc->time_base,
ffmpeg_config.audio_stream.next_timestamp,
ffmpeg_config.audio_stream.enc->time_base);
if (ret <= 0)
{
ret = deal_video_avpacket(ffmpeg_config.oc, &ffmpeg_config.video_stream); // 处理FFMPEG视频数据
if (ret == -1)
{
printf("deal_video_avpacket error\n");
break;
}
}
else
{
ret = deal_audio_avpacket(ffmpeg_config.oc, &ffmpeg_config.audio_stream); // 处理FFMPEG音频数据
if (ret == -1)
{
printf("deal_video_avpacket error\n");
break;
}
}
}
av_write_trailer(ffmpeg_config.oc); // 写入AVFormatContext的尾巴
free_stream(ffmpeg_config.oc, &ffmpeg_config.video_stream); // 释放VIDEO_STREAM的资源
free_stream(ffmpeg_config.oc, &ffmpeg_config.audio_stream); // 释放AUDIO_STREAM的资源
avio_closep(&ffmpeg_config.oc->pb); // 释放AVIO资源
avformat_free_context(ffmpeg_config.oc); // 释放AVFormatContext资源
return NULL;
}
2.2deal_video_avpacket的实现
int deal_video_avpacket(AVFormatContext *oc, OutputStream *ost)
{
int ret;
AVCodecContext *c = ost->enc;
AVPacket *video_packet = get_ffmpeg_video_avpacket(ost->packet); // 从RV1126视频编码数据赋值到FFMPEG的Video AVPacket中
if (video_packet != NULL)
{
video_packet->pts = ost->next_timestamp++; // VIDEO_PTS按照帧率进行累加
}
ret = write_ffmpeg_avpacket(oc, &c->time_base, ost->stream, video_packet); // 向复合流写入视频数据
if (ret != 0)
{
printf("write video avpacket error");
return -1;
}
return 0;
}
2.2.1get_ffmpeg_video_avpacket从队列中取出视频数据并将RV1126视频编码数据赋值到FFMPEG的Video AVPacket中
extern VIDEO_QUEUE *video_queue;
extern AUDIO_QUEUE *audio_queue;
// 从RV1126视频编码数据赋值到FFMPEG的Video AVPacket中
AVPacket *get_ffmpeg_video_avpacket(AVPacket *pkt)
{
video_data_packet_t *video_data_packet = video_queue->getVideoPacketQueue(); // 从视频队列获取数据
if (video_data_packet != NULL)
{
/*
重新分配给定的缓冲区
1. 如果入参的 AVBufferRef 为空,直接调用 av_realloc 分配一个新的缓存区,并调用 av_buffer_create 返回一个新的 AVBufferRef 结构;
2. 如果入参的缓存区长度和入参 size 相等,直接返回 0;
3. 如果对应的 AVBuffer 设置了 BUFFER_FLAG_REALLOCATABLE 标志,或者不可写,再或者 AVBufferRef data 字段指向的数据地址和 AVBuffer 的 data 地址不同,递归调用 av_buffer_realloc 分配一个新
的 buffer,并将 data 拷贝过去;
4. 不满足上面的条件,直接调用 av_realloc 重新分配缓存区。
*/
int ret = av_buffer_realloc(&pkt->buf, video_data_packet->video_frame_size + 70);
if (ret < 0)
{
return NULL;
}
pkt->size = video_data_packet->video_frame_size; // rv1126的视频长度赋值到AVPacket Size
memcpy(pkt->buf->data, video_data_packet->buffer, video_data_packet->video_frame_size); // rv1126的视频数据赋值到AVPacket data
pkt->data = pkt->buf->data; // 把pkt->buf->data赋值到pkt->data
pkt->flags |= AV_PKT_FLAG_KEY; // 默认flags是AV_PKT_FLAG_KEY
if (video_data_packet != NULL)
{
free(video_data_packet);
video_data_packet = NULL;
}
return pkt;
}
else
{
return NULL;
}
}
2.2.2write_ffmpeg_avpacket向复合流写入视频数据
int write_ffmpeg_avpacket(AVFormatContext *fmt_ctx, const AVRational *time_base, AVStream *st, AVPacket pkt)
{
/将输出数据包时间戳值从编解码器重新调整为流时基 */
av_packet_rescale_ts(pkt, *time_base, st->time_base);
pkt->stream_index = st->index;
return av_interleaved_write_frame(fmt_ctx, pkt);
}