【VxWorks5.5源码阅读笔记一】消息队列模块

2023-05-16

前面写了几篇VxWorks5.5操作系统下的任务间通信,POSIX接口,网络Socket编程。

因此有必要来阅读一下VxWorks5.5的源码来熟悉一下,一来可以了解一下各种接口的底层实现,二来可以看看C语言的高级用法,三学习一下C语言文件的编写规范。

我下载的这个版本(不确定是否完整)有两千多个文件,因此想要完全弄清弄懂是非常难的,而且VxWorks操作系统还是微内核,如果像Windows或者linux的源码,那估计都是几万几十万文件吧,可见操作系统是多么的复杂,想要弄懂也是非常难的。

当然我们不需要完全弄懂操作系统的所有,我们有个大致的理解,然后在需要的时候可以针对操作系统的某一个小方面来进行了解。

当然看代码,我们当然选择source insight了,我用的是4.0版本。

这次先看看消息队列的源码实现

msgQlib.h

/* msgQLib.h - message queue library header file */

/* Copyright 1984-2001 Wind River Systems, Inc. */

/*
modification history
--------------------
02m,19oct01,bwa  Added MSG_Q_EVENTSEND_ERR_NOTIFY option.
02l,17apr98,rlp  canceled MSG_Q_INFO modification for backward compatibility.
02k,04nov97,rlp  modified MSG_Q_INFO structure for tracking messages sent.
02j,13jul93,wmd  use MEM_ROUND_UP to determine MSG_NODE_SIZE.
02i,22sep92,rrr  added support for c++
02h,04jul92,jcf  cleaned up.
01g,26may92,rrr  the tree shuffle
01f,04oct91,rrr  passed through the ansification filter
		  -changed copyright notice
01e,05oct90,dnw  changed MSG_Q_INFO structure.
01d,05oct90,dnw  changed function declarations for new interface.
01c,05oct90,shl  added ANSI function prototypes.
                 made #endif ANSI style.
                 added copyright notice.
01b,07aug90,shl  moved function declarations to end of file.
01a,10may90,dnw  written
*/

#ifndef __INCmsgQLibh
#define __INCmsgQLibh

#ifdef __cplusplus
extern "C" {
#endif

#include "vxWorks.h"
#include "vwModNum.h"


/* generic status codes */

#define S_msgQLib_INVALID_MSG_LENGTH		(M_msgQLib | 1)
#define S_msgQLib_NON_ZERO_TIMEOUT_AT_INT_LEVEL	(M_msgQLib | 2)
#define S_msgQLib_INVALID_QUEUE_TYPE		(M_msgQLib | 3)

/* message queue options */

#define MSG_Q_TYPE_MASK	0x01	/* mask for pend queue type in options */
#define MSG_Q_FIFO	0x00	/* tasks wait in FIFO order */
#define MSG_Q_PRIORITY	0x01	/* tasks wait in PRIORITY order */
#define MSG_Q_EVENTSEND_ERR_NOTIFY 0x02 /* notify when eventRsrcSend fails */


/* message send priorities */

#define MSG_PRI_NORMAL	0	/* normal priority message */
#define MSG_PRI_URGENT	1	/* urgent priority message */

/* message queue typedefs */

typedef struct msg_q *MSG_Q_ID;	/* message queue ID */

typedef struct			/* MSG_Q_INFO */
    {
    int     numMsgs;		/* OUT: number of messages queued */
    int     numTasks;		/* OUT: number of tasks waiting on msg q */

    int     sendTimeouts;	/* OUT: count of send timeouts */
    int     recvTimeouts;	/* OUT: count of receive timeouts */

    int     options;		/* OUT: options with which msg q was created */
    int     maxMsgs;		/* OUT: max messages that can be queued */
    int     maxMsgLength;	/* OUT: max byte length of each message */

    int     taskIdListMax;	/* IN: max tasks to fill in taskIdList */
    int *   taskIdList;		/* PTR: array of task ids waiting on msg q */

    int     msgListMax;		/* IN: max msgs to fill in msg lists */
    char ** msgPtrList;		/* PTR: array of msg ptrs queued to msg q */
    int *   msgLenList;		/* PTR: array of lengths of msgs */

    } MSG_Q_INFO;

/* macros */

/* The following macro determines the number of bytes needed to buffer
 * a message of the specified length.  The node size is rounded up for
 * efficiency.  The total buffer space required for a pool for
 * <maxMsgs> messages each of up to <maxMsgLength> bytes is:
 *
 *    maxMsgs * MSG_NODE_SIZE (maxMsgLength)
 */

#define MSG_NODE_SIZE(msgLength) \
	(MEM_ROUND_UP((sizeof (MSG_NODE) + msgLength)))

/* function declarations */

#if defined(__STDC__) || defined(__cplusplus)

extern STATUS 	msgQLibInit (void);
extern MSG_Q_ID msgQCreate (int maxMsgs, int maxMsgLength, int options);
extern STATUS 	msgQDelete (MSG_Q_ID msgQId);
extern STATUS 	msgQSend (MSG_Q_ID msgQId, char *buffer, UINT nBytes,
			  int timeout, int priority);
extern int 	msgQReceive (MSG_Q_ID msgQId, char *buffer, UINT maxNBytes,
			     int timeout);
extern STATUS 	msgQInfoGet (MSG_Q_ID msgQId, MSG_Q_INFO *pInfo);
extern int 	msgQNumMsgs (MSG_Q_ID msgQId);
extern void 	msgQShowInit (void);
extern STATUS 	msgQShow (MSG_Q_ID msgQId, int level);

#else	/* __STDC__ */

extern STATUS 	msgQLibInit ();
extern MSG_Q_ID 	msgQCreate ();
extern STATUS 	msgQDelete ();
extern STATUS 	msgQSend ();
extern int 	msgQReceive ();
extern STATUS 	msgQInfoGet ();
extern int 	msgQNumMsgs ();
extern void 	msgQShowInit ();
extern STATUS 	msgQShow ();

#endif	/* __STDC__ */

#ifdef __cplusplus
}
#endif

