标签:切换 assert blog sts 另一个 ddc sse 超时 保存
@(嵌入式)
FreeRTOS 信号量和互斥锁是基于队列实现的, 队列介绍见 << FreeRTOS 消息队列 >>。 使用信号量需要在源文件中包含头文件 semphr.h , 该文件定义了信号量的 API, 实际我们使用的信号量 API 都是宏定义, 宏的实际是队列提供的函数。
FreeRTOS 信号量包括二进制信号量、计数信号量、互斥锁和递归互斥锁。 这篇文章介绍如何使用这些信号量就行任务间同步以及其实现。
分析的源码版本是 v9.0.0
二进制信号量可以用于互斥和同步, 多用于同步。 可以把二进制信号量看成一个深度为1的队列(实际FreeRTOS也是这么实现的), 调用信号量获取函数, 设置阻塞时间, 在该时间内没有收到信号, 任务会被挂起, 当收到信号或者超时, 任务恢复,函数返回。
多个任务同时阻塞在一个信号量, 当信号量有效时, 最高优先级的任务最先解除阻塞。
举个使用场景, 一个任务读取一个外设,一直等待外设可读占用CPU效率低, 可以调用信号量获取函数阻塞等待, 当外设可读,在其中断函数中发送信号量,唤醒任务执行读取操作。
(中断中必须使用带有 FromISR结尾的 API)

