操作系统 线程同步实验
一、实验目标:
- 顺序表循环队列实现的实验目标:
掌握使用顺序表和循环队列实现队列的基本操作,如队列的插入、删除、遍历等,同时了解循环队列的内部实现原理和利用循环队列解决实际问题的方法。
- Linux生产者/消费者问题的多线程示例实验目标:
熟悉Linux下多线程编程的开发环境和基本思路,学会使用线程同步机制(如互斥锁、条件变量等)解决多线程编程中的共享资源竞争问题,实现一个生产者/消费者模型的多线程示例程序。同时,掌握Linux下多线程编程的基本调试技巧和方法。
二、实验内容:
本次实验主要包括两部分,一是顺序表循环队列的实现,二是Linux生产者/消费者问题的多线程示例。在顺序表循环队列的实现中,将通过数组实现一个循环队列,支持入队和出队操作,并对队列进行相关的操作和处理。在Linux生产者/消费者问题的多线程示例中,将使用多线程技术对商品进行生产和消费,同时通过使用锁机制实现对生产和消费过程的同步和协调。
三、实验步骤:
1、顺序循环队列的实现
队列的结构体变量定义和相关操作函数的声明 在顺序表循环队列的实现中,首先定义了队列结构体变量,并声明了相关操作函数,包括队列的初始化、入队、出队、遍历等操作,具体代码如下:
首先定义循环队列结构如下:
#define QUEUE_SIZE 5 //队列最大容纳QUEUE_SIZE-1个元素
typedef struct{
int aData[QUEUE_SIZE]; //队列元素
int dwHead; //指向队首元素
int dwTail; //指向队尾元素的下一个元素
}T_QUEUE, *PT_QUEUE;
提供判空、判满、查询等辅助函数:
//判断循环队列是否为空
int IsQueEmpty(PT_QUEUE ptQue)
{
return ptQue->dwHead == ptQue->dwTail;
}
//判断循环队列是否为满
int IsQueFull(PT_QUEUE ptQue)
{
return (ptQue->dwTail + 1) % QUEUE_SIZE == ptQue->dwHead;
}
//获取循环队列元素数目
int QueDataNum(PT_QUEUE ptQue)
{
return (ptQue->dwTail - ptQue->dwHead + QUEUE_SIZE) % QUEUE_SIZE;
}
//获取循环队列队首位置
int GetQueHead(PT_QUEUE ptQue)
{
return ptQue->dwHead;
}
//获取循环队列队首元素
int GetQueHeadData(PT_QUEUE ptQue)
{
return ptQue->aData[ptQue->dwHead];
}
//获取循环队列队尾位置
int GetQueTail(PT_QUEUE ptQue)
{
return ptQue->dwTail;
}
基于上述结构,定义初始化、入队出队和显示函数:
//初始化循环队列
void InitQue(PT_QUEUE ptQue)
{
memset(ptQue, 0, sizeof(*ptQue));
}
//向循环队列中插入元素
void EnterQue(PT_QUEUE ptQue, int dwElem)
{
if(IsQueFull(ptQue))
{
printf("Elem %d cannot enter Queue %p(Full)!\n", dwElem, ptQue);
return;
}
ptQue->aData[ptQue->dwTail]= dwElem;
ptQue->dwTail = (ptQue->dwTail + 1) % QUEUE_SIZE;
}
//从循环队列中取出元素
int LeaveQue(PT_QUEUE ptQue)
{
if(IsQueEmpty(ptQue))
{
printf("Queue %p is Empty!\n", ptQue);
return -1;
}
int dwElem = ptQue->aData[ptQue->dwHead];
ptQue->dwHead = (ptQue->dwHead + 1) % QUEUE_SIZE;
return dwElem;
}
//从队首至队尾依次显示队中元素值
void DisplayQue(PT_QUEUE ptQue)
{
if(IsQueEmpty(ptQue))
{
printf("Queue %p is Empty!\n", ptQue);
return;
}
}
printf("Queue Element: ");
int dwIdx = ptQue->dwHead;
while((dwIdx % QUEUE_SIZE) != ptQue->dwTail)
printf("%d ", ptQue->aData[(dwIdx++) % QUEUE_SIZE]);
printf("\n");
测试代码中展示了循环队列的基本操作,包括队列的初始化、入队、出队以及队列元素的遍历输出。
生产者消费者问题多线程实现
#define PRODUCER_NUM 5 //生产者数
#define CONSUMER_NUM 3 //消费者数
#define PRD_NUM 20 //最多生产的产品数
#define DELAY_TIME 3 //生产(或消费)任务之间的最大时间间隔
#define QUEUE_SIZE (PRD_NUM+1) //队列最大容纳QUEUE_SIZE-1个元素
T_QUEUE gtQueue;
pthread_mutex_t gtQueLock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gtPrdCond = PTHREAD_COND_INITIALIZER; //Full->Not Full
pthread_cond_t gtCsmCond = PTHREAD_COND_INITIALIZER; //Empty->Not Empty
在多线程实现生产者消费者问题时,需要定义线程共享的全局数据,包括生产者消费者线程数目,产品数目,队列大小,互斥锁和条件变量等。
void *ProducerThread(void *pvArg)
{
pthread_detach(pthread_self());
intptr_t dwThrdNo = (intptr_t)pvArg;
while(1)
{
pthread_mutex_lock(>QueLock);
while(IsQueFull(>Queue)) //队列由满变为非满时,生产者线程被唤醒
pthread_cond_wait(>PrdCond, >QueLock);
EnterQue(>Queue, GetQueTail(>Queue)); //将队列元素下标作为元素值入队
if(QueDataNum(>Queue) == 1) //当生产者开始产出后,通知(唤醒)消费者线程
pthread_cond_broadcast(>CsmCond);
printf("[Producer %2ld]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue));
pthread_mutex_unlock(>QueLock);
sleep(rand()%DELAY_TIME + 1);
}
}
void *ConsumerThread(void *pvArg)
{
pthread_detach(pthread_self());
intptr_t dwThrdNo = (intptr_t)pvArg;
while(1)
{
pthread_mutex_lock(>QueLock);
while(IsQueEmpty(>Queue)) //队列由空变为非空时,消费者线程将被唤醒
pthread_cond_wait(>CsmCond, >QueLock);
if(GetQueHead(>Queue) != GetQueHeadData(>Queue))
{
printf("[Consumer %2ld]Product: %d, Expect: %d\n", dwThrdNo,
GetQueHead(>Queue), GetQueHeadData(>Queue));
exit(0);
}
LeaveQue(>Queue);
if(QueDataNum(>Queue) == (PRD_NUM-1)) //当队列由满变为非满时,通知(唤醒)生产者线程
pthread_cond_broadcast(>PrdCond);
printf("[Consumer %2ld]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue));
pthread_mutex_unlock(>QueLock);
sleep(rand()%DELAY_TIME + 1);
}
}
int main(void)
{
InitQue(>Queue);
srand(getpid()); //初始化随机函数发生器
pthread_t aThrd[CONSUMER_NUM+PRODUCER_NUM];
int dwThrdIdx;
for(dwThrdIdx = 0; dwThrdIdx < CONSUMER_NUM; dwThrdIdx++)
{ //创建CONSUMER_NUM个消费者线程,传入(void*)dwThrdIdx作为ConsumerThread的参数
pthread_create(&aThrd[dwThrdIdx], NULL, ConsumerThread, (void*)(intptr_t)dwThrdIdx);
}
sleep(2);
for(dwThrdIdx = 0; dwThrdIdx < PRODUCER_NUM; dwThrdIdx++)
{
pthread_create(&aThrd[dwThrdIdx+CONSUMER_NUM], NULL, ProducerThread, (void*)(intptr_t)dwThrdIdx);
}
while(1);
return 0 ;
}
在主函数中,先初始化互斥锁和条件变量,然后创建消费者线程和生产者线程,最后进行阻塞等待。在生产者线程中,使用条件变量实现队列非满时才进行入队操作,并使用生产者唤醒消费者的方式通知队列非空;在消费者线程中,使用条件变量实现队列非空时才进行出队操作,并使用消费者唤醒生产者的方式通知队列非满。这样就能够实现基本的生产者消费者同步。
四、实验结果:
1、循环队列测试结果
2、生产者-消费者示例运行结果
五、源代码
1、循环队列
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#define QUEUE_SIZE 21//
typedef struct{
/* data */
int aData[QUEUE_SIZE];
int dwHead;
int dwTail;
}T_QUEUE, *PT_QUEUE;
int IsQueFull(PT_QUEUE ptQue) {
return (ptQue->dwTail+1) %QUEUE_SIZE == ptQue->dwHead;
}
int IsQueEmpty(PT_QUEUE ptQue) {
return ptQue->dwHead == ptQue->dwTail;
}
int QueDataNum(PT_QUEUE ptQue) {
return (ptQue->dwHead - ptQue->dwTail + QUEUE_SIZE) % QUEUE_SIZE;
}
int GetQueHead(PT_QUEUE ptQue) {
return ptQue->dwHead;
}
int GetQueHeadData(PT_QUEUE ptQue) {
return ptQue->aData[ptQue->dwHead];
}
int GetQueTail(PT_QUEUE ptQue) {
return ptQue->dwTail;
}
void InitQue(PT_QUEUE ptQue) {
memset(ptQue,0, sizeof(*ptQue));
}
void EnterQue(PT_QUEUE ptQue, int dwElem)
{
if(IsQueFull(ptQue))
{
printf("Elem %d cannot enter Queue %p(Full)!\n", dwElem, ptQue);
return;
}
ptQue->aData[ptQue->dwTail]= dwElem;
ptQue->dwTail = (ptQue->dwTail + 1) % QUEUE_SIZE;
}
int LeaveQue(PT_QUEUE ptQue) {
if (IsQueEmpty(ptQue))
{
/* code */
printf("Queue %p is Empty!\n", ptQue);
return -1;
}
int dwElem = ptQue->aData[ptQue->dwHead];
ptQue->dwHead = (ptQue->dwHead+1)%QUEUE_SIZE;
return dwElem;
}
void DisplayQue(PT_QUEUE ptQue) {
if (IsQueEmpty(ptQue))
{
/* code */
printf("Queue %p is Empty!\n", ptQue);
return ;
}
printf("Queue Element: ");
int dwIdx = ptQue->dwHead;
while((dwIdx % QUEUE_SIZE) != ptQue->dwTail)
printf("%d", ptQue->aData[(dwIdx++) % QUEUE_SIZE]);
printf("\n");
}
// 测试函数
static T_QUEUE gtQueue;
void QueueTest(void)
{
InitQue(>Queue);
printf("Enter Queue 1,2,3,4,5...\n");
EnterQue(>Queue, 1);
EnterQue(>Queue, 2);
EnterQue(>Queue, 3);
EnterQue(>Queue, 4);
EnterQue(>Queue, 5);
printf("Queue Status: Empty(%d), Full(%d)\n", IsQueEmpty(>Queue), IsQueFull(>Queue));
printf("Queue Elem Num: %u\n", QueDataNum(>Queue));
printf("Head: %u(%d)\n", GetQueHead(>Queue), GetQueHeadData(>Queue));
printf("Tail: %u\n", GetQueTail(>Queue));
DisplayQue(>Queue);
printf("\nLeave Queue...\n");
printf("Leave %d\n", LeaveQue(>Queue));
printf("Leave %d\n", LeaveQue(>Queue));
printf("Leave %d\n", LeaveQue(>Queue));
DisplayQue(>Queue);
printf("\nEnter Queue 6,7...\n");
EnterQue(>Queue, 6);
EnterQue(>Queue, 7);
DisplayQue(>Queue);
printf("\nLeave Queue...\n");
printf("Leave %d\n", LeaveQue(>Queue));
printf("Leave %d\n", LeaveQue(>Queue));
printf("Leave %d\n", LeaveQue(>Queue));
DisplayQue(>Queue);
}
// 在当前函数测试时打开
// int main() {
// QueueTest();
// return -1;
// }
2、生产消费者
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include <stdint.h>//用于定义intptr_t类型
#include<pthread.h>//线程相关操作的头文件
#include<semaphore.h> //信号量相关操作的头文件
#include "T_QUEUE.c"//自定义队列的头文件
#define PRODUCER_NUM 5 //生产者线程数量
#define CONSUMER_NUM 3 //消费者线程数量
#define PRD_NUM 20//队列最大容量
#define DELAY_TIME 3 //任务生产或消费的最长延迟时间
static T_QUEUE gtQueue;//定义一个队列的全局变量
pthread_mutex_t gtQueLock = PTHREAD_MUTEX_INITIALIZER;//定义静态互斥锁
pthread_cond_t gtPrdCond = PTHREAD_COND_INITIALIZER; //定义生产者条件变量,当队列未满时通知生产者继续生产,只在添加元素时使用
pthread_cond_t gtCsmCond = PTHREAD_COND_INITIALIZER; //定义消费者条件变量,当队列不为空时通知消费者继续消费,只在删除元素时使用
void *ProducerThread(void *pvArg)//生产者线程函数
{
pthread_detach(pthread_self());//以分离状态创建线程,释放线程运行后的资源
intptr_t dwThrdNo = (intptr_t)pvArg;//强制类型转换,获取线程编号,并将pvArg转化为intptr_t类型
while(1)//无限生产
{
pthread_mutex_lock(>QueLock);//上锁,保证线程操作间的互斥性
while(IsQueFull(>Queue))//如果队列满了,等待条件变量
pthread_cond_wait(>PrdCond, >QueLock);//等待是否队列非满状态
EnterQue(>Queue, GetQueTail(>Queue)); //把新产品放到队列尾部
if(QueDataNum(>Queue) == 1)//如果现在队列中就只有一个产品
pthread_cond_broadcast(>CsmCond);//广播消费者,通知其可以消费了
printf("[Producer %2ld]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue));//打印当前队列中产品的数量
pthread_mutex_unlock(>QueLock);//解锁,方便其他线程进行操作
sleep(rand()%DELAY_TIME + 1);//模拟生产时间
}
}
void *ConsumerThread(void *pvArg)//消费者线程函数
{
pthread_detach(pthread_self());//以分离状态创建线程,释放线程运行后的资源
intptr_t dwThrdNo = (intptr_t)pvArg;//强制类型转换,获取线程编号,并将pvArg转化为intptr_t类型
while(1)//无限消费
{
pthread_mutex_lock(>QueLock);//上锁,保证线程操作间的互斥性
while(IsQueEmpty(>Queue))//如果队列为空,等待条件变量
pthread_cond_wait(>CsmCond, >QueLock);//等待是否队列非空状态
if(GetQueHead(>Queue) != GetQueHeadData(>Queue))//取队列头部产品并且和期望产品编号比较
{
printf("[Consumer %2ld]Product: %d, Expect: %d\n", dwThrdNo,
GetQueHead(>Queue), GetQueHeadData(>Queue));
exit(0);//如果不匹配则退出程序
}
LeaveQue(>Queue);//取出队列头部
if(QueDataNum(>Queue) == (PRD_NUM-1))//如果现在队列中就只有一个产品
pthread_cond_broadcast(>PrdCond);//广播生产者,通知其可以生产了
printf("[Consumer %2ld]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue));//打印当前队列中产品的数量
pthread_mutex_unlock(>QueLock);//解锁,方便其他线程进行操作
sleep(rand()%DELAY_TIME + 1);//模拟消费时间
}
}
int main(void)
{
InitQue(>Queue);//初始化队列
srand(getpid()); //以当前进程ID作为随机数生成种子
pthread_t aThrd[CONSUMER_NUM+PRODUCER_NUM];//线程号数组
int dwThrdIdx;
for(dwThrdIdx = 0; dwThrdIdx < CONSUMER_NUM; dwThrdIdx++)//创建消费者线程
{
pthread_create(&aThrd[dwThrdIdx], NULL, ConsumerThread, (void*)(intptr_t)dwThrdIdx);
}
sleep(2);//留2秒的时间给消费者线程准备
for(dwThrdIdx = 0; dwThrdIdx < PRODUCER_NUM; dwThrdIdx++)//创建生产者线程
{
pthread_create(&aThrd[dwThrdIdx+CONSUMER_NUM], NULL, ProducerThread, (void*)(intptr_t)dwThrdIdx);
}
while(1);//永远运行主线程
return 0 ;
}