Linux生产者消费者模型(POSIX信号量)

2023-11-16

目录

一.生产者消费者模型

1.基本概念

2.模型特点

3.模型优点 

二.基于BlockingQueue的生产者消费者模型

1.基本概念

2.单生产者、单消费者为例进行模拟实现 

3.基于计算任务的生产者消费者模型

三.POSIX信号量

1.基本概念

2.信号量函数

三. 二元信号量模拟实现互斥功能  

四.基于环形队列的生产者消费者模型

1.生产者和消费者关心不同资源

2.需要遵守的两个原则

3.代码模拟实现

4.信号量保护环形队列 


一.生产者消费者模型

1.基本概念

  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
  • 生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。

                

2.模型特点

(1)生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护

(2)生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

  • 介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。
  • 其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

                         

(3)生产者和消费者之间为什么会存在同步关系?

  • 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
  • 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
  • 虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

(4)让消费者和生产者协同工作,合适的时候可能一直运行,生产者和消费者并不会因为要互相等待对方的结果而阻塞,相当于双方可以并发执行.

(5)为什么要有生产者消费者模型?     本质是用代码进行解耦的过程
                                

 3.模型优点 

  • 解耦。
  • 支持并发。
  • 支持忙闲不均 (哪边的线程忙可以多分配一些线程)

(1)如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合

(2)对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合

                

                

二.基于BlockingQueue的生产者消费者模型

1.基本概念

  • 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
  • 其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素
  • 当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)

                 

2.单生产者、单消费者为例进行模拟实现 

(1) 生产和消费步调一致,生产一个消费一个

①BlockQueue.hpp

#pragma oncec

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

#define NUM 5

template<class T>
class BlockQueue
{
private:
	bool IsFull()
	{
		return _q.size() == _cap;
	}
	bool IsEmpty()
	{
		return _q.empty();
	}

public:
	BlockQueue(int cap = NUM)
		: _cap(cap)
	{
		pthread_mutex_init(&_mutex, nullptr);
		pthread_cond_init(&_full, nullptr);
		pthread_cond_init(&_empty, nullptr);
	}
	~BlockQueue()
	{
		pthread_mutex_destroy(&_mutex);
		pthread_cond_destroy(&_full);
		pthread_cond_destroy(&_empty);
	}

public:
	//向阻塞队列插入数据(生产者调用)
	void Push(const T& data)
	{
		pthread_mutex_lock(&_mutex);
		while (IsFull()){
			//不能进行生产,直到阻塞队列可以容纳新的数据
			pthread_cond_wait(&_full, &_mutex);
		}
		_q.push(data);
		pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程

		pthread_mutex_unlock(&_mutex);
	}

	//从阻塞队列获取数据(消费者调用)
	void Pop(T& data)
	{
		pthread_mutex_lock(&_mutex);
		while (IsEmpty()){
			//不能进行消费,直到阻塞队列有新的数据
			pthread_cond_wait(&_empty, &_mutex);
		}
		data = _q.front();
		_q.pop();

		pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程

		pthread_mutex_unlock(&_mutex);
	}

private:
	std::queue<T> _q; //阻塞队列
	int _cap; //阻塞队列容纳数据的最大个数
	pthread_mutex_t _mutex;
	pthread_cond_t _full;  
	pthread_cond_t _empty; 
};

程序说明:  

  • 由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可。
  • 将BlockingQueue当中存储的数据模板化,方便以后需要时进行复用。
  • 这里设置BlockingQueue存储数据的上限为5,当阻塞队列中存储了五组数据时生产者就不能进行生产了,此时生产者就应该被阻塞。
  • 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
  • 生产者线程要向阻塞队列当中Push数据,前提是阻塞队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中有空间时再将其唤醒。
  • 消费者线程要从阻塞队列当中Pop数据,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要进行等待,直到阻塞队列中有新的数据时再将其唤醒。
  • 因此在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在empty条件变量下进行等待。
  • 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。
  • 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程。
  • 当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在full条件变量下进行等待,因此当消费者消费完数据后需要唤醒在full条件变量下等待的生产者线程。

                                 