// 信号量句柄
SemaphoreHandle_t xSemaphore;
void vATask( void * pvParameters )
{
// 创建二进制信号量
xSemaphore = xSemaphoreCreateBinary();
if( xSemaphore == NULL )
{
//heap 空间不够 ,创建失败
}
else
{
// 信号量获取 设置阻塞时间 10 ticks
if( xSemaphoreTake( xSemaphore, ( TickType_t ) 10 ) == pdTRUE )
{
// 获取到信号量 !
//...
// 如果任务中发送信号量
//xSemaphoreGive( xSemaphore );
}
else
{
// 等待 10 tick 无法获取信号量
// 超时返回
}
}
}
// 中断
void vTimerISR( void * pvParameters )
{
static signed BaseType_t xHigherPriorityTaskWoken;
xHigherPriorityTaskWoken = pdFALSE;
// 发送信号量
// 传递参数判断是否有高优先级任务就绪
xSemaphoreGiveFromISR( xSemaphore, &xHigherPriorityTaskWoken );
// 判断是否需要触发任务切换
portYIELD_FROM_ISR( xHigherPriorityTaskWoken );
}
如果把信号量作为互斥锁使用, 则任务在获取信号后,处理完毕需要返回。
FreeRTOS 在 8.02 版本提供了一种更加轻量级的任务同步, 任务通知, 由于该方式是集合在任务控制块的,所以不需要额外的内存消耗,推荐使用。
以下看看 FreeRTOS 如何基于队列实现信号量的。
在信号量定义头文件可以找到该宏的定义, 可以发现, 创建一个信号量,实际上是创建了一个队列, 队列深度设置为1个, 同时, semSEMAPHORE_QUEUE_ITEM_LENGTH 这个宏的值为0, 即 item size 为0, 表示信号量这个队列没有队列项存储空间, 因为对于信号量,没有这个需要,
对于二进制信号量, 创建后默认初始化其 uxMessageWaiting 为0, 当有信号发出时, 该值变为1(最大也只能为1),此时信号量有效, 如果有任务获取消费了信号量,该变量再次变为0, 信号量无效, 有任务在次调用获取信号量,可能阻塞等待或者返回信号量空。
#define xSemaphoreCreateBinary() \
xQueueGenericCreate( ( UBaseType_t ) 1, semSEMAPHORE_QUEUE_ITEM_LENGTH, queueQUEUE_TYPE_BINARY_SEMAPHORE )
任务调用接口获取信号量, 可以通过如下宏实现 :
#define xSemaphoreTake( xSemaphore, xBlockTime ) \
xQueueGenericReceive( ( QueueHandle_t ) ( xSemaphore ), NULL,( xBlockTime ), pdFALSE )
这个函数是供任务调用的, 可以看到该函数实际上调用的是队列的接收函数, 由于没有数据可读, 传递的指针为 NULL。
函数调用设置阻塞时间,如果调用函数时信号量无效, 则会阻塞任务等待。
如果是在中断中, 则必须调用如下宏
#define xSemaphoreTakeFromISR( xSemaphore, pxHigherPriorityTaskWoken )\
xQueueReceiveFromISR( ( QueueHandle_t ) ( xSemaphore ), NULL, ( pxHigherPriorityTaskWoken ) )
函数 xSemaphoreTakeFromISR是供中断调用的,做了中断优先级处理,并且不会阻塞。
释放信号量的地方可能是中断,或者是任务中, 对应调用不同接口。
如果在中断中调用发送信号量, 需要调用的 API 是 xSemaphoreGiveFromISR, 查看该宏定义如下 :
#define xSemaphoreGiveFromISR( xSemaphore, pxHigherPriorityTaskWoken ) \
xQueueGiveFromISR( ( QueueHandle_t ) ( xSemaphore ), ( pxHigherPriorityTaskWoken ) )
该宏实际调用的函数 xQueueGiveFromISR定义于 queue.c中,
在<< FreeRTOS 消息队列 >> 介绍过, 队列在中断中调用的发送函数却是 xQueueGenericSendFromISR。
对比了一下两个函数的差别, 发现 xQueueGiveFromISR相比队列默认用的, 差别是没有调用了内存拷贝的函数,因为对于信号量而言, 发送的消息队列不关心其内容,在前面在创建信号量也提过, 对应创建的队列是没有队列项存储空间的, 其 item size 是0。 而主要操作的是变量 uxMessagesWaiting的值。
简化以下该函数 ,如下
BaseType_t xQueueGiveFromISR( QueueHandle_t xQueue,
BaseType_t * const pxHigherPriorityTaskWoken )
{
BaseType_t xReturn;
UBaseType_t uxSavedInterruptStatus;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
// 对应 item size == 0 的队列
configASSERT( pxQueue->uxItemSize == 0 );
// 互斥锁不能在中断中使用
configASSERT( !( ( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) && ( pxQueue->pxMutexHolder != NULL ) ) );
// 设置中断优先级
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
// 获取当前队列可读消息数
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
// 如果队列仍有空间
if( uxMessagesWaiting < pxQueue->uxLength )
{
const int8_t cTxLock = pxQueue->cTxLock;
// 互斥锁不能在中断中使用
// 不需要考虑互斥锁类型信号量
// 增加未读消息数量
pxQueue->uxMessagesWaiting = uxMessagesWaiting + 1;
// 队列没有被锁定 唤醒阻塞等待的最高优先级任务
if( cTxLock == queueUNLOCKED )
{
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
if( pxHigherPriorityTaskWoken != NULL )
{
// 中断中调用的这个函数名,高优先级任务就绪
// 设置参数,表示需要切换任务
*pxHigherPriorityTaskWoken = pdTRUE;
}
}
}
}
else
{
// 如果队列被锁定, 不能在此修改事件链表
// 增加计数, 解锁的时候处理
pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
}
xReturn = pdPASS;
}
else
{
// 队列满 直接返回
xReturn = errQUEUE_FULL;
}
}
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
return xReturn;
}
在任务中释放信号量调用的 API 是
#define xSemaphoreGive( xSemaphore ) \
xQueueGenericSend( ( QueueHandle_t ) ( xSemaphore ), NULL, semGIVE_BLOCK_TIME, queueSEND_TO_BACK )
可以看到实际调用的函数是 xQueueGenericSend, 这个函数在队列一章做了比较详细地介绍过,此处不贴源码。
对于信号量, 由于没有消息内容, 所以传递的指针设置为 NULL, 不会执行任务拷贝函数,在函数中判断队列是否满, 如果没有满, 就增加未读消息数的变量, 查看是否有任务等待信号而被阻塞,恢复最高优先级的任务。 如果任务满, 按照设定的阻塞时间阻塞挂起任务等待。
二进制信号量是长度为1的队列, 计数信号量则是长度可以大于1的信号量, 当设置长度为1, 其行为和二进制型号量一样。
当任务调用 API 释放信号量, 信号量未读计数加1, 任务调用接收函数处理信号量, 则对应减1,初始化信号量计数为0。
所以, 使用上, 计数信号量和二进制信号量是差不多。
查看计数信号量创建宏 :
#define xSemaphoreCreateCounting( uxMaxCount, uxInitialCount ) \
xQueueCreateCountingSemaphore( ( uxMaxCount ), ( uxInitialCount ) )
相比二进制信号量, 计数信号量创建时需要设置两个参数,一个是最大的计数值, 另一个初始化计数值。
实际函数是在队列中实现, 对应查看队列中该函数是如何实现的, 看到其代码如下 :
QueueHandle_t xQueueCreateCountingSemaphore( const UBaseType_t uxMaxCount, const UBaseType_t uxInitialCount )
{
QueueHandle_t xHandle;
configASSERT( uxMaxCount != 0 );
configASSERT( uxInitialCount <= uxMaxCount );
// 申请队列, 深度由第一个参数决定
// item size 也是为 0
xHandle = xQueueGenericCreate( uxMaxCount,
queueSEMAPHORE_QUEUE_ITEM_LENGTH,
queueQUEUE_TYPE_COUNTING_SEMAPHORE );
if( xHandle != NULL )
{
// 初始化信号量计数值
( ( Queue_t * ) xHandle )->uxMessagesWaiting = uxInitialCount;
}
return xHandle;
}
计数型号量其他地方使用同二进制一样,不继续讨论。
当一个任务访问一个资源时, 需要获取令牌, 在其使用期间,其他任务不能使用该资源, 使用完后, 释放令牌, 其他任务可以访问, 保证资源在一段时间只能由一个任务读取修改。

