Linux中的多线程剖析

2023-11-16

目录

1、前言

2、多线程理解

2.1 线程

2.2 通俗了解进程和线程

2.2.1 进程是资源分配的基本单位

2.2.2 Linux中的线程是一种轻量化进程

2.3 进程和线程详解

2.3.1 创建一个线程 (pthread_create)

2.3.2 线程自己的一部分数据

2.3.3 线程组

2.3.4 关于进程的其他操作

2.4 Linux线程互斥

2.4.1 互斥量mutex

2.4.2 互斥量的接口

2.5 可重入和线程安全

2.5.1 线程安全

2.5.2 重入

2.6 死锁

2.6.1 死锁四个必要条件

2.6.2 避免死锁

2.7 线程同步

2.7.1 条件变量

2.7.2 同步概念与竞态条件

2.7.3 条件变量函数

2.8 POSIX信号量

3、实现简易的线程池(生产者消费着模型)

生产者消费者模型:

线程池:

(1),使用条件变量和阻塞队列实现线程池

(2),使用信号量和循环队列实现线程池

4、结语


1、前言

        今天呢,我们来深度理解一下什么时多线程,还有他的相关操作,可是为什么叫Linux中的多线程剖析呢,因为多线程在Linux和Windows系统底层中的实现并不一样,相比之下,Windows的实现更加的复杂,而Linux中的简单一些,有利于理解,好了下面我们进入正片。注:如果本文中有什么错误,请在评论区指出,或者私信作者,谢谢。

2、多线程理解

2.1 线程

        要理解多线程,那我们就先得知道什么叫做线程,,通俗的讲呢,就是一个程序的执行路线就是一个线程,我们写的程序大多都是只有一条执行路线的,从调用main函数,一直结束,都只有一个路线,这就是一个线程。

        可是我们之前接触过一个概念叫做进程,那么线程和进程有什么区别,又有什么联系呢?

2.2 通俗了解进程和线程

        首先呢,线程是运行在进程中的,       

        进程是资源分配的基本单位,而进程是CPU调度的基本单位。

        接下来我们详细理解:

2.2.1 进程是资源分配的基本单位

        在我之前的文章中讲过,在Linux中呢,进程在内存中是由PCB进行管理的,而具体实现就是task_struct 结构体,这个结构体中保存着对应进程的相关信息,比如地址空间,文件描述相关,详细可以参照我之前的文章,

        如下如,Linux中的进程包括,PCB(task_struct),地址空间,页表,以及页表映射在内存中的数据,这些东西组成了一个进程,当然,下面这张图只是一个单线程进程,

2.2.2 Linux中的线程是一种轻量化进程

        那么在Linux中,是怎么对进程进行实现的呢?

         如上图所示,Linux中在设计线程的时候,采用了和进程管理一样的模块描述符,一个task_struct就是一个线程,但是这些task_struct并没有指向自己独有的地址空间和资源,这样的进程结构我们称作轻量化进程(LWP),也就是Linux中对线程的实现,

        这种实现,让CPU不能分辨自己处理的是进程还是线程,但是这就是它设计的精妙之处。

        对于轻量化进程来说,他们都指向同一个地址空间,所以他们可以很轻松的共一些数据和资源,但是对于Windows来说,他们为进程单独设计了一种结构,那样子的话,工程量就非常大了,具体可以自己了解。

2.3 进程和线程详解

        通过上面的通俗了解,我们就对进程和线程有了一定的了解和认识,那么我们详细看看:

2.3.1 创建一个线程 (pthread_create)

        Linux对外提供了一个用户级别的库,不是系统调用接口,这个库当中,就提供了一些对线程操作的函数:

#include <pthread.h>
int pthread_create(
    pthread_t *thread, //返回线程ID
    const pthread_attr_t *attr, //设置线程的属性,attr为NULL表示使用默认属性
    void *(*start_routine) (void*), //是个函数地址,线程启动后要执行的函数
    void *arg //传给线程启动函数的参数
    );
//返回值:成功返回0,错误返回错误码,
int pthread_join(pthread_t thread, void **value_ptr);

        我们可以使用这个函数来创建一个线程,但是等线程执行万指定的函数之后,我们需要对线程资源进行释放

        要注意的是,这里我们使用函数创建的线程是用户态的线程,对应底层的维护是使用轻量级进程来维护的

#include <iostream>
#include <pthread.h>


void* fun1(void* arg){
    std::cout << (char*)arg << std::endl;
    return nullptr;
}
void* fun2(void* arg){
    std::cout << (char*)arg << std::endl;
    return nullptr;
}

int main() {
    pthread_t p1,p2;
    //创建并启动线程
    pthread_create(&p1,nullptr,fun1,(void*)"我是线程p1");
    pthread_create(&p2,nullptr,fun2,(void*)"我是线程p2");

    //阻塞等待线程结束,并对其进行资源释放
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    return 0;
}

        在运行的时候,要注意:要指定pthread库的库名,不然会链接错误

makefile:

.PHONY:all
all:thread01

thread01:thread01.cpp
	g++ -o $@ $^ -std=c++11 -lpthread