判断是否满足生产消费条件时不能用if,而应该用while:

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行,就会出现错误(没有数据还拿,没有空间还放)。
  • 在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。
     

②main.cc

#include "BlockQueue.hpp"

void* Producer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//生产者不断进行生产
	while (true){
		sleep(1);
		int data = rand() % 100 + 1;
		bq->Push(data); //生产数据
		std::cout << "Producer: " << data << std::endl;
	}
}

void* Consumer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//消费者不断进行消费
	while (true){
		sleep(1);
		int data = 0;
		bq->Pop(data); //消费数据
		std::cout << "Consumer: " << data << std::endl;
	}
}

int main()
{
	srand((unsigned int)time(nullptr));//生成随机数

	pthread_t producer, consumer;
	BlockQueue<int>* bq = new BlockQueue<int>;

	//创建生产者线程和消费者线程
	pthread_create(&producer, nullptr, Producer, bq);
	pthread_create(&consumer, nullptr, Consumer, bq);

	//join生产者线程和消费者线程
	pthread_join(producer, nullptr);
	pthread_join(consumer, nullptr);

	delete bq;
	return 0;
}

程序说明:

  • 阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。

                

③结果 : 生产者是每隔一秒生产一个数据,所以消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。

        

 (2)生产快,消费慢

①代码: 只需要改变main.cc中的执行函数

void* Producer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//生产者不断进行生产
	while (true){
		int data = rand() % 100 + 1;
		bq->Push(data); //生产数据
		std::cout << "Producer: " << data << std::endl;
	}
}

void* Consumer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//消费者不断进行消费
	while (true){
		sleep(1);
		int data = 0;
		bq->Pop(data); //消费数据
		std::cout << "Consumer: " << data << std::endl;
	}
}

 ②结果: 生产者生产的很快,运行后一瞬间生产者就将阻塞队列打满了,此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了 ; 顺序消费。

                                                        

(3)生产慢,消费快

①代码

void* Producer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//生产者不断进行生产
	while (true){
		sleep(1);
		int data = rand() % 100 + 1;
		bq->Push(data); //生产数据
		std::cout << "Producer: " << data << std::endl;
	}
}
void* Consumer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//消费者不断进行消费
	while (true){
		int data = 0;
		bq->Pop(data); //消费数据
		std::cout << "Consumer: " << data << std::endl;
	}
}

②结果 : 虽然消费者消费的很快,但一开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的。

                

(4) 生产/消费超过一定高低水位线再通知另一方

①代码 : 生产者生产快不sleep,消费者消费慢sleep

//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
	pthread_mutex_lock(&_mutex);
	while (IsFull()){
		//不能进行生产,直到阻塞队列可以容纳新的数据
		pthread_cond_wait(&_full, &_mutex);
	}
	_q.push(data);

    //高低水位线
	if (_q.size() >= _cap / 2){
		pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
	}

	pthread_mutex_unlock(&_mutex);
}

//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{
	pthread_mutex_lock(&_mutex);
	while (IsEmpty()){
		//不能进行消费,直到阻塞队列有新的数据
		pthread_cond_wait(&_empty, &_mutex);
	}
	data = _q.front();
	_q.pop();

    //高低水位线
	if (_q.size() <= _cap / 2){
		pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
	}

	pthread_mutex_unlock(&_mutex);
}

②结果 :运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。

                

3.基于计算任务的生产者消费者模型

(1)代码 

①Task.hpp : 定义一个Task类,这个类当中包含一个Run成员函数,该函数代表着如何让消费者处理拿到的数据

#pragma once
#include <iostream>

class Task
{
public:
	Task(int x = 0, int y = 0, int op = 0)
		: _x(x), _y(y), _op(op)
	{}
	~Task()
	{}