与二进制信号量最大的不同在于, 互斥信号量带有优先级继承的机制,这个机制用于减低优先级反转的影响。
举个例子, 三个任务优先级从大到小依次 A > B > C, 某种情况下, 任务C 获取了互斥锁, 之后任务A 请求拿锁被挂起, 任务C 继续运行, 如果没有优先级继承, 任务B 就绪,由于优先级高于当前的任务C, 所以开始运行, 这样导致任务C 无法及时放锁,进而导致任务A 无法运行, 但是任务A 的优先级比B 高, 这就是优先级反转。
如果加入优先级继承, 任务C 拿锁, 任务A 请求拿锁被挂起时, 由于C < A, 通过继承机制, 提高C 的优先级,使其等于 A, 这样, 以上任务B 就无法抢占C, 任务C 结束释放锁让后恢复其本来优先级, 任务A 开始运行。
使用互测锁前需要创建互斥锁, 需要调用 API 的定义 :
#define xSemaphoreCreateMutex() xQueueCreateMutex( queueQUEUE_TYPE_MUTEX )
查找对应的实现函数 xQueueCreateMutex源码 , 函数调用了队列创建函数创建了一个深度为1, item size 为 0 的队列, 到这里看起来和二进制信号量一样。
队列创建后,调用了专门初始化互斥信号量的函数初始化。
QueueHandle_t xQueueCreateMutex( const uint8_t ucQueueType )
{
Queue_t *pxNewQueue;
const UBaseType_t uxMutexLength = ( UBaseType_t ) 1, uxMutexSize = ( UBaseType_t ) 0;
pxNewQueue = ( Queue_t * ) xQueueGenericCreate( uxMutexLength, uxMutexSize, ucQueueType );
prvInitialiseMutex( pxNewQueue );
return pxNewQueue;
}
初始化互斥信号量,
static void prvInitialiseMutex( Queue_t *pxNewQueue )
{
if( pxNewQueue != NULL )
{
// 初始化互斥信号量的参数
// 互斥信号量当前有效
pxNewQueue->pxMutexHolder = NULL;
// 标记类型
pxNewQueue->uxQueueType = queueQUEUE_IS_MUTEX;
// 递归信号量用
pxNewQueue->u.uxRecursiveCallCount = 0;
// 发送一个消息到队列
// 这样,第一个拿信号量的任务不会被挂起
( void ) xQueueGenericSend( pxNewQueue, NULL, ( TickType_t ) 0U, queueSEND_TO_BACK );
}
}
任务拿锁可以通过调用 API 定义如下
#define xSemaphoreTake( xSemaphore, xBlockTime ) \
xQueueGenericReceive( ( QueueHandle_t ) ( xSemaphore ), NULL, ( xBlockTime ), pdFALSE )
如果任务调用此函数时互斥锁有效,则拿到锁后返回。
这个函数在队列文章中介绍过,该函数中, 对于互斥锁有一些特殊处理,主要是实现了优先级继承机制。
在一个任务拿锁后, 其他任务尝试拿锁失败,如果设置了阻塞时间,则该任务会被阻塞,在进入阻塞前, 函数会判断当前任务的优先级是否高于拥有锁任务的优先级,如果高于, 则会先提高拥有锁任务的优先级。
实现的代码如下
点击源码
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
taskENTER_CRITICAL();
{
// 判断是否需要修改拿锁任务优先级
vTaskPriorityInherit( ( void * ) pxQueue->pxMutexHolder );
}
taskEXIT_CRITICAL();
}
}
#endif
查看 vTaskPriorityInherit 函数的实现, 该函数比较当前任务(拿锁失败,阻塞等待)和拿锁任务优先级, 进行优先级继承处理。
void vTaskPriorityInherit( TaskHandle_t const pxMutexHolder )
{
TCB_t * const pxTCB = ( TCB_t * ) pxMutexHolder;
// 判断是否有任务拿着锁
if( pxMutexHolder != NULL )
{
// 判断当前任务的优先级是否比拿锁任务高
if( pxTCB->uxPriority < pxCurrentTCB->uxPriority )
{
// 修改拿锁任务在事件链表项的优先级值
if( ( listGET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ) ) &
taskEVENT_LIST_ITEM_VALUE_IN_USE ) == 0UL )
{
listSET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ),
( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) pxCurrentTCB->uxPriority );
}
//如果拿锁任务在就绪链表, 移动拿锁任务到新优先级任务链表
if( listIS_CONTAINED_WITHIN( &( pxReadyTasksLists[ pxTCB->uxPriority ] ),
&( pxTCB->xStateListItem ) ) != pdFALSE )
{
// 先从旧就绪链表移除
if( uxListRemove( &( pxTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
taskRESET_READY_PRIORITY( pxTCB->uxPriority );
}
// 修改优先级, 优先级继承, 并加入到新的优先级就绪链表
pxTCB->uxPriority = pxCurrentTCB->uxPriority;
prvAddTaskToReadyList( pxTCB );
}
else
{
// 如果拿锁任务没有在就绪链表, 直接修改优先级值即可
pxTCB->uxPriority = pxCurrentTCB->uxPriority;
}
}
}
}
互斥锁不能在中断使用, 因为中断函数没有优先级继承,同时, 中断函数不能阻塞。
任务使用资源后, 需要释放互斥锁,这样其他任务才能正常拿锁使用资源。
释放锁的接口定义如下 :
#define xSemaphoreGive( xSemaphore ) \
xQueueGenericSend( ( QueueHandle_t ) ( xSemaphore ), NULL, semGIVE_BLOCK_TIME, queueSEND_TO_BACK )
宏实际调用的函数实际也解析过了, 上面提到拿锁的时候, 任务堵塞会进行优先级继承处理, 因此,当一个任务获取了互斥锁, 在其使用期间, 其本身优先级可能已经被提高过了, 当其释放锁的时候, 需要恢复到原来的优先级。还锁的操作是向队列发送返回一个消息,在拷贝消息内容的函数,判断队列是互斥锁时, 会调用优先级继承解除函数, 恢复任务的优先级。
优先级恢复函数如下,
BaseType_t xTaskPriorityDisinherit( TaskHandle_t const pxMutexHolder )
{
TCB_t * const pxTCB = ( TCB_t * ) pxMutexHolder;
BaseType_t xReturn = pdFALSE;
if( pxMutexHolder != NULL )
{
// 当前任务是拿锁任务
// 这个队列是互斥锁类型
configASSERT( pxTCB == pxCurrentTCB );
configASSERT( pxTCB->uxMutexesHeld );
( pxTCB->uxMutexesHeld )--;
// 拿锁任务的优先级被修改了
if( pxTCB->uxPriority != pxTCB->uxBasePriority )
{
// 锁彻底释放
if( pxTCB->uxMutexesHeld == ( UBaseType_t ) 0 )
{
// 从链表移除任务
if( uxListRemove( &( pxTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
taskRESET_READY_PRIORITY( pxTCB->uxPriority );
}
// 恢复优先级
pxTCB->uxPriority = pxTCB->uxBasePriority;
// 恢复任务TCB 其他和优先级相关的参数
listSET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ), ( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) pxTCB->uxPriority );
// 重新插入到就绪链表
prvAddTaskToReadyList( pxTCB );
// 发生优先级继承
// 说明有高优先级等锁, 所以提示需要任务切换
xReturn = pdTRUE;
}
}
}
return xReturn;
}
任务发生优先级继承, 本身优先级被修改提高,任务原始优先级会保存在 pxTCB->uxBasePriority中, 恢复的时候使用。
优先级继承恢复后, 可知有更高优先级任务阻塞等待锁,所以需要返回提示任务切换, 对应队列发送函数中的特殊处理一段 , 根据内存拷贝函数 prvCopyDataToQueue 返回值判断是否需要触发任务切换, 因为任务拷贝函数调用了上面这个函数恢复优先级,需不需要触发任务切换的返回值就是由这个函数提供的。
获取递归互斥量的任务可以重复获取该递归互斥量。使用xSemaphoreTakeRecursive() 函数成功获取几次递归互斥量,对应的就要使用xSemaphoreGiveRecursive()函数返还几次,在此之前递归互斥量都处于无效状态, 其他任务无法获取, 必须等待获取的任务释放完毕。
递归互斥锁创建调用接口 :
#define xSemaphoreCreateRecursiveMutex() \
xQueueCreateMutex( queueQUEUE_TYPE_RECURSIVE_MUTEX )
实际调用函数同普通互斥锁一样。
递归信号量在同一个任务可以多次拿取, 其调用的接口不同其他信号量的 xSemaphoreTake, 而是如下宏 :
#define xSemaphoreTakeRecursive( xMutex, xBlockTime ) \
xQueueTakeMutexRecursive( ( xMutex ), ( xBlockTime ) )
查看具体实现函数 xQueueTakeMutexRecursive, 看看有什么不同。
BaseType_t xQueueTakeMutexRecursive( QueueHandle_t xMutex, TickType_t xTicksToWait )
{
BaseType_t xReturn;
Queue_t * const pxMutex = ( Queue_t * ) xMutex;
configASSERT( pxMutex );
if( pxMutex->pxMutexHolder == ( void * ) xTaskGetCurrentTaskHandle() )
{
// 如果尝试拿锁的任务是当前拿了锁的任务, 则递增拿锁次锁
( pxMutex->u.uxRecursiveCallCount )++;
xReturn = pdPASS;
}
else
{
// 其他任务尝试拿锁, 同使用普通互斥锁一样
xReturn = xQueueGenericReceive( pxMutex, NULL, xTicksToWait, pdFALSE );
// 阻塞等待
// 如果恢复后,在超时时间内拿到锁
// 递增计数, 第一次拿锁!!
if( xReturn != pdFAIL )
{
( pxMutex->u.uxRecursiveCallCount )++;
}
}
return xReturn;
}
相比普通互斥锁,递归互斥锁允许同一个任务多次拿锁, 所以其拿锁接口会判断拿锁任务是否是拥有锁的任务,如果是, 则递增拿锁次数, 其他任务处理则和普通互斥锁一样,阻塞等待。
响应的, 递归互斥锁释放调用的接口定义 :
#define xSemaphoreGiveRecursive( xMutex ) \
xQueueGiveMutexRecursive( ( xMutex ) )
#endif
查看实际实现的函数 :
BaseType_t xQueueGiveMutexRecursive( QueueHandle_t xMutex )
{
BaseType_t xReturn;
Queue_t * const pxMutex = ( Queue_t * ) xMutex;
configASSERT( pxMutex );
// 判断尝试放锁的任务是否拿着锁
if( pxMutex->pxMutexHolder == ( void * ) xTaskGetCurrentTaskHandle() )
{
// 递归互斥锁, 拿几个需要对应放几个
( pxMutex->u.uxRecursiveCallCount )--;
// 计数清零,说明可以真正放锁
if( pxMutex->u.uxRecursiveCallCount == ( UBaseType_t ) 0 )
{
// 返回锁
( void ) xQueueGenericSend( pxMutex, NULL, queueMUTEX_GIVE_BLOCK_TIME, queueSEND_TO_BACK );
}
xReturn = pdPASS;
}
else
{
// 当前任务不是持有锁的任务, 无法释放
xReturn = pdFAIL;
}
// 还不能真正放锁
return xReturn;
}
对应拿锁的处理, 以上的函数也就很好理解了。
FreeRTOS 信号量记录到此结束。
标签:切换 assert blog sts 另一个 ddc sse 超时 保存
原文地址:http://blog.csdn.net/qq_18150497/article/details/52947752