生产者和消费者模型

2023-05-16

生产者和消费者模型

1. 什么是生产者和消费者模型

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。

再具体一点:

  1. 生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
  2. 如果缓冲区已经满了,则生产者线程阻塞。
  3. 如果缓冲区为空,那么消费者线程阻塞。

2. 如何实现

实现生产者消费者模型有两种方式:

  1. 采用 wait—notify 方式实现生产者消费者模型(注意这里需要加同步锁 synchronized)
  2. 采用 阻塞队列 方式实现生产者消费者模式

3. wait-notify 方式

实现过程并不复杂,直接上代码:

这里设置了生产者生产速度大于消费者消费速度(通过 sleep() 方法实现)。

缓冲区 BufferArea.java

public class BufferArea {

    // 当前资源数量的计数值
    private int currNum = 0;

    // 资源池中允许存放的资源数目
    private int maxSize = 10;

    /**
     * 从资源池中取走资源
     */
    public synchronized void get() {
        if (currNum > 0) {
            currNum--;
            System.out.println("Cosumer_" + Thread.currentThread().getName() + "消耗一件资源," + "当前缓冲区有" + currNum + "个");
            // 通知生产者生产资源
            notifyAll();
        } else {
            try {
                // 如果没有资源,则 Cosumer_ 进入等待状态
                System.out.println("Cosumer_" + Thread.currentThread().getName() + ": 当前缓冲区资源不足,进入等待状态");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 向缓冲区中添加资源
     */
    public synchronized void put() {
        // 若当前缓冲区内的资源计数小于最大 size 数,才加
        if (currNum < maxSize) {
            currNum++;
            System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + currNum + "个");

            // 通知等待的消费者
            notifyAll();
        } else {
            // 若当前缓冲区的资源计数大于最大 size 数,则等待
            try {
                System.out.println(Thread.currentThread().getName() + "线程进入等待 << 当前缓冲区的资源计数大于最大 size 数");
                // 生产者进入等待状态,并释放锁
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

生产者 Producer.java

public class Producer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Producer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Producer_" + getName());
    }

    @Override
    public void run() {
        // 不断的生产资源
        while (true) {
            sleepSomeTime();
            mBufferArea.put();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消费者 Consumer

public class Consumer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Consumer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Consumer_" + getName());
    }

    @Override
    public void run() {
        // 不断的取出资源
        while (true) {
            sleepSomeTime();
            mBufferArea.get();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试 Test.java

public class Test {

    public static void main(String[] args) {
        BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();

        Consumer consumer1 = new Consumer(bufferArea);
        Consumer consumer2 = new Consumer(bufferArea);
        Consumer consumer3 = new Consumer(bufferArea);

        Producer producer1 = new Producer(bufferArea);
        Producer producer2 = new Producer(bufferArea);
        Producer producer3 = new Producer(bufferArea);

        consumer1.start();
        consumer2.start();
        consumer3.start();

        producer1.start();
        producer2.start();
        producer3.start();

    }

}

打印结果如下:

ProducerThread-5生产一件资源,当前资源池有1个
ProducerThread-4生产一件资源,当前资源池有2个
ProducerThread-3生产一件资源,当前资源池有3个
ProducerThread-5生产一件资源,当前资源池有4个
ProducerThread-4生产一件资源,当前资源池有5个
ProducerThread-3生产一件资源,当前资源池有6个
ProducerThread-5生产一件资源,当前资源池有7个
ProducerThread-4生产一件资源,当前资源池有8个
ProducerThread-3生产一件资源,当前资源池有9个
ProducerThread-3生产一件资源,当前资源池有10个
ProducerThread-4线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
ProducerThread-5线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
ProducerThread-3线程进入等待 << 当前缓冲区的资源计数大于最大 size 数

>> 注释:3个生产者线程生产满了10个(maxSize)产品,然后就都进入了等待

Cosumer_Consumer_Thread-0消耗一件资源,当前缓冲区有9个
Cosumer_Consumer_Thread-1消耗一件资源,当前缓冲区有8个
Cosumer_Consumer_Thread-2消耗一件资源,当前缓冲区有7个

>> 注释:3个消费者消费了3个产品

ProducerThread-3生产一件资源,当前资源池有8个
ProducerThread-5生产一件资源,当前资源池有9个
ProducerThread-4生产一件资源,当前资源池有10个

>> 注释:生产者立马又生产3个

...

>> 然后一直循环往复这个过程

4. 阻塞队列方式

阻塞队列的特点:
  • 当队列元素已满的时候,阻塞插入操作
  • 当队列元素为空的时候,阻塞获取操作
不同的阻塞队列:

ArrayBlockingQueue 与 LinkedBlockingQueue 都是支持 FIFO (先进先出),但是 LinkedBlockingQueue 是无界的,而ArrayBlockingQueue 是有界的。

这里我们采用无界阻塞队列来演示生产者消费者模式。

演示

还是设置生产者生产速度大于消费者消费速度(通过 sleep() 方法实现)

缓冲区 BlockQueueBufferArea.java

public class BlockQueueBufferArea {

    BlockingQueue<Integer> mProductPoll = new LinkedBlockingQueue(10);

    public void  put() {
        try {
            System.out.println(Thread.currentThread().getName() + "产品池被放入了一个资源");
            mProductPoll.put(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void get() {
        try {
            System.out.println(Thread.currentThread().getName() + "产品池被取走了一个资源");
            mProductPoll.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

生产者 Producer.java

public class Producer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Producer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Producer_" + getName());
    }

    @Override
    public void run() {
        // 不断的生产资源
        while (true) {
            sleepSomeTime();
            mBufferArea.put();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消费者 Consumer.java

public class Consumer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Consumer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Consumer_" + getName());
    }

    @Override
    public void run() {
        // 不断的取出资源
        while (true) {
            sleepSomeTime();
            mBufferArea.get();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试 Test.java

public class Test {

    public static void main(String[] args) {
        BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();

        Consumer consumer1 = new Consumer(bufferArea);
        Consumer consumer2 = new Consumer(bufferArea);
        Consumer consumer3 = new Consumer(bufferArea);

        Producer producer1 = new Producer(bufferArea);
        Producer producer2 = new Producer(bufferArea);
        Producer producer3 = new Producer(bufferArea);

        consumer1.start();
        consumer2.start();
        consumer3.start();

        producer1.start();
        producer2.start();
        producer3.start();

    }

}

打印结果如下:

Producer_Thread-5产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Consumer_Thread-0产品池被取走了一个资源
Consumer_Thread-1产品池被取走了一个资源
Consumer_Thread-2产品池被取走了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源

5. 参考

  • https://blog.csdn.net/noaman_wgs/article/details/66969905

  • https://blog.csdn.net/user11223344abc/article/details/80419177

  • https://blog.csdn.net/Code_shadow/article/details/81701708

转载于:https://www.cnblogs.com/weixuqin/p/11430981.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

生产者和消费者模型 的相关文章

随机推荐

  • [nginx]invalid number of arguments

    invalid number of arguments nginx出现以下的错误 xff0c 基本上错误的原因就是少了后面的分号导致 invalid number of arguments in 34 include 34 directiv
  • HDU 2246 考研路茫茫——考试大纲

    HDU 2246 考研路茫茫 考试大纲 聽說這題要打表999 43 就傻傻的從0 N一個個地貼在代碼上了 打了幾個文件 xff0c 一同學就說我錯了 xff0c 杯具 因為提交上去的代碼長度不能超64K 白打了 xff0c 不過提示我測試數
  • MariaDB简介

    一 什么是数据库 DB 与 DBMS xff1a DB xff08 DataBase xff09 即数据库 xff0c 存储已经组织好的数据的容器 DBMS xff08 DataBase Manage System xff09 是数据库管理
  • 面试问题之操作系统:动态链接库和静态链接库的区别

    动态链接库是一个可以被其它应用程序共享的程序模块 xff0c 其中封装了一些可以被共享的例程和资源 动态链接库文件名的扩展名一般是dll xff0c 也有可能是drv xff0c sys和fon xff0c 它和可执行文件 exe 非常类似
  • linux中使用Crontab定时执行java的jar包无法使用环境变量的问题

    1 crontab简单使用 cmd 其实就是5个星星的事情 xff0c 随便百度一下吧 5个时间标签用来标注执行的设定 比如每5分钟执行一次 5 cmd 要特别注意 2 有些命令在命令行里执行很好 xff0c 到了crontab里面不能正常
  • Linux内核版本介绍与查询

    Linux内核版本命名在不同时期有着不同的规范 xff0c 在涉及到Linux版本问题时经常容易混淆 xff0c 主线版本 xff0f 稳定版 xff0f 长期支持版本经常搞不清楚 xff0c 本文主要记录下内核版本命名的规则以及如何查看L
  • kvm介绍

    KVM Kernel Based Virtual Machines 是一个基于Linux内核的虚拟化技术 可以直接将Linux内核转换为Hypervisor xff08 系统管理程 序 xff09 从而使得Linux 内核能够直接管理虚拟机
  • linux安装Topicons Plus解决图标不显示问题

    安装TopIcons Plus地址 https extensions gnome org extension 1031 topicons 1 点击链接下载安装包 然后解压 2 把解压后的文件包 移动到此路径下 xff1a usr share
  • 图像缩放算法(最临近点插值算法、双线性内插值算法、双立方插值算法)

    1 最临近点插值算法 xff1a 当一张 xff08 N M xff09 大小的图像放大到 xff08 xff08 j N xff09 xff08 k M xff09 xff09 时 xff0c 那么两张图像之间的像素点存在对应关系 xff
  • C语言float是什么类型,float是什么数据类型?

    float是浮点型数据类型 float是C语言的基本数据类型中的一种 xff0c 表示单精度浮点数 C语言规定单精度浮点型在内存占用4个字节 xff0c 精度为7位 xff0c 取值范围为 xff1a 3 4 10 38 3 4 10 38
  • 服务器文件 修改,服务器文件修改

    服务器文件修改 内容精选 换一换 远程连接Linux云服务器报错 xff1a Module is unknown修改此问题需要重启进入救援模式 xff0c 请评估风险后进行操作 本节操作涉及云服务器重启操作 xff0c 可能会导致业务中断
  • linux 批量重启机器脚本,(Linux) 一键批量启动、停止、重启Jar包Shell脚本

    废话不多说 xff0c 直接上脚本 xff0c 我这里是以spring cloud项目做的示例 bin sh export EUREKA 61 family eureka 1 0 0 jar export GATEWAY 61 family
  • python用post提交数据_python通过post提交数据的方法

    本文实例讲述了python通过post提交数据的方法 分享给大家供大家参考 具体实现方法如下 xff1a coding cp936 import urllib2 import urllib def postHttp name 61 None
  • Linux shell flock详解,Linux shell:Flock简介

    简介 当多个进程操作同一份资源时 xff0c 为了避免损坏数据 xff0c 每个进程在运行时都要保证其它进程没有同时操作资源 xff0c 这时通过flock命令给资源加锁可以实现此需求 flock 在打开的文件上应用或删除咨询锁 命令flo
  • 如何学习计算机编程语言

    关于如何学习计算机编程语言 xff08 C C 43 43 Java Python PHP xff09 1 计算机编程语言是我们和计算机交流信息的载体 xff0c 我们通过它和计算机 说话 xff0c 计算机听到我们说的话 xff0c 领会
  • WebRTC音视频同步

    这两篇文章 xff0c 可以直接去看 xff1b WebRTC音视频同步机制实现分析 https www jianshu com p 3a4d24a71091 WebRTC音视频同步分析 https blog csdn net lincai
  • nginx编译,修改日志路径

    1 configure without http rewrite module 2 修改objs ngx auto config h ifndef NGX PID PATH define NGX PID PATH 34 var logs n
  • 什么是MySQL执行计划

    要对执行计划有个比较好的理解 xff0c 需要先对MySQL的基础结构及查询基本原理有简单的了解 MySQL本身的功能架构分为三个部分 xff0c 分别是 应用层 逻辑层 物理层 xff0c 不只是MySQL xff0c 其他大多数数据库产
  • SPSS详细操作:生存资料的Cox回归分析

    SPSS详细操作 xff1a 生存资料的Cox回归分析 一 问题与数据 某研究者拟观察某新药的抗肿瘤效果 xff0c 将70名肺癌患者随机分为两组 xff0c 分别采用该新药和常规药物进行治疗 xff0c 观察两组肺癌患者的生存情况 xff
  • 生产者和消费者模型

    生产者和消费者模型 1 什么是生产者和消费者模型 生产者消费者模型具体来讲 xff0c 就是在一个系统中 xff0c 存在生产者和消费者两种角色 xff0c 他们通过内存缓冲区进行通信 xff0c 生产者生产消费者需要的资料 xff0c 消