	void Run()
	{
		int result = 0;
		switch (_op)
		{
		case '+':
			result = _x + _y;
			break;
		case '-':
			result = _x - _y;
			break;
		case '*':
			result = _x * _y;
			break;
		case '/':
			if (_y == 0){
				std::cout << "Warning: div zero!" << std::endl;
				result = -1;
			}
			else{
				result = _x / _y;
			}
			break;
		case '%':
			if (_y == 0){
				std::cout << "Warning: mod zero!" << std::endl;
				result = -1;
			}
			else{
				result = _x % _y;
			}
			break;
		default:
			std::cout << "error operation!" << std::endl;
			break;
		}
		std::cout << _x << _op << _y << "=" << result << std::endl;
	}

private:
	int _x;
	int _y;
	char _op;
};

②main.cc : 对线程执行函数稍作修改, 此时生产者放入阻塞队列的数据就是一个Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。

void* Producer(void* arg)
{
	BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
	const char* arr = "+-*/%";
	//生产者不断进行生产
	while (true){
		int x = rand() % 100;
		int y = rand() % 100;
		char op = arr[rand() % 5];
		Task t(x, y, op);
		bq->Push(t); //生产数据
		std::cout << "producer task done" << std::endl;
	}
}

void* Consumer(void* arg)
{
	BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
	//消费者不断进行消费
	while (true){
		sleep(1);
		Task t;
		bq->Pop(t); //消费数据
		t.Run(); //处理数据
	}
}

                 

(2)结果 : 运行代码,当阻塞队列被生产者打满后消费者被唤醒,此时消费者在消费数据时执行的就是计算任务,当阻塞队列当中的数据被消费到低水位线时又会唤醒生产者进行生产。

                

 (3)小结

我们想让生产者消费者模型处理某一种任务时,大体的框架已经搭建好了,就只需要提供对应的Task类,然后让该Task类提供一个对应的Run成员函数告诉我们应该如何处理这个任务即可

                                

                

        

                

三.POSIX信号量

1.基本概念

  • 信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理
  • 每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特定的临界资源的权限,当操作完毕后就应该释放信号量
  • 信号量存在的价值 : 1.进行同步与互斥    2.更细粒度的临界资源的管理
     

(1)为什么要有信号量? 提高效率

  • 当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
  • 如果我们将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。        

(2)信号量的PV操作:

  • P操作:我们将申请信号( count-- )量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
  • V操作:我们将释放信号量( count++ )称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。

(3)PV操作必须是原子操作

  • 多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
  • 但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
  • 内存当中变量的++、--操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++、--操作。

(4)申请信号量失败被挂起等待

  • 当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
  •  信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。

(5)信号量结构的大致理解伪代码:

                 

2.信号量函数

(1) 初始化

函数 :  int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:需要初始化的信号量。
  • pshared:传入0值表示线程间共享,传入非零值表示进程间共享(常用0)
  • value:信号量的初始值(计数器的初始值)。

返回值: 初始化信号量成功返回0,失败返回-1。


注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。

(2)销毁

函数 : int sem_destroy(sem_t *sem);
 

参数: sem:需要销毁的信号量。

返回值:销毁信号量成功返回0,失败返回-1。

(3)等待信号量(申请信号量 ,P())

函数 : int sem_wait(sem_t *sem);


参数:sem:需要等待的信号量。


返回值:

  • 等待信号量成功返回0,信号量的值减一
  • 等待信号量失败返回-1,信号量的值保持不变

(4)发布信号量(释放信号量,V())

函数 : int sem_post(sem_t *sem);

参数:sem:需要发布的信号量。


返回值:

  • 发布信号量成功返回0,信号量的值加一
  • 发布信号量失败返回-1,信号量的值保持不变

        

                

                

三. 二元信号量模拟实现互斥功能  

  • 信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。
  • 信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。

示例,实现一个多线程抢票系统,使用二元信号量模拟实现多线程互斥,,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。

1.代码

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

class Sem{
public:
	Sem(int num)
	{
		sem_init(&_sem, 0, num);
	}
	~Sem()
	{
		sem_destroy(&_sem);
	}
	void P()
	{
		sem_wait(&_sem);
	}
	void V()
	{
		sem_post(&_sem);
	}
private:
	sem_t _sem;
};


