@(嵌入式)
FreeRtos
简述
FreeRTOS 信号量和互斥锁是基于队列实现的, 队列介绍见 << FreeRTOS 消息队列 >>。 使用信号量需要在源文件中包含头文件 semphr.h , 该文件定义了信号量的 API, 实际我们使用的信号量 API 都是宏定义, 宏的实际是队列提供的函数。
FreeRTOS 信号量包括二进制信号量、计数信号量、互斥锁和递归互斥锁。 这篇文章介绍如何使用这些信号量就行任务间同步以及其实现。
分析的源码版本是 v9.0.0
二进制信号量
二进制信号量可以用于互斥和同步, 多用于同步。 可以把二进制信号量看成一个深度为1的队列(实际FreeRTOS也是这么实现的), 调用信号量获取函数, 设置阻塞时间, 在该时间内没有收到信号, 任务会被挂起, 当收到信号或者超时, 任务恢复,函数返回。
多个任务同时阻塞在一个信号量, 当信号量有效时, 最高优先级的任务最先解除阻塞。
二进制信号量使用
举个使用场景, 一个任务读取一个外设,一直等待外设可读占用CPU效率低, 可以调用信号量获取函数阻塞等待, 当外设可读,在其中断函数中发送信号量,唤醒任务执行读取操作。
(中断中必须使用带有 FromISR
结尾的 API)
// 信号量句柄
SemaphoreHandle_t xSemaphore;
void vATask( void * pvParameters )
{
// 创建二进制信号量
xSemaphore = xSemaphoreCreateBinary();
if( xSemaphore == NULL )
{
//heap 空间不够 ,创建失败
}
else
{
// 信号量获取 设置阻塞时间 10 ticks
if( xSemaphoreTake( xSemaphore, ( TickType_t ) 10 ) == pdTRUE )
{
// 获取到信号量 !
//...
// 如果任务中发送信号量
//xSemaphoreGive( xSemaphore );
}
else
{
// 等待 10 tick 无法获取信号量
// 超时返回
}
}
}
// 中断
void vTimerISR( void * pvParameters )
{
static signed BaseType_t xHigherPriorityTaskWoken;
xHigherPriorityTaskWoken = pdFALSE;
// 发送信号量
// 传递参数判断是否有高优先级任务就绪
xSemaphoreGiveFromISR( xSemaphore, &xHigherPriorityTaskWoken );
// 判断是否需要触发任务切换
portYIELD_FROM_ISR( xHigherPriorityTaskWoken );
}
如果把信号量作为互斥锁使用, 则任务在获取信号后,处理完毕需要返回。
FreeRTOS 在 8.02 版本提供了一种更加轻量级的任务同步, 任务通知, 由于该方式是集合在任务控制块的,所以不需要额外的内存消耗,推荐使用。
二进制信号量实现
以下看看 FreeRTOS 如何基于队列实现信号量的。
创建信号量
在信号量定义头文件可以找到该宏的定义, 可以发现, 创建一个信号量,实际上是创建了一个队列, 队列深度设置为1个, 同时, semSEMAPHORE_QUEUE_ITEM_LENGTH
这个宏的值为0, 即 item size 为0, 表示信号量这个队列没有队列项存储空间, 因为对于信号量,没有这个需要,
对于二进制信号量, 创建后默认初始化其 uxMessageWaiting
为0, 当有信号发出时, 该值变为1(最大也只能为1),此时信号量有效, 如果有任务获取消费了信号量,该变量再次变为0, 信号量无效, 有任务在次调用获取信号量,可能阻塞等待或者返回信号量空。
#define xSemaphoreCreateBinary() \
xQueueGenericCreate( ( UBaseType_t ) 1, \
semSEMAPHORE_QUEUE_ITEM_LENGTH, \
queueQUEUE_TYPE_BINARY_SEMAPHORE )
获取信号量
任务调用接口获取信号量, 可以通过如下宏实现 :
#define xSemaphoreTake( xSemaphore, xBlockTime ) \
xQueueGenericReceive( ( QueueHandle_t ) ( xSemaphore ), \
NULL,( xBlockTime ), pdFALSE )
这个函数是供任务调用的, 可以看到该函数实际上调用的是队列的接收函数, 由于没有数据可读, 传递的指针为 NULL。
函数调用设置阻塞时间,如果调用函数时信号量无效, 则会阻塞任务等待。
如果是在中断中, 则必须调用如下宏
#define xSemaphoreTakeFromISR( xSemaphore, pxHigherPriorityTaskWoken )\
xQueueReceiveFromISR( ( QueueHandle_t ) ( xSemaphore ), \
NULL, ( pxHigherPriorityTaskWoken ) )
函数 xSemaphoreTakeFromISR
是供中断调用的,做了中断优先级处理,并且不会阻塞。
释放信号量
释放信号量的地方可能是中断,或者是任务中, 对应调用不同接口。
中断中释放
如果在中断中调用发送信号量, 需要调用的 API 是 xSemaphoreGiveFromISR
, 查看该宏定义如下 :
#define xSemaphoreGiveFromISR( xSemaphore, pxHigherPriorityTaskWoken ) \
xQueueGiveFromISR( ( QueueHandle_t ) ( xSemaphore ),\
( pxHigherPriorityTaskWoken ) )
该宏实际调用的函数 xQueueGiveFromISR
定义于 queue.c
中,
在<< FreeRTOS 消息队列 >> 介绍过, 队列在中断中调用的发送函数却是 xQueueGenericSendFromISR
。
对比了一下两个函数的差别, 发现 xQueueGiveFromISR
相比队列默认用的, 差别是没有调用了内存拷贝的函数,因为对于信号量而言, 发送的消息队列不关心其内容,在前面在创建信号量也提过, 对应创建的队列是没有队列项存储空间的, 其 item size 是0。 而主要操作的是变量 uxMessagesWaiting
的值。
简化以下该函数 ,如下
BaseType_t xQueueGiveFromISR( QueueHandle_t xQueue,
BaseType_t * const pxHigherPriorityTaskWoken )
{
BaseType_t xReturn;
UBaseType_t uxSavedInterruptStatus;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
// 对应 item size == 0 的队列
configASSERT( pxQueue->uxItemSize == 0 );
// 互斥锁不能在中断中使用
configASSERT( !( ( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) && ( pxQueue->pxMutexHolder != NULL ) ) );
// 设置中断优先级
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
// 获取当前队列可读消息数
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
// 如果队列仍有空间
if( uxMessagesWaiting < pxQueue->uxLength )
{
const int8_t cTxLock = pxQueue->cTxLock;
// 互斥锁不能在中断中使用
// 不需要考虑互斥锁类型信号量
// 增加未读消息数量
pxQueue->uxMessagesWaiting = uxMessagesWaiting + 1;
// 队列没有被锁定 唤醒阻塞等待的最高优先级任务
if( cTxLock == queueUNLOCKED )
{
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
if( pxHigherPriorityTaskWoken != NULL )
{
// 中断中调用的这个函数名,高优先级任务就绪
// 设置参数,表示需要切换任务
*pxHigherPriorityTaskWoken = pdTRUE;
}
}
}
}
else
{
// 如果队列被锁定, 不能在此修改事件链表
// 增加计数, 解锁的时候处理
pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
}
xReturn = pdPASS;
}
else
{
// 队列满 直接返回
xReturn = errQUEUE_FULL;
}
}
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
return xReturn;
}
任务中释放
在任务中释放信号量调用的 API 是
#define xSemaphoreGive( xSemaphore ) \
xQueueGenericSend( ( QueueHandle_t ) ( xSemaphore ),\
NULL, \
semGIVE_BLOCK_TIME, \
queueSEND_TO_BACK )
可以看到实际调用的函数是 xQueueGenericSend
, 这个函数在队列一章做了比较详细地介绍过,此处不贴源码。
对于信号量, 由于没有消息内容, 所以传递的指针设置为 NULL, 不会执行任务拷贝函数,在函数中判断队列是否满, 如果没有满, 就增加未读消息数的变量, 查看是否有任务等待信号而被阻塞,恢复最高优先级的任务。 如果任务满, 按照设定的阻塞时间阻塞挂起任务等待。
计数信号量
二进制信号量是长度为1的队列, 计数信号量则是长度可以大于1的信号量, 当设置长度为1, 其行为和二进制型号量一样。
当任务调用 API 释放信号量, 信号量未读计数加1, 任务调用接收函数处理信号量, 则对应减1,初始化信号量计数为0。
所以, 使用上, 计数信号量和二进制信号量是差不多。
查看计数信号量创建宏 :
#define xSemaphoreCreateCounting( uxMaxCount, uxInitialCount ) \
xQueueCreateCountingSemaphore( ( uxMaxCount ), ( uxInitialCount ) )
相比二进制信号量, 计数信号量创建时需要设置两个参数,一个是最大的计数值, 另一个初始化计数值。
实际函数是在队列中实现, 对应查看队列中该函数是如何实现的, 看到其代码如下 :
QueueHandle_t xQueueCreateCountingSemaphore( const UBaseType_t uxMaxCount, const UBaseType_t uxInitialCount )
{
QueueHandle_t xHandle;
configASSERT( uxMaxCount != 0 );
configASSERT( uxInitialCount <= uxMaxCount );
// 申请队列, 深度由第一个参数决定
// item size 也是为 0
xHandle = xQueueGenericCreate( uxMaxCount,
queueSEMAPHORE_QUEUE_ITEM_LENGTH,
queueQUEUE_TYPE_COUNTING_SEMAPHORE );
if( xHandle != NULL )
{
// 初始化信号量计数值
( ( Queue_t * ) xHandle )->uxMessagesWaiting = uxInitialCount;
}
return xHandle;
}
计数型号量其他地方使用同二进制一样,不继续讨论。
互斥锁
当一个任务访问一个资源时, 需要获取令牌, 在其使用期间,其他任务不能使用该资源, 使用完后, 释放令牌, 其他任务可以访问, 保证资源在一段时间只能由一个任务读取修改。
与二进制信号量最大的不同在于, 互斥信号量带有优先级继承的机制,这个机制用于减低优先级反转的影响。
举个例子, 三个任务优先级从大到小依次 A > B > C, 某种情况下, 任务C 获取了互斥锁, 之后任务A 请求拿锁被挂起, 任务C 继续运行, 如果没有优先级继承, 任务B 就绪,由于优先级高于当前的任务C, 所以开始运行, 这样导致任务C 无法及时放锁,进而导致任务A 无法运行, 但是任务A 的优先级比B 高, 这就是优先级反转。
如果加入优先级继承, 任务C 拿锁, 任务A 请求拿锁被挂起时, 由于C < A, 通过继承机制, 提高C 的优先级,使其等于 A, 这样, 以上任务B 就无法抢占C, 任务C 结束释放锁让后恢复其本来优先级, 任务A 开始运行。
创建互斥信号量
使用互测锁前需要创建互斥锁, 需要调用 API 的定义 :
#define xSemaphoreCreateMutex() xQueueCreateMutex( queueQUEUE_TYPE_MUTEX )
查找对应的实现函数 xQueueCreateMutex
源码 , 函数调用了队列创建函数创建了一个深度为1, item size 为 0 的队列, 到这里看起来和二进制信号量一样。
队列创建后,调用了专门初始化互斥信号量的函数初始化。
QueueHandle_t xQueueCreateMutex( const uint8_t ucQueueType )
{
Queue_t *pxNewQueue;
const UBaseType_t uxMutexLength = ( UBaseType_t ) 1, uxMutexSize = ( UBaseType_t ) 0;
pxNewQueue = ( Queue_t * ) xQueueGenericCreate( uxMutexLength, uxMutexSize, ucQueueType );
prvInitialiseMutex( pxNewQueue );
return pxNewQueue;
}
初始化互斥信号量,
static void prvInitialiseMutex( Queue_t *pxNewQueue )
{
if( pxNewQueue != NULL )
{
// 初始化互斥信号量的参数
// 互斥信号量当前有效
pxNewQueue->pxMutexHolder = NULL;
// 标记类型
pxNewQueue->uxQueueType = queueQUEUE_IS_MUTEX;
// 递归信号量用
pxNewQueue->u.uxRecursiveCallCount = 0;
// 发送一个消息到队列
// 这样,第一个拿信号量的任务不会被挂起
( void ) xQueueGenericSend( pxNewQueue, NULL, ( TickType_t ) 0U, queueSEND_TO_BACK );
}
}
拿锁
任务拿锁可以通过调用 API 定义如下
#define xSemaphoreTake( xSemaphore, xBlockTime ) \
xQueueGenericReceive( ( QueueHandle_t ) ( xSemaphore ), NULL, ( xBlockTime ), pdFALSE )
如果任务调用此函数时互斥锁有效,则拿到锁后返回。
这个函数在队列文章中介绍过,该函数中, 对于互斥锁有一些特殊处理,主要是实现了优先级继承机制。
在一个任务拿锁后, 其他任务尝试拿锁失败,如果设置了阻塞时间,则该任务会被阻塞,在进入阻塞前, 函数会判断当前任务的优先级是否高于拥有锁任务的优先级,如果高于, 则会先提高拥有锁任务的优先级。
实现的代码如下
点击源码
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
taskENTER_CRITICAL();
{
// 判断是否需要修改拿锁任务优先级
vTaskPriorityInherit( ( void * ) pxQueue->pxMutexHolder );
}
taskEXIT_CRITICAL();
}
}
#endif
查看 vTaskPriorityInherit
函数的实现, 该函数比较当前任务(拿锁失败,阻塞等待)和拿锁任务优先级, 进行优先级继承处理。
void vTaskPriorityInherit( TaskHandle_t const pxMutexHolder )
{
TCB_t * const pxTCB = ( TCB_t * ) pxMutexHolder;
// 判断是否有任务拿着锁
if( pxMutexHolder != NULL )
{
// 判断当前任务的优先级是否比拿锁任务高
if( pxTCB->uxPriority < pxCurrentTCB->uxPriority )
{
// 修改拿锁任务在事件链表项的优先级值
if( ( listGET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ) ) &
taskEVENT_LIST_ITEM_VALUE_IN_USE ) == 0UL )
{
listSET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ),
( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) pxCurrentTCB->uxPriority );
}
//如果拿锁任务在就绪链表, 移动拿锁任务到新优先级任务链表
if( listIS_CONTAINED_WITHIN( &( pxReadyTasksLists[ pxTCB->uxPriority ] ),
&( pxTCB->xStateListItem ) ) != pdFALSE )
{
// 先从旧就绪链表移除
if( uxListRemove( &( pxTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
taskRESET_READY_PRIORITY( pxTCB->uxPriority );
}
// 修改优先级, 优先级继承, 并加入到新的优先级就绪链表
pxTCB->uxPriority = pxCurrentTCB->uxPriority;
prvAddTaskToReadyList( pxTCB );
}
else
{
// 如果拿锁任务没有在就绪链表, 直接修改优先级值即可
pxTCB->uxPriority = pxCurrentTCB->uxPriority;
}
}
}
}
互斥锁不能在中断使用, 因为中断函数没有优先级继承,同时, 中断函数不能阻塞。
放锁
任务使用资源后, 需要释放互斥锁,这样其他任务才能正常拿锁使用资源。
释放锁的接口定义如下 :
#define xSemaphoreGive( xSemaphore ) \
xQueueGenericSend( ( QueueHandle_t ) ( xSemaphore ), NULL, semGIVE_BLOCK_TIME, queueSEND_TO_BACK )
宏实际调用的函数实际也解析过了, 上面提到拿锁的时候, 任务堵塞会进行优先级继承处理, 因此,当一个任务获取了互斥锁, 在其使用期间, 其本身优先级可能已经被提高过了, 当其释放锁的时候, 需要恢复到原来的优先级。还锁的操作是向队列发送返回一个消息,在拷贝消息内容的函数,判断队列是互斥锁时, 会调用优先级继承解除函数, 恢复任务的优先级。
优先级恢复函数如下,
BaseType_t xTaskPriorityDisinherit( TaskHandle_t const pxMutexHolder )
{
TCB_t * const pxTCB = ( TCB_t * ) pxMutexHolder;
BaseType_t xReturn = pdFALSE;
if( pxMutexHolder != NULL )
{
// 当前任务是拿锁任务
// 这个队列是互斥锁类型
configASSERT( pxTCB == pxCurrentTCB );
configASSERT( pxTCB->uxMutexesHeld );
( pxTCB->uxMutexesHeld )--;
// 拿锁任务的优先级被修改了
if( pxTCB->uxPriority != pxTCB->uxBasePriority )
{
// 锁彻底释放
if( pxTCB->uxMutexesHeld == ( UBaseType_t ) 0 )
{
// 从链表移除任务
if( uxListRemove( &( pxTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
taskRESET_READY_PRIORITY( pxTCB->uxPriority );
}
// 恢复优先级
pxTCB->uxPriority = pxTCB->uxBasePriority;
// 恢复任务TCB 其他和优先级相关的参数
listSET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ), ( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) pxTCB->uxPriority );
// 重新插入到就绪链表
prvAddTaskToReadyList( pxTCB );
// 发生优先级继承
// 说明有高优先级等锁, 所以提示需要任务切换
xReturn = pdTRUE;
}
}
}
return xReturn;
}
任务发生优先级继承, 本身优先级被修改提高,任务原始优先级会保存在 pxTCB->uxBasePriority
中, 恢复的时候使用。
优先级继承恢复后, 可知有更高优先级任务阻塞等待锁,所以需要返回提示任务切换, 对应队列发送函数中的特殊处理一段 , 根据内存拷贝函数 prvCopyDataToQueue
返回值判断是否需要触发任务切换, 因为任务拷贝函数调用了上面这个函数恢复优先级,需不需要触发任务切换的返回值就是由这个函数提供的。
递归互斥锁
获取递归互斥量的任务可以重复获取该递归互斥量。使用xSemaphoreTakeRecursive() 函数成功获取几次递归互斥量,对应的就要使用xSemaphoreGiveRecursive()函数返还几次,在此之前递归互斥量都处于无效状态, 其他任务无法获取, 必须等待获取的任务释放完毕。
递归互斥锁创建调用接口 :
#define xSemaphoreCreateRecursiveMutex() \
xQueueCreateMutex( queueQUEUE_TYPE_RECURSIVE_MUTEX )
实际调用函数同普通互斥锁一样。
获取递归信号量
递归信号量在同一个任务可以多次拿取, 其调用的接口不同其他信号量的 xSemaphoreTake
, 而是如下宏 :
#define xSemaphoreTakeRecursive( xMutex, xBlockTime ) \
xQueueTakeMutexRecursive( ( xMutex ), ( xBlockTime ) )
查看具体实现函数 xQueueTakeMutexRecursive
, 看看有什么不同。
BaseType_t xQueueTakeMutexRecursive( QueueHandle_t xMutex, TickType_t xTicksToWait )
{
BaseType_t xReturn;
Queue_t * const pxMutex = ( Queue_t * ) xMutex;
configASSERT( pxMutex );
if( pxMutex->pxMutexHolder == ( void * ) xTaskGetCurrentTaskHandle() )
{
// 如果尝试拿锁的任务是当前拿了锁的任务, 则递增拿锁次锁
( pxMutex->u.uxRecursiveCallCount )++;
xReturn = pdPASS;
}
else
{
// 其他任务尝试拿锁, 同使用普通互斥锁一样
xReturn = xQueueGenericReceive( pxMutex, NULL, xTicksToWait, pdFALSE );
// 阻塞等待
// 如果恢复后,在超时时间内拿到锁
// 递增计数, 第一次拿锁!!
if( xReturn != pdFAIL )
{
( pxMutex->u.uxRecursiveCallCount )++;
}
}
return xReturn;
}
相比普通互斥锁,递归互斥锁允许同一个任务多次拿锁, 所以其拿锁接口会判断拿锁任务是否是拥有锁的任务,如果是, 则递增拿锁次数, 其他任务处理则和普通互斥锁一样,阻塞等待。
释放递归信号量
响应的, 递归互斥锁释放调用的接口定义 :
#define xSemaphoreGiveRecursive( xMutex ) \
xQueueGiveMutexRecursive( ( xMutex ) )
#endif
查看实际实现的函数 :
BaseType_t xQueueGiveMutexRecursive( QueueHandle_t xMutex )
{
BaseType_t xReturn;
Queue_t * const pxMutex = ( Queue_t * ) xMutex;
configASSERT( pxMutex );
// 判断尝试放锁的任务是否拿着锁
if( pxMutex->pxMutexHolder == ( void * ) xTaskGetCurrentTaskHandle() )
{
// 递归互斥锁, 拿几个需要对应放几个
( pxMutex->u.uxRecursiveCallCount )--;
// 计数清零,说明可以真正放锁
if( pxMutex->u.uxRecursiveCallCount == ( UBaseType_t ) 0 )
{
// 返回锁
( void ) xQueueGenericSend( pxMutex, NULL, queueMUTEX_GIVE_BLOCK_TIME, queueSEND_TO_BACK );
}
xReturn = pdPASS;
}
else
{
// 当前任务不是持有锁的任务, 无法释放
xReturn = pdFAIL;
}
// 还不能真正放锁
return xReturn;
}
对应拿锁的处理, 以上的函数也就很好理解了。
FreeRTOS 信号量记录到此结束。