Posix信号量

2023-11-01

一、Posix信号量

1.概述

信号量是一种用于提供不同进程间或一个给定进程的不同线程间同步手段的原语。三种类型的信号量:

  1. Posix有名信号量:使用Posix IPC名字标识,可用于进程或线程间的同步。
  2. Posix基于内存的信号量:存放在共享内存区中,可用于进程或线程间的同步。
  3. System V信号量:在内核中维护,可用于进程或线程间的同步。

只考虑不同进程间的同步。首先考虑二值信号量: 其值或为0或为1的信号量。如下图所示:
在这里插入图片描述
图中画出该信号量是由内核来维护的(对于SystemV信号量是正确的),其值可以是0或1。 Posix信号量不必在内核中维护。

Posix信号量是由可能与文件系统中的路径名对应的名字来标识的。因此,下图是Posix有名信号量的更为实际的图示。
在这里插入图片描述

一个进程可以在某个信号量上执行的三种操作:

  1. 创建:调用者指定初始值,二值信号量,通常是1或0。

  2. 等待:测试信号量值,小于或等于0,阻塞;大于0将将其减1。(while(semaphore_value<=0) ; semaphore--;)

  3. 挂出:将信号量的值加1。若有一些进程阻塞着等待该信号量的值变为大于0,其中一个进程现在就可能被唤醒。考虑到访问同一信号量的其他进程,挂出操作也必须是原子的。(semaphore_value++)。

没有使用其值仅为0或1的二值信号量,这种信号量称为计数信号量,二值信号量可用互斥目的,就像互斥锁一样,如下所示:
在这里插入图片描述
信号量初始化为1,sem_wait调用等待其值变为大于0,然后将它减1,sem_post调用则将其值加1(从0变为1),然后唤醒阻塞在sem_wait调用中等待该信号量的任何线程。

除可以像互斥锁那样使用外,信号量还有一个互斥锁没有提供的特性:互斥锁必须总是由锁住它的线程解锁,信号量的挂出却不必由执行过它的等待操作的同一线程执行。

使用两个二值信号量和生产者-消费者问题的一个简化版本提供展示这种特性的一个例子。

下展示了往某个共享缓冲区中放置 一个条目的一 个生产者以及取走该条 目的一个消费者。为简单起见,假设该缓冲区只容纳一个条目。
在这里插入图片描述
下图显示生产者和消费者伪代码:
在这里插入图片描述
步骤:

  1. 生产者初始化缓冲区和两个信号量。

  2. 假设消费者接着运行 。 它阻塞在sem_wait调用中,因为get的值为0。

  3. 一 段时间后生产者接着运行:

    1. 当它调用sem_wait后,put的值由1减为0, 于是生产者往缓冲区中放宽 一 个条目,然后它调用sem_post, 把get的值由0增为1。
    2. 既然有一个线程(即消费者)阻塞在该信号量上等待其值变为正数,该线程将被标记成准备好运行。
    3. 但是假设生产者继续运行,生产者随后会阻塞在for循环顶部的sem_wait调用中,因为put的值为0。生产者必须等待到消费者腾空缓冲区。
  4. 消费者从set_wait调用中返回,将get信号量的值由1减为0。然后处理缓冲区中的数据,然后调用sem_post,把put的值由0增为1。既然有一个线程(生产者)阻塞在该信号量上等待其值变为正数,该线程将被标记成准备好运行。但是假设消费者继续运行,消费者随后会阻塞在for循环顶部的sem_wait调用中,因为get的值为0。

  5. 生产者从sem_wait调用中返回,把数据放入缓冲区中,上述情形循环继续。若每次调用sem_post时,即使当时有一个进程正在等待并随后被标记成准各好运行,调用者也继续运行。是调用者继续运行还是刚变成准备好状态的线程运行无关紧要。

信号量、互斥锁和条件变量之间的三个差异:

  1. 互斥锁必须总是由给它上锁的线程解锁,信号量的挂出却不必由执行过它的等待操作的同一 线程执行。

  2. 互斥锁要么被锁住,要么被解开(二值状态,类似于二值信号量)。

  3. 既然信号量有一个与之关联的状态(它的计数值),则信号量挂出操作总是被记住。

  4. 当向条件变量发送信号时,如果没有线程等待在该条件变量上,那么该信号将丢失。

二、.Posix提供两种信号量:有名信号量和基于内存的信号量

POSIX信号量有两种形式,差异在于创建和销 毁的形式上:

  1. 命名的:命名信号量可以通过名字访问,因此可以被任何已知它们名字的进程中的 线程使用。

  2. 未命名的:未命名信号量只存在于内存中,并要求能使用 信号量的进程必须可以访问内存。意味着只能应用在同一进程中的线 程,或者不同进程中已经映射相同内存内容到它们的地址空间中的线程。

在这里插入图片描述

下图显示某个进程由两个线程共享一个Posix基于内存的信号量。

在这里插入图片描述
下图显示某个共享内存区中由两个进程共享的一个Posix基于内存的信号量。
在这里插入图片描述

三、命名信号量

要使用命名信号量必须要使用下列函数:

  1. sem_open()函数打开或者创建一个信号量并返回一个句柄以供后继调用使用,如果这个调用会创建信号量的话还会对所创建的信号量进行初始化。

  2. sem_post(sem)和 sem_wait(sem)函数分别递增和递减一个信号量值。

  3. sem_getvalue()函数获取一个信号量的当前值。

  4. sem_close()函数删除调用进程与它之前打开的一个信号量之间的关联关系

  5. sem_unlink())函数删除一个信号量名字并将其标记为在所有进程关闭该信号量时删除该信号量。

1.sem_open和sem_close函数

//创建一个新的命名信号量或者使用一个现有信号量
//使用现有信号量,只需指定两个参数:信号量的名字和oflag参数的0值。
#include<semaphore.h>
sem_t *sem_open(const char *name,int oflag,../*mode_t mode,unsigned int value*/);

