多线程之生产者消费者模式

2023-05-16

文章目录

    • 基本组成
    • 阻塞队列
      • 有界队列与无界队列
        • ArrayBlockingQueue
        • LinkedBlockingQueue
        • SynchronousQueue
    • 流量控制与信号量(Semaphore)
    • 双缓冲与Exchanger

基本组成

生产者:生产者的任务是生产产品,产品可以是数据,也可以是任务。(将产品存入传输通道的线程被称为生产者线程)

消费者:消费者的主要职责是消费的产品。(从传输通道中取出产品进行消费的线程被称为消费者线程)

传输通道:生产者和消费者是并发运行在各自的线程中,这就意味着可以使程序原本串行的处理得以并发化。但线程之间无法像函数调用那样通过参数直接传递数据,因此生产者和消费者之间需要一个用于传递产品的传输通道,通道相当于中间的缓冲区,生产者每生产一个产品就将其放入到传输通道,消费者则不断的从传输通道中取出产品进行消费。因为生产者和消费者都可以各自运行在一个或多个线程中,所以传输通道一般使用线程安全的队列。

阻塞队列

有界队列与无界队列

当传输通道为空的时候消费者无法取出产品,此时消费者进行等待,直到传输通道非空;当传输通道存储空间满的时候生产者无法存入新的产品,此时生产者线程进行等待,直到传输通道非满。生产者线程向传输通道成功存入产品后会唤醒等待传输通道非空的消费者线程,消费者线程从传输通道取出一个产品后会唤醒等待传输通道非满的生产者线程,这种方式被称为阻塞式(Blocking)。一个方法或操作能够导致其执行的线程被暂停(生命状态变为WAITING或BLOCKED),这种方法就被称为阻塞方法(Blocking Method),常见的阻塞方法有ReentrantLock.lock、申请内部锁等。相反,如果一个方法或操作并不会导致其执行线程被暂停,那么相应的方法就被称为非阻塞方法(Non-blocking Method)。

阻塞队列按照其存储空间是否受限制来划分,可分为有界队列(Bounded Queue)和无界队列(Unbounded Queue),有界队列的存储容量限制是由程序指定的,无界队列的最大容量为Integer.MAX_VALUE个元素。
往队列中存入一个元素的操作被称为put操作,从队列中取出一个元素的操作被称为take操作。

当消费者处理能力低于生产者处理能力时,这会导致队列中的产品积压,由此导致队列中的产品所占用的内存空间越来越多,如果想要限制传输通道的存储容量,可以使用有界阻塞队列作为传输通道。
有界队列的另外一个好处是可以“反压”:当消费者能力跟不上生产者的生产能力时,队列中的产品会逐渐积压变满,此时生产者会被暂停,直到消费者消费了部分线程而使队列非满。这在一定程度上给了消费者跟上步伐的机会,但生产者会进行上下文切换。

ArrayBlockingQueue

有界队列可以使用 ArrayBlockingQueue 或者 LinkedBlockingQueue ,ArrayBlockingQueue内部使用一个数组作为存储空间,是预先分配好的,因此它的put和take操作不会增加垃圾回收的负担。但其缺点是在执行put、take操作时使用的是同一把锁,多个生产者或消费者情况下会导致过多的上下文切换。

LinkedBlockingQueue

LinkedBlockingQueue 既可以实现无界队列,也可以实现有界队列。它的优点是内部在实现take、put操作的时候使用两个锁(putLock和takeLock),这样就降低了锁竞争,减少了上下文切换,但其内部实现是一个链表,链表节点所需的存储空间是动态分配的,因此会增加垃圾回收的负担。除此之外,因为LinkedBlockingQueue使用的是两把锁,其维护当前队列长度的时候无法使用int变量,需要使用原子变量AtomicInteger,因而也增加了额外的开销。

SynchronousQueue

SynchronousQueue 是一种特殊的无界队列,当生产者线程执行put操作时,如果没有消费者线程执行take,则该生产者线程会被暂停;当消费者线程执行take时,如果没有生产者执行put,则消费者线程会被暂停,也就是说,以SynchronousQueue作为传输通道的话,生产者和消费者只能“交换”一个产品,就像是一手交钱,一手交货。因此,SynchronousQueue 适用于生产者和消费者处理能力差不多的情况下,否则,当生产者执行put,但消费者没有执行take(反过来也是一样),就会进行较多的等待。

阻塞队列也可以支持非阻塞操作,可以使用BlockingQueue接口定义的offer和poll来替代put和take。offer返回false表示队列已满或入队失败,poll返回null表示队列为空。

