@(嵌入式)
FreeRtos
简述
FreeRTOS 任务间通信方式有
* 消息通知 Notifications(V8.20版本开始支持)
* 消息队列 Queues
* 二进制信号量 Binary Semaphores
* 计数信号量 Counting Semaphores
* 互斥锁 Mutexes
* 递归互斥锁 Recursive Mutexes
上面这几中方式中, 除了消息通知, 其他几种实现都是基于消息队列。消息队列作为主要的通信方式, 支持在任务间, 任务和中断间传递消息内容。
这一章介绍 FreeRtos 消息队列的基本使用, 重点分析其实现的方式。
分析的源码版本是 v9.0.0
Queue 使用
FreeRTOS 官方提供了比较详细的接口使用文档 ( 戳我 ), 因此这里不花费太多的篇幅重复。
只是简单地介绍下, 主要使用到的接口以及示例。
创建一个消息队列
使用消息队列前, 需要先创建队列, 并拿到返回的句柄用于操作队列。
QueueHandle_t xQueue;
xQueue = xQueueCreate( 10, sizeof( unsigned long ) );
把队列比喻为一个邮箱, 那么队列项目是每封邮件的的大小, 深度是邮箱最大可以存储多少封邮件。
发送消息 & 接受消息
队列的基本操作就是出队(接收消息)和入队(发送消息), 如上图所示, 有两个任务 A 和 B, A 发送消息给任务 B
void funOfTaskA()
{
unsigned long pxMessage;
if( xQueue != 0 ) {
xQueueSend( xQueue, ( void * ) &pxMessage, ( TickType_t ) 0 );
}
}
void funOfTaskB()
{
if( xQueueReceive( xQueue, &( pxMessage ), ( TickType_t ) 10 ) )
{
}
}
上面例子, 第一个函数发送消息到队列, 如果队列已经满了, 直接返回不阻塞。 第二过函数接收队列消息, 如果队列中没有消息, 会阻塞任务等待最长10个 Ticks。
FreeRTOS 的队列内容是内存拷贝, 我们将要发送的内容的地址传递给发送函数,该函数会将地址上的内容拷贝到自己的存储区域;而接收函数会将消息内容拷贝到我们传递给他的指针指向的内存区域。
如果消息内容太大, 队列需要提前占用的存储空间对应也会变大, 消息传递过程的内存拷贝也会导致效率下降。 对于这种情况, 可以通过传递指针而不是实际内容代替, 消息中是指向数据的指针, 接收任务接收消息后通过该指针读取到实际的内容。
如下例子所示
struct AMessage
{
char ucMessageID;
char ucData[ 20 ];
} xMessage;
QueueHandle_t xQueue;
void vATask( void *pvParameters )
{
struct AMessage *pxMessage;
xQueue = xQueueCreate( 10, sizeof( struct AMessage * ) );
if( xQueue == 0 )
{
}
pxMessage = & xMessage;
xQueueSend( xQueue, ( void * ) &pxMessage, ( TickType_t ) 0 );
}
void vADifferentTask( void *pvParameters )
{
struct AMessage *pxRxedMessage;
if( xQueue != 0 )
{
if( xQueueReceive( xQueue, &( pxRxedMessage ), ( TickType_t ) 10 ) )
{
}
}
}
简单的队列使用基本如上, 更加详细的操作, 请直接参考 官方文档。
注意,在中断中使用 FreeRTOS 的接口, 需是结尾带有 FromISR
的。
Queue 实现
按照上面举例的顺序, 从创建队列 -> 发送消息 -> 接收消息 依次展开分析 FreeRTOS 的队列源码实现。 这部分代码在源码目录下 queue.c 中。
数据结构
队列实现围绕其数据结构, 如下说明队列的数据结构, 其每个数据成员的作用。
姑且不管是否理解, 后续会一步一步介绍它的具体应用。
typedef struct QueueDefinition
{
int8_t *pcHead;
int8_t *pcTail;
int8_t *pcWriteTo;
union
{
int8_t *pcReadFrom;
UBaseType_t uxRecursiveCallCount;
} u;
List_t xTasksWaitingToSend;
List_t xTasksWaitingToReceive;
volatile UBaseType_t uxMessagesWaiting;
UBaseType_t uxLength;
UBaseType_t uxItemSize;
volatile int8_t cRxLock;
volatile int8_t cTxLock;
#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated;
#endif
#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition *pxQueueSetContainer;
#endif
#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType;
#endif
} xQUEUE;
typedef xQUEUE Queue_t;
队列创建
前面举例创建列表调用的 API xQueueCreate
实际是一个宏定义, 后面真正实现函数是
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
const uint8_t ucQueueType )
相比该宏, 函数多了一个参数, 在开头提到过 FreeRTOS 的信号量,互斥锁也是基于队列实现的, 而这个函数的第三个参数的作用用于指定创建的对象类型, 这个类型变量主要用于调试的。
旧版本这个函数还会包括其他参数, 用于指定队列内存是堆还是静态,但是新版本 FreeRTOS 用不同函数实现不同内存申请方式, 对应的静态内存创建队列的接口是 :
QueueHandle_t xQueueGenericCreateStatic( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
StaticQueue_t *pxStaticQueue,
const uint8_t ucQueueType )
多了两个参数用于传递静态内存的指针。
主要分析动态申请内存的函数, 静态函数的差别主要在于调用时传入队列的内存。创建队列的函数本身比较简单, 基本就是申请内存, 初始化内存, 返回指针。 看下源代码 :
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize, const uint8_t ucQueueType )
{
Queue_t *pxNewQueue;
size_t xQueueSizeInBytes;
uint8_t *pucQueueStorage;
if( uxItemSize == ( UBaseType_t ) 0 )
{
xQueueSizeInBytes = ( size_t ) 0;
}
else
{
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
}
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );
if( pxNewQueue != NULL )
{
pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );
#if( configSUPPORT_STATIC_ALLOCATION == 1 )
{
pxNewQueue->ucStaticallyAllocated = pdFALSE;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
return pxNewQueue;
}
队列创建函数申请了队列及其存储队列项所需要的内存块, 而后对其进行初始化,
队列内存结构如下 :
Queue 内存的分配情况如上图所示, 在该数据结构中, 其中有两个链表变量 xTasksWaitingToReceive
和 xTasksWaitingToSend
, 当某个任务调用队列 API 接收函数准备接收消息时, 队列刚好没有内容, 如果设置了阻塞时间, 则该任务会被插入到 xTasksWaitingToReceive
链表中, 等待新消息; 对应的, 发送消息的任务发送消息时碰上队列满了, 也会被插入到 xTasksWaitingToSend
链表中, 等待其他任务读取消息后空出空间。
参数初始化函数如下所示,
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
const uint8_t ucQueueType,
Queue_t *pxNewQueue )
{
( void ) ucQueueType;
if( uxItemSize == ( UBaseType_t ) 0 )
{
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
#if ( configUSE_TRACE_FACILITY == 1 )
{
pxNewQueue->ucQueueType = ucQueueType;
}
#endif /* configUSE_TRACE_FACILITY */
#if( configUSE_QUEUE_SETS == 1 )
{
pxNewQueue->pxQueueSetContainer = NULL;
}
#endif /* configUSE_QUEUE_SETS */
traceQUEUE_CREATE( pxNewQueue );
}
发送消息
同创建队列的函数一样, 一般我们调用的接收 API 实际上也是一个宏, 对某些参数做了特定设置, 后面继续介绍到的发送 API 也是如此。 发送消息函数最终实现基本是以下两个函数 :
* xQueueGenericSend
在普通任务函数中调用的接口
* xQueueGenericSendFromISR
在中断中调用的接口。 因为 FreeRTOS 是一个实时操作系统, 为了保证中断发生时的实时响应, 做了优先级设置。 在中断中直接调用普通的系统接口函数可能导致阻塞其他中断, 为了避免这种情况, 提供了特定接口, 中断中调用系统的接口(FromISR后缀),会短期修改该中断的优先级,避免影响其他中断, 保证实时性。 同时, 在中断中调用的接口, 不会阻塞挂起。
任务中调用发送函数
以下主要分析普通任务下调用队列发送函数 xQueueGenericSend
, 对源码的简化处理, 去除了集合部分的代码, 做了一些注释说明 :
BaseType_t xQueueGenericSend( QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
for( ;; )
{
taskENTER_CRITICAL();
{
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
#if ( configUSE_QUEUE_SETS == 1 )
#else /* configUSE_QUEUE_SETS */
{
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
}
else if( xYieldRequired != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
}
#endif /* configUSE_QUEUE_SETS */
taskEXIT_CRITICAL();
return pdPASS;
}
else
{
if( xTicksToWait == ( TickType_t ) 0 )
{
taskEXIT_CRITICAL();
return errQUEUE_FULL;
}
else if( xEntryTimeSet == pdFALSE )
{
vTaskSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
}
}
taskEXIT_CRITICAL();
vTaskSuspendAll();
prvLockQueue( pxQueue );
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
if( prvIsQueueFull( pxQueue ) != pdFALSE )
{
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );
prvUnlockQueue( pxQueue );
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
else
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
return errQUEUE_FULL;
}
}
}
方便理解, 参考如下流程图。
函数首先判断了队列是否有空间存储新的数据, 如果队列已经满, 则会判断调用函数是否设置了阻塞等待时间,没有设置阻塞时间函数会直接返回队列满, 而如果设置超时, 并且时间没有到期,则会将当前任务插入到队列的等待链表, 并且触发任务切换, 释放 CPU 的使用, 等到队列有空间或者超时溢出再切换回来。
对于正常情况下, 数据可以插入队列, 调用拷贝函数将新数据保存到队列的队列项存储区域, 更新队列相关指针和参数, 对于拷贝函数, 在队列作为互斥锁时, 发送消息实际上就是释放锁, 而互斥锁为了避免任务优先级反转, 如果拿锁的任务优先级低于等待锁的任务, 拿锁任务优先级会段时间提高(优先级继承), 当释放锁的时候, 发现有优先级继承,说明有一个更高优先级的任务在等待当前任务放锁, 所以这时候需要进行任务切换。 处理优先级继承问题,在函数 prvCopyDataToQueue
处理。
拷贝新数据后, 对应需要检查队列当前等待接收链表 xTasksWaitingToReceive
是否有任务等待, 将最高优先级任务解除阻塞到就绪, 并判断新就绪任务优先级是否高于当前任务, 是的话, 触发任务切换。
中断中调用发送函数
相比在任务中调用的发送函数,在中断中调用的函数会更加简单一些, 没有任务阻塞操作。
FromISR 后缀的函数, 函数开头会先修改调用中断的优先级, 避免影响其他中断的响应, 保证系统的实时性。
函数 xQueueGenericSend
中插入数据后, 会检查等待接收链表是否有任务等待,如果有会恢复就绪。如果恢复的任务优先级比当前任务高, 则会触发任务切换;但是在中断中调用的这个函数的做法是返回一个参数标志是否需要触发任务切换,并不在中断中切换任务。
在任务中调用的函数中有锁定和解锁队列的操作, 锁定队列的时候, 队列的事件链表不能被修改。 而下面这个函数中,被中断调用, 当遇到队列被锁定的时候, 将新数据插入到队列后, 并不会直接恢复因为等待接收的任务, 而是累加了计数, 当队列解锁的时候, 会根据这个计数, 对应恢复几个任务。
遇到队列满的情况, 函数会直接返回, 而不是阻塞等待, 因为在中断中阻塞是不允许的!!!
BaseType_t xQueueGenericSendFromISR(
QueueHandle_t xQueue,
const void * const pvItemToQueue,
BaseType_t * const pxHigherPriorityTaskWoken,
const BaseType_t xCopyPosition )
{
BaseType_t xReturn;
UBaseType_t uxSavedInterruptStatus;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
const int8_t cTxLock = pxQueue->cTxLock;
( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
if( cTxLock == queueUNLOCKED )
{
#if ( configUSE_QUEUE_SETS == 1 )
#else /* configUSE_QUEUE_SETS */
{
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
if( pxHigherPriorityTaskWoken != NULL )
{
*pxHigherPriorityTaskWoken = pdTRUE;
}
}
}
}
#endif /* configUSE_QUEUE_SETS */
}
else
{
pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
}
xReturn = pdPASS;
}
else
{
xReturn = errQUEUE_FULL;
}
}
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
return xReturn;
}
接收消息
接收 API 和发送 API 差不多, 也是实现了几个宏, 但是实际实现的函数是xQueueGenericReceive``和 ``xQueueGenericReceiveFromISR
这两个。
有了上面发送函数的介绍, 接收函数基本思路差不多, 所以, 以下, 主要简单地分析下任务中调用的发送函数。
源代码注释如下 :
BaseType_t xQueueGenericReceive( QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait,
const BaseType_t xJustPeeking )
{
BaseType_t xEntryTimeSet = pdFALSE;
TimeOut_t xTimeOut;
int8_t *pcOriginalReadPosition;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
for( ;; )
{
taskENTER_CRITICAL();
{
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
pcOriginalReadPosition = pxQueue->u.pcReadFrom;
prvCopyDataFromQueue( pxQueue, pvBuffer );
if( xJustPeeking == pdFALSE )
{
pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
pxQueue->pxMutexHolder = ( int8_t * ) pvTaskIncrementMutexHeldCount();
}
}
#endif /* configUSE_MUTEXES */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
}
}
else
{
pxQueue->u.pcReadFrom = pcOriginalReadPosition;
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
}
}
taskEXIT_CRITICAL();
return pdPASS;
}
else
{
if( xTicksToWait == ( TickType_t ) 0 )
{
taskEXIT_CRITICAL();
return errQUEUE_EMPTY;
}
else if( xEntryTimeSet == pdFALSE )
{
vTaskSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
}
}
taskEXIT_CRITICAL();
vTaskSuspendAll();
prvLockQueue( pxQueue );
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
taskENTER_CRITICAL();
{
vTaskPriorityInherit( ( void * ) pxQueue->pxMutexHolder );
}
taskEXIT_CRITICAL();
}
}
#endif
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
prvUnlockQueue( pxQueue );
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
else
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
return errQUEUE_EMPTY;
}
}
}
}
任务调用接收函数收取队列消息, 函数首先判断当前队列是否有未读消息, 如果没有, 则会判断参数 xTicksToWait
, 决定直接返回函数还是阻塞等待。
如果队列中有消息未读, 首先会把待读的消息复制到传进来的指针所指内, 然后判断函数参数 xJustPeeking == pdFALSE
的时候, 符合的话, 说明这个函数读取了数据, 需要把被读取的数据做出队处理, 如果不是, 则只是查看(peek),只是返回数据,但是不会把数据清除。
对于正常读取数据的操作, 清除数据后队列会空出空位, 所以查看等待链表中是否有任务等发送数据而被挂起, 有的话恢复一个任务就绪, 并根据优先级判断是否需要出触发 PendSV 执行任务切换。
对于互斥锁, 接收消息函数对应的是请求锁, 所以会增加拿锁次数的记录。
对于只是查看数据的, 由于没有清除数据, 所以没有空间新空出,不需要检查发送等待链表, 但是会检查接收等待链表, 如果有任务挂起会切换其到就绪并判断是否需要切换。
到此, 对 FreeRTOS 队列的介绍完毕。
后续会专门一章分析下其信号量和互斥锁 基于队列的实现。
参考
- FreeRTOS Queue
- FreeRTOS Queue API
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)