//返回值:若成功,返回指向信号量的指针;若出错,返回SEM_FAILED
  1. oflag参数有O_CREAT标志集时,若命名信号量不存在,则创建新的;若存在被利用。指定O_CREAT需提供额为两个参数。

  2. mode参数指定谁可以访问信号量,取值和打开文件的权限位相同,赋值给信号量的权限可以被调用者的文件创建屏蔽字修改。只有读和写访问要紧,但是打开一个现有信号量不允许指定模式。实现经常为读和写打开信号量。

  3. value参数用来指定信号量1的初始值,取值范围为0~SEM_VALUE_MAX

  4. 确保创建的是信号量,可以设置参数为O_CREAT|O_EXCL,若信号量已经存在,会导致sem_open失败。

//释放任何信号量相关资源
#include <semaphore.h> 
int sem_close(sem_t *sem); 
//进程没有首先调用sem_close而退出,那么内核将自动关闭任何打开的信号量。
//返回值:若成功,返回0;若出错,返回-1

例:创建有名信号量,运行命令行选项指定独占创建的-e和指定1一个初始值的-i。

int main(int argc, char **argv)
{
	int		c, flags;
	sem_t	*sem;
	unsigned int	value;

	flags = O_RDWR | O_CREAT;
	value = 1;
	while ( (c = getopt(argc, argv, "ei:")) != -1) {
		switch (c) {
		case 'e':
			flags |= O_EXCL;
			break;

		case 'i':
			value = atoi(optarg);
			break;
		}
	}
	if (optind != argc - 1)
		err_quit("usage: semcreate [ -e ] [ -i initalvalue ] <name>");

	sem = sem_open(argv[optind], flags, FILE_MODE, value);

	sem_close(sem);
	exit(0);
}

2.sem_unlink函数

//来销毁一个命名信号量。
#include <semaphore.h> 
int sem_unlink(const char *name);
//如果没有打开的信号量引用,则该信号量会被销毁。否则,销毁将延迟到最后一个打开的引用关闭。
// 返回值:若成功,返回0;若出错,返回-1

例:删除一个有名信号量名字。

int main(int argc, char **argv)
{
	if (argc != 2)
		err_quit("usage: semunlink <name>");

	em_unlink(argv[1]);

	exit(0);
}

3.sem_wait函数

//来实现信号量的减1操作
#include <semaphore.h> 
int sem_trywait(sem_t *sem); 
int sem_wait(sem_t *sem); 
//两个函数的返回值:若成功,返回0;若出错则,返回-1
  1. 使用sem_wait函数时,如果信号量是0则会发生阻塞,,直到成功使用信号量减1或被信号中断才返回。

  2. sem_trywait函数可以避免阻塞,调用此函数时,如果信号量为0,则不会阻塞,而是返回-1并且将errno置为EAGAIN。

下列函数是sem_wait函数的一个变体,允许调用者为被阻塞的时间指定一个限制。

//阻塞一段确定的时间。
#include <semaphore.h> 
#include <time.h> 
int sem_timedwait(sem_t *restrict sem, const struct timespec *restrict tsptr); 
//放弃等待信号量时,用tsptr指定绝对时间,超时是基于CLOCK_REALTIME时钟的。
//信号量可以立即减1,则超时不重要了,尽管指定是过去某个时间,信号量减1操作仍然会成功。
//超时到期并且信号量计数没能减1,此函数将返回-1且errno设置为ETIMEFOUT。
//返回值:若成功,返回0;若出错,返回−1

例:打开一个信号量,调用sem_wait(信号量的当前值小于或等于0,则阻塞结束阻塞信号量减1)。

int main(int argc, char **argv)
{
	sem_t	*sem;
	int		val;

	if (argc != 2)
		err_quit("usage: semwait <name>");

	sem = sem_open(argv[1], 0);
	Sem_wait(sem);
	Sem_getvalue(sem, &val);
	printf("pid %ld has semaphore, value = %d\n", (long) getpid(), val);

	pause();	/* blocks until killed */
	exit(0);
}

5.sem_post函数

//使信号量值增1
#include <semaphore.h> 
int sem_post(sem_t *sem); 
//调用此函数时,在调用sem_wait(或者sem_timedwait)中发生进程阻塞,那么进程会被唤醒并且被sem_post增1的信号量计数会再次被sem_wait(或者sem_timedwait)减1。
//返回值:若成功,返回0;若出错,返回−1

例:挂出有名信号量,然后取得并输出该信号量的值。

int main(int argc, char **argv)
{
	sem_t	*sem;
	int		val;

	if (argc != 2)
		err_quit("usage: sempost <name>");

	sem = sem_open(argv[1], 0);
	Sem_post(sem);
	Sem_getvalue(sem, &val);
	printf("value = %d\n", val);

	exit(0);
}

6.sem_getvalue函数

//来检索信号量值。
#include <semaphore.h>
 int sem_getvalue(sem_t *restrict sem, int *restrict valp); 
 //成功valp指向的整数值将包含信号量值。
 //注:试图使用刚读出的值信号量的值可能改变了,除非使用额外的同步机制来避免这种竞争。否则此函数只能用于调式。
//返回值:若成功,返回0;若出错,返回−1



//例:获取POSIX信号量值
#include <semaphore.h>
#include "tlpi_hdr.h"
int main(int argc, char *argv[])
{
    sem_t *sem;

    if (argc < 2 || strcmp(argv[1], "--help") == 0)
        usageErr("%s sem-name\n", argv[0]);

    sem = sem_open(argv[1], 0);
    if (sem == SEM_FAILED)
        errExit("sem_open");

    if (sem_wait(sem) == -1)
        errExit("sem_wait");

    printf("%ld sem_wait() succeeded\n", (long) getpid());
    exit(EXIT_SUCCESS);
}

四、生产者-消费者问题

解决生产者-消费者问题方案:

1.消费者在生产者完成启动的,使用单个**互斥锁(同步各个生产者)**就能解决问题。