.PHONY:clean
clean:
	rm -f thread01

运行结果:

2.3.2 线程自己的一部分数据

        大家肯定注意到了一个点,那就是pthread_create中,有一个输出型参数,他返回的是什么呢,我们打印 出来看一下

std::cout << "p1:" << p1 << std::endl;
std::cout << "p2:" << p2 << std::endl;
//运行结果:
//p1:139891316045568
//p2:139891307652864

        这么大长串的东西,叫做他的线程ID,但是进程ID那么一点点,这个东西怎么会那么大,大家不要将这两个东西搞混了,这里所说的线程ID并不是我们在监视窗口上看到的那个ID,在监视窗口上那个是线程的tid,这里的进程ID是他在编码层面上的ID,我们可以详细理解一下:

监视窗口上的进程ID(LWP):

        编码层面上的线程ID到底是个什么鬼,这里我们要知道一个东西,就是线程是CPU调度的最小粒子,就是他是要在单独运行的,那我们知道一个程序都是在栈里运行的,但是现在很多个线程都公用一个地址空间,那栈也是公用的吗,并不是,那样就太复杂了,在弹栈,压栈的时候,并没有那么多的寄存器可以供我们维护这个东西,

        所以呢,在地址空间中,有一块叫做共享区的地方,这块地方上面是栈,下边是堆,他们两个相向而生,所以说这块空间非常大,所以呢,在设计之初,就使用这块地方来为本进程创建的线程开辟栈区和一些线程私有的资源区,

         如上图所示,这就是创建的线程的资源分配,我们使用 pthread_create 的时候,返回给我们的线程ID,其实就是对应的线程资源在地址空间中的地址,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID,

pthread_t pthread_self(void);

        线程自己的东西除了栈还有线程ID,一组寄存器,errno,信号屏蔽字,调度优先级

2.3.3 线程组

        没有线程之前,一个进程对应内核里的一个进程描述符,对应一个进程ID。但是引入线程概念之后,情况发生了变化,一个用户进程下管辖N个用户态线程,每个线程作为一个独立的调度实体在内核态都有自己的进程描述符,进程和内核的描述符一下子就变成了1:N关系,POSIX标准又要求进程内的所有线程调用getpid函数时返回相同的进程ID,如何解决上述问题呢?

        Linux内核引入了线程组的概念。

        多线程的进程,又被称为线程组,线程组内的每一个线程在内核之中都存在一个进程描述符(task_struct)与之对应。进程描述符结构体中的pid,表面上看对应的是进程ID,其实不然,它对应的是线程ID;进程描述符中的tgid,含义是Thread Group ID,该值对应的是用户层面的进程ID

        不同于pthread_t类型的线程ID,和进程ID一样,线程ID是pid_t类型的变量,而且是用来唯一标识线程的一个整型变量。

2.3.4 关于进程的其他操作

线程终止

void pthread_exit(void *value_ptr);
//参数
//value_ptr:value_ptr不要指向一个局部变量。
//返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)

        需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。 

取消一个执行中的线程

int pthread_cancel(pthread_t thread);
//参数
//thread:线程ID
//返回值:成功返回0;失败返回错误码

等待线程结束

int pthread_join(pthread_t thread, void **value_ptr);
//参数
//thread:线程ID
//value_ptr:它指向一个指针,后者指向线程的返回值
//返回值:成功返回0;失败返回错误码

        调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

        1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。

        2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_CANCELED。

        3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传pthread_exit的参数。

        4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。

分离线程

        默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。

        如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。

int pthread_detach(pthread_t thread);

2.4 Linux线程互斥

我们先来了解一些概念性的东西:

        临界资源:多线程执行流共享的资源就叫做临界资源

        临界区:每个线程内部,访问临界自娱的代码,就叫做临界区

        互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

        原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

2.4.1 互斥量mutex

        大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。

多个线程并发的操作共享变量,会带来一些问题。

我们举一个抢票的例子来说

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cerrno>
#include <cstdlib>
#include <string>
#define THREADNO 5

int residue = 1000;

void *loot(void* arg){
    while (true) {
        
        if (residue > 0) {
            std::cout << "线程" << (long long)arg + 1 << ":" << residue-- << " 余票:" << residue << std::endl;;
            usleep(1000);
        }
        else {
            
            break;
        }
        
        usleep(2000);
    }
}

int main() {
    //创建线程
    pthread_t tids[THREADNO];
    for (long long i = 0; i < THREADNO; ++i) {
        if (pthread_create(tids + i, NULL, loot, (void*)i) != 0){
            perror("pthread_create");
            exit(1);
        }
    }
    std::cout << "进程创建完毕" << std::endl;


    for (int i = 0; i < THREADNO; ++i) {
        pthread_join(tids[i],nullptr);
    }

    return 0;
}

        CPU运算速度是非常快的,所以在访问临界资源的时候,就可能出现数据错误,本来只有一千张票,结果发放了一千多张。

为什么可能无法获得错误结果?

        if 语句判断条件为真以后,代码可以并发的切换到其他线程

        usleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段

        --ticket操作本身就不是一个原子操作

