Java多种方式解决生产者消费者问题(十分详细)

2023-05-16

一、问题描述

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

示意图:
生产者消费者


二、解决方法

思路

  1. 采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

  2. 在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

解决问题的核心

   保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法

  1. wait() / notify()方法

  2. await() / signal()方法

  3. BlockingQueue阻塞队列方法

  4. 信号量

  5. 管道


三、代码实现

1. wait() / notify()方法

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

仓库Storage.java

import java.util.LinkedList;

public class Storage {

    // 仓库容量
    private final int MAX_SIZE = 10;
    // 仓库存储的载体
    private LinkedList<Object> list = new LinkedList<>();

    public void produce() {
        synchronized (list) {
            while (list.size() + 1 > MAX_SIZE) {
                System.out.println("【生产者" + Thread.currentThread().getName()
		                + "】仓库已满");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName()
                    + "】生产一个产品,现库存" + list.size());
            list.notifyAll();
        }
    }

    public void consume() {
        synchronized (list) {
            while (list.size() == 0) {
                System.out.println("【消费者" + Thread.currentThread().getName() 
						+ "】仓库为空");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.remove();
            System.out.println("【消费者" + Thread.currentThread().getName()
                    + "】消费一个产品,现库存" + list.size());
            list.notifyAll();
        }
    }
}

生产者

public class Producer implements Runnable{
    private Storage storage;

    public Producer(){}

    public Producer(Storage storage){
        this.storage = storage;
    }