Sem sem(1); //二元信号量
int tickets = 1000;

void* TicketGrabbing(void* arg)
{
	std::string name = (char*)arg;
	while (true){
		sem.P();
		if (tickets > 0){
			usleep(1000);
			std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;
			sem.V();
		}
		else{
			sem.V();
			break;
		}
	}
	std::cout << name << " quit..." << std::endl;
	pthread_exit((void*)0);
}

int main()
{
	pthread_t tid1, tid2, tid3, tid4;
	pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
	pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
	pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
	pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");
	
	pthread_join(tid1, nullptr);
	pthread_join(tid2, nullptr);
	pthread_join(tid3, nullptr);
	pthread_join(tid4, nullptr);
	return 0;
}

2.结果 : 未出现数据不一致问题 

                

                

                        

四.基于环形队列的生产者消费者模型

1.生产者和消费者关心不同资源

(1)生产者关注的是空间资源,消费者关注的是数据资源

  • 生产者关注的是环形队列当中是否有空间(blank),只要有空间生产者就可以进行生产。
  • 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。

         

(2)blank_sem和data_sem的初始值设置

现在我们用信号量来描述环形队列当中的空间资源(blank_sem)和数据资源(data_sem),在我们初始信号量时给它们设置的初始值是不同的:

  • blank_sem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
  • data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。

                 

(3)生产者和消费者申请和释放资源

①生产者申请空间资源,释放数据资源

对于生产者来说,生产者每次生产数据前都需要先申请blank_sem:

  • 如果blank_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
  • 如果blank_sem的值为0,则信号量申请失败,此时生产者需要在blank_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。

当生产者生产完数据后,应该释放data_sem:

  • 虽然生产者在进行生产前是对blank_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作而不是blank_sem。
  • 生产者在生产数据前申请到的是blank位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是blank位置,而应该是data位置。
  • 当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此我们应该对data_sem进行V操作。

                         

②消费者申请数据资源,释放空间资源

对于消费者来说,消费者每次消费数据前都需要先申请data_sem:

  • 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
  • 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。

当消费者消费完数据后,应该释放blank_sem:

  • 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对blank_sem进行V操作而不是data_sem。
  • 消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作blank位置,而不是data位置。
  • 当消费者消费完数据后,意味着环形队列当中多了一个blank位置,因此我们应该对blank_sem进行V操作。

                 

 2.需要遵守的两个原则

(1)生产者和消费者不能对同一个位置进行访问(互斥)

  • 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
  • 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。

(2)无论是生产者还是消费者,都不应该将对方套一个圈以上(格子的数量有限)

  • 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
  • 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
     

                         

3.代码模拟实现

(1) 代码

①RingQueue.hpp

#pragma once

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>

#define NUM 10

template<class T>
class RingQueue
{
private:
	//P操作
	void P(sem_t& s)
	{
		sem_wait(&s);
	}
	//V操作
	void V(sem_t& s)
	{
	    sem_post(&s);
	}
public:
	RingQueue(int cap = NUM)
		: _cap(cap), _p_pos(0), _c_pos(0)
	{
		_q.resize(_cap);
		sem_init(&_blank_sem, 0, _cap); //blank_sem初始值设置为环形队列的容量
		sem_init(&_data_sem, 0, 0); //data_sem初始值设置为0
	}
	~RingQueue()
	{
	sem_destroy(&_blank_sem);
	sem_destroy(&_data_sem);
	}

public:
	//向环形队列插入数据(生产者调用)
	void Push(const T& data)
	{
		P(_blank_sem); //生产者关注空间资源
		_q[_p_pos] = data;
		V(_data_sem); //生产

		//更新下一次生产的位置
		_p_pos++;
		_p_pos %= _cap;
	}

	//从环形队列获取数据(消费者调用)
	void Pop(T& data)
	{
		P(_data_sem); //消费者关注数据资源
		data = _q[_c_pos];
		V(_blank_sem);

		//更新下一次消费的位置
		_c_pos++;
		_c_pos %= _cap;
	}

private:
	std::vector<T> _q; //环形队列
	int _cap; //环形队列的容量上限
	int _p_pos; //生产位置
	int _c_pos; //消费位置
	sem_t _blank_sem; //描述空间资源
	sem_t _data_sem; //描述数据资源
};

