前言
生产者—消费者模式,生产者这边负责生产产品、而消费者负责消费产品,对于消费者来说,没有产品的时候只能等待产品出来,有产品就使用它。这里我们使用一个变量来表示这个这个产品,生产者生产一件产品变量加 1,消费者消费一次变量减 1。下面将讲述相关于进程线程解决生产者消费者问题。其中用到互斥锁,条件变量
互斥锁
简单来说互斥锁就是一个上锁和解锁的过程,当生产者生产产品,消费者消耗产品都需要上锁解锁。 当我们生产产品时要上锁,上锁的目的就是不让其他线程进入,起到保护和免打扰的过程。因为线程的安全性是一个十分严重的问题。下面是互斥锁的使用方法:
互斥锁(mutex)
又叫互斥量,从本质上说是一把锁,在访问共享资源之前对互斥锁进行上锁,在访问完成后释放互斥锁(解锁);对互斥锁进行上锁之后,任何其它试图再次对互斥锁进行加锁的线程都会被阻塞,直到当前线程释放互斥锁。如果释放互斥锁时有一个以上的线程阻塞,那么这些阻塞的线程会被唤醒
互斥锁初始化
1、使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥锁
互 斥锁使 用 pthread_mutex_t 数 据类型 表示, pthread_mutex_t 其 实是一个 结构体 类型, 而宏PTHREAD_MUTEX_INITIALIZER 其实是一个对结构体赋值操作的封装,如下所示:
# define PTHREAD_MUTEX_INITIALIZER \{ { 0 , 0 , 0 , 0 , 0 , __PTHREAD_SPINS, { 0 , 0 } } }
所以由此可知,使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥锁的操作如下:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
PTHREAD_MUTEX_INITIALIZER 宏已经携带了互斥锁的默认属性。
2、使用 pthread_mutex_init()函数初始化互斥锁
使用 PTHREAD_MUTEX_INITIALIZER 宏只适用于在定义的时候就直接进行初始化,对于其它情况则不能使用这种方式,譬如先定义互斥锁,后再进行初始化,或者在堆中动态分配的互斥锁,譬如使用 malloc()函数申请分配的互斥锁对象,那么在这些情况下,可以使用 pthread_mutex_init()函数对互斥锁进行初始化,其函数原型如下所示:
# include <pthread.h>
int pthread_mutex_init ( pthread_mutex_t * mutex, const pthread_mutexattr_t * attr) ;
使用该函数需要包含头文件<pthread.h>。
函数参数和返回值含义如下:
mutex:
参数 mutex 是一个 pthread_mutex_t 类型指针,指向需要进行初始化操作的互斥锁对象;
attr:
参数 attr 是一个 pthread_mutexattr_t 类型指针,指向一个 pthread_mutexattr_t 类型对象,该对象用于定义互斥锁的属性(在 12.2.6 小计中介绍),若将参数 attr 设置为 NULL,则表示将互斥锁的属性设置为默认值,在这种情况下其实就等价于 PTHREAD_MUTEX_INITIALIZER 这种方式初始化,而不同之处在于,使用宏不进行错误检查。
返回值:
成功返回 0;失败将返回一个非 0 的错误码。
💡 Tips:注意,当在 Ubuntu 系统下执行"man 3 pthread_mutex_init"命令时提示找不到该函数,并不是 Linux下没有这个函数,而是该函数相关的 man 手册帮助信息没有被安装,这时我们只需执行"sudo apt-get install manpages-posix-dev"安装即可。
使用 pthread_mutex_init()函数对互斥锁进行初始化示例:
pthread_mutex_t mutex;
pthread_mutex_init ( & mutex, NULL ) ;
或者:
pthread_mutex_t * mutex = malloc ( sizeof ( pthread_mutex_t ) ) ;
pthread_mutex_init ( mutex, NULL ) ;
互斥锁加锁和解锁
互斥锁初始化之后,处于一个未锁定状态,调用函数 pthread_mutex_lock()可以对互斥锁加锁、获取互斥锁,而调用函数 pthread_mutex_unlock()可以对互斥锁解锁、释放互斥锁。其函数原型如下所示:
# include <pthread.h>
int pthread_mutex_lock ( pthread_mutex_t * mutex) ;
int pthread_mutex_unlock ( pthread_mutex_t * mutex) ;
使用这些函数需要包含头文件<pthread.h>,参数 mutex 指向互斥锁对象;pthread_mutex_lock()和pthread_mutex_unlock()在调用成功时返回 0;失败将返回一个非 0 值的错误码。
调用 pthread_mutex_lock()函数对互斥锁进行上锁,如果互斥锁处于未锁定状态,则此次调用会上锁成功,函数调用将立马返回;如果互斥锁此时已经被其它线程锁定了,那么调用 pthread_mutex_lock()会一直阻塞,直到该互斥锁被解锁,到那时,调用将锁定互斥锁并返回。
调用 pthread_mutex_unlock()函数将已经处于锁定状态的互斥锁进行解锁。以下行为均属错误:
对处于未锁定状态的互斥锁进行解锁操作; 解锁由其它线程锁定的互斥锁。
如 果 有 多 个 线 程 处 于 阻 塞 状 态 等 待 互 斥 锁 被 解 锁 , 当 互 斥 锁 被 当 前 锁 定 它 的 线 程 调 用pthread_mutex_unlock()函数解锁后,这些等待着的线程都会有机会对互斥锁上锁,但无法判断究竟哪个线程会如愿以偿! 使用示例
# include <stdio.h>
# include <stdlib.h>
# include <pthread.h>
# include <unistd.h>
# include <string.h>
static pthread_mutex_t mutex;
static int g_count = 0 ;
static void * new_thread_start ( void * arg) {
int loops = * ( ( int * ) arg) ;
int l_count, j;
for ( j = 0 ; j < loops; j++ ) {
pthread_mutex_lock ( & mutex) ;
l_count = g_count;
l_count++ ;
g_count = l_count;
pthread_mutex_unlock ( & mutex) ;
}
return ( void * ) 0 ;
}
static int loops;
int main ( int argc, char * argv[ ] )
{
pthread_t tid1, tid2;
int ret;
if ( 2 > argc)
loops = 10000000 ;
else
loops = atoi ( argv[ 1 ] ) ;
pthread_mutex_init ( & mutex, NULL ) ;
ret = pthread_create ( & tid1, NULL , new_thread_start, & loops) ;
if ( ret) {
fprintf ( stderr , "pthread_create error: %s\n" , strerror ( ret) ) ;
exit ( - 1 ) ;
}
ret = pthread_create ( & tid2, NULL , new_thread_start, & loops) ;
if ( ret) {
fprintf ( stderr , "pthread_create error: %s\n" , strerror ( ret) ) ;
exit ( - 1 ) ;
}
ret = pthread_join ( tid1, NULL ) ;
if ( ret) {
fprintf ( stderr , "pthread_join error: %s\n" , strerror ( ret) ) ;
exit ( - 1 ) ;
}
ret = pthread_join ( tid2, NULL ) ;
if ( ret) {
fprintf ( stderr , "pthread_join error: %s\n" , strerror ( ret) ) ;
exit ( - 1 ) ;
}
printf ( "g_count = %d\n" , g_count) ;
exit ( 0 ) ;
}
在测试运行,使用默认值 1000 万次 通过测试结果可以看到确实得到了我们想看到的正确结果,每次对 g_count 的累加总是能够保持正确,但是在运行程序的过程中,明显会感觉到锁消耗的时间会比较长,这就涉及到性能的问题。 有人会想到pthread_mutex_trylock()函数,在这里我们不过多介绍。
回到主题!!! 生产者与消费者
示例代码如下所示:
# include <stdio.h>
# include <stdlib.h>
# include <pthread.h>
# include <unistd.h>
# include <string.h>
static pthread_mutex_t mutex;
static int g_avail = 0 ;
static void * consumer_thread ( void * arg)
{
for ( ; ; ) {
pthread_mutex_lock ( & mutex) ;
while ( g_avail > 0 )
g_avail-- ;
pthread_mutex_unlock ( & mutex) ;
}
return ( void * ) 0 ;
}
int main ( int argc, char * argv[ ] )
{
pthread_t tid;
int ret;
pthread_mutex_init ( & mutex, NULL ) ;
ret = pthread_create ( & tid, NULL , consumer_thread, NULL ) ;
if ( ret) {
fprintf ( stderr , "pthread_create error: %s\n" , strerror ( ret) ) ;
exit ( - 1 ) ;
}
for ( ; ; )
{
pthread_mutex_lock ( & mutex) ;
g_avail++ ;
pthread_mutex_unlock ( & mutex) ;
}
exit ( 0 ) ;
}
此代码中,主线程作为“生产者”,新创建的线程作为“消费者”,运行之后它们都回处于死循环中,所以代码中没有加入销毁互斥锁、等待回收新线程相关的代码,进程终止时会自动被处理。
上述代码虽然可行,但由于新线程中会不停的循环检查全局变量 g_avail 是否大于 0,故而造成 CPU 资源的浪费。采用条件变量这一问题就可以迎刃而解!条件变量允许一个线程休眠(阻塞等待)直至获取到另一个线程的通知(收到信号)再去执行自己的操作,譬如上述代码中,当条件 g_avail > 0 不成立时,消费者线程会进入休眠状态,而生产者生成产品后(g_avail++,此时 g_avail 将会大于 0),向处于等待状态的线程发出“信号”,而其它线程收到“信号”之后,便会被唤醒!
💡 Tips:这里提到的信号并不是第八章内容所指的信号,需要区分开来!
前面说到,条件变量通常搭配互斥锁来使用,是因为条件的检测是在互斥锁的保护下进行的,也就是说条件本身是由互斥锁保护的,线程在改变条件状态之前必须首先锁住互斥锁,不然就可能引发线程不安全的问题。
1. 条件变量初始化
条件变量使用 pthread_cond_t 数据类型来表示,类似于互斥锁,在使用条件变量之前必须对其进行初始化。初始化方式同样也有两种:使用宏 PTHREAD_COND_INITIALIZER 或者使用函数 pthread_cond_init(),使用宏的初始化方法与互斥锁的初始化宏一样,这里就不再重述!譬如:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_cond_init ( )
函数原型如下所示:
# include <pthread.h>
int pthread_cond_destroy ( pthread_cond_t * cond) ;
int pthread_cond_init ( pthread_cond_t * cond, const pthread_condattr_t * attr) ;
同样,使用这些函数需要包含头文件<pthread.h>,使用 pthread_cond_init()函数初始化条件变量,当不再使用时,使用 pthread_cond_destroy()销毁条件变量。
参数 cond 指向 pthread_cond_t 条件变量对象,对于 pthread_cond_init()函数,类似于互斥锁,在初始化条件变量时设置条件变量的属性,参数 attr 指向一个 pthread_condattr_t 类型对象,pthread_condattr_t 数据类型用于描述条件变量的属性。可将参数 attr 设置为 NULL,表示使用属性的默认值来初始化条件变量,与使用 PTHREAD_COND_INITIALIZER 宏相同。
函数调用成功返回 0,失败将返回一个非 0 值的错误码。
对于初始化与销毁操作,有以下问题需要注意:
在使用条件变量之前必须对条件变量进行初始化操作,使用 PTHREAD_COND_INITIALIZER 宏或者函数 pthread_cond_init()都行; 对已经初始化的条件变量再次进行初始化,将可能会导致未定义行为; 对没有进行初始化的条件变量进行销毁,也将可能会导致未定义行为; 对某个条件变量而言,仅当没有任何线程等待它时,将其销毁才是最安全的; 经 pthread_cond_destroy()销毁的条件变量,可以再次调用 pthread_cond_init()对其进行重新初始化。
2. 通知和等待条件变量
条件变量的主要操作便是发送信号(signal)和等待。发送信号操作即是通知一个或多个处于等待状态的线程,某个共享变量的状态已经改变,这些处于等待状态的线程收到通知之后便会被唤醒,唤醒之后再检查条件是否满足。等待操作是指在收到一个通知前一直处于阻塞状态。
函数 pthread_cond_signal()和 pthread_cond_broadcast()均可向指定的条件变量发送信号,通知一个或多个处于等待状态的线程。调用 pthread_cond_wait()函数是线程阻塞,直到收到条件变量的通知。
pthread_cond_signal()和 pthread_cond_broadcast()函数原型如下所示:
# include <pthread.h>
int pthread_cond_broadcast ( pthread_cond_t * cond) ;
int pthread_cond_signal ( pthread_cond_t * cond) ;
使用这些函数需要包含头文件<pthread.h>,参数 cond 指向目标条件变量,向该条件变量发送信号。调用成功返回 0;失败将返回一个非 0 值的错误码。
pthread_cond_signal()和 pthread_cond_broadcast()的区别在于:二者对阻塞于 pthread_cond_wait()的多个线程对应的处理方式不同,pthread_cond_signal()函数至少能唤醒一个线程,而 pthread_cond_broadcast()函数则能唤醒所有线程。使用pthread_cond_broadcast()函数总能产生正确的结果,唤醒所有等待状态的线程,但函数 pthread_cond_signal()会更为高效,因为它只需确保至少唤醒一个线程即可,所以如果我们的程序当中,只有一个处于等待状态的线程,使用 pthread_cond_signal()更好,具体使用哪个函数根据实际情况进行选择!
pthread_cond_wait()
函数原型如下所示:
# include <pthread.h>
int pthread_cond_wait ( pthread_cond_t * cond, pthread_mutex_t * mutex) ;
当程序当中使用条件变量,当判断某个条件不满足时,调用 pthread_cond_wait()函数将线程设置为等待状态(阻塞)。
pthread_cond_wait()函数包含两个参数:
cond:
指向需要等待的条件变量,目标条件变量;
mutex:
参数 mutex 是一个 pthread_mutex_t 类型指针,指向一个互斥锁对象;前面开头便给大家介绍了,条件变量通常是和互斥锁一起使用,因为条件的检测(条件检测通常是需要访问共享资源的)是在互斥锁的保护下进行的,也就是说条件本身是由互斥锁保护的。
返回值:
调用成功返回 0;失败将返回一个非 0 值的错误码。
在 pthread_cond_wait()函数内部会对参数 mutex 所指定的互斥锁进行操作,通常情况下,条件判断以及pthread_cond_wait()函数调用均在互斥锁的保护下,也就是说,在此之前线程已经对互斥锁加锁了。调用pthread_cond_wait()函数时,调用者把互斥锁传递给函数,函数会自动把调用线程放到等待条件的线程列表上,然后将互斥锁解锁;当thread_cond_wait()被唤醒返回时,会再次锁住互斥锁。
注意的是,条件变量并不保存状态信息,只是传递应用程序状态信息的一种通讯机制。如果调用pthread_cond_signal()和 pthread_cond_broadcast()向指定条件变量发送信号时,若无任何线程等待该条件变量,这个信号也就会不了了之。
当调用 pthread_cond_broadcast()同时唤醒所有线程时,互斥锁也只能被某一线程锁住,其它线程获取锁失败又会陷入阻塞。
使用示例
使用条件变量对示例代码 12.3.1 进行修改,当消费者线程没有产品可消费时,让它处于等待状态,知道生产者把产品生产出来;当生产者把产品生产出来之后,再去通知消费者。
# include <stdio.h>
# include <stdlib.h>
# include <pthread.h>
# include <unistd.h>
# include <string.h>
static pthread_mutex_t mutex;
static pthread_cond_t cond;
static int g_avail = 0 ;
static void * consumer_thread ( void * arg) {
for ( ; ; ) {
pthread_mutex_lock ( & mutex) ;
while ( 0 >= g_avail)
pthread_cond_wait ( & cond, & mutex) ;
while ( 0 < g_avail)
g_avail-- ;
pthread_mutex_unlock ( & mutex) ;
}
return ( void * ) 0 ;
}
int main ( int argc, char * argv[ ] )
{
pthread_t tid;
int ret;
pthread_mutex_init ( & mutex, NULL ) ;
pthread_cond_init ( & cond, NULL ) ;
ret = pthread_create ( & tid, NULL , consumer_thread, NULL ) ;
if ( ret) {
fprintf ( stderr , "pthread_create error: %s\n" , strerror ( ret) ) ;
exit ( - 1 ) ;
}
for ( ; ; ) {
pthread_mutex_lock ( & mutex) ;
g_avail++ ;
pthread_mutex_unlock ( & mutex) ;
pthread_cond_signal ( & cond) ;
}
exit ( 0 ) ;
}
全局变量 g_avail 作为主线程和新线程之间的共享资源,两个线程在访问它们之间首先会对互斥锁进行上锁,消费者线程中,当判断没有产品可被消费时(g_avail <= 0),调用 pthread_cond_wait()使得线程陷入等待状态,等待条件变量,等待生产者制造产品;调用 pthread_cond_wait()后线程阻塞并解锁互斥锁;而在生产者线程中,它的任务是生产产品(使用g_avail++来模拟),产品生产完成之后,调用pthread_mutex_unlock()将互斥锁解锁,并调用 pthread_cond_signal()向条件变量发送信号;这将会唤醒处于等待该条件变量的消费者线程,唤醒之后再次自动获取互斥锁,然后再对产品进行消费(g_avai–模拟)。
3. 条件变量的判断条件
使用条件变量,都会有与之相关的判断条件,通常情况下,会涉及到一个或多个共享变量。譬如在示例代码 12.3.2 中,与条件变量相关的判断是(0 >= g_avail)。细心的读者会发现,在这份示例代码中,我们使用了 while 循环、而不是 if 语句,来控制对 pthread_cond_wait()的调用,这是为何呢?
必须使用 while 循环,而不是 if 语句,这是一种通用的设计原则:当线程从 pthread_cond_wait()返回时,并不能确定判断条件的状态,应该立即重新检查判断条件,如果条件不满足,那就继续休眠等待。
从 pthread_cond_wait()返回后,并不能确定判断条件是真还是假,其理由如下:
当有多于一个线程在等待条件变量时,任何线程都有可能会率先醒来获取互斥锁,率先醒来获取到互斥锁的线程可能会对共享变量进行修改,进而改变判断条件的状态。譬如示例代码 12.3.2 中,如果有两个或更多个消费者线程,当其中一个消费者线程从 pthread_cond_wait()返回后,它会将全局共享变量 g_avail 的值变成 0,导致判断条件的状态由真变成假。 可能会发出虚假的通知。
4. 条件变量的属性
如前所述,调用 pthread_cond_init()函数初始化条件变量时,可以设置条件变量的属性,通过参数 attr 指定。参数 attr 指向一个 pthread_condattr_t 类型对象,该对象对条件变量的属性进行定义,当然,如果将参数attr 设置为 NULL,表示使用默认值来初始化条件变量属性。
关于条件变量的属性本书不打算深入讨论,条件变量包括两个属性:进程共享属性和时钟属性。每个属性都提供了相应的 get 方法和 set 方法
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)