2.4.2 互斥量的接口

初始化互斥量

初始化互斥量有两种方法:

        方法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

        方法2,动态分配:

int pthread_mutex_init(
    pthread_mutex_t *restrict mutex, 
    const pthread_mutexattr_t *restrict attr
);
//参数:
//mutex:要初始化的互斥量 
//attr:NULL

销毁互斥量

销毁互斥量需要注意:

        使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁

        不要销毁一个已经加锁的互斥量

        已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

 互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); 
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
//返回值:成功返回0,失败返回错误号

调用pthread_ lock 时,可能会遇到以下情况:

        互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功

        发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

修改上面的抢票代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cerrno>
#include <cstdlib>
#include <string>

#define THREADNO 2

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
int residue = 100;

void *loot(void* arg){
    while (true) {
        pthread_mutex_lock(&mtx);
        if (residue > 0) {
            std::cout << "线程" << (long long)arg + 1 << ":" << residue << " 余票:" << residue - 1 << std::endl;
            residue--;
            usleep(5000);
            //sleep(1);
        }
        else {
            pthread_mutex_unlock(&mtx);
            break;
        }
        pthread_mutex_unlock(&mtx);
        usleep(2000);
    }
}

int main() {
    //创建线程
    pthread_t tids[THREADNO];
    for (long long i = 0; i < THREADNO; ++i) {
        if (pthread_create(tids + i, NULL, loot, (void*)i) != 0){
            perror("pthread_create");
            exit(1);
        }
    }
    std::cout << "进程创建完毕" << std::endl;


    for (int i = 0; i < THREADNO; ++i) {
        pthread_join(tids[i],nullptr);
    }

    return 0;
}

        经过上面的例子,大家已经意识到单纯的i++或者++i都不是原子的,有可能会有数据一致性问题为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

2.5 可重入和线程安全

2.5.1 线程安全

        多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。

2.5.2 重入

        同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

常见的线程不安全的情况

        不保护共享变量的函数

        函数状态随着被调用,状态发生变化的函数

        返回指向静态变量指针的函数

        调用线程不安全函数的函数

常见的线程安全的情况

        每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的

        类或者接口对于线程来说都是原子操作

        多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

        调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的

        调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构

        可重入函数体内使用了静态的数据结构

常见可重入的情况

        不使用全局变量或静态变量

        不使用用malloc或者new开辟出的空间

        不调用不可重入函数

        不返回静态或全局数据,所有数据都有函数的调用者提供

        使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

可重入与线程安全联系

        函数是可重入的,那就是线程安全的

        函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题

        如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

可重入与线程安全区别

        可重入函数是线程安全函数的一种

        线程安全不一定是可重入的,而可重入函数则一定是线程安全的。

        如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。

2.6 死锁

        死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

2.6.1 死锁四个必要条件

        互斥条件:一个资源每次只能被一个执行流使用

        请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

        不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

        循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

2.6.2 避免死锁

        破坏死锁的四个必要条件

        加锁顺序一致

        避免锁未释放的场景

        资源一次性分配

2.7 线程同步

2.7.1 条件变量

        当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

2.7.2 同步概念与竞态条件

        同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步

        竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

2.7.3 条件变量函数

初始化

int pthread_cond_init(
    pthread_cond_t *restrict cond,
    const pthread_condattr_t *restrict attr); 
//参数:
//cond:要初始化的条件变量 
//attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(
    pthread_cond_t *restrict cond,
    pthread_mutex_t *restrict mutex
); 
//参数:
//cond:要在这个条件变量上等待 
//mutex:互斥量

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); 
int pthread_cond_signal(pthread_cond_t *cond);

简单案例:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <string>

struct data{
    char name;
    pthread_mutex_t *mtx;
    pthread_cond_t *cond;
};



void* print(void *arg){
    data *d = (data*)arg;
    char name[64];
    snprintf(name, sizeof(name),"线程%c打印--我是线程%c!",d->name,d->name);
    std::string threadname = name;
    
    while (true) {
        pthread_cond_wait(d->cond,d->mtx);
        std::cout << threadname <<std::endl;
        sleep(1);
        pthread_cond_signal(d->cond);
    }
}


int main() {
    pthread_mutex_t mtx;
    pthread_cond_t cond;

    pthread_mutex_init(&mtx,nullptr);
    pthread_cond_init(&cond,nullptr);

    data d1,d2;
    d1.name = 'A';
    d1.mtx = &mtx;
    d1.cond = &cond;
    d2.name = 'B';
    d2.mtx = &mtx;
    d2.cond = &cond;

    pthread_t p1,p2;
    pthread_create(&p1,nullptr,print,(void*)&d1);
    pthread_create(&p2,nullptr,print,(void*)&d2);
    sleep(1);
    pthread_cond_signal(&cond);
    
    pthread_join(p1,nullptr);
    pthread_join(p2,nullptr);
    while (1);
    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

运行结果:

为什么pthread_ cond_ wait 需要互斥量?

        条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

2.8 POSIX信号量

        POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。  但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); 
