分布式选修课上讲了多线程编程,布置了一个生产者消费者的作业,觉得挺有意思,并且网络上的消费者问题多使用c语言编写,故在此记录c++解决方法
由于是消费者线程各自计数,故使用一个全局数组变量存储各线程的结果,最后在主函数求和,主要结构及解释如下
完整代码如下
#include<iostream>
#include<pthread.h>
using namespace std;
int ans[100];
pthread_mutex_t count_mutex;
pthread_cond_t condc;
pthread_cond_t condp;
int store[11]; //定义缓冲区,store[0]代表空,不使用
int flag = 0; //栈标记
int consum = 1; //记录消费数量
struct sum_data //记录是第几个消费者线程
{
int number;
};
void *consumer(void *threadarg) //消费者
{
struct sum_data *my_data;
my_data = (struct sum_data *) threadarg;
int s = 0;
while(1)
{
pthread_mutex_lock(&count_mutex);
while(flag == 0 && consum <= 100)
{
pthread_cond_wait(&condc,&count_mutex);
}
if(consum > 100)
{
pthread_cond_signal(&condc);
pthread_mutex_unlock(&count_mutex);
break;
}
s += store[flag];
flag -= 1;
consum += 1;
pthread_cond_signal(&condp);
pthread_mutex_unlock(&count_mutex);
}
ans[my_data->number] = s;
pthread_exit(NULL);
}
void *producer(void *threadarg) //生产者
{
int i;
for(i = 1;i <= 100;i++)
{
pthread_mutex_lock(&count_mutex);
while(flag == 10)
{
pthread_cond_wait(&condp,&count_mutex);
}
flag += 1;
store[flag] = i;
pthread_cond_signal(&condc);
pthread_mutex_unlock(&count_mutex);
}
pthread_exit(NULL);
}
int main()
{
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&condc, NULL);
pthread_cond_init(&condp, NULL);
int N,M;
N = 5;
int ret = 0;
pthread_t tids[100];
struct sum_data td[100];
pthread_t produce;
ret = pthread_create(&produce,NULL,producer,NULL); //生产者线程创建
for(int i = 1;i <= N;i++) //消费者线程创建
{
td[i].number = i-1;
ret = pthread_create(&tids[i],NULL,consumer,(void *)&td[i]);
}
for(int i = 1;i <= N;i++) //等待消费者线程
{
pthread_join(tids[i], NULL);
}
int sum = 0;
for(int i = 0;i < N;i++)
{
sum += ans[i];
}
cout << sum;
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&condc);
pthread_cond_destroy(&condp);
pthread_exit(NULL);
return 0;
}
完成基础问题后,老师提出可以改进成任意指定生产者与消费者数量的形式,故进行了扩展,将一个生产者,五个消费者的情况拓展为数量为任意输入值,其消费者的线程结构同上,不再重复解释,只解释生产者的改动
完整代码如下
#include<iostream>
#include<pthread.h>
using namespace std;
int ans[100];
pthread_mutex_t count_mutex;
pthread_cond_t condc;
pthread_cond_t condp;
int store[11]; //定义缓冲区
int flag = 0; //栈标记
int consum = 1; //记录消费数量
int produce = 1; //记录生产数量
struct sum_data
{
int number;
};
void *consumer(void *threadarg) //消费者
{
struct sum_data *my_data;
my_data = (struct sum_data *) threadarg;
int s = 0;
while(1)
{
pthread_mutex_lock(&count_mutex);
while(flag == 0 && consum <= 100)
{
pthread_cond_wait(&condc,&count_mutex);
}
if(consum > 100)
{
pthread_cond_signal(&condc);
pthread_mutex_unlock(&count_mutex);
break;
}
s += store[flag];
flag -= 1;
consum += 1;
pthread_cond_signal(&condp);
pthread_mutex_unlock(&count_mutex);
}
ans[my_data->number] = s;
pthread_exit(NULL);
}
void *producer(void *threadarg) //生产者
{
while(1)
{
pthread_mutex_lock(&count_mutex);
while(flag == 10)
{
pthread_cond_wait(&condp,&count_mutex);
}
if(produce > 100)
{
pthread_mutex_unlock(&count_mutex);
break;
}
flag += 1;
store[flag] = produce;
produce += 1;
pthread_cond_signal(&condc);
pthread_mutex_unlock(&count_mutex);
}
pthread_exit(NULL);
}
int main()
{
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&condc, NULL); //消费者条件变量
pthread_cond_init(&condp, NULL); //生产者条件变量
int N,M;
cin >> M >> N;
//N = 5;
int ret = 0;
pthread_t tids[100];
struct sum_data td[100];
for(int i = 1;i <= M;i++) //创建生产者线程
{
ret = pthread_create(&tids[i],NULL,producer,NULL);
}
for(int i = 1;i <= N;i++) //创建消费者线程
{
td[i].number = i-1;
ret = pthread_create(&tids[i+M],NULL,consumer,(void *)&td[i]);
}
for(int i = 1;i <= M+N;i++) //等待所有线程
{
pthread_join(tids[i], NULL);
}
int sum = 0;
for(int i = 0;i < N;i++)
{
sum += ans[i];
}
cout << sum;
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&condc);
pthread_cond_destroy(&condp);
pthread_exit(NULL);
return 0;
}
比较值得注意的是两个线程中两个条件变量的使用,以及信号的传导,理清关系才不会无从下手
如果对其中pthread变量和函数的使用不太清楚,建议先了解相关知识
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)