提示:
1、LinkedBlockingQueue 适合在生产者线程和消费者线程之间并发程度比较大的情况下使用。
2、ArrayBlockingQueue 适合在生产者线程和消费者线程之间并发程度较低的情况下使用。
3、SynchronousQueue 适合在生产者线程和消费者线程之间处理能力相差不大的情况下使用。

流量控制与信号量(Semaphore)

使用无界队列的一个好处是不会导致线程被阻塞。但消费者的消费能力跟不上生产者的生产能力时,会导致数据的积压,因此,在使用无界队列作为传输通道的时候一般会限制生产者的速率,即进行流量控制。

我们可以使用jdk 1.5中的Semaphore来实现限流。我们把代码所访问的特定资源或者执行的特定操作的机会统一看作一种资源,这种资源被称为虚拟资源。Semaphore 相当于虚拟资源配额管理器,它可以用来控制同一时间内对虚拟资源的访问次数。只有当线程获取到配额,才能访问资源,并在访问完资源后进行配额的释放。 acquire、release 分别对应获取配额和释放配额。如果当前配额不足,则执行acquire会进行阻塞,直到配额不为0。Semaphore内部会维护一个等待队列用于存储这些被暂停的线程,在执行acquire时会将配额减1,在执行release时会将配额加1,并随机唤醒等待队列中的一个线程。

下面实现一个demo来对上面所有的内容进行一个回顾:

队列的抽象接口:

public interface Channel<P> {

    //向传输通道中放入一个产品
    void put(P product);

    //从传输通道中取出一个产品
    P take();
   
}

队列的实现类:

public class SemaphoreDemo<P> implements Channel<P> {

    private final BlockingQueue<P> queue;

    private final Semaphore semaphore;


    public SemaphoreDemo(BlockingQueue<P> queue, int flowLimit) {
        this(queue, flowLimit, false);
    }

    public SemaphoreDemo(BlockingQueue<P> queue, int flowLimit, boolean isFair) {
        this.queue = queue;
        this.semaphore = new Semaphore(flowLimit, isFair);
    }


    @SneakyThrows
    @Override
    public void put(P product) {
        //申请一个配额
        semaphore.acquire();
        try {
            //访问虚拟资源
            queue.put(product);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //返回一个配额
            semaphore.release();
        }

    }

    @SneakyThrows
    @Override
    public P take() {
        return queue.take();
    }
}

客户端:

public class Demo {

    public static void main(String[] args) throws InterruptedException {
        //使用无界队列,最多只能有两个线程同时执行
        SemaphoreDemo<String> semaphoreDemo = new SemaphoreDemo<>(new LinkedBlockingQueue<String>() , 2);

        //5个线程同时put
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            new Thread(() -> {
                StopWatch stopWatch = new StopWatch(String.valueOf(finalI));
                stopWatch.start(String.valueOf(finalI));
                semaphoreDemo.put("product" + finalI);
                stopWatch.stop();
                System.out.println(stopWatch.prettyPrint());
            }).start();
        }

        Thread.sleep(1000);

        for (int i = 0; i < 5; i++) {
            String product = semaphoreDemo.take();
        }

    }
}

下面是打印的结果,可以看出有两个线程一个开始执行时间是1ms,说明没有被阻塞,后面的线程执行时间是2ms,是被阻塞的,也就是说,同时只能满足两个线程获取资源。

StopWatch '0': running time (millis) = 2
-----------------------------------------
ms     %     Task name
-----------------------------------------
00002  100%  0

StopWatch '3': running time (millis) = 2
-----------------------------------------
ms     %     Task name
-----------------------------------------
00002  100%  3

StopWatch '2': running time (millis) = 1
-----------------------------------------
ms     %     Task name
-----------------------------------------
00001  100%  2

StopWatch '1': running time (millis) = 1
-----------------------------------------
ms     %     Task name
-----------------------------------------
00001  100%  1

StopWatch '4': running time (millis) = 2
-----------------------------------------
ms     %     Task name
-----------------------------------------
00002  100%  4

双缓冲与Exchanger

多线程环境下,有时候我们会使用两个或者更多的缓冲区来实现数据从数据源到使用方的移动。其中一个缓冲区填充满来自数据源的数据后可以被数据使用方进行消费,另外一个空的或已经使用过的缓冲区则用来填充数据源的新数据。负责填充缓冲区的是生产者线程, 负责消费一个已经填充了缓冲区的线程是消费者线程。因此,当消费者线程消费了一个已经填充的缓冲区时,另外一个缓冲区可以由生产者进行填充,从而实现了数据生成与消费的并发。这种缓冲技术被称为双缓冲(Double Buffering)。