//参数:
//pshared:0表示线程间共享,非零表示进程间共享 
//value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem);

发布信号量

//发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);

3、实现简易的线程池(生产者消费着模型)

生产者消费者模型:

        为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

        解耦、支持并发、支持忙闲不均

线程池:

        一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

        1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

        2.  对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

        3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

        下来我们分别用条件变量和信号量实现线程池,单机版的,我们就设计在线程池内部自产自销的方案实现

(1),使用条件变量和阻塞队列实现线程池

//thread.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <string>
#include <cstdio>
//#include <functional>


class Thread{

    //typedef std::function<void* (void*)> function;
    typedef void*(*function)(void*);

public:
    Thread(){}
    Thread(int num):_tid(0)
    {
        char name[64];
        snprintf(name, sizeof(name),"thread%d",num);
        _name = name;
    }

    void create(function fun,void* arg){
        pthread_create(&_tid, nullptr, fun, arg);
    }

    void join() {
        pthread_join(_tid,nullptr);
    }

    std::string name(){
        return _name;
    }

    ~Thread(){}

private:
    pthread_t _tid;
    std::string _name;
};
//Task.hpp
#pragma once
#include <iostream>

template<class T, class D>
class Task{

public:
    Task(T fun, D num1, D num2):_fun(fun),_arg0(num1),_arg1(num2)
    {}

    T fun() {
        return _fun;
    }

    D arg0() {
        return _arg0;
    }
    D arg1() {
        return _arg1;
    }

private:
    T _fun;
    D _arg0;
    D _arg1;
};
//mypool.hpp
#pragma once

#include "thread.hpp"
#include "Task.hpp"
#include <vector>
#include <queue>
//#include <cstdlib>
#include <ctime>
#include <unistd.h>
#define CAPACITY 10

int add(int x, int y)
{
    return x + y;
}

template <class T, class D>
struct poolData
{
    int _capacity;
    //线程自身
    Thread* _self;
    std::queue<Task<T, D> *> *_task;
    // 锁
    pthread_mutex_t *_mtx;
    // 条件变量
    pthread_cond_t *_full;  // 满了
    pthread_cond_t *_empty; // 空了
};

template <class T, class D>
class Pool
{
public:
    Pool(int num) : _capacity(10)
    {
        _productor = new std::vector<Thread *>(num);
        _consumer = new std::vector<Thread *>(num);
        _task = new std::queue<Task<T, D> *>;
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }

    std::vector<Thread *> *getpro()
    {
        return _productor;
    }
    std::vector<Thread *> *getcon()
    {
        return _consumer;
    }
    std::queue<Task<T, D> *> *gettask()
    {
        return _task;
    }

    void strat()
    {
        //poolData<T, D> prodata[_productor->size()];
        //poolData<T, D> condata[_productor->size()];
        std::vector<poolData<T, D>> prodata(_productor->size());
        std::vector<poolData<T, D>> condata(_productor->size());
        for (int i = 0; i < _productor->size(); ++i)
        {
            //data[i]._consumer = _consumer;
            //data[i]._productor = _productor;
            prodata[i]._task = _task;
            prodata[i]._full = &_full;
            prodata[i]._empty = &_empty;
            prodata[i]._mtx = &_mtx;
            prodata[i]._capacity = _capacity;
            (*_productor)[i] = new Thread(i);
            prodata[i]._self = (*_productor)[i];
            (*_productor)[i]->create(productor, &prodata[i]);

            condata[i]._task = _task;
            condata[i]._full = &_full;
            condata[i]._empty = &_empty;
            condata[i]._mtx = &_mtx;
            condata[i]._capacity = _capacity;
            (*_consumer)[i] = new Thread(i);
            condata[i]._self = (*_consumer)[i];
            (*_consumer)[i]->create(consumer, &condata[i]);
        }
        for (int i = 0; i < _productor->size(); ++i)
        {
            (*_productor)[i]->join();
            (*_consumer)[i]->join();
        }
        for (int i = 0; i < _productor->size(); ++i)
        {
            delete (*_productor)[i];
            delete (*_consumer)[i];
        }
    }