#endif /* __INCmsgQLibh */

这是头文件的内容,我们可以看到msgQlib.h的内容主要由以下几部分组成


文件模块功能说明和修改历史,这是对一个文件的说明和版本记录;


#ifndef __INCmsgQLibh
#define __INCmsgQLibh

#ifdef __cplusplus
extern "C" {
#endif

这边的预编译可以防止头文件被重复包含,external “C”可以让C++用C编译该头文件中的代码,这样可以和与该头文件对应的库文件的符号表一致,可以使C++调用C语言写的库文件。


#include "vxWorks.h"
#include "vwModNum.h"

头文件包含,只包含用到的头文件,且先包含基础头文件


/* generic status codes */

#define S_msgQLib_INVALID_MSG_LENGTH		(M_msgQLib | 1)
#define S_msgQLib_NON_ZERO_TIMEOUT_AT_INT_LEVEL	(M_msgQLib | 2)
#define S_msgQLib_INVALID_QUEUE_TYPE		(M_msgQLib | 3)

/* message queue options */

#define MSG_Q_TYPE_MASK	0x01	/* mask for pend queue type in options */
#define MSG_Q_FIFO	0x00	/* tasks wait in FIFO order */
#define MSG_Q_PRIORITY	0x01	/* tasks wait in PRIORITY order */
#define MSG_Q_EVENTSEND_ERR_NOTIFY 0x02 /* notify when eventRsrcSend fails */


/* message send priorities */

#define MSG_PRI_NORMAL	0	/* normal priority message */
#define MSG_PRI_URGENT	1	/* urgent priority message */	(M_msgQLib | 3)

宏定义,可供自己模块和包含该头文件的模块使用


typedef struct msg_q *MSG_Q_ID;	/* message queue ID */

typedef struct			/* MSG_Q_INFO */
    {
    int     numMsgs;		/* OUT: number of messages queued */
    int     numTasks;		/* OUT: number of tasks waiting on msg q */

    int     sendTimeouts;	/* OUT: count of send timeouts */
    int     recvTimeouts;	/* OUT: count of receive timeouts */

    int     options;		/* OUT: options with which msg q was created */
    int     maxMsgs;		/* OUT: max messages that can be queued */
    int     maxMsgLength;	/* OUT: max byte length of each message */

    int     taskIdListMax;	/* IN: max tasks to fill in taskIdList */
    int *   taskIdList;		/* PTR: array of task ids waiting on msg q */

    int     msgListMax;		/* IN: max msgs to fill in msg lists */
    char ** msgPtrList;		/* PTR: array of msg ptrs queued to msg q */
    int *   msgLenList;		/* PTR: array of lengths of msgs */

    } MSG_Q_INFO;

/* macros */

/* The following macro determines the number of bytes needed to buffer
 * a message of the specified length.  The node size is rounded up for
 * efficiency.  The total buffer space required for a pool for
 * <maxMsgs> messages each of up to <maxMsgLength> bytes is:
 *
 *    maxMsgs * MSG_NODE_SIZE (maxMsgLength)
 */

#define MSG_NODE_SIZE(msgLength) \
	(MEM_ROUND_UP((sizeof (MSG_NODE) + msgLength)))

结构体类型定义,在此处定义该模块所需的和其他包含该模块所需的结构体类型。


#if defined(__STDC__) || defined(__cplusplus)

extern STATUS 	msgQLibInit (void);
extern MSG_Q_ID msgQCreate (int maxMsgs, int maxMsgLength, int options);
extern STATUS 	msgQDelete (MSG_Q_ID msgQId);
extern STATUS 	msgQSend (MSG_Q_ID msgQId, char *buffer, UINT nBytes,
			  int timeout, int priority);
extern int 	msgQReceive (MSG_Q_ID msgQId, char *buffer, UINT maxNBytes,
			     int timeout);
extern STATUS 	msgQInfoGet (MSG_Q_ID msgQId, MSG_Q_INFO *pInfo);
extern int 	msgQNumMsgs (MSG_Q_ID msgQId);
extern void 	msgQShowInit (void);
extern STATUS 	msgQShow (MSG_Q_ID msgQId, int level);

#else	/* __STDC__ */

extern STATUS 	msgQLibInit ();
extern MSG_Q_ID 	msgQCreate ();
extern STATUS 	msgQDelete ();
extern STATUS 	msgQSend ();
extern int 	msgQReceive ();
extern STATUS 	msgQInfoGet ();
extern int 	msgQNumMsgs ();
extern void 	msgQShowInit ();
extern STATUS 	msgQShow ();

根据不同的编译器使用不同形式的外部函数声明。

注意观察一下发现,好像是如果是C++编译器的外部函数声明比C编译器的外部函数声明会多函数参数,这个不知道是啥原因,难道C编译器的外部函数声明不需要说明函数参数吗?

上面的推测好像是正确的,C++编译器在对函数生成符号表时,会根据函数参数和函数名一起生成符号表;而C编译器只会根据函数名来生成符号表。

因此对于C++编译器,我们最好把函数参数写出,而对于C语言编译器,我们可以不写,在单独编译包含了该头文件的源文件是,我们不需要详细说明函数参数,等到链接时再确定。

先不管,用代码测试一下

//main.c
#include <stdio.h>
#include "add.h"

int main(void) {
	add(1, 3);
}

//add.h
extern int add();//或者extern int add(int a, int b);都可以通过编译

//add.c
int add(int a, int b) {
	return a + b;
}

上面的C语言项目使用C编译器可以通过编译,而且是不管add.h中的函数参数可以有也可以没有都可以正常编译运行。

当我们把代码改成如下

//main.cpp
#include <iostream>
#include "add.h"

using namespace std;
int main(void) {
	add(1, 3);
}

//add.h
extern int add();//或者extern int add(int a, int b);都可以通过编译

//add.c
int add(int a, int b) {
	return a + b;
}

因为我们使用的头文件是 iostream,所以IDE会使用C++编译器。在使用C++编译器时,该项目是不能通过编译的,提示[Error] too many arguments to function ‘int add()’,意思是你声明的时候没有参数,现在又有了两个参数,因此报错了。

这就是因为C++编译器对函数名的处理了,在main.cpp文件中,我们通过头文件包含的add函数没有参数,因此C++编译器将其处理成符号名 add ,而在add.c 文件中,add函数是有参数的,因此C++编译器会把add函数处理成符号为 add_int_int之类的符号名,当然在他们单独编译时是可能正常的,但到链接时,add函数的两个符号名不能匹配起来,因此会造成错误;而C编译器可以正常处理的原因就是,在C编译器下,函数的符号名只和函数名有关,和函数参数无关。

意外之喜,没想到刚开始看源码,就收获了知识的理解。


头文件内容总结

整个头文件的内容就是这些了,最有意思的应该就是

typedef struct			/* MSG_Q_INFO */
    {
    int     numMsgs;		/* OUT: number of messages queued */
    int     numTasks;		/* OUT: number of tasks waiting on msg q */

    int     sendTimeouts;	/* OUT: count of send timeouts */
    int     recvTimeouts;	/* OUT: count of receive timeouts */

    int     options;		/* OUT: options with which msg q was created */
    int     maxMsgs;		/* OUT: max messages that can be queued */
    int     maxMsgLength;	/* OUT: max byte length of each message */

    int     taskIdListMax;	/* IN: max tasks to fill in taskIdList */
    int *   taskIdList;		/* PTR: array of task ids waiting on msg q */

    int     msgListMax;		/* IN: max msgs to fill in msg lists */
    char ** msgPtrList;		/* PTR: array of msg ptrs queued to msg q */
    int *   msgLenList;		/* PTR: array of lengths of msgs */

    } MSG_Q_INFO;

看含义应该是消息队列信息结构体类型的定义

里面有各种含义的整型,有数组指针,有二维指针,通过这些数据结构的特性来相应的存储消息队列的各种信息。

int * taskIdList; /* PTR: array of task ids waiting on msg q */任务ID数组

char ** msgPtrList; /* PTR: array of msg ptrs queued to msg q */数组,数组存储的每个元素是消息的字符指针

int * msgLenList; /* PTR: array of lengths of msgs */数组,每个消息的长度


下面就是找来msgQlib.c文件来看看吧

首先了解一下VxWorks中常见的宏定义

#define BOOL int
#define LOCAL static
#define IMPORT extern
#define ERROR -1
#define NULL 0
#define TRUE 1
#define FALSE 0

#define FAST	register
#define IMPORT	extern
#define LOCAL	static

typedef	char		INT8;
typedef	short		INT16;
typedef	int		INT32;
typedef	long long	INT64;

typedef	unsigned char	UINT8;
typedef	unsigned short	UINT16;
typedef	unsigned int	UINT32;
typedef	unsigned long long UINT64;

typedef	unsigned char	UCHAR;
typedef unsigned short	USHORT;
typedef	unsigned int	UINT;
typedef unsigned long	ULONG;

typedef	int		BOOL;
typedef	int		STATUS;
typedef int 		ARGINT;

typedef void		VOID;

#ifdef __cplusplus
typedef int 		(*FUNCPTR) (...);     /* ptr to function returning int */
typedef void 		(*VOIDFUNCPTR) (...); /* ptr to function returning void */
typedef double 		(*DBLFUNCPTR) (...);  /* ptr to function returning double*/
typedef float 		(*FLTFUNCPTR) (...);  /* ptr to function returning float */
#else
typedef int 		(*FUNCPTR) ();	   /* ptr to function returning int */
typedef void 		(*VOIDFUNCPTR) (); /* ptr to function returning void */
typedef double 		(*DBLFUNCPTR) ();  /* ptr to function returning double*/
typedef float 		(*FLTFUNCPTR) ();  /* ptr to function returning float */
#endif			/* _cplusplus */

可以看出VxWorks中的宏定义还是很不错的,清晰好用。

下m面先看msgCreate函数

/*******************************************************************************
*
* msgQCreate - create and initialize a message queue
*
* This routine creates a message queue capable of holding up to <maxMsgs>
* messages, each up to <maxMsgLength> bytes long.  The routine returns 
* a message queue ID used to identify the created message queue in all 
* subsequent calls to routines in this library.  The queue can be created 
* with the following options:
* .iP "MSG_Q_FIFO  (0x00)" 8
* queue pended tasks in FIFO order.
* .iP "MSG_Q_PRIORITY  (0x01)"
* queue pended tasks in priority order.
* .iP "MSG_Q_EVENTSEND_ERR_NOTIFY (0x02)"
* When a message is sent, if a task is registered for events and the
* actual sending of events fails, a value of ERROR is returned and the errno
* is set accordingly. This option is off by default.
* .LP
*
* RETURNS:
* MSG_Q_ID, or NULL if error.
*
* ERRNO: S_memLib_NOT_ENOUGH_MEMORY, S_intLib_NOT_ISR_CALLABLE
*
* SEE ALSO: msgQSmLib
*/

MSG_Q_ID msgQCreate
    (
    int         maxMsgs,        /* max messages that can be queued */
    int         maxMsgLength,   /* max bytes in a message */
    int         options         /* message queue options */
    )
    {
    MSG_Q_ID	msgQId;
    void *	pPool;		/* pointer to memory for messages */
    UINT	size = (UINT) maxMsgs * MSG_NODE_SIZE (maxMsgLength);

    if (INT_RESTRICT () != OK)		/* restrict ISR from calling */
	return (NULL);

    if ((!msgQLibInstalled) && (msgQLibInit () != OK))
	return (NULL);			/* package init problem */

    if ((msgQId = (MSG_Q_ID)objAllocExtra (msgQClassId, size, &pPool)) == NULL)
	return (NULL);

    if (msgQInit (msgQId, maxMsgs, maxMsgLength, options, pPool) != OK)
	{
	objFree (msgQClassId, (char *) msgQId);
	return (NULL);
	}

#ifdef WV_INSTRUMENTATION
    /* windview - level 1 event logging routine */
    EVT_OBJ_4 (OBJ, msgQId, msgQClassId, EVENT_MSGQCREATE, 
		msgQId, maxMsgs, maxMsgLength, options);
#endif

    return ((MSG_Q_ID) msgQId);
    }

先看一下MSG_Q_ID的结构体定义

typedef struct msg_q *MSG_Q_ID;	/* message queue ID */
typedef struct msg_q		/* MSG_Q */
    {
    OBJ_CORE		objCore;	/* object management */
    Q_JOB_HEAD		msgQ;		/* message queue head */
    Q_JOB_HEAD		freeQ;		/* free message queue head */
    int			options;	/* message queue options */
    int			maxMsgs;	/* max number of messages in queue */
    int			maxMsgLength;	/* max length of message */
    int			sendTimeouts;	/* number of send timeouts */
    int			recvTimeouts;	/* number of receive timeouts */
    EVENTS_RSRC		events;		/* VxWorks events */
    } MSG_Q;   
typedef struct obj_core		/* OBJ_CORE */
    {
    struct obj_class *pObjClass;	/* pointer to object's class */
    } OBJ_CORE;
typedef struct				/* Head of job queue */
    {
    Q_JOB_NODE *first;			/* first node in queue */
    Q_JOB_NODE *last;			/* last node in queue */
    int		count;			/* number of nodes in queue */
    Q_CLASS    *pQClass;		/* must be 4th long word */
    Q_HEAD	pendQ;			/* queue of blocked tasks */
    } Q_JOB_HEAD;
typedef struct qJobNode			/* Node of a job queue */
    {
    struct qJobNode *next;
    } Q_JOB_NODE;

可以看到MSG_Q是结构体的名称,MSG_Q_ID是结构体的指针(指针也可以看成是结构体的ID)。

msgQCreate函数其实最重要的是调用了msgQInit函数,msgQInit函数的代码如下

/*******************************************************************************
*
* msgQInit - initialize a message queue
*
* This routine initializes a message queue data structure.  Like msgQCreate()
* the resulting message queue is capable of holding up to <maxMsgs> messages,
* each of up to <maxMsgLength> bytes long.
* However, instead of dynamically allocating the MSG_Q data structure,
* this routine takes a pointer <pMsgQ> to the MSG_Q data structure to be
* initialized, and a pointer <pMsgPool> to the buffer to be use to hold
* queued messages.  <pMsgPool> must point to a 4 byte aligned buffer
* that is (<maxMsgs> * MSG_NODE_SIZE (<maxMsgLength>)).
*

* The queue can be created with the following options:
*
*	MSG_Q_FIFO	queue pended tasks in FIFO order
*	MSG_Q_PRIORITY	queue pended tasks in priority order
*
* RETURNS: OK or ERROR.
*
* ERRNO: S_msgQLib_INVALID_QUEUE_TYPE
*
* SEE ALSO: msgQCreate()
*
* NOMANUAL
*/

STATUS msgQInit
    (
    FAST MSG_Q *pMsgQ,          /* pointer to msg queue to initialize */
    int         maxMsgs,        /* max messages that can be queued */
    int         maxMsgLength,   /* max bytes in a message */
    int         options,        /* message queue options */
    void *      pMsgPool        /* pointer to memory for messages */
    )
    {
    FAST int		nodeSize = MSG_NODE_SIZE (maxMsgLength);
    FAST int		ix;
    FAST Q_CLASS_ID	msgQType;

    if ((!msgQLibInstalled) && (msgQLibInit () != OK))
	return (ERROR);				/* package init problem */

    bzero ((char *) pMsgQ, sizeof (*pMsgQ));	/* clear out msg q structure */

    /* initialize internal job queues */

    switch (options & MSG_Q_TYPE_MASK)
        {
        case MSG_Q_FIFO:	msgQType = Q_FIFO;	break;
        case MSG_Q_PRIORITY:	msgQType = Q_PRI_LIST;	break;

        default:
            errnoSet (S_msgQLib_INVALID_QUEUE_TYPE);
	    return (ERROR);
        }

    if ((qInit ((Q_HEAD *) &pMsgQ->msgQ, qJobClassId, msgQType,
	 0, 0, 0, 0, 0, 0, 0, 0, 0) != OK) ||
	(qInit ((Q_HEAD *) &pMsgQ->freeQ, qJobClassId, msgQType,
	 0, 0, 0, 0, 0, 0, 0, 0, 0) != OK))
	return (ERROR);


    /* put msg nodes on free list */

    for (ix = 0; ix < maxMsgs; ix++)
	{
	qJobPut (pMsgQ, &pMsgQ->freeQ, (Q_JOB_NODE *) pMsgPool,
		 Q_JOB_PRI_DONT_CARE);
	pMsgPool = (void *) (((char *) pMsgPool) + nodeSize);
	}


    /* initialize rest of msg q */

    pMsgQ->options	= options;
    pMsgQ->maxMsgs	= maxMsgs;
    pMsgQ->maxMsgLength	= maxMsgLength;

    eventInit (&pMsgQ->events);

#ifdef WV_INSTRUMENTATION
    if (wvObjIsEnabled)
    {
    /* windview - connect level 1 event logging routine */
    objCoreInit (&pMsgQ->objCore, msgQInstClassId);
    }
    else
#endif
    objCoreInit (&pMsgQ->objCore, msgQClassId);

    return (OK);
    }

注意一些bzero函数

void bzero
    (
    char *buffer,               /* buffer to be zeroed       */
    int nbytes                  /* number of bytes in buffer */
    )
    {
    bfill (buffer, nbytes, 0);
    }

bfill函数

/*******************************************************************************
*
* bfill - fill a buffer with a specified character
*
* This routine fills the first <nbytes> characters of a buffer with the
* character <ch>.  Filling is done in the most efficient way possible,
* which may be long-word, or even multiple-long-word stores, on some
* architectures.  In general, the fill will be significantly faster if
* the buffer is long-word aligned.  (For filling that is restricted to
* byte stores, see the manual entry for bfillBytes().)
*
* RETURNS: N/A
*
* SEE ALSO: bfillBytes()
*/

void bfill
    (
    FAST char *buf,           /* pointer to buffer              */
    int nbytes,               /* number of bytes to fill        */
    FAST int ch     	      /* char with which to fill buffer */
    )
    {
#if (CPU_FAMILY != I960) || !defined(__GNUC__) || defined(VX_IGNORE_GNU_LIBS)
    FAST long *pBuf;
    char *bufend = buf + nbytes;
    FAST char *buftmp;
    FAST long val;

    if (nbytes < 10)
	goto byte_fill;

    val = (ch << 24) | (ch << 16) | (ch << 8) | ch;

    /* start on necessary alignment */

    while ((int)buf & ALIGNMENT)
	*buf++ = ch;

    buftmp = bufend - sizeof (long); /* last word boundary before bufend */

    pBuf = (long *)buf;

    /* fill 4 bytes at a time; don't exceed buf endpoint */

    do
	{
	*pBuf++ = val;
	}
    while ((char *)pBuf < buftmp);

    buf = (char *)pBuf - sizeof (long);

    /* fill remaining bytes one at a time */

byte_fill:
    while (buf < bufend)
	*buf++ = ch;
#else	/* IGNORE GNU LIBS */
    (void) memset ((void *)buf, (int) ch, (size_t) nbytes);
#endif	/* IGNORE GNU LIBS */
    }

可以看到会根据预编译选择自己实现memset或者直接调用memset函数。


下面看看msgQSend函数

/*******************************************************************************
*
* msgQSend - send a message to a message queue
*
* This routine sends the message in <buffer> of length <nBytes> to the message
* queue <msgQId>.  If any tasks are already waiting to receive messages
* on the queue, the message will immediately be delivered to the first
* waiting task.  If no task is waiting to receive messages, the message
* is saved in the message queue and if a task has previously registered to 
* receive events from the message queue, these events are sent in the context 
* of this call.  This may result in the unpending of the task waiting for 
* the events.  If the message queue fails to send events and if it was 
* created using the MSG_Q_EVENTSEND_ERR_NOTIFY option, ERROR is returned 
* even though the send operation was successful.
*
* The <timeout> parameter specifies the number of ticks to wait for free
* space if the message queue is full.  The <timeout> parameter can also have 
* the following special values:
* .iP "NO_WAIT  (0)" 8
* return immediately, even if the message has not been sent.  
* .iP "WAIT_FOREVER  (-1)"
* never time out.
* .LP
*
* The <priority> parameter specifies the priority of the message being sent.
* The possible values are:
* .iP "MSG_PRI_NORMAL  (0)" 8
* normal priority; add the message to the tail of the list of queued 
* messages.
* .iP "MSG_PRI_URGENT  (1)"
* urgent priority; add the message to the head of the list of queued messages.
* .LP
*
* USE BY INTERRUPT SERVICE ROUTINES
* This routine can be called by interrupt service routines as well as
* by tasks.  This is one of the primary means of communication
* between an interrupt service routine and a task.  When called from an
* interrupt service routine, <timeout> must be NO_WAIT.
*
* RETURNS: OK on success or ERROR otherwise.
*
* ERRNO:
* .iP "S_distLib_NOT_INITIALIZED"
* Distributed objects message queue library (VxFusion) not initialized.
* .iP "S_smObjLib_NOT_INITIALIZED"
* Shared memory message queue library (VxMP Option) not initialized.
* .iP "S_objLib_OBJ_ID_ERROR"
* Invalid message queue ID.
* .iP "S_objLib_OBJ_DELETED"
* Message queue deleted while calling task was pended.
* .iP "S_objLib_OBJ_UNAVAILABLE"
* No free buffer space when NO_WAIT timeout specified.
* .iP "S_objLib_OBJ_TIMEOUT"
* Timeout occurred while waiting for buffer space.
* .iP "S_msgQLib_INVALID_MSG_LENGTH"
* Message length exceeds limit.
* .iP "S_msgQLib_NON_ZERO_TIMEOUT_AT_INT_LEVEL"
* Called from ISR with non-zero timeout.
* .iP "S_eventLib_EVENTSEND_FAILED"
* Message queue failed to send events to registered task.  This errno 
* value can only exist if the message queue was created with the 
* MSG_Q_EVENTSEND_ERR_NOTIFY option.
* .LP
*
* SEE ALSO: msgQSmLib, msgQEvStart
*/

STATUS msgQSend
    (
    FAST MSG_Q_ID       msgQId,         /* message queue on which to send */
    char *              buffer,         /* message to send */
    FAST UINT           nBytes,         /* length of message */
    int                 timeout,        /* ticks to wait */
    int                 priority        /* MSG_PRI_NORMAL or MSG_PRI_URGENT */
    )
    {
    FAST MSG_NODE *	pMsg;

    if (ID_IS_SHARED (msgQId))			/* message Q is shared? */
        {
		if (ID_IS_DISTRIBUTED (msgQId)) /* message queue is distributed? */
			{
			if (msgQDistSendRtn == NULL)
				{
				errno = S_distLib_NOT_INITIALIZED; 
				return (ERROR);
				}
		 	return ((*msgQDistSendRtn) (msgQId, buffer, nBytes,
			 	timeout, WAIT_FOREVER, priority));
			}

        if (msgQSmSendRtn == NULL)
            {
            errno = S_smObjLib_NOT_INITIALIZED;
            return (ERROR);
            }

        return ((*msgQSmSendRtn) (SM_OBJ_ID_TO_ADRS (msgQId), buffer, nBytes,
				  timeout, priority));
	}

    /* message queue is local */

    if (!INT_CONTEXT ())
	TASK_LOCK ();
    else
	{
	if (timeout != 0)
	    {
	    errnoSet (S_msgQLib_NON_ZERO_TIMEOUT_AT_INT_LEVEL);
	    return (ERROR);
	    }
	}

restart:
    if (OBJ_VERIFY (msgQId, msgQClassId) != OK)
	{
	if (!INT_CONTEXT ())
	    TASK_UNLOCK ();
	return (ERROR);
	}

#ifdef WV_INSTRUMENTATION
    /* windview - level 1 event logging routine */
    EVT_OBJ_5 (OBJ, msgQId, msgQClassId, EVENT_MSGQSEND, msgQId, 
	       buffer, nBytes, timeout, priority);
#endif

    if (nBytes > msgQId->maxMsgLength)
	{
	if (!INT_CONTEXT ())
	    TASK_UNLOCK ();
	errnoSet (S_msgQLib_INVALID_MSG_LENGTH);
	return (ERROR);
	}

    pMsg = (MSG_NODE *) qJobGet (msgQId, &msgQId->freeQ, timeout);

    if (pMsg == (MSG_NODE *) NONE)
	{
	timeout = SIG_TIMEOUT_RECALC(timeout);
	goto restart;
	}

    if (pMsg == NULL)
	{
#if FALSE
	msgQId->sendTimeouts++;
#else

	/*
	 * The timeout stat should only be updated if a timeout has occured.
	 * An OBJ_VERIFY needs to be performed to catch the case where a 
	 * timeout indeed occured, but the message queue is subsequently 
	 * deleted before the current task is rescheduled.
	 */

	if (errnoGet() == S_objLib_OBJ_TIMEOUT) 
            {
	    if (OBJ_VERIFY (msgQId, msgQClassId) == OK)
	        msgQId->sendTimeouts++;
            else
                errnoSet(S_objLib_OBJ_DELETED);
            }

#endif /* FALSE */

	if (!INT_CONTEXT ())
	    TASK_UNLOCK ();
	return (ERROR);
	}

    pMsg->msgLength = nBytes;
    bcopy (buffer, MSG_NODE_DATA (pMsg), (int) nBytes);

    if (qJobPut (msgQId, &msgQId->msgQ, &pMsg->node, priority) != OK)
	{
	if (!INT_CONTEXT ())
	    TASK_UNLOCK ();

	return (ERROR); /* errno set by qJobPut() */
	}

    if (!INT_CONTEXT ())
	TASK_UNLOCK ();

    return (OK);
    }

看一下里面的bcopy函数

/*******************************************************************************
*
* bcopy - copy one buffer to another
*
* This routine copies the first <nbytes> characters from <source> to
* <destination>.  Overlapping buffers are handled correctly.  Copying is done
* in the most efficient way possible, which may include long-word, or even
* multiple-long-word moves on some architectures.  In general, the copy
* will be significantly faster if both buffers are long-word aligned.
* (For copying that is restricted to byte, word, or long-word moves, see
* the manual entries for bcopyBytes(), bcopyWords(), and bcopyLongs().)
*
* RETURNS: N/A
*
* SEE ALSO: bcopyBytes(), bcopyWords(), bcopyLongs()
*/

void bcopy
    (
    const char *source,       	/* pointer to source buffer      */
    char *destination,  	/* pointer to destination buffer */
    int nbytes          	/* number of bytes to copy       */
    )
    {
    FAST char *dstend;
    FAST long *src;
    FAST long *dst;
    int tmp = destination - source;

    if (tmp <= 0 || tmp >= nbytes)
	{
	/* forward copy */

	dstend = destination + nbytes;

	/* do byte copy if less than ten or alignment mismatch */

	if (nbytes < 10 || (((int)destination ^ (int)source) & ALIGNMENT))
	    goto byte_copy_fwd;

	/* if odd-aligned copy byte */

	while ((int)destination & ALIGNMENT)
	    *destination++ = *source++;

	src = (long *) source;
	dst = (long *) destination;

	do
	    {
	    *dst++ = *src++;
	    }
	while (((char *)dst + sizeof (long)) <= dstend);

	destination = (char *)dst;
	source      = (char *)src;

byte_copy_fwd:
	while (destination < dstend)
	    *destination++ = *source++;
	}
    else
	{
	/* backward copy */

	dstend       = destination;
	destination += nbytes - sizeof (char);
	source      += nbytes - sizeof (char);

	/* do byte copy if less than ten or alignment mismatch */

	if (nbytes < 10 || (((int)destination ^ (int)source) & ALIGNMENT))
	    goto byte_copy_bwd;

	/* if odd-aligned copy byte */

	while ((int)destination & ALIGNMENT)
	    *destination-- = *source--;

	src = (long *) source;
	dst = (long *) destination;

	do
	    {
	    *dst-- = *src--;
	    }
	while (((char *)dst + sizeof(long)) >= dstend);

	destination = (char *)dst + sizeof (long);
	source      = (char *)src + sizeof (long);

byte_copy_bwd:
	while (destination >= dstend)
	    *destination-- = *source--;
	}
    }
#endif	/* IGNORE GNU LIBS */

再看一下msgQReceive

/*******************************************************************************
*
* msgQReceive - receive a message from a message queue
*
* This routine receives a message from the message queue <msgQId>.
* The received message is copied into the specified <buffer>, which is
* <maxNBytes> in length.  If the message is longer than <maxNBytes>,
* the remainder of the message is discarded (no error indication
* is returned).
*
* The <timeout> parameter specifies the number of ticks to wait for 
* a message to be sent to the queue, if no message is available when
* msgQReceive() is called.  The <timeout> parameter can also have 
* the following special values: 
* .iP "NO_WAIT  (0)" 8
* return immediately, whether a message has been received or not.  
* .iP "WAIT_FOREVER  (-1)"
* never time out.
* .LP
*
* WARNING: This routine must not be called by interrupt service routines.
*
* RETURNS:
* The number of bytes copied to <buffer>, or ERROR.
*
* ERRNO: S_distLib_NOT_INITIALIZED, S_smObjLib_NOT_INITIALIZED,
*        S_objLib_OBJ_ID_ERROR, S_objLib_OBJ_DELETED,
*        S_objLib_OBJ_UNAVAILABLE, S_objLib_OBJ_TIMEOUT,
*        S_msgQLib_INVALID_MSG_LENGTH, S_intLib_NOT_ISR_CALLABLE
*
* SEE ALSO: msgQSmLib
*/

int msgQReceive
    (
    FAST MSG_Q_ID       msgQId,         /* message queue from which to receive */
    char *              buffer,         /* buffer to receive message */
    UINT                maxNBytes,      /* length of buffer */
    int                 timeout         /* ticks to wait */
    )
    {
    FAST MSG_NODE *	pMsg;
    FAST int		bytesReturned;

    if (INT_RESTRICT() != OK) /* errno set by INT_RESTRICT() */
	return ERROR;

    if (ID_IS_SHARED (msgQId))			/* message Q is shared? */
        {
		if (ID_IS_DISTRIBUTED (msgQId)) /* message queue is distributed? */
			{
			if (msgQDistReceiveRtn == NULL)
				{
				errno = S_distLib_NOT_INITIALIZED;
			 	return (ERROR);
				}
			return ((*msgQDistReceiveRtn) (msgQId, buffer,
				maxNBytes, timeout, WAIT_FOREVER));
			}

        if (msgQSmReceiveRtn == NULL)
            {
            errno = S_smObjLib_NOT_INITIALIZED;
            return (ERROR);
            }

        return ((*msgQSmReceiveRtn) (SM_OBJ_ID_TO_ADRS (msgQId), buffer,
				     maxNBytes, timeout));
	}

    /* message queue is local */

    /* even though maxNBytes is unsigned, check for < 0 to catch possible
     * caller errors
     */
    if ((int) maxNBytes < 0)
	{
	errnoSet (S_msgQLib_INVALID_MSG_LENGTH);
	return (ERROR);
	}

    TASK_LOCK ();

restart:
    if (OBJ_VERIFY (msgQId, msgQClassId) != OK)
	{
	TASK_UNLOCK ();
	return (ERROR);
	}

#ifdef WV_INSTRUMENTATION
    /* windview - level 1 event logging routine */
    EVT_OBJ_4 (OBJ, msgQId, msgQClassId, EVENT_MSGQRECEIVE, msgQId, 
	       buffer, maxNBytes, timeout);
#endif

    pMsg = (MSG_NODE *) qJobGet (msgQId, &msgQId->msgQ, timeout);

    if (pMsg == (MSG_NODE *) NONE)
	{
	timeout = SIG_TIMEOUT_RECALC(timeout);
	goto restart;
	}

    if (pMsg == NULL)
	{
#if FALSE
	msgQId->recvTimeouts++;
#else
	/*
	 * The timeout stat should only be updated if a timeout has occured.
	 * An OBJ_VERIFY needs to be performed to catch the case where a 
	 * timeout indeed occured, but the message queue is subsequently 
	 * deleted before the current task is rescheduled.
	 */

	if (errnoGet() == S_objLib_OBJ_TIMEOUT) 
            {
	    if (OBJ_VERIFY (msgQId, msgQClassId) == OK)
	        msgQId->sendTimeouts++;
            else
                errnoSet(S_objLib_OBJ_DELETED);
            }

#endif /* FALSE */

	TASK_UNLOCK ();
	return (ERROR);
	}

    bytesReturned = min (pMsg->msgLength, maxNBytes);
    bcopy (MSG_NODE_DATA (pMsg), buffer, bytesReturned);

    qJobPut (msgQId, &msgQId->freeQ, &pMsg->node, Q_JOB_PRI_DONT_CARE);

    TASK_UNLOCK ();

    return (bytesReturned);
    }

msgQDelete函数

STATUS msgQDelete
    (
    MSG_Q_ID msgQId     /* message queue to delete */
    )
    {
    return (msgQDestroy (msgQId, TRUE));
    }

msgQDestroy函数

/*******************************************************************************
*
* msgQDestroy - destroy message queue
*
* RETURNS: OK or ERROR.
*
* ERRNO: S_distLib_NO_OBJECT_DESTROY
*
*/

LOCAL STATUS msgQDestroy
    (
    MSG_Q_ID msgQId,    /* message queue to destroy */
    BOOL     dealloc    /* deallocate memory associated with message queue */
    )
    {
    Q_JOB_NODE *pNode;
    FAST int	timeout;
    FAST int	nMsgs;

    int errnoCopy;

    if (ID_IS_SHARED (msgQId))  		/* message Q is shared?*/
	{
		if (ID_IS_DISTRIBUTED (msgQId)) /* message queue is distributed? */
			{
			errno = S_distLib_NO_OBJECT_DESTROY;
		 	return (ERROR);             /* cannot delete distributed msgQ */
			}
	
		errno = S_smObjLib_NO_OBJECT_DESTROY;
        return (ERROR);				/* cannot delete sm. msgQ */
	}

    if (INT_RESTRICT () != OK)			/* restrict isr use */
	return (ERROR);

    TASK_SAFE ();				/* TASK SAFE */
    TASK_LOCK ();				/* LOCK PREEMPTION */

#ifdef WV_INSTRUMENTATION

    /* Indicate that msgQDelete has been initiated */

    /* windview - level 1 event logging routine */
    EVT_OBJ_1 (OBJ, msgQId, msgQClassId, EVENT_MSGQDELETE, msgQId);
#endif

    if (OBJ_VERIFY (msgQId, msgQClassId) != OK)	/* validate message queue id */
	{
	TASK_UNLOCK ();				/* UNLOCK PREEMPTION */
	TASK_UNSAFE ();				/* TASK UNSAFE */
	return (ERROR);
	}

    objCoreTerminate (&msgQId->objCore);	/* INVALIDATE */

#ifdef WV_INSTRUMENTATION

    /*  Indicate that the msgQDelete has succeeded (before TASK_UNLOCK, as
     *  that causes unnecessary WV parser confusion).
     */

    /* windview - level 2 instrumentation
     * EVENT_OBJ_MSGDELETE needs to return the msgQId so MSG_OFFSET is
     * used to calulate the msgQId from the pQHead
     */
    EVT_TASK_1 (EVENT_OBJ_MSGDELETE, msgQId);
#endif

    TASK_UNLOCK ();				/* UNLOCK PREEMPTION */


    /* gobble up all messages in the message and free queues */

    timeout = NO_WAIT;		/* first time through gobble without waiting */
    nMsgs = 0;

    errnoCopy = errnoGet ();

    while (nMsgs < msgQId->maxMsgs)
	{
	while (((pNode = qJobGet (msgQId, &msgQId->freeQ, timeout)) != NULL) &&
	       (pNode != (Q_JOB_NODE *) NONE))
	    nMsgs++;

	while (((pNode = qJobGet (msgQId, &msgQId->msgQ, timeout)) != NULL) &&
	       (pNode != (Q_JOB_NODE *) NONE))
	    nMsgs++;

	timeout = 1;		/* after first time, wait a bit */
	}

    errnoSet (errnoCopy);

    /* terminate both the queues */

    /*
     * Since eventTerminate() can wake up a task, we want to put all tasks
     * in the ready queue before doing a windExit(), so that it is sure that
     * the task of highest priority runs first. To achieve that, the
     * statements 'kernelState = TRUE;' and 'windExit();' have been moved
     * from qJobTerminate() to here. This is the only place that qJobTerminate
     * is called.
     */

    kernelState = TRUE;

    qJobTerminate (&msgQId->msgQ);
    qJobTerminate (&msgQId->freeQ);

    eventTerminate (&msgQId->events);/* free task waiting for events if any */

    windExit ();

    if (dealloc)
	objFree (msgQClassId, (char *) msgQId);

    TASK_UNSAFE ();				/* TASK UNSAFE */

    return (OK);
    }

总结,这次是第一次看VxWorks的源码,看的是比较简单的消息队列模块,但是发现还是没有完全看懂。其中在C语言层面使用了很多预编译,结构体复杂运用,链表,数组,二维指针,函数指针,各种花哨的宏定义运用,很多都没有完全弄懂。除了语言层面,消息队列模块还和其他很多模块有交联,这也增加了理解的难度。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【VxWorks5.5源码阅读笔记一】消息队列模块 的相关文章

随机推荐