jdk 1.5中的Exchanger可以实现双缓冲,Exchanger相当于只有两个参与方的CyclicBarrier,Exchanger.exchange 相当于CyclicBarrier.await(如果有关CyclicBarrier使用不明确的可参考我的这篇博客这篇博客)

初始状态下,生产者和消费者各自创建一个空的缓冲区,消费者线程执行Exchanger.exchange时将参数指定一个空的或者已经使用过的缓冲区,生产者执行Exchanger.exchange时将参数指定一个已经填充完毕的缓冲区。只有当二者都执行完毕之后,才进行下一步操作。Exchanger.exchange的参数是对方需要的,返回值是自己需要的(也就是对方所指定的参数)。因此,这也可以看成是SynchronousQueue。

下面通过Exchanger实现一个一手交钱一手交货的小demo:

@Slf4j
public class ExchangerDemo {

    private static final Exchanger<String> exchanger = new Exchanger<String>();

    public static void main(String[] args) {
        
        new Thread(()->{
            String money = "money";
            try {
                String product = exchanger.exchange(money);
                log.info("I get the {}",product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        new Thread(()->{
            String product = "product";
            try {
                String money = exchanger.exchange(product);
                log.info("I get the {}",money);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

打印结果如下:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多线程之生产者消费者模式 的相关文章

  • 正则在小偷程序中的应用(续)

    获取资源信息 content 61 file get contents 34 http list sososteel com qg list html pg 61 1 amp h 61 34 time 对抓取的信息进行处理 取class为l
  • mysql密码过期问题解决方案

    mysql密码过期问题解决方案 问题再现 xff1a 密码过期 旧电脑许久没有用 xff0c 今天打开发现数据库连接不上了 提示密码过期 xff0c 请修改密码 ERROR 1862 HY000 Your password has expi
  • 安装MITIE的条件

    1 版本 在安装rasa xff08 1 1 8 xff0c 其它版本应该也行 xff09 时候必须安装mitie xff0c 下面是我安装mitie时的环境 xff1a 1 VS2017 装没装忘记了 xff0c 但是电脑中有Micros
  • Linux重启命令 / 查看、重启、禁用网卡命令 / 修改IP / 操作防火墙命令

    重启命令 1 reboot 2 shutdown r now 立刻重启 root用户使用 3 shutdown r 10 过10分钟自动重启 root用户使用 4 shutdown r 20 35 在时间为20 35时候重启 root用户使
  • Centos下rpm离线下载与安装命令

    xff08 1 xff09 安装yum即相关插件 sudo apt get install yum sudo apt get install yum utils sudo yum install yum plugin downloadonl
  • Windows下搭建Vxworks开发环境,VMware虚拟机实现

    这是本人自己整理的笔记 xff0c 参考了两篇文章 xff0c 大同小异 参考的第一篇 xff1a http hi baidu com hezhe1008 blog item 26e5cd53364eb314377abe01 html 参考
  • 使用opencv下的sift进行特征匹配(python)

    1 安装opencv 首先确保安装了opencv包和vs2015 xff0c 命令为 xff1a pip install opencv python 也可以直接下载安装文件 xff0c 测试代码 import cv2 import nump
  • CentOS7.0上用命令安装Chrome浏览器。

    1 配置yum下载源 xff1a 在目录 etc yum repos d 下新建文件 google chrome repo 并且在该文件中添加如下内容 xff1a 1 google chrome 2 name 61 google chrom
  • centos7安装anaconda

    步骤一 xff1a 删除centos7中自带有python2 7 xff08 可选 xff09 xff08 1 xff09 强制删除已安装python及其关联 rpm qa grep python xargs rpm ev allmatch
  • new释放内存理解c++

    释放内存后 xff0c 本身抄指针的内容是不会改变的即指针指向没有变 xff0c 但是它里面保存的地址所对应的内存在系统里标记为未使用的即指向的内存区域已经被系统收回了 xff0c 这块内存随时可能会被分配给其他进程或进程里面的变量使用 x
  • archlinux BIOS+BMR 安装记录

    声明 xff01 xff01 本安装指南采用的是传统的BIOS 43 BMR分区方案 43 GRUB引导 适用于早期的电脑 现在大多电脑基本都是UEFI 43 GPT xff0c 这种方案 安装前准备 制作启动盘 1 xff0c 下载arc
  • MAC Please reinstall Android Studio form screatch

    遇到问题的流程 xff0c 是误操作关闭了Kotlin插件 xff0c 重启的时候提示这个信息 解决办法 xff1a 资源库 Preferences AndroidStudio4 0 下面找到disabled plugins txt文件 删
  • 关于zookeeper启动一闪而过的情况

    问题描述 笔者在虚拟机启动zk cmd无论怎么启动都是闪退的情况 解决方案 在cmd文件加个pause查看问题 如下图所示 64 echo off REM Licensed to the Apache Software Foundation
  • jupyter notebook error: FileNotFoundError: [WinError 2]

    文章目录 问题现象解决方法 问题现象 Traceback most recent call last File 34 D Anaconda3 envs robot env lib site packages tornado web py 3
  • Hp 4200添加为网络打印机的步骤

    物理连接 1 只要连接电源 网线即可 设置打印机IP 1 在打印机控制面板上按下 键 xff0c 进入菜单 xff1b 2 按下 或 找到 CONFIGURE DEVICE xff08 配置设备 xff09 选项 xff0c 按下 键 xf
  • Qt内置浏览器引擎WebEngine调试和分析方法

    问题背景 H5和JS代码写好了 xff0c 在浏览器里调试也一切OK xff0c 然后嵌入到Qt中 xff0c 发现各种问题 xff0c 这时候 xff0c 如何在Qt框架下调试Web的代码呢 xff1f 调试方法 在Qt帮助文档中 xff
  • IOS开发之——网络-视频播放-JSON解析(11)

    一 概述 解析JSON结果到OC类通过视频播放地址构成视频播放器并播放显示网络请求 xff0c 返回视频播放列表 xff0c 点击列表中的一项进行播放 二 Main storyboard 三 解析JSON结果到OC类 3 1 JSON数据
  • 华为交换机 ssh和Telnet远程登录配置命令

    华为交换机 ssh和Telnet远程登录配置命令 联系方式 QQ481715271 简单拓扑 LSW4 vlan 20配置地址的原因是SSH Telnet 都可以登录 LSW3 配置 vlan batch 10 20 interface V
  • CentOS7配置yum本地源时报错Repository ‘centos7-media‘: Error parsing config

    错误如下 xff1a 已加载插件 xff1a fastestmirror Repository 39 centos7 media 39 Error parsing config Error parsing 34 gpgkey 61 39 f
  • 深入理解Tomcat虚拟目录

    我们知道 xff0c Web网站中的内容 xff08 包括网页 xff0c 图片 xff0c 音频文件等 xff09 一般都存放在App的目录下 但随着网站内容的不断丰富 xff0c 用户需要把不同层次的内容组织成网站的子目录 我们通常的做

随机推荐

  • Spring Security 实现身份认证

    Spring Security可以运行在不同的身份认证环境中 xff0c 当我们推荐用户使用Spring Security进行身份认证但并不推荐集成到容器管理的身份认证中时 xff0c 但当你集成到自己的身份认证系统时 xff0c 它依然是
  • Spring Security身份认证之UserDetailsService

    zhiqian我们采用了配置文件的方式从数据库中读取用户进行登录 虽然该方式的灵活性相较于静态账号密码的方式灵活了许多 xff0c 但是将数据库的结构暴露在明显的位置上 xff0c 绝对不是一个明智的做法 本文通过Java代码实现UserD
  • 基于Apache OLTU的OAuth2.0授权解决方案

    Apache OLTU实现了OAuth 2 0的规范 xff0c 是一种可靠的Java授权解决方案 但是 xff0c 官方文档实在是太惨不忍睹了 本文参考了开涛的 OAuth 2 0集成Shiro文章 模拟了OAuth2 0的认证流程 技术
  • Couch的MapReduce查询

    1 MapReduce介绍 传统的关系型数据库中 xff0c 只要你的数据是结构化的 xff0c 你可以进行任何类型的查询 Apache Couch与此相反 xff0c 它使用MapReduce xff08 预定义的map和的reduce方
  • Java遍历读取文件目录结构

    Java读取计算机目录 xff0c 并打印 public class ReadDirectory 文件所在的层数 private int fileLevel 生成输出格式 64 param name 输出的文件名或目录名 64 param
  • Java实现数字水印

    数字水印有可见不可见之分 xff0c 可见的比如课件上印有学校校徽 xff0c 微博发图片会水印上上传者的信息及微博logo等 用java实现可见的数字水印 xff0c 草人主要是用到了java awt包中的AlphaComposite类
  • 程序员应该如何去设计需求

    刚出道的程序员 xff0c 在做需求分析的时候 xff0c 总是经常挨批 xff0c 客户说他们不能按照客户的要求去设计原型 xff0c 领导说他们不用心去与客户沟通交流 程序员总是感到自己很冤枉 xff0c 明明客户没有给出一点建设性建议
  • Android硬件开发之——使用Android Beam传输文本

    前言 本文主要讲述使用Android Beam传输文本 xff0c 内容包含 xff1a Android Beam的基本理念Android Beam API实例 Android Beam Android Beam的基本理念 Android
  • 小小程序员的一周日报

    工作依旧在有条不紊的进行着 xff0c 一周的时间很快就会过去 xff0c 正如今天李哥所说的 xff0c 这一周还没有感觉怎么过呢 xff0c 就结束了 是啊 xff0c 这就是我们的工作 xff0c 程序员的工作 xff0c 软件设计师
  • 项目空间都有啥

    项目空间是什么 xff0c Workplace 答案是 xff1a No 项目空间是由项目负责人提出的实施某项目方案的一种流程 项目空间是XX海油ERP管理系统下的一个业务 xff0c 项目负责人通过创建项目名称 项目负责人 使用资源 所属
  • 你不要瞧不起Ctrl+C

    曾经 xff0c 在我未参加工作之前 xff0c 我认为靠 Ctrl 43 C 来完成工作的人 xff0c 肯定是懒惰的程序员 xff0c 但是现在我发现我错了 xff0c 而且是彻底的错了 能够通过 Ctrl 43 C 来完成工作的人 x
  • 文档交接说明书(模板)

    因为同事的离职 xff0c 我的入职 xff0c 要从同事手中交接过来一些项目 公司里只有一些开发文档相关的模板 xff0c 并没有文档交接相关的模板 xff0c 所以交接文档的模板也就由我们自己来定 我结合自己在工作中的经验 xff0c
  • Java如何打印输出九九乘法表

    Java如何打印输出九九乘法表 打印乘法表的方法 1 使用双重for循环打印九九乘法表 2 使用do while 实现打印九九乘法表 双重for循环的使用 打印结果如图示 xff1a Java程序源代码如下 xff1a span class
  • 服务器出现大量 TIME_WAIT,如何解决

    经常在服务器发现一些连接出现 TIME WAIT 状态 xff0c 那么为什么会有 TIME WAIT状态 xff0c 它是如何产生的 xff1f 大量的 TIME WAIT 有什么危害 xff1f 如何排查 xff1f 如何优化 xff1
  • 【Mariadb/Mysql】利用JSON函数巧妙实现行列置换提高系统效率

    在现实生活中 xff0c 往往我们需要创建一些One to Many或Many to Many的关系数据表 例如 xff1a 按照Configuration Solution的设计 xff0c 产品与产品参数表的关系 xff0c 往往一种产
  • msvcp120.dll丢失的最新解决方法

    下载msvcp120 dll打开浏览器后在顶部输入 dll修复程序 site 按下电脑键盘的回车键打开下载msvcp120 dll系统文件 msvcp120 dll文件下载完成点击解压安装包 然后右键打开修复安装包文件电脑提示找不到msvc
  • VirtualBox Windows下开机自启动

    背景 我们本地使用Virtualbox虚拟机的时候 xff0c 有些虚拟机需要开机自启动 方案 STEP 1 编写start bat脚本 34 D Program Files Oracle VirtualBox VBoxManage exe
  • js模拟C#Matches方法。

    水平有点低 xff0c 感觉应该这样写 var str 61 34 今年是2013年7月24日12 00 12 xff0c 我打算在2023年7月24日12 01 10登月 34 function Matches inputStr reg
  • MFC在其他线程中刷新主对话框中EDIT等控件数据

    为了程序稳定性 xff0c 在数据处理线程中 xff0c 所有与主UI线程有关的控件数据刷新应该到主UI线程中处理 也就是数据处理线程发消息 xff0c 让界面UI去更新控件 例如在项目中用到EDIT控件需要实时更新数据 xff0c 用Po
  • 多线程之生产者消费者模式

    文章目录 基本组成阻塞队列有界队列与无界队列ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue 流量控制与信号量 Semaphore 双缓冲与Exchanger 基本组成 生产者 xf