    // 生产者
    static void *productor(void *args)
    {
        poolData<T, D> *pd = (poolData<T, D> *)args;
        //std::vector<Thread *> *pro = pd->_productor;
        // 消费者队列
        // std::vector<Thread*> *con = pd->_consumer;
        Thread* self = pd->_self;
        // 阻塞队列
        std::queue<Task<T, D> *> *task = pd->_task;
        // 锁
        pthread_mutex_t *mtx = pd->_mtx;
        // 条件变量
        pthread_cond_t *full = pd->_full;   // 满了
        pthread_cond_t *empty = pd->_empty; // 空了
        int capacity = pd->_capacity;
        srand((uint64_t)time(nullptr) ^ 0x6666);
        while (true)
        {
            int x = rand() % 1000 + 10;
            int y = rand() % 1000 + 10;

            pthread_mutex_lock(mtx);
            // 如果满了,就挂起
            while (task->size() == capacity){
                std::cout << "生产者挂起!" << std::endl;
                pthread_cond_wait(full, mtx);
            }
                
            
            std::cout << "生产者拿到锁,开始生产"<< std::endl;
            // 添加任务
            task->push(new Task<int (*)(int, int), int>(add, x, y));
            std::cout << "生产者" << self->name() << " 生产:" << x << "+" << y << "=?" << std::endl;
            usleep(rand()%3000);
            std::cout << "生产者生产完毕,释放锁,唤醒线程"<< std::endl;
            pthread_cond_signal(empty);
            pthread_mutex_unlock(mtx);
            

            
            sleep(1);
        }
    }
    // 消费者
    static void *consumer(void *args)
    {
        poolData<T, D> *pd = (poolData<T, D> *)args;
        // std::vector<Thread*>* pro = pd->_productor;
        // 消费者队列
        //std::vector<Thread *> *con = pd->_consumer;
        Thread* self = pd->_self;
        // 阻塞队列
        std::queue<Task<T, D> *> *task = pd->_task;
        // 锁
        pthread_mutex_t *mtx = pd->_mtx;
        // 条件变量
        pthread_cond_t *full = pd->_full;   // 满了
        pthread_cond_t *empty = pd->_empty; // 空了

        while (true)
        {

            pthread_mutex_lock(mtx);
            while (task->empty()) {
                std::cout << "消费者挂起!" << std::endl;
                pthread_cond_wait(empty, mtx);
            }
            std::cout << "消费者拿到锁,开始消费"<< std::endl;
            // 执行任务
            // task->push(Task(add,x,y));
            Task<T, D> *t = task->front();
            task->pop();
            D num = t->fun()(t->arg0(), t->arg1());
            std::cout << "消费者" << self->name() << " 消费:" << t->arg0() << "+" << t->arg1() << "=" << num << std::endl;
            delete t;
            usleep(rand()%3000);
            std::cout << "消费者消费完毕,释放锁,唤醒线程"<< std::endl;

            pthread_cond_signal(full);
            pthread_mutex_unlock(mtx);
            

            //usleep(rand() % 2000);
            sleep(2);
        }
    }

    ~Pool()
    {
        if (!_productor->empty())
        {
            for (int i = 0; i < _productor->size(); ++i)
            {
                if ((*_productor)[i] != nullptr)
                {
                    delete (*_productor)[i];
                }
            }
        }
        if (!_consumer->empty())
        {
            for (int i = 0; i < _consumer->size(); ++i)
            {
                if ((*_consumer)[i] != nullptr)
                {
                    delete (*_consumer)[i];
                }
            }
        }

        while (!_task->empty())
        {
            Task<T,D> *t = _task->front();
            _task->pop();
            delete t;
        }

        delete _productor;
        delete _consumer;
        delete _task;
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

private:
    int _capacity;
    // 生产着队列
    std::vector<Thread *> *_productor;
    // 消费者队列
    std::vector<Thread *> *_consumer;
    // 阻塞队列
    std::queue<Task<T, D> *> *_task;
    // 锁
    pthread_mutex_t _mtx;
    // 条件变量
    pthread_cond_t _full;  // 满了
    pthread_cond_t _empty; // 空了
};
//test.cpp
#include "mypool.hpp"

int main() {
    //std::cout << "hello" << std::endl;
    Pool<int(*)(int,int),int> p(1);
    p.strat();

    return 0;
}

(2),使用信号量和循环队列实现线程池

#pragma once

//mutex.hpp
#include <iostream>
#include <pthread.h>


class mutex{

public:
    mutex(){
        pthread_mutex_init(&_mtx,nullptr);
    }

    void lock(){
        pthread_mutex_lock(&_mtx);
    }

    void unlock(){
        pthread_mutex_unlock(&_mtx);
    }

    ~mutex(){
        pthread_mutex_destroy(&_mtx);
    }


private:
    pthread_mutex_t _mtx;
};
//sem.hpp
#pragma once

#include <iostream>
#include <semaphore.h>

class sem{

public:
    sem(int value)
    {
        sem_init(&_sem, 0, value);
    }

    void p(){
        sem_wait(&_sem);
    }

    void v(){
        sem_post(&_sem);
    }

    ~sem(){
        sem_destroy(&_sem);
    }


private:
    sem_t _sem;
};
//ringqueue.hpp
#include <iostream>
#include <vector>
#include "sem.hpp"
#include "mutex.hpp"

template<class T>
class ringqueue {

public:
    ringqueue(int capacity = 10)
    :_ring_queue(capacity),
    _start(0),
    _tail(0),
    _space_sem(capacity),
    _data_sem(0),
    _mtx()
    {}

    void push(const T &in){

        _space_sem.p();
        _mtx.lock();
        _ring_queue[_start++] = in;
        _start %= _ring_queue.size();
        _data_sem.v();
        _mtx.unlock();
    }

    void pop(T & out){
        _data_sem.p();
        _mtx.lock();
        out = _ring_queue[_tail++];
        _tail %= _ring_queue.size();
        _space_sem.v();
        _mtx.unlock();
    }