2.消费者在生产者完成启动,解决同步问题使用一个互斥锁(同步各个生产者)加上一个条件变量及其互斥锁(同步生产者和消费者)。

对生产者-消费者问题进行扩展:
把共享缓冲区用作一个环绕缓冲区:生产者填写最后一项([buff[NBUFF-1])后,回过来填写第一项([buff[0]),消费者也这么做。这时增加了同步问题。即生产者不能走到消费者的前面。仍然假设生产者和消费者都是线程,也可为进程,前提存在某种在进程间共享缓冲区的方法。

当缓冲区作为一个环形缓冲区考虑,必须由代码维持以下三个条件:

  1. 当缓冲区为空时,消费者不能试图从其中去除一个条目。

  2. 当缓冲区填满时,生产者不能试图往其中放置一个条目。

  3. 共享变量可能描述缓冲区的当前状态,因此生产者和消费者的所有缓冲区操作都必须保护起来,避免竞争状态。

下面的信号量的方案展示了三种不同类型的信号量:

  1. 名为mutex的二值信号量保护两个临界区:一个是往缓冲区中插入一个数据条目(生产者执行),另一个从共享缓冲区中移走一个数据条目(消费者执行)。用互斥锁的二值信号量初始化为1。

  2. 名为nempty的计数信号量统计共享缓冲区中的空槽位数。该信号量初始化为缓冲区中的槽位数(NBUFF)。

  3. 名为nstored的计数信号量统计共享缓冲区中已填写的槽位数。该信号量初始化为0,因为缓冲区开始是空的。

下图展示了程序完成初始化时缓冲区及两个计算信号量的状态。未用的数组元素标以阴影。

在这里插入图片描述

例子中,生产者只是把 0~(NLOOP-1) 存放到共享缓冲区中(buff[0]= 0,buff[1]= 1, 等等),并把该缓冲区用作一 个环绕缓冲区。

消费者从该缓冲区取出这些整数,井验证它们是正确的,若有错误则输出到标准输出上。

下图展示了在生产者往共享缓冲区放置了3个条目之后,但在消息者从该缓冲区取走其中任何条目之前该缓冲区和两个计数信号量的状态。
在这里插入图片描述

接着假设消费者从缓冲区中移走一个条目,如下图所示:

在这里插入图片描述


#define	NBUFF	 10
#define	SEM_MUTEX	"mutex"	 	/* t这些是px_ipc_name()的参数*/
#define	SEM_NEMPTY	"nempty"
#define	SEM_NSTORED	"nstored"

int		nitems;					/* 生产者和消费者只读*/

/* 生产者和消费者共享的数据*/
struct {	
  int	buff[NBUFF];
  sem_t	*mutex, *nempty, *nstored;
} shared;

void	*produce(void *), *consume(void *);

int main(int argc, char **argv)
{
	pthread_t	tid_produce, tid_consume;

	if (argc != 2)
		err_quit("usage: prodcons1 <#items>");
	nitems = atoi(argv[1]);

		/* 创建三个信号量*/
	shared.mutex = sem_open(Px_ipc_name(SEM_MUTEX), O_CREAT | O_EXCL,
							FILE_MODE, 1);
	shared.nempty =sem_open(Px_ipc_name(SEM_NEMPTY), O_CREAT | O_EXCL,
							 FILE_MODE, NBUFF);
	shared.nstored = sem_open(Px_ipc_name(SEM_NSTORED), O_CREAT | O_EXCL,
							  FILE_MODE, 0);

		/* 创建一个生产者线程和一个消费者线程 */
	set_concurrency(2);
	pthread_create(&tid_produce, NULL, produce, NULL);
	pthread_create(&tid_consume, NULL, consume, NULL);

		/* 等待两个线程 */
	pthread_join(tid_produce, NULL);
	pthread_join(tid_consume, NULL);

		/* 移除信号灯 */
	sem_unlink(px_ipc_name(SEM_MUTEX));
	sem_unlink(px_ipc_name(SEM_NEMPTY));
	sem_unlink(px_ipc_name(SEM_NSTORED));
	exit(0);
}


/* 包括prodcon*/
void * produce(void *arg)
{
	int		i;

	for (i = 0; i < nitems; i++) {
		Sem_wait(shared.nempty);	/*等待至少1个空插槽 */
		Sem_wait(shared.mutex);
		shared.buff[i % NBUFF] = i;	/* 将i存储到循环缓冲区 */
		Sem_post(shared.mutex);
		Sem_post(shared.nstored);	/* 又存储了1个项目 */
	}
	return(NULL);
}

void * consume(void *arg)
{
	int		i;

	for (i = 0; i < nitems; i++) {
		Sem_wait(shared.nstored);		/* 等待至少1个存储项 */
		Sem_wait(shared.mutex);
		if (shared.buff[i % NBUFF] != i)
			printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
		Sem_post(shared.mutex);
		Sem_post(shared.nempty);		/* 还有1个空插槽 */
	}
	return(NULL);
}

如果错误的对换了消费者函数中sem_wait调用的顺序,将会产生死锁。生产者在等待mutex信号量,但是消费者却持有该信号量并在等待nstored信号量然而生产者只有获取了mutex信号量才能挂出nstored信号量。

五、未命名信号量

**未命名信号量(也被称为基于内存的信号量)**是连续为sem_t并存储在应用程序分配的内存中的变量。通过将这个信号量放在由几个进程或者线程共享的内存区域中就能使得这个信号量对这些进程或者线程可用。

操作未命名信号量所使用的函数与操作命名信号量使用的函数是一样的(sem_wait()、sem_post()以及 sem_getvalue()等)。此外,还需要用到另外两个函数。

  1. sem_init()对一个信号量进行初始化并通知系统该信号量会在在进程间共享还是在单个进程中的线程间共享。
  2. sem_destroy(sem)函数销毁一个信号量。

这些函数不应该被应用到命名信号量上。

未命名与命名信号量对比:
使用未命名信号量之后就无需为信号量创建一个名字了,这种做法在下列情况中是比较有用的

  • 正在构建动态数据结构,并且其中的每一项都需要一个关联的信号量,做法就是每一项都分配一个未命名信号量。

    • 在线程间共享的信号量不需要名字。将一个未命名信号量作为一个共享(全局或堆上的)变量自动会使之对所有线程可用。

      • 在相关进程间共享的信号量不需要名字。如果一个父进程在一块共享内存区域中(如一个共享匿名映射)分配了一个未命名信号量,那么作为 fork()操作的一部分,子进程会自动继承这个映射,从而继承这个信号量。

1.sem_init函数

//创建未命名的信号量
#include <semaphore.h>
 int sem_init(sem_t *sem, int pshared, unsigned int value);
 //pshared参数表明是否在多个进程中使用信号量,若是设置成非0值。
 //需要声明一个sem_t类型的变量并把它的地址传递给sem_init来实现初始化。
 //value:指定了信号量的初始值。
//要在两个进程之间使用信号量,需要确保sem参数指向两个进程之间共享的内存范围。
 // 返回值:若成功,返回0;若出错,返回−1

2.sem_destroy函数

//丢弃未命名的信号量
#include <semaphore.h> 
int sem_destroy(sem_t *sem); 
//调用此函数后,不能再使用任何带有sem的信号量函数,除非通过调用sem_init重新初始化它。
//返回值:若成功,返回0;若出错,返回−1

例:将上面的生产者-消费者例子转换成使用基于内存的信号量。

#include	"unpipc.h"

#define	NBUFF	 10

int		nitems;					/*生产者和消费者只读 */
struct {	/* 生产者和消费者共享的数据 */
  int	buff[NBUFF];
  sem_t	mutex, nempty, nstored;		/* 信号量,而不是指针*/
} shared;

void	*produce(void *), *consume(void *);

int
main(int argc, char **argv)
{
	pthread_t	tid_produce, tid_consume;

	if (argc != 2)
		err_quit("usage: prodcons2 <#items>");
	nitems = atoi(argv[1]);

		/* 4初始化三个信号量*/
	Sem_init(&shared.mutex, 0, 1);
	Sem_init(&shared.nempty, 0, NBUFF);
	Sem_init(&shared.nstored, 0, 0);

	Set_concurrency(2);
	Pthread_create(&tid_produce, NULL, produce, NULL);
	Pthread_create(&tid_consume, NULL, consume, NULL);

	Pthread_join(tid_produce, NULL);
	Pthread_join(tid_consume, NULL);

	Sem_destroy(&shared.mutex);
	Sem_destroy(&shared.nempty);
	Sem_destroy(&shared.nstored);
	exit(0);
}

void *produce(void *arg)
{
	int		i;

	for (i = 0; i < nitems; i++) {
		sem_wait(&shared.nempty);	/* 等待至少1个空插槽 */
		sem_wait(&shared.mutex);
		shared.buff[i % NBUFF] = i;	/*将i存储到循环缓冲区 */
		sem_post(&shared.mutex);
		sem_post(&shared.nstored);	/*又存储了1个项目 */
	}
	return(NULL);
}

void * consume(void *arg)
{
	int		i;

	for (i = 0; i < nitems; i++) {
		sem_wait(&shared.nstored);		/*等待至少1个存储项*/
		sem_wait(&shared.mutex);
		if (shared.buff[i % NBUFF] != i)
			printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
		sem_post(&shared.mutex);
		sem_post(&shared.nempty);		/* 还有1个空插槽 */
	}
	return(NULL);
}

六、多个生产者、单个消费者

下列是允许多个生产者和单个消费者。


#define	NBUFF	 	 10
#define	MAXNTHREADS	100

int		nitems, nproducers;		/*生产者和消费者只读 */

struct {	/* 生产者和消费者共享的数据 */
  int	buff[NBUFF];
  int	nput;
  int	nputval;
  sem_t	mutex, nempty, nstored;		/* 信号量,而不是指针*/
} shared;

void	*produce(void *), *consume(void *);

int main(int argc, char **argv)
{
	int		i, count[MAXNTHREADS];
	pthread_t	tid_produce[MAXNTHREADS], tid_consume;
//新命令行参数
	if (argc != 3)
		err_quit("usage: prodcons3 <#items> <#producers>");
	nitems = atoi(argv[1]);
	nproducers = min(atoi(argv[2]), MAXNTHREADS);

		/* 初始化三个信号量*/
	sem_init(&shared.mutex, 0, 1);
	sem_init(&shared.nempty, 0, NBUFF);
	sem_init(&shared.nstored, 0, 0);

		/* 创建所有生产者和一个消费者 */
	set_concurrency(nproducers + 1);
	for (i = 0; i < nproducers; i++) {
		count[i] = 0;
		pthread_create(&tid_produce[i], NULL, produce, &count[i]);
	}
	pthread_create(&tid_consume, NULL, consume, NULL);

		/*等待所有生产者和消费者*/
	for (i = 0; i < nproducers; i++) {
		pthread_join(tid_produce[i], NULL);
		printf("count[%d] = %d\n", i, count[i]);	
	}
	pthread_join(tid_consume, NULL);

	em_destroy(&shared.mutex);
	em_destroy(&shared.nempty);
	sem_destroy(&shared.nstored);
	exit(0);
}

//生产者执行函数
void *produce(void *arg)
{
	for ( ; ; ) {
		em_wait(&shared.nempty);	/* 等待至少1个空插槽 */
		em_wait(&shared.mutex);
//生产者线程间的互斥
		if (shared.nput >= nitems) {
			em_post(&shared.nempty);
			em_post(&shared.mutex);
			return(NULL);			/* 全部完成 */
		}

		shared.buff[shared.nput % NBUFF] = shared.nputval;
		shared.nput++;
		shared.nputval++;

		em_post(&shared.mutex);
		em_post(&shared.nstored);	/* 又存储了1个项目*/
		*((int *) arg) += 1;
	}
}


// 消费者使用,验证缓冲区每个项都是正确的,检查到错误就输出一个消息
void *consume(void *arg)
{
	int		i;

	for (i = 0; i < nitems; i++) {
		sem_wait(&shared.nstored);		/* 等待至少1个存储项*/
		sem_wait(&shared.mutex);

		if (shared.buff[i % NBUFF] != i)
			printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);

		sem_post(&shared.mutex);
		sem_post(&shared.nempty);		/* 还有1个空插槽 */
	}
	return(NULL);
}

七、多个生产者,多个消费者

多个生产者和消费者,具有多少个消费者是否有意义决定于具体应用,例:

  1. 把IP地址转换成对应主机名。
  2. 读出UDP数据报,对它们进行操作后把结果写入某个数据库的程序。
#define	NBUFF	 	 10
#define	MAXNTHREADS	100

int		nitems, nproducers, nconsumers;		//只读

struct {	/* 生产者和消费者共享的数据 */
  int	buff[NBUFF];
  int	nput;			/* 项目编号: 0, 1, 2, ... */
  int	nputval;		/* 要存储在buff中的值[] */
  int	nget;			/* 项目编号: 0, 1, 2, ... */
  int	ngetval;		/*从buff[]获取的值*/
  sem_t	mutex, nempty, nstored;		/* 信号量,而不是指针*/
} shared;

void	*produce(void *), *consume(void *);

int main(int argc, char **argv)
{
	int		i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
	//tid_produce:保存消费者线程ID;tid_consume:保存消费者处理的条目数
	pthread_t	tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
//增设一个新的命令行选项,由它指定待创建消费者线程的总数。
	if (argc != 4)
		err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
	nitems = atoi(argv[1]);
	nproducers = min(atoi(argv[2]), MAXNTHREADS);
	nconsumers = min(atoi(argv[3]), MAXNTHREADS);

		/* 初始化三个信号量 */
	sem_init(&shared.mutex, 0, 1);
	sem_init(&shared.nempty, 0, NBUFF);
	sem_init(&shared.nstored, 0, 0);

		/* 创建所有生产者和所有消费者 */
	set_concurrency(nproducers + nconsumers);
	for (i = 0; i < nproducers; i++) {
		prodcount[i] = 0;
		pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
	}
	for (i = 0; i < nconsumers; i++) {
		conscount[i] = 0;
		pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
	}

		/* 等待所有生产者和消费者 */
	for (i = 0; i < nproducers; i++) {
		pthread_join(tid_produce[i], NULL);
		printf("producer count[%d] = %d\n", i, prodcount[i]);	
	}
	for (i = 0; i < nconsumers; i++) {
		pthread_join(tid_consume[i], NULL);
		printf("consumer count[%d] = %d\n", i, conscount[i]);	
	}

	sem_destroy(&shared.mutex);
	sem_destroy(&shared.nempty);
	sem_destroy(&shared.nstored);
	exit(0);
}

/* 生产者 */
void *produce(void *arg)
{
	for ( ; ; ) {
		sm_wait(&shared.nempty);	/* 等待至少1个空插槽 */
		sem_wait(&shared.mutex);

		if (shared.nput >= nitems) {
			sem_post(&shared.nstored);	/* 让消费者终止 */
			sem_post(&shared.nempty);
			sem_post(&shared.mutex);
			return(NULL);			/* 全部完成 */
		}

		shared.buff[shared.nput % NBUFF] = shared.nputval;
		shared.nput++;
		shared.nputval++;

		sem_post(&shared.mutex);
		sem_post(&shared.nstored);	/*又存储了1个项目*/
		*((int *) arg) += 1;
	}
}

//消费者
void *consume(void *arg)
{
	int		i;

	for ( ; ; ) {
		sem_wait(&shared.nstored);	/* 等待至少1个存储项 */
		sem_wait(&shared.mutex);

		if (shared.nget >= nitems) {
			sem_post(&shared.nstored);
			sem_post(&shared.mutex);
			return(NULL);			/*全部完成 */
		}

		i = shared.nget % NBUFF;
		if (shared.buff[i] != shared.ngetval)
			printf("error: buff[%d] = %d\n", i, shared.buff[i]);
		shared.nget++;
		shared.ngetval++;

		sem_post(&shared.mutex);
		sem_post(&shared.nempty);	//还有1个空插槽
		*((int *) arg) += 1;
	}
}

八、多个缓冲区

处理典型程序中,有如下循环:

while((n>=read(fdin,buff,BUFFSIZE))>0{
	write(fdout,buff,0)}

下图展示了实现这种操作的一种方法,reader的函数从输入文件读入数据,write函数输出文件写出数据。总共使用一个缓冲区。

在这里插入图片描述
下图显示整个操作的时间线图:读操作5个时间单位写操作7个单位时间,读和写出来之间2个单位时间。

在这里插入图片描述

把应用修改成在两个线程间分割读写操作,两个线程自动共享一个全局缓冲区,如下图所示:

在这里插入图片描述

读写两个线程操作完毕后,需要通知各方,下图给出这种操作的时间线图:

在这里插入图片描述
上面把读和写分隔成两个线程中并不影响整个操作所需时间,没有任何速度优势,只是把整个操作分割到两个线程中。

由于使用一个缓冲区中时间线图忽略了许多细微点,如检测出对一个文件的顺序读后就为读进程执行对下一个磁盘的异步超前读、可以改善执行这种类型操作所花的称为"时钟时间"的实际时间量。还好忽略了其他进程对读入这线程和写出者线程的影响及内核调度算法的效果。

这时可以把文件复制应用修改成使用两个线程和两个缓冲区。经典的双缓冲区方案:

在这里插入图片描述
下图显示双缓冲区方案的时间线图:
在这里插入图片描述

  1. 双缓冲区所花费的总时钟时间几乎只有单缓冲区的一半。

  2. 双缓冲区写操作现在是尽可能快的发生每两个写操作间仅有2个单位时间作为分隔。而一个缓冲区和改成双线程有9个单位时间分隔。

  3. 这种情况有助于像磁带驱动器这样的设备,这些设备尽可能快写入数据的条件下会动作得更快(鱼贯模式)。

例:双缓冲区仅仅是生产者-消费者问题中的一个特例,处理任意数量的缓冲区,而不是双缓冲区。


#define	NBUFF	 8

struct {	/* 生产者和消费者共享的数据 */
  struct {
    char	data[BUFFSIZE];			/* 缓冲器 */
    ssize_t	n;						/*缓冲区中的字节数*/
  } buff[NBUFF];					/* 这些缓冲区计数的NBUFF */
  sem_t	mutex, nempty, nstored;		/* 信号量,而不是指针*/
} shared;

int		fd;							/* 要复制到标准输出的输入文件 */
void	*produce(void *), *consume(void *);

int main(int argc, char **argv)
{
	pthread_t	tid_produce, tid_consume;

	if (argc != 2)
		err_quit("usage: mycat2 <pathname>");

	fd = Open(argv[1], O_RDONLY);

		/*初始化三个信号量*/
	Sem_init(&shared.mutex, 0, 1);
	Sem_init(&shared.nempty, 0, NBUFF);
	Sem_init(&shared.nstored, 0, 0);

		/* 一个生产者线程,一个消费者线程*/
	Set_concurrency(2);
	Pthread_create(&tid_produce, NULL, produce, NULL);	/* 读卡器线程 */
	Pthread_create(&tid_consume, NULL, consume, NULL);	/* 编写器线程 */

	Pthread_join(tid_produce, NULL);
	Pthread_join(tid_consume, NULL);

	Sem_destroy(&shared.mutex);
	Sem_destroy(&shared.nempty);
	Sem_destroy(&shared.nstored);
	exit(0);
}


//生产者
void *produce(void *arg)
{
	int		i;

	for (i = 0; ; ) {
		Sem_wait(&shared.nempty);	/*等待至少1个空插槽*/

		Sem_wait(&shared.mutex);
			/*临界区域 */
		Sem_post(&shared.mutex);

		shared.buff[i].n = Read(fd, shared.buff[i].data, BUFFSIZE);
		if (shared.buff[i].n == 0) {
			Sem_post(&shared.nstored);	/* 又存储了1个项目*/
			return(NULL);
		}
		if (++i >= NBUFF)
			i = 0;					/*循环缓冲器 */

		Sem_post(&shared.nstored);	/*又存储了1个项目 */
	}
}

void *consume(void *arg)
{
	int		i;

	for (i = 0; ; ) {
		Sem_wait(&shared.nstored);		/*等待至少1个存储项 */

		Sem_wait(&shared.mutex);
			/*临界区域*/
		Sem_post(&shared.mutex);

		if (shared.buff[i].n == 0)
			return(NULL);
		Write(STDOUT_FILENO, shared.buff[i].data, shared.buff[i].n);
		if (++i >= NBUFF)
			i = 0;					/* 循环缓冲器*/

		sem_post(&shared.nempty);		/*还有1个空插槽*/
	}
}

九、进程间共享信号量

  1. 进程间共享基于内存信号量的规则:信号量本身(地址作为sem_init第一个参数的sem_t数据类型变量)必须驻留在由所有希望共享它的进程所共享的内存区中而且sem_init的第二个参数必须为1。

  2. 至于有名信号量,不同进程总是能够访问一个有名信号量,只要在调用sem_open时指定相同的名字就行。即使对于某个给定名字的sem_open调用在每个调用进程中可能返回不同的指针,使用该指针的信号量函数所引用的任然是同一个有名信号量。

  3. 在sem_open返回指向某个sem_t数据类型变量的指针后接着调用fork会怎么样?Posix.1表示在父进程中打开的任何信号量仍在子进程中打开。意味着下面代码正确。

sem_t *mutex;
mutex = sem_open(Px_ipc_name(NAME),O_CREAT|O_EXCL,FILE_MODE,0);
if((childpid = fork())==0{
...
sem_wait(mutex):
...
}
...
sem_post(mutex);
...

十、信号量的限制

  1. SEM_NSEMS_MAX : 这是一个进程能够拥有的 POSIX 信号量的最大数目。

  2. SEM_VALUE_MAX :这是一个 POSIX 信号量值能够取的最大值。信号量的取值可以为 0 到这个限制之间的任意一个值。

十一、Posix不同方法实现有名信号量

  1. 使用FIFO实现信号量。

  2. 使用内存映射I/O实现信号量。

  3. 使用System V信号量实现Posix信号量。

十二、POSIX 信号量与 System V 信号量比较

优点:

  1. POSIX IPC 接口更加简单并且与传统的 UNIX 文件模型更加一致

  2. POSIX IPC 对象是引用计数的,这样就简化了确定何时删除一个 IPC 对象的工作。

  3. POSIX 命名信号量消除了 System V 信号量存在的初始化问题(首先,进程 B 在一个未初始化的信号量(即其值是一个任意值)上执行了一个 semop()。其次,进程 A 中的 semctl()调用覆盖了进程 B 所做出的变更)。

  4. 将一个 POSIX 未命名信号量与动态分配的内存对象关联起来更加简单:只需要将信号量嵌入到对象中即可。

  5. 在高度频繁地争夺信号量的场景中,那么 POSIX 信号量的性能与System V 信号量的性能是类似的。但在争夺信号量不那么频繁的场景中,POSIX 信号量的性能要比 System V 信号量好很多。POSIX 在这种场景中之所以能够做得更好是因为它们的实现方式只有在发生争夺的时候才需要执行系统调用,而 System V 信号量操作则不管是否发生争夺都需要执行系统调用。

缺点:

  1. POSIX 信号量的可移植性稍差
  2. POSIX 信号量不支持 System V 信号量中的撤销特性。

十三、POSIX 信号量与 Pthreads 互斥体

  • POSIX 信号量和 Pthreads 互斥体都可以用来同步同一个进程中的线程的动作,并且它们的性能也是相近的

    • 然而互斥体通常是首选方法,因为互斥体的所有权属性能够确保代码具有良好的结构性(只有锁住互斥体的线程才能够对其进行解锁)

      • 与之形成对比的是,一个线程能够递增一个被另一个线程递减的信号量。这种灵活性会导致产生结构糟糕的同步设计。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Posix信号量 的相关文章

  • 抑制 makefile 中命令调用的回显?

    我为一个作业编写了一个程序 该程序应该将其输出打印到标准输出 分配规范需要创建一个 Makefile 当调用它时make run gt outputFile应该运行该程序并将输出写入一个文件 该文件的 SHA1 指纹与规范中给出的指纹相同
  • 无法从 jenkins 作为后台进程运行 nohup 命令

    更新 根据下面的讨论 我编辑了我的答案以获得更准确的描述 我正在尝试从詹金斯运行 nohup 命令 完整的命令是 nohup java jar home jar server process 0 35 jar prod gt gt var
  • UNIX系统调用监视器

    如何监控进程的系统调用 Check strace http linux die net man 1 strace 在最简单的情况下 strace 运行指定的命令直到退出 它拦截并记录进程调用的系统调用以及进程接收的信号 每个系统调用的名称
  • 如何在数组中存储包含双引号的命令参数?

    我有一个 Bash 脚本 它生成 存储和修改数组中的值 这些值稍后用作命令的参数 对于 MCVE 我想到了任意命令bash c echo 0 0 echo 1 1 这解释了我的问题 我将用两个参数调用我的命令 option1 without
  • 如何有效截断文件头?

    大家都知道truncate file size 函数 通过截断文件尾部将文件大小更改为给定大小 但是如何做同样的事情 只截断文件的尾部和头部呢 通常 您必须重写整个文件 最简单的方法是跳过前几个字节 将其他所有内容复制到临时文件中 并在完成
  • 如何查看正在运行的 tcsh 版本?

    如何查看我的 UNIX 终端中运行的 tcsh 的当前版本 看着那 这version多变的 echo version tcsh 6 14 00 Astron 2005 03 25 i386 intel linux options wide
  • 为什么main()后面有函数定义?

    我假设最常用的系统功能之一 ls由最权威的程序员之一 Richard Stallman 编写的最著名的操作系统之一 linux 的代码可能是真正编写良好的代码的一个例子 因此 由于它是开源的 我决定看一下代码 参见例如here http m
  • php exec 返回的结果比直接进入命令行要少

    我有一个 exec 命令 它的行为与通过 Penguinet 给 linux 的相同命令不同 res exec cd mnt mydirectory zcat log file gz echo res 当将命令直接放入命令行时 我在日志文件
  • 通过 sed 使用 unix 变量将数据附加到每行末尾[重复]

    这个问题在这里已经有答案了 我有一个文件 我想使用 SED 将值附加到每行末尾的 unix 变量中 我已经通过 AWK 实现了这一点 但我想在 SED 中实现 像这样的东西 我已经尝试过以下命令 但它不起作用 sed i s BATCH R
  • 我如何知道 C 程序的可执行文件是在前台还是后台运行?

    在我的 C 程序中 我想知道我的可执行文件是否像这样在前台运行 a out 或者像这样 a out 如果你是前台工作 getpgrp tcgetpgrp STDOUT FILENO or STDIN FILENO or STDERR FIL
  • 使用 Grep 查找两个短语之间的文本块(包括短语)

    是否可以使用 grep 来高亮所有以以下内容开头的文本 mutablePath CGPathCreateMutable 并以以下内容结尾 CGPathAddPath skinMutablePath NULL mutablePath 这两个短
  • 如何在unix中移动或复制“find”命令列出的文件?

    我有使用下面的命令看到的某些文件的列表 但是如何将列出的这些文件复制到另一个文件夹 例如 test 中 find mtime 1 exec du hc 添加到 Eric Jablow 的答案中 这是一个可能的解决方案 它对我有用 linux
  • 第二次ftruncate失败

    我试图在首次成功执行 shm open 和 ftruncate 后超出共享内存对象 这是代码 char uuid GenerateUUID int fd shm open uuid O RDWR O CREAT O EXCL S IRUSR
  • Python 如何找到 sys.prefix(或 sys.base_prefix)的值?

    锡上写着什么 我已经解开了如何解开的谜团sys prefix使用虚拟环境时设置 Python 寻找pyvenv cfg file 1 https www python org dev peps pep 0405 specification
  • 安装SIGTSTP前台进程

    我正在尝试为正在运行的前台进程安装 CTRL Z SIGTSTP 处理程序 我设置了处理程序 sigaction 就在我之前wait在父级中 这是正确的地方吗 好像不太好用 EDIT 我正在写一个外壳 这是我的代码的概要 我目前在父级中设置
  • mongodb 正在运行吗?

    我已经在我的 Unix 服务器上安装了 Mongodb 和 PHP 驱动程序 我的问题是如何判断 Mongodb 是否正在运行 是否有一个简单的命令行查询来检查状态 如果我从外壳程序启动一次 如果我退出外壳程序 它会继续运行 情况似乎并非如
  • 通过 Node.js 运行 bash 脚本 - 非法选项 -o pipelinefail

    我正在尝试使用 Node js 执行 bash 脚本child process exec 然而它在文件的第二行爆炸 usr bin env bash set eo pipefail TRACE set x echo we are here
  • 在 Bash 中拆分 csv 文件中的列

    我想从 csv 文件的第二列中提取值并将提取的值存储在新列中 我的数据集示例 page name post id page id A 86680728811 272953252761568 86680728811 A 86680728811
  • 如何使用 Perl 在 Unix 中获取文件创建时间

    如何使用 perl 在 unix 中获取文件创建时间 我有这个命令显示文件的最后修改时间 perl MPOSIX le print strftime d b Y H M localtime lstat 9 for ARGV file txt
  • 从文件中删除包含非英语 (Ascii) 字符的行

    我有一个文本文件 其中包含来自不同语言的字符 例如 中文 拉丁文等 我想删除包含这些非英语字符的所有行 我想包含所有英文字符 a b 数字 0 9 和所有标点符号 我如何使用 awk 或 sed 等 unix 工具来完成此操作 Perl 支

随机推荐

  • VS2015:当前不会命中断点,显示当前不会命中断点 没有与此行关联的可执行代码

    使用VS在Release模式下打断点进行调试 有时候不会命中断点 显示信息如下 当前不会命中断点 显示当前不会命中断点 没有与此行关联的可执行代码 这是因为Release模式下 编译器进行编译优化时 忽略掉了这个断点 为此需要 releas
  • ERROR: You must give at least one requirement to install (see "pip help install")

    使用pip install 命令时遇到You must give at least one requirement to install see pip help install 原因是install 后面没有参数 也就是说没有给想要安装的
  • 蓝牙AOA高精度定位

    01 蓝牙AOA定位技术原理 2019 年初 蓝牙技大联盟宣布蓝牙 5 1 引入了新的 寻向 功能 这个功能可检测蓝牙信号的方向 将大幅提高蓝牙定位的精确度 提供更好的位置服务 结束了以往通过 RSSI 信号强度的方式做低精度指纹定位的历史
  • qt程序设置同时只能运行一个,避免重复打开

    qt程序设置同时只能运行一个 避免重复打开 1 qt程序设置同时只能运行一个 避免重复打开 2 Qt 桌面软件防止重新启动 一 创建互斥量 二 使用QLockFile 创建锁定文件 通过能否锁定来判断是否已经有程序启动 三 使用 系统信号量
  • too few arguments

    改为 解决
  • C#,《小白学程序》第二十七课:大数四则运算之“运算符重载”的算法及源程序

    1 文本格式 using System using System Text using System Collections using System Collections Generic
  • python 默认参数问题

    一个函数参数的默认值 仅仅在该函数定义的时候 被赋值一次 首先来看问题 gt gt gt def add end L L append END return L gt gt gt gt gt gt t add end gt gt gt gt
  • UE4烘焙

    前言 UE4引擎通过editor来创建和管理uasset 但是当游戏发布到不同的平台时需要根据平台转换为不同的格式 而这样的转换过程就叫作烘焙 分为三个步骤 加载包至内存 为包中的每个对象生成目标平台特定数据 Derived Data 把含
  • GY906 MLX90614 非接触式 红外测温传感器 LabVIEW i2c总线数据读取

    GY906使用的红外测温芯片为MLX90614 使用LabVIEW读取i2c总线数据时 需要知道传感器的地址 出厂默认为0x5A 传感器地址支持自己修改 存放在芯片EEPROM的0x0E位置 可以通过访问EEPROM的0x0E单元来获得传感
  • vue-waterfall-easy 的使用

    安装 npm install vue waterfall easy save dev 引用 import vueWaterfallEasy from vue waterfall easy html
  • FindBugs规则整理

    FindBugs是基于Bug Patterns概念 查找javabytecode class文件 中的潜在bug 主要检查bytecode中的bug patterns 如NullPoint空指针检查 没有合理关闭资源 字符串相同判断错 而不
  • 极光笔记

    随着硬件 软件 网络等不断发展 完善 互联网已经渗透到了日常生活中的方方面面 在直接赋能原有行业服务的同时也带来了很多新的服务模式 给人们日常生活带来了极大便利 例如 外卖 快递 跑腿等相关业务更是在我们日常生活中随处可见 业务终端经常处在
  • 8月总共面试31次,我人麻了....

    3年测试经验原来什么都不是 只是给你的简历上画了一笔 一直觉得经验多 无论在哪都能找到满意的工作 但是现实却是给我打了一个大巴掌 事后也不会给糖的那种 先说一下自己的个人情况 普通二本计算机专业毕业 懂python 会写脚本 会seleni
  • 各种距离~~~

    def minkowski distance vec1 vec2 p 3 闵氏距离 当p 1时 就是曼哈顿距离 当p 2时 就是欧氏距离 当p 时 就是切比雪夫距离 param vec1 param vec2 param p return
  • 记录嵌入式面试的流程

    选择嵌入式行业 意味着初期进行很大的付出 希望找到一份满意的工作也是理所当然的 最近正好离开原公司 面试了几家企业 基本参加的都给了offer 这里总结下面试遇到的事情 也是一种积累 我也经历过刚踏入嵌入式行业啥都不会的时候 那时面试都是以
  • Jetson AGX Xavier踩坑记录

    1 联网后 升级所有安装包 并且更新了一下系统 sudo apt get update 2 安装中文输入法 sudo apt get install fcitx googlepinyin 3 安装nano文本编辑器 比较喜欢这个文本编辑器
  • 我的开源项目-RtspServer

    项目地址 GitHub传送门 如果你对如何实现一个RTSP服务器感兴趣 可以看这 从零开始写一个RTSP服务器专栏 文章目录 一 项目介绍 二 功能介绍 三 开发环境 四 使用方法 4 1 传输音视频文件 4 2 采集V4L2摄像头 4 3
  • Qt富文本 - 插入表格/列表/图片

    插入表格 列表 图片 新建桌面应用程序testRichText 基类QMainWindow 勾选创建界面文件 其他选择默认 编辑mainwindow cpp构造函数 mainwindow h ifndef MAINWINDOW H defi
  • 卷积神经网络的架构及其性能分析

    目录 1 概述 2 CNN 3 R CNN 4 Fast R CNN 5 Faster R CNN 6 AlexNet 7 ResNet 8 Mask R CNN 9 YOLO 10 SSD 11 RetinaNet 卷积神经网络现在在图像
  • Posix信号量

    Posix信号量 一 Posix信号量 1 概述 二 Posix提供两种信号量 有名信号量和基于内存的信号量 三 命名信号量 1 sem open和sem close函数 2 sem unlink函数 3 sem wait函数 5 sem