程序解释: 

  • 当不设置环形队列的大小时,我们默认将环形队列的容量上限设置为10。
  • 代码中的RingQueue是用vector实现的,生产者每次生产的数据放到vector下标为p_pos的位置,消费者每次消费的数据来源于vector下标为c_pos的位置。
  • 生产者每次生产数据后p_pos都会进行++,标记下一次生产数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
  • 消费者每次消费数据后c_pos都会进行++,标记下一次消费数据的来源位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
  • p_pos只会由生产者线程进行更新,c_pos只会由消费者线程进行更新,对这两个变量访问时不需要进行保护,因此代码中将p_pos和c_pos的更新放到了V操作之后,就是为了尽量减少临界区的代码 , 提高效率
  • 这里也可以设置高低水位线,数据/空格子数量达到一定程度再消费
     

②main.cc实现单生产者,单消费者模型

#include "RingQueue.hpp"

void* Producer(void* arg)
{
	RingQueue<int>* rq = (RingQueue<int>*)arg;
	while (true){
		sleep(1);
		int data = rand() % 100 + 1;
		rq->Push(data);
		std::cout << "Producer: " << data << std::endl;
	}
}

void* Consumer(void* arg)
{
	RingQueue<int>* rq = (RingQueue<int>*)arg;
	while (true){
		sleep(1);
		int data = 0;
		rq->Pop(data);
		std::cout << "Consumer: " << data << std::endl;
	}
}

int main()
{
	srand((unsigned int)time(nullptr));

	pthread_t producer, consumer;
	RingQueue<int>* rq = new RingQueue<int>;
	pthread_create(&producer, nullptr, Producer, rq);
	pthread_create(&consumer, nullptr, Consumer, rq);
	
	pthread_join(producer, nullptr);
	pthread_join(consumer, nullptr);
	delete rq;
	return 0;
}
  •  环形队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个环形队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将环形队列作为线程执行例程的参数进行传入

                 

(2)结果

①生产者消费者同步

②生产快消费慢

  • 生产者生产的很快,运行代码后一瞬间生产者就将环形队列打满了,此时生产者想要再进行生产,但空间资源已经为0了,于是生产者只能在blank_sem的等待队列下进行阻塞等待,直到由消费者消费完一个数据后对blank_sem进行了V操作,生产者才会被唤醒进而继续进行生产。
  • 但由于生产者的生产速度很快,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。

                 

③生产慢消费快 

  • 虽然消费者消费的很快,但一开始环形队列当中的数据资源为0,因此消费者只能在data_sem的等待队列下进行阻塞等待,直到生产者生产完一个数据后对data_sem进行了V操作,消费者才会被唤醒进而进行消费。
  • 但由于消费者的消费速度很快,消费者消费完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。

                                        

4.信号量保护环形队列 

(1)在blank_sem和data_sem两个信号量的保护后,该环形队列中不可能会出现数据不一致的问题。

①因为只有当生产者和消费者指向同一个位置并访问时,才会导致数据不一致的问题,而此时生产者和消费者在对环形队列进行写入或读取数据时,只有两种情况会指向同一个位置:

  • 环形队列为空时
  • 环形队列为满时

②但是在这两种情况下,生产者和消费者不会同时对环形队列进行访问:

  • 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0
  • 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0

                         