    @Override
    public void run(){
        while(true){
            try{
                Thread.sleep(1000);
                storage.produce();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

消费者

public class Consumer implements Runnable{
    private Storage storage;

    public Consumer(){}

    public Consumer(Storage storage){
        this.storage = storage;
    }

    @Override
    public void run(){
        while(true){
            try{
                Thread.sleep(3000);
                storage.consume();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

主函数

public class Main {

    public static void main(String[] args) {
        Storage storage = new Storage();
        Thread p1 = new Thread(new Producer(storage));
        Thread p2 = new Thread(new Producer(storage));
        Thread p3 = new Thread(new Producer(storage));

        Thread c1 = new Thread(new Consumer(storage));
        Thread c2 = new Thread(new Consumer(storage));
        Thread c3 = new Thread(new Consumer(storage));

        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

运行结果

【生产者p1】生产一个产品,现库存1
【生产者p2】生产一个产品,现库存2
【生产者p3】生产一个产品,现库存3
【生产者p1】生产一个产品,现库存4
【生产者p2】生产一个产品,现库存5
【生产者p3】生产一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【消费者c1】消费一个产品,现库存7
【生产者p3】生产一个产品,现库存8
【消费者c2】消费一个产品,现库存7
【消费者c3】消费一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【生产者p3】生产一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p2】仓库已满
【生产者p3】仓库已满
【生产者p1】仓库已满
【消费者c1】消费一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p3】仓库已满
。。。。。。以下省略

一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

注意:

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

2. await() / signal()方法

在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

在这里只需改动Storage类

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {

    // 仓库最大存储量
    private final int MAX_SIZE = 10;
    // 仓库存储的载体
    private LinkedList<Object> list = new LinkedList<Object>();
    // 锁
    private final Lock lock = new ReentrantLock();
    // 仓库满的条件变量
    private final Condition full = lock.newCondition();
    // 仓库空的条件变量
    private final Condition empty = lock.newCondition();

    public void produce()
    {
        // 获得锁
        lock.lock();
        while (list.size() + 1 > MAX_SIZE) {
            System.out.println("【生产者" + Thread.currentThread().getName()
		             + "】仓库已满");
            try {
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【生产者" + Thread.currentThread().getName() 
				 + "】生产一个产品,现库存" + list.size());

        empty.signalAll();
        lock.unlock();
    }

    public void consume()
    {
        // 获得锁
        lock.lock();
        while (list.size() == 0) {
            System.out.println("【消费者" + Thread.currentThread().getName()
		             + "】仓库为空");
            try {
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【消费者" + Thread.currentThread().getName()
		         + "】消费一个产品,现库存" + list.size());

        full.signalAll();
        lock.unlock();
    }
}

运行结果与wait()/notify()类似

3. BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

import java.util.concurrent.LinkedBlockingQueue;

public class Storage {

    // 仓库存储的载体
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);

    public void produce() {
        try{
            list.put(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName()
                    + "】生产一个产品,现库存" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public void consume() {
        try{
            list.take();
            System.out.println("【消费者" + Thread.currentThread().getName()
                    + "】消费了一个产品,现库存" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

4. 信号量

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)。

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Storage {

    // 仓库存储的载体
    private LinkedList<Object> list = new LinkedList<Object>();
	// 仓库的最大容量
    final Semaphore notFull = new Semaphore(10);
    // 将线程挂起,等待其他来触发
    final Semaphore notEmpty = new Semaphore(0);
    // 互斥锁
    final Semaphore mutex = new Semaphore(1);

    public void produce()
    {
        try {
            notFull.acquire();
            mutex.acquire();
            list.add(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName()
                    + "】生产一个产品,现库存" + list.size());
        }
        catch (Exception e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            notEmpty.release();
        }
    }

    public void consume()
    {
        try {
            notEmpty.acquire();
            mutex.acquire();
            list.remove();
            System.out.println("【消费者" + Thread.currentThread().getName()
                    + "】消费一个产品,现库存" + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            notFull.release();
        }
    }
}

5. 管道

一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

1. PipedInputStream / PipedOutputStream (操作字节流)

Producer

import java.io.IOException;
import java.io.PipedOutputStream;

public class Producer implements Runnable {

    private PipedOutputStream pipedOutputStream;

    public Producer() {
        pipedOutputStream = new PipedOutputStream();
    }

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                pipedOutputStream.write(("This is a test, Id=" + i + "!\n").getBytes());
            }
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Consumer

import java.io.IOException;
import java.io.PipedInputStream;

public class Consumer implements Runnable {
    private PipedInputStream pipedInputStream;

    public Consumer() {
        pipedInputStream = new PipedInputStream();
    }

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {
        int len = -1;
        byte[] buffer = new byte[1024];
        try {
            while ((len = pipedInputStream.read(buffer)) != -1) {
                System.out.println(new String(buffer, 0, len));
            }
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

主函数

import java.io.IOException;

public class Main {

    public static void main(String[] args) {
        Producer p = new Producer();
        Consumer c = new Consumer();
        Thread t1 = new Thread(p);
        Thread t2 = new Thread(c);
        try {
            p.getPipedOutputStream().connect(c.getPipedInputStream());
            t2.start();
            t1.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
2. PipedReader / PipedWriter (操作字符流)

Producer

import java.io.IOException;
import java.io.PipedWriter;

public class Producer implements Runnable {

    private PipedWriter pipedWriter;

    public Producer() {
        pipedWriter = new PipedWriter();
    }

    public PipedWriter getPipedWriter() {
        return pipedWriter;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                pipedWriter.write("This is a test, Id=" + i + "!\n");
            }
            pipedWriter.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Consumer

import java.io.IOException;
import java.io.PipedReader;

public class Consumer implements Runnable {
    private PipedReader pipedReader;

    public Consumer() {
        pipedReader = new PipedReader();
    }

    public PipedReader getPipedReader() {
        return pipedReader;
    }

    @Override
    public void run() {
        int len = -1;
        char[] buffer = new char[1024];
        try {
            while ((len = pipedReader.read(buffer)) != -1) {
                System.out.println(new String(buffer, 0, len));
            }
            pipedReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

主函数

import java.io.IOException;

public class Main {

    public static void main(String[] args) {
        Producer p = new Producer();
        Consumer c = new Consumer();
        Thread t1 = new Thread(p);
        Thread t2 = new Thread(c);
        try {
            p.getPipedWriter().connect(c.getPipedReader());
            t2.start();
            t1.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

想查看上面几种方式的完整代码,请点击这里:生产者消费者问题的一些实验


参考

《Java多线程编程核心技术》 高洪岩
生产者/消费者问题的多种Java实现方式
Producer–consumer problem – Wikipedia
Semaphore的一种使用方法
Semaphore实现的生产者消费者程序

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java多种方式解决生产者消费者问题(十分详细) 的相关文章

随机推荐

  • 关于SubSonic3.0生成的表名自动加复数(s)的“用户代码未处理SqlException,对象名‘xxxs‘无效”异常处理

    关于SubSonic3 0生成的表名自动加复数 xff08 s xff09 的 用户代码未处理SqlException xff0c 对象名 39 xxxs 39 无效 异常处理 参考文章 xff1a xff08 1 xff09 关于SubS
  • 互联网金融风控面试算法知识(三)

    资料来源于网络搜集和汇总 xff0c 把算法知识的总结放在业务知识后面也是为了说明实际工作业务落地应用的重要性大于算法创新 面试题依然是适用于3年经验以内的初学者 xff0c 希望大家在学习算法的同时不要一心只研究算法而脱离了业务 xff0
  • wiki树莓派安装ubuntu mate 和 ros

    两大步骤 1 安装ubuntu mate 2 安装ros 一 安装ubuntu mate 下载ubuntu mate 18 04 img 并制作系统盘 首先要说的就是树莓派支持的系统是很多样的 xff0c 但是针对ros xff0c 我们只
  • npm 的工作原理

    包 Package 和模块 Module 如何定义一个Package 满足如下条件都可以称为一个包 xff1a 一个文件夹包含应用程序 xff0c 使用package json来描述它 a 一个用gzip压缩的文件夹 xff0c 满足 a
  • 2023最全Postman安装使用详解

    一 Postman背景介绍 用户在开发或者调试网络程序或者是网页B S模式的程序的时候是需要一些方法来跟踪网页请求的 xff0c 用户可以使用一些网络的监视工具比如著名的Firebug等网页调试工具 今天给大家介绍的这款网页调试工具不仅可以
  • cmake 设置 debug release模式

    1 通过命令行的方式 cmake DCMAKE BUILD TYPE 61 Debug 2 set CMAKE BUILD TYPE Debug CACHE STRING 34 set build type to debug 34 或者 s
  • 华为笔试题(4)

    一 计算n x m的棋盘格子 xff08 n为横向的格子数 xff0c m为竖向的格子数 xff09 沿着各自边缘线从左上角走到右下角 xff0c 总共有多少种走法 xff0c 要求不能走回头路 xff0c 即 xff1a 只能往右和往下走
  • 安装RedisBloom插件

    前言 安装RedisBloom模块会遇到很多坑 xff0c 希望你不要和我一样踩的这么全 x1f60f 如果觉得编译麻烦 xff0c 我也上传了我编译的so文件 xff0c 可以直接加载使用 https download csdn net
  • ROS Catkin 教程之 CMakeLists.txt

    1 概览 CMakeLists txt 是用 CMake 构建系统构建 ROS 程序包的输入文件 任何兼容 CMake 的包都包含一个或多个 CMakeLists txt 文件 xff0c 用以描述怎样构建和安装代码 catkin 项目采用
  • Xsens Mti-g-710 IMU driver在Ubuntu18.04 ROS melodic中的安装使用

    Ubuntu18 04下安装的ROS melodic 如何使用Xsens Mti g 710 IMU driver xff1f 这里给出一个详细步骤说明 这里的IMU是USB接口 1安装 首先插入IMU的USB口 命令行运行 gt lsus
  • PYTHON -MYSQLDB安装遇到的问题和解决办法

    PYTHON MYSQLDB安装遇到的问题和解决办法 参考文章 xff1a xff08 1 xff09 PYTHON MYSQLDB安装遇到的问题和解决办法 xff08 2 xff09 https www cnblogs com gaosh
  • 位姿估计Robot_pose_efk的配置和使用

    Robot pose efk 用于融合里程计 xff0c 惯性测量单元和视觉里程计的传感器输出 xff0c 从而减少测量中的总体误差 了解ROS的robot pose ekf软件包中扩展卡尔曼滤波器的用法 xff1a robot pose
  • linux录屏和截图软件

    linux下的录屏和截图软件有很多 xff0c kazam集成了录屏和截图两个功能 xff0c 而且十分轻量级 xff0c 比较好用 如果是在VirtualBox虚拟机中跑linux的话 xff0c virtualbox本身就提供录屏和截图
  • APM 学习 6 --- ArduPilot 线程

    ArduPilot 学习之路 6 xff0c 线程 英文原文地址 xff1a https ardupilot org dev docs learning ardupilot threading html 理解 ArduPilot 线程 线程
  • nginx 配置多个vue,环境部署

    1 最近项目要上线 xff0c 需要通过nginx作为代理 xff0c 要发布2个VUE前端项目 xff0c 记录一下nginx conf配置文件 亲自验证 xff0c 特此记录一下 xff0c 希望能帮助向我一样 小白的人 user ro
  • freertos源码分析(1)--初始篇

    代码下载地址 xff1a https www freertos org 部分转载参考 FreeRTOS基础知识 xff1a RTOS全称为 xff1a Real Time OS xff0c 就是实时操作系统 xff0c 强调的是 xff1a
  • nginx服务占用百分之百

    一 当nginx达到100 时 xff0c 也就是服务器负载突然上升 1 利用top命令查看cpu使用率较高的php cgi进程 PID USER PR NI VIRT RES SHR S CPU MEM TIME 43 COMMAND 1
  • Gazebo教程(使用roslaunch 启动Gazebo,world以及urdf模型)

    Gazebo教程 xff08 使用roslaunch 启动Gazebo xff0c world以及urdf模型 xff09 关于如何学习ROS可以参考古月居的这篇文章 1 https www zhihu com question 35788
  • dispatch_queue_create---创建队列

    dispatch queue create span class hljs keyword const span span class hljs keyword char span label dispatch queue attr t a
  • Java多种方式解决生产者消费者问题(十分详细)

    一 问题描述 生产者消费者问题 xff08 Producer consumer problem xff09 xff0c 也称有限缓冲问题 xff08 Bounded buffer problem xff09 xff0c 是一个多线程同步问题