    ~ringqueue()
    {
        
    }

private:
    std::vector<T> _ring_queue;
    int _start;
    int _tail;
    sem _space_sem;
    sem _data_sem;
    mutex _mtx;
};
//mypool.hpp
#pragma once

#include "thread.hpp"
#include "Task.hpp"
#include "ringQueue.hpp"
//#include <vector>
//#include <queue>
//#include <cstdlib>
#include <ctime>
#include <unistd.h>
//#define CAPACITY 10

int add(int x, int y)
{
    return x + y;
}

template <class T, class D>
struct poolData
{
    Thread* _self;
    ringqueue<Task<T,D>*>* _rq;
};

template <class T, class D>
class Pool
{
public:
    Pool(int num) 
    :_productor(num),
    _consumer(num)
    {
        
    }

    

    void strat()
    {
        poolData<T,D> prodata[_productor.size()];
        poolData<T,D> condata[_consumer.size()];
        for (int i = 0; i < _productor.size(); ++i) {
            _productor[i] = new Thread(i);
            prodata[i]._self = _productor[i];
            prodata[i]._rq = &_rq;
            _productor[i]->create(productor,&prodata[i]);
            _consumer[i] = new Thread(i);
            condata[i]._self = _consumer[i];
            condata[i]._rq = &_rq;
            _consumer[i]->create(consumer,&condata[i]);
        }

        for (int i = 0; i < _productor.size(); ++i) {
            _productor[i]->join();
            delete _productor[i];
            _consumer[i]->join();
            delete _consumer[i];
        }
    }

    // 生产者
    static void *productor(void *args)
    {
        poolData<T,D> *pd = (poolData<T,D>*)args;
        Thread *self = pd->_self;
        ringqueue<Task<T,D>*> *rq = pd->_rq;
        srand(time(nullptr) ^ 0x6666);
        while (true) {
            int x = rand() % 1000 + 10;
            int y = rand() % 1000 + 10;
            rq->push(new Task<T,D>(add,x,y));
            std::cout << "生产线程" << self->name() << "生产任务:" << x << "+" << y << "=?" << std::endl;
            usleep(rand() %5000 + 3000);
            //sleep(1);
        }
    }
    // 消费者
    static void *consumer(void *args)
    {
        poolData<T,D> *pd = (poolData<T,D>*)args;
        Thread *self = pd->_self;
        ringqueue<Task<T,D>*> *rq = pd->_rq;
        srand(time(nullptr) ^ 0x6666);
        while (true) {
            Task<T,D> *t;
            rq->pop(t);
            int z = t->fun()(t->arg0(),t->arg1());
            std::cout << "消费者" << self->name() << "消费任务:" << t->arg0() << "+" << t->arg1() << "=" << z << std::endl;

            delete t;
            usleep(rand() % 5000 + 3000);
        }
    }

    ~Pool()
    {
        
    }

private:
    ringqueue<Task<T,D>*> _rq;
    std::vector<Thread*> _productor;
    std::vector<Thread*> _consumer;
};
//test.cpp
#include "mypool.hpp"

int main() {
    Pool<int(*)(int,int),int> p(5);
    p.strat();
    return 0;
}

4、结语

        好了,今天的分享就到这里了,如果文章对你有帮助,请留下你的评论,如果有错误,也请评论私信作者,

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux中的多线程剖析 的相关文章

  • dlopen 或 dlclose 未调用信号处理程序

    我在随机时间内收到分段错误 我注册了信号 但发生分段错误时未调用信号处理程序 include
  • 如何在gnuplot中将字符串转换为数字

    有没有办法将表示数字 以科学格式 的字符串转换为 gnuplot 中的数字 IE stringnumber 1 0e0 number myconvert stringnumber plot 1 1 number 我可能使用 shell 命令
  • 如何从 Linux 内核模块获取使用计数?

    我对正在开发的内核模块的使用计数有疑问 我想打印它以进行调试 如何从模块代码中获取它 有问题的内核版本 Linux 2 6 32 module refcount http lxr linux no linux v2 6 34 1 inclu
  • 错误:命令“c++”失败,退出状态为 1

    所以我尝试按照以下说明安装 Pyv8https andrewwilkinson wordpress com 2012 01 23 integrating python and javascript with pyv8 https andre
  • 在 Linux 上创建线程与进程的开销

    我试图回答在 python 中创建线程与进程有多少开销的问题 我修改了类似问题的代码 该问题基本上运行一个带有两个线程的函数 然后运行带有两个进程的相同函数并报告时间 import time sys NUM RANGE 100000000
  • 用于读取文件的 Bash 脚本

    不知道为什么最后一行没有从脚本中删除 bin bash FILENAME 1 while read line do cut d f2 echo line done lt FILENAME cat file 1 test 2 test 3 t
  • 可以作为命令行参数传递多少数据?

    在 Linux 下生成进程时可以发送多少字节作为命令行参数 gahooa 推荐了一篇好文章http www in ulm de mascheck various argmax http www in ulm de mascheck vari
  • SMP 上如何处理中断?