(2)当环形队列为空和满时,我们已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行.
 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux生产者消费者模型(POSIX信号量) 的相关文章

  • 干净地销毁System V共享内存段

    我在用shmget shmat and shmctl分别获取和创建共享内存段 将其附加到进程地址空间中并删除它 我想知道进程是否仍然可以使用共享内存段 即使它已被分离并要求使用删除 shmctl id IPC RMID 在一个过程中 我无法
  • 类似 wget 的 BitTorrent 客户端或库? [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 是否有任何
  • 访问 Linux 线程(pthreads)的本地堆栈

    我目前正在实现一个使用多线程但对总内存消耗有要求的应用程序 我希望有一个主线程执行 I O 并有几个工作线程执行计算 目前 我在主堆栈上有几个可供工作人员访问的数据结构 我使用 OpenMP 进行工作分配 由于主 工作者模式不能很好地与 O
  • Linux shell 命令逐块读取/打印文件

    是否有一个标准的 Linux 命令可以用来逐块读取文件 例如 我有一个大小为 6kB 的文件 我想读取 打印第一个 1kB 然后是第二个 1kB 看来猫 头 尾在这种情况下不起作用 非常感谢 你可以这样做read n在循环中 while r
  • 当存在点和下划线时,使用 sed 搜索并替换

    我该如何更换foo with foo sed 只需运行 sed s foo foo g file php 不起作用 逃离 sed s foo foo g file php Example cat test txt foo bar sed s
  • 如何真正释放 Linux 中的大页面以供新进程使用?

    真的找不到太多关于此的信息 希望有人可以提供帮助 我正在假脱机使用 100GB java 堆作为大数据缓存 为了避免与文件系统缓存等内容发生冲突 并且因为它通常性能更好 我将其分配在大页面中 我保留了 51 200 x 2MB 大页面 一切
  • Linux下的C#,Process.Start()异常“没有这样的文件或目录”

    我在使用 Process 类调用程序来启动程序时遇到问题 可执行文件的层次结构位于 bin 目录下 而当前工作目录需要位于 lib 目录下 project bin a out this is what I need to call lib
  • 即使 makefile 和源代码存在,为什么“Build Project”在 Eclipse Helios CDT 中显示为灰色?

    我无法构建我的项目 我在 Eclipse Helios 中创建了一个新的 CDT 项目 并告诉它使用现有的源代码和 makefile 这两者都正确显示在 Package 和 Project 视图中 然而 项目 菜单中的 构建全部 和 构建项
  • bash双括号问题

    我对 bash 脚本非常陌生 在使用双括号时遇到了问题 我似乎无法让它们在 Ubuntu Server 11 10 中工作 我的下面的脚本位于 if test sh 中 bin bash if 14 14 then echo FOO fi
  • Mono 和 WebRequest 速度 - 测试

    在 mono 4 6 2 linux 中 我注意到 wget 下载文件的速度与webclient DownloadString 所以我做了一个小测试来调查 为什么 wget 明显比 C 快 根据我自己的实验 使用 wget 下载 手动读取文
  • 有关 Linux 内存类型的问题

    关于Linux内存我有以下问题 我知道活动内存是最常访问的内存部分 但是有人可以解释一下 linux 如何考虑将内存位置用于活动内存或非活动内存 主动存储器由哪些部分组成 磁盘 文件缓存是否被视为活动内存的一部分 有什么区别Buffers
  • 使用 C++ 输出字符串覆盖 Linux 终端上的最后一个字符串

    假设我有一个命令行程序 有没有办法让我说的时候 std cout lt lt stuff 如果我不做std cout lt lt n 在另一个之间std cout lt lt stuff 东西的另一个输出将覆盖同一行上的最后一个东西 清理行
  • 无法为 Python 3.4 创建工作虚拟环境

    I 安装Python 3 4 2 https docs python org 3 using unix html building python和我的 Linux Mint 17 1 中的 Virtualenv 12 0 5 然后我尝试创建
  • 原生 Linux 应用程序可像 ResHacker 一样编辑 Win32 PE

    我想运行自动修改 dll服务 用户提交特定的 dll 我在服务器上修改它 然后用户可以下载 dll的修改版本 是否有任何本机 Linux 应用程序提供常见的 Win32 PE 修改功能 例如图标 字符串 加速器 对话等 至少提供命令行或脚本
  • 在 Ubuntu 上纯粹通过 bash 脚本安装 mysql 5.7

    我想要一个无需任何手动输入即可安装 MySQL 5 7 实例的 bash 脚本 我正在关注数字海洋教程 https www digitalocean com community tutorials how to install mysql
  • 在中断时获取 current->pid

    我正在Linux调度程序上写一些东西 我需要知道在我的中断到来之前哪个进程正在运行 当前的结构可用吗 如果我在中断处理程序中执行 current gt pid 我是否可以获得我中断的进程的 pid 你可以 current gt pid存在并
  • 是否有可能通过 mmap 匿名内存“打孔”?

    考虑一个使用大量大致页面大小的内存区域 例如 64 kB 左右 的程序 每个内存区域的寿命都相当短暂 在我的特定情况下 这些是绿色线程的替代堆栈 如何最好地分配这些区域 以便一旦该区域不再使用 它 们的页面可以返回到内核 天真的解决方案显然
  • 如何在两个不同帐户之间设置无密码身份验证

    我们可以在两台机器的两种不同用途之间设置无密码身份验证吗 例如 计算机A有用户A 计算机B有用户B 我们可以设置密码 ssh 以便计算机 A 上的用户 A 使用其用户帐户 A 登录计算机 B 谢谢你 如果我理解你的问题 你能设置一下吗ssh
  • 如何仅将整个嵌套目录中的头文件复制到另一个目录,在复制到新文件夹后保持相同的层次结构

    我有一个目录 其中有很多头文件 h 和其他 o 和 c 文件以及其他文件 这个目录里面有很多嵌套的目录 我只想将头文件复制到一个单独的目录 并在新目录中保留相同的结构 cp rf oldDirectory newDirectory将复制所有
  • Windows 与 Linux 文本文件读取

    问题是 我最近从 Windows 切换到 Ubuntu 我的一些用于分析数据文件的 python 脚本给了我错误 我不确定如何正确解决 我当前仪器的数据文件输出如下 Header 有关仪器等的各种信息 Data 状态 代码 温度 字段等 0

随机推荐

  • Pycharm中的常用快捷方式

    最重要的快捷键 ctrl shift A 万能命令行 shift两次 查看资源文件 新建工程第一步操作 module设置把空包分层去掉 compact empty middle package 设置当前的工程是utf 8 设置的Editor
  • 关于OCA,OCP,OCM认证的的区别

    可能大家知道OCA OCP OCM的关系是一个比一个难考 一个比一个含金量高 但是你知道具体的考试科目 考试方式 就业形势区别吗 不知道的话这篇通俗易懂的文章会让你一目了然 区别一 含金量 OCA 数据库专业人员踏上Oracle数据库认证之
  • MySQL数据库入门超级详细教程

    文章目录 MySQL 1 数据库软件安装 2 为什么要用数据库 3 什么是数据库 4 数据库管理系统 DBMS 5 MySQL 介绍 6 SQL 6 1 SQL 语句概述 6 2 SQL 基本操作 7 表结构操作 7 1 创建数据表 7 2
  • Windows10系统下彻底删除卸载MySQL

    本文介绍 在Windows10系统下 如何彻底删除卸载MySQL 1 停止MySQL服务 开始 所有应用 Windows管理工具 服务 将MySQL服务停止 2 卸载mysql server 控制面板 所有控制面板项 程序和功能 将mysq
  • vscode+Electron环境搭建 helloword

    0 Electron是什么简介 Electron 简单来说就是一个基于Chrome Nodejs的容器 可以用纯前端的方式实现跨平台的桌面应用开发 代码由js css html构成 它支持把整个项目编译成exe 由于它支持Nodejs 所以
  • sqli-labs(22)

    接下里我们进入第二二关 好像和第21关一样 cookie的base64加密注入 闭合变成了双引号而已 0X01 构造语句进行尝试 union select 1 2 3 IiB1bmlvbiBzZWxlY3QgMSwyLDMj 嘿嘿 好像成功
  • 我的世界超富的java种子_《我的世界》最富有的四个种子,第一名有4个村庄,这科学吗?...

    原标题 我的世界 最富有的四个种子 第一名有4个村庄 这科学吗 投胎是一项技术活 这个定律在Minecraft同样适用 好的出生点意味着好的开始 但不是人人都有这种运气 没关系 有种子嘛 Seed 382686119982684279 出生
  • Unity 改变鼠标指针的方法

    在网上查的帖子 先看一下 Texture2D ClickedCursorImg 把鼠标指针改为ClickedCursorImg Cursor SetCursor ClickedCursorImg Vector2 zero CursorMod
  • Api Savior 文档生成 idea 插件进阶教程

    原文地址见 Github Wiki Spring MVC 注解支持表 注解 注解字段 是否支持 作用描述 备注 RequestMapping value path 支持 绑定一个或多个 url RequestMapping method 支
  • JetBrains系列--工具使用方法

    JetBrains系列 工具使用方法 介绍 常用IDE 2 1 IDEA 2 2 pycharm 2 3 goland 2 4 clion 3 快捷方式 4 说明 JetBrains系列 工具使用方法 介绍 JetBrains 系列IDE是
  • 如何在Vue项目中给路由跳转加上进度条

    在平常浏览网页时 我们会注意到在有的网站中 当点击页面中的链接进行路由跳转时 页面顶部会有一个进度条 用来标示页面跳转的进度 如下图所示 虽然实际用处不大 但是对用户来说 有个进度条会大大减轻用户的等待压力 提升用户体验 本篇文章就来教你如
  • 程序员口中常说的API是什么意思?什么是API?

    什么是API 我的回答 API 应用程序编程接口 一般来说 这是一套明确定义的各种软件组件之间的通信方法 什么是API 我们不妨用一个小故事展示出来 研发人员A开发了软件A 研发人员B正在研发软件B 有一天 研发人员B想要调用软件A的部分功
  • Xilinx Vivado .coe文件生成

    一 COE格式文件生成 由于Quartus ii软件ROM用的是mif格式的文件 且可以用软件Guagle wave生成正弦波 三角波 锯齿波 我们可以利用这个软件先生成数据 然后再将其转化为符合COE格式的文件 具体请参考以下步骤 1 先
  • JavaWeb中如何将JSP文件的编码格式修改为UTF-8

    目录 一 修改原因 二 修改步骤 在使用eclipse学习jsp时 很多默认的编码都是ISO 8859 15 而我们需要使用的是utf 8编码 我们第一个接触改变jsp编码的方式可能都是在需要修改的jsp中修改 如下 将charset与pa
  • python线程与进程概述_1.24

    多进程与多线程 进程 Process 是计算机中的程序关于某数据集合上的一次运行活动 是系统进行资源分配和调度的基本单位 是操作系统结构的基础 线程 Thread 有时被称为轻量级进程 Lightweight Process LWP 是程序
  • Java 20新特性:Scoped Values 作用域值(孵化器)

    以下内容由New Bing自动生成 仅介绍了Scoped Values的部分内容 如果需要详细的Scoped Values信息 可以查阅官方JEP 429文档 Java JEP 429是 JDK 20 中引入的唯一一个新特性 目前还处于孵化
  • android点击按钮弹出输入框,android 弹出框(输入框和选择框)

    1 输入框 final EditText inputServer new EditText this inputServer setFilters new InputFilter new InputFilter LengthFilter 5
  • tcp短连接TIME_WAIT问题解决方法大全(3)——tcp_tw_recycle

    tcp tw recycle和tcp timestamps 参考官方文档 http www kernel org doc Documentation networking ip sysctl txt tcp tw recycle解释如下 t
  • ELK日志收集分析服务

    任务要求 搭建ELK集群 收集日志信息并展示 任务拆解 认识ELK 部署elasticsearch集群并了解其基本概念 安装elasticsearch head实现图形化操作 安装logstash收集日志 安装kibana日志展示 安装fi
  • Linux生产者消费者模型(POSIX信号量)

    目录 一 生产者消费者模型 1 基本概念 2 模型特点 3 模型优点 二 基于BlockingQueue的生产者消费者模型 1 基本概念 2 单生产者 单消费者为例进行模拟实现 3 基于计算任务的生产者消费者模型 三 POSIX信号量 1