    SMP 对称多处理器 多核 机器上如何处理中断 内存管理单元是只有一个还是多个 假设两个线程 A 和 B 运行在不同的内核上 同时 访问页表中不存在的内存页面 在这种情况下 将会出现页面错误 并从内存中引入新页面 将会发生的事件的顺序是什么
  • 完整的 C++ i18n gettext()“hello world”示例

    我正在寻找完整的 i18ngettext 你好世界的例子 我已经开始了一个基于的脚本使用 GNU gettext 的本机语言支持教程 https web archive org web 20130330233819 http oriya s
  • 如何将 elf 解释器(ld-linux.so.2/ld-2.17.so)构建为静态库?

    如果我的问题不准确 我深表歉意 因为我没有太多 Linux 相关经验 我目前正在构建一个 Linux 从头开始 主要遵循 linuxfromscratch org 版本的指南 7 3 我遇到了以下问题 当我构建可执行文件时 获取一个称为 E
  • 为什么默认情况下不启用 arp 忽略/通告 [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 我有一个需要经验才能回答的具体问题 为什么 arp ignore arp announce 在 Linux 安装 例如 debian 上默认不启用 有
  • 运行 shell 命令并将输出发送到文件?

    我需要能够通过 php 脚本修改我的 openvpn 身份验证文件 我已将我的 http 用户设置为免通 sudoer 因为这台机器仅在我的家庭网络中可用 我目前有以下命令 echo shell exec sudo echo usernam
  • Visual Studio - X11:缺少 DISPLAY 环境变量

    我正在使用 Visual Studio 2019 Enterprise 开发跨平台 Windows Linux x64 GUI 应用程序 在这个 2019 版本中 我们可以使用 Visual Studio调试平台 Windows 本机 和
  • 如何使用 go1.6.2 构建 linux 32 位

    有没有任何组合GOARCH and GOOS我可以设置哪些值来构建 ELF 32 位二进制文 件 GOOS linux and GOARCH 386 更多示例 架构 32 bit gt GOARCH 386 64 bit gt GOARCH
  • 套接字发送调用被阻塞很长时间

    我每 10 秒在套接字上发送 2 个字节的应用程序数据 阻塞 但发送调用在下面的最后一个实例中被阻塞超过 40 秒 2012 06 13 12 02 46 653417 信息 发送前 2012 06 13 12 02 46 653457 信
  • Apache LOG:子进程 pid xxxx 退出信号分段错误 (11)

    Apache PHP Mysql Linux 注意 子进程 pid 23145 退出信号分段错误 11 tmp 中可能存在 coredump 但 tmp下没有找到任何东西 我怎样才能找到错误 PHP 代码中函数的无限循环导致了此错误
  • 查找并删除超过 x 天的文件或文件夹

    我想删除超过 7 天的文件和文件夹 所以我尝试了 17 07 14 email protected cdn cgi l email protection find tmp mindepth 1 maxdepth 1 ctime 7 exec
  • 从哪里获取 iostream.h

    我正在尝试在 Linux 中做一些事情 但它抱怨找不到 iostream h 我需要安装什么才能获取此文件 这个标准头的正确名称是iostream没有扩展名 如果您的编译器仍然找不到它 请尝试以下操作 find usr include na
  • Unix 中的访问时间是多少

    我想知道访问时间是多少 我在网上搜索但得到了相同的定义 读 被改变 我知道与touch我们可以改变它 谁能用一个例子来解释一下它是如何改变的 有没有办法在unix中获取创建日期 时间 stat结构 The stat 2 结构跟踪所有文件日期
  • C++ Linux GCC 应用程序中的 GUID

    我有很多服务器运行这个 Linux 应用程序 我希望他们能够生成一个碰撞概率较低的 GUID 我确信我可以从 dev urandom 中提取 128 个字节 这可能没问题 但是有没有一种简单易用的方法来生成与 Win32 更等效的 GUID

随机推荐

  • 高标准农田信息化管理平台概要设计

    1 综合信息一张图系统 通过一张图的形式 可视化直观展示地区土地分布 耕地质量 高标准农田建设情况 灌溉情况 设备分布情况及环境监测数据 农业管理者可在一张图上查看农田相关信息 及时了解农田情况 为农田管理者的精准管理和科学决策提供辅助支撑
  • Asp.Net&.Net Core 使用 SonarQube 踩坑记 (使用 MSBuild扫描器篇)

    使用dotnet 需要 搭建 ner core的运行环境 1 首先安装配置java运行环境 且javaJDK 必须是11以上 jdk版本必须大于11 等于11不行 2 java和java JDK后 记得配置java 和jdk建立连接和配置
  • formdata上传文件_关于multipart/formdata上传文件

    最近在做一个文件上传的开放接口 用到Content Type multipart form data这种请求类型 特地做了一些研究和记录 在最初的 http协议中 并没有上传文件方面的功能 RFC1867为 http协议添加了这个能力 常见
  • 深度学习笔试、面试题 二

    1 梯度爆炸问题是指在训练深度神经网络的时候 梯度变得过大而损失函数变为无穷 在RNN中 下面哪种方法可以较好地处理梯度爆炸问题 A 用改良的网络结构比如LSTM和GRUs B 梯度裁剪 C Dropout D 所有方法都不行 正确答案是
  • Linux-写USB键盘驱动(详解)

    1 首先我们通过上节的代码中修改 来打印下键盘驱动的数据到底是怎样的 先来回忆下 我们之前写的鼠标驱动的id table是这样 所以我们要修改id table 使这个驱动为键盘的驱动 如下图所示 然后修改中断函数 通过printk 打印数据
  • 算法与数据结构(七):优先队列

    博主会对算法与数据结构会不断进行更新 敬请期待 如有什么建议 欢迎联系 我们知道队列具有先进先出的特性 栈具有先进后出的特性 那么有没有一种数据结构可以根据自己的需求 以一定的规则从队列中弹出呢 优先队列就是实现这种目标的数据结构 一般情况
  • shell随机读取文件的一行

    bin bash a cat files txt wc l for i 0 i lt 5 i do b RANDOM a b b 1 sed n b p files txt done
  • 微信公众号内嵌H5网页授权步骤

    主要注意点就是回调地址 我是用vue框架开发的 所以单独做了个页面去授权回调 redirectToAuthPage const callbackURL encodeURIComponent https ad jfpays com wcpn
  • Ubuntu 22.04安装Visual Studio Code(VS Code)

    Ubuntu 22 04安装Visual Studio Code 一 下载 打开浏览器 访问VS Code的官方网址 https code visualstudio com 在首页的左侧有两个蓝色的按钮 点击左边的按钮 下载 deb格式的安
  • 全链路压测的“谜”

    前言 对于性能测试来说 全链路压测肯定跑不了的 在昨天上午的 GIAC全球互联网架构大会 上 网易云就进行了全链路压测的议题 对于有性能测试的公司来说 面试往往会被问到什么是全链路压测 如何有效的开展全链路压测等等 我今天也只是高屋建瓴 站
  • unity地形之splatalpha研究 地形贴图导出更换与绘制

    unity中的地图贴图的绘制常常使用的是paint texture里面的 但是这个方式往往费时很多 却只能做出很少的效果 这里要介绍的就是通过外部绘制splatalpha 来替换 达到unity中地形更强的效果 使用软件基本有worldma
  • 【yolo】实现一键yolov5数据处理(下)(划分数据集和验证集+构建yolo数据集结构+生成yaml文件)

    事先准备 所有训练所需的图像存于一个目录 所有训练所需的标签存于一个目录 图像文件与标签文件都统一的格式 图像名与标签名一一对应 两种模式可以选择 将文件按照划分输出直接输出到train val目录 或者 输出train txt val t
  • Python turtle 画圣诞树

    马上就要圣诞街了 作为一名程序猿的我们应该用代码表达一下程序猿的温柔呐 所以 改写了一段Python画圣诞树的代码 给你们的朋友们画一颗代码圣诞树吧 圣诞树一 import turtle as t as就是取个别名 后续调用的t都是turt
  • 配置阿里云yum源并启动nginx服务

    1 查看yum源仓库 ls etc yum repos d 2 查看CentOs Base repo文件 3 配置yum源 https opsx alibaba com mirror 找到这个网站 然后找到centos7 执行下载阿里云yu
  • 剑指Offer 53-Ⅱ.0~n-1中缺失的数字

    LeetCode 剑指Offer 53 o n 1中缺失的数字 一个长度为n 1的递增排序数组中的所有数字都是唯一的 并且每个数字都在范围0 n 1之内 在范围0 n 1内的n个数字中有且只有一个数字不在该数组中 请找出这个数字 示例 1
  • GDB调试子进程

    http blog sina com cn s blog 4e415c0b0100lum0 html 1 set follow fork mode
  • Ajax请求url的中文乱码问题

    Ajax请求路径中如果有中文出现在参数值中时 在IE浏览器中出现过乱码的情况 遇到这种问题那肯定就是编码的问题 这时我们就必须采用编码后再传参了 在后台接收时就必须进行解码操作 在js中进行编码操作有几个常用的方法escape encode
  • OpenHarmony在Amlogic A311D芯片平台的快速开发上手指南

    OpenHarmony是由开放原子开源基金会 OpenAtom Foundation 孵化及运营的开源项目 目标是面向全场景 全连接 全智能时代 搭建一个智能终端设备操作系统的框架和平台 促进万物互联产业的繁荣发展 目前最新发布的版本为Op
  • DOCKER UBUNTU 配置

    TensorRT 8 2 1 8 安装笔记 超全超详细 Docker 快速搭建 TensorRT 环境 知乎 1 DOCKER 深度学习环境基本要素 1 Docker部署深度学习服务器 CUDA cudnn ssh 铜锣烧阿南Anan的博客
  • Linux中的多线程剖析

    目录 1 前言 2 多线程理解 2 1 线程 2 2 通俗了解进程和线程 2 2 1 进程是资源分配的基本单位 2 2 2 Linux中的线程是一种轻量化进程 2 3 进程和线程详解 2 3 1 创建一个线程 pthread create