RabbitMQ学习笔记2-Work queues

2023-05-16


接下来学习第二种模型,Work queues模型,如图所示:


该模型描述的是:一个生产者(P)向队列发送一个消息,然后多个消费者(P)接受消息,每条消息只能被一个消费者接收。以下示例可根据学习笔记1里面的代码稍作修改,因此部分细节不再说明。


生产者发送消息到队列中

创建连接和通道

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

声明一个队列

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
注意:此时第二个参数设置为true。作用是, 当RabbitMQ退出或崩溃时,它会丢失队列和消息,。需要两件事来确保消息不会丢失:我们需要将队列和消息设置为持久化。

发送消息到队列

String message = "Hello World......";
// PERSISTENT_TEXT_PLAIN: Content-type "text/plain", deliveryMode 2 (persistent), priority zero
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
注意:参数3MessageProperties.PERSISTENT_TEXT_PLAIN作用是使消息持久化。

关闭通道和连接

channel.close();
connection.close();

源代码如下:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * @author YKun
 * @date 2017年4月24日 下午10:37:20
 * @describe
 */
public class NewTask {

	private static final String TASK_QUEUE_NAME = "task_queue";

	public static void main(String[] args) throws IOException, TimeoutException {

		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		// param1: 队列名 param2: 是否持久化
		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

		String message = "Hello World......";
		
		// PERSISTENT_TEXT_PLAIN: Content-type "text/plain", deliveryMode 2 (persistent), priority zero
		channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
		
		System.out.println(" [x] Sent '" + message + "'");

	    	channel.close();
	    	connection.close();
	}

}


运行结果如下:




消费者获取消息

创建连接和通道

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

声明一个队列

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

定义一个默认的消费者

Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");

		        System.out.println(" [x] Received '" + message + "'");
		        try {
		          doWork(message);
		        } finally {
		          System.out.println(" [x] Done");
		          channel.basicAck(envelope.getDeliveryTag(), false);
		        }
			}
		};
private static void doWork(String task) {
		for (char ch : task.toCharArray()) {
			if (ch == '.') {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException _ignored) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}

函数doWork的作用是跟据获取的消息,若消息内容存在"."则程序休眠一秒,channel.basicAck的作用是在处理完消息之后返回应答状态,第二个参数false表示关闭RabbitMQ的自动应答,改为手动应答。

监听队列,并且设置成手动返回

channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
参数2,autoAck设置为false,表示不自动删除


Message acknowledgment 消息应答

在现实场景中经常会发生一种情况,其中一个消费者在执行任务过程中有可能会挂掉或者出现异常,可是RabbitMQ默认一旦将任务发送给消费者之后就会将该任务从内存中删除,因此极大可能会出现任务丢失的情况,所以当其中一个消费者宕机之后应该把这个任务转发给下一个消费者进行处理。为确保消息永不丢失,RabbitMQ支持消息确认,既消费者处理完任务后发送一个确认信息给RabbitMQ,RabbitMQ收到确认信息后方可删除该任务。若消费者出现宕机而不发送确认信息,RabbitMQ则会将任务重新排队交由下一个消费者处理。因此才会出现以下代码
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

Fair dispatch 公平派遣

当消息进入RabbitMQ时,RabbitMQ只会分派消息,而且是固定的将第n个消息分发给第n个消费者。可能会出现一种情况,例如假设有两个消费者,三个任务,第一个任务需要耗费20秒的时间进行处理,第二个任务只需5秒,第三个任务也只需5秒,则会出现以下情况,第一第三个任务都会被分派给第一个消费者,而第二个消费者可能处理完第二个任务之后就空闲下来,但第一个消费者连第一个任务都还没处理完。最佳的情况是当两个消费者有任一一个消费者空闲下来就应该处理掉还没被消费的任务,既第二个消费者应该将第三个任务处理掉。 我们可以使用 basicQos 方法与  prefetchCount  =  1 设置。这告诉RabbitMQ不要一次给一个工作者多个消息。或者换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它将发送到下一个还不忙的工作。
int prefetchCount = 1 ;
channel.basicQos(prefetchCount);
源代码如下:
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * @author YKun
 * @date 2017年4月24日 下午10:56:34
 * @describe
 */
public class Worker {

	private static final String TASK_QUEUE_NAME = "task_queue";

	public static void main(String[] args) throws IOException, TimeoutException {

		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		System.out.println(channel.hashCode());
		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		channel.basicQos(1);
		
		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");

		        System.out.println(" [x] Received '" + message + "'");
		        try {
		          doWork(message);
		        } finally {
		          System.out.println(" [x] Done");
		          channel.basicAck(envelope.getDeliveryTag(), false);
		        }
			}
		};
		
		// param2: autoAck设置为false,表示不自动删除
		channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
	}

	private static void doWork(String task) {
		for (char ch : task.toCharArray()) {
			if (ch == '.') {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException _ignored) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}

}



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ学习笔记2-Work queues 的相关文章

  • [零刻]EQ12迷你主机设置来电开机自启教程

    EQ12 来电自启需通过BIOS设置 断电前开机状态则来电后自动开机 xff0c 关机状态则不会自动开机 操作步骤 xff1a 1 首先关闭主机 xff0c 按下电源后 xff0c 当屏幕出现logo后 xff0c 迅速按下键盘上的Del键
  • C语言习题:字符串操作函数练习题目

    1 将包含字符数字的字符串分开 使得分开后的字符串前一部分是数字后一部分是字母 例如 h1ell2o3 gt 123hello span class token macro property span class token directi
  • Java单元测试实践-08.Stub、Replace、Suppress静态方法

    Java单元测试实践 00 目录 xff08 9万多字文档 43 700多测试示例 xff09 https blog csdn net a82514921 article details 107969340 1 Stub Replace S
  • [零刻]EQ12安装PVE虚拟机教程

    PVE虚拟机简介 Proxmox VE是一个运行虚拟机和容器的平台 基于Debian Linux xff0c 完全开源 为了获得最大的灵活性 xff0c 实现了两种虚拟化技术 基于内核的虚拟机 KVM 和基于容器的虚拟化 LXC 一个主要的
  • [零刻]EQ12&EQ12Pro安装OpenWRT软路由教程

    OpenWRT系统安装 安装前准备 1 U盘一个 2 WePE写盘工具 3 Openwrt固件 4 Img镜像写盘工具 安装步骤 xff1a 1 首先下载WePE写盘工具 xff0c 制作一个PE系统安装环境 xff0c 启动软件后 xff
  • [零刻]EQ12&EQ12Pro安装原厂系统教程

    有些小伙伴购买的准系统的版本因为不带内存和硬盘 xff0c 需要自己进行安装系统 xff0c 对于小白来说还是比较困难的 xff0c 在这里推荐大家使用原厂的系统 xff0c 里面自带驱动安装好后就可以直接使用 使用原厂系统包安装步骤 xf
  • Esxi8.0安装Ubuntu系统教程

    本篇教程主要教大家怎么在ESXi8 0虚拟机上安装Ubuntu系统 xff0c 首先安装Ubuntu需要准备一个ISO系统镜像文件 xff0c 我们可以去Ubuntu官网下载 Ubuntu官网 xff1a https ubuntu com
  • [零刻]EQ12&EQ12Pro调整风扇转速教程

    调整 CPU 风扇转速可以有不同的用途 xff0c 具体取决于您的计算机和使用情况 降低噪音 xff1a 如果您的风扇的噪音很大 xff0c 可以通过降低 CPU 风扇的转速来减少噪音 这可以通过在 BIOS 或中设置 CPU 风扇转速控制
  • [零刻]EQ12&EQ12Pro调整最低功耗教程

    在使用EQ12作为软路由的时候 xff0c 因为要不间断工作 xff0c 功耗就非常关键了 xff0c 所以并不需要太高的功耗 xff0c 所以接下来我教你怎么设置以最低功耗运行 具体操作步骤 xff1a 1 先关闭电脑 xff0c 按下电
  • 动态更新阿里云DDNS解析记录的IPv6地址,随时随地用域名远程访问自己的电脑【如何远程访问家里的电脑】

    远程访问电脑 日志简介要求1 IPv6网络1 1检查光猫是否支持IPv61 2检查路由器是否支持并开启IPv6 xff08 没有路由器的跳过这一步 xff09 1 3配置电脑防火墙1 3 1允许ICMPv6协议通过防火墙1 3 2文件共享S
  • 一个好用的js压缩加密网站

    js加密网站 我是vue的js xff0c 用了好几个加密网站得出的结果会有错 xff0c 这个挺好
  • alibaba pc safe service无法删除,一直在后台运行怎么办?

    对付流氓软件 xff0c 应当使用师夷长技以制夷的办法 xff0c 下载一个腾讯电脑管家 xff0c 然后下载里面的文件粉碎机 xff0c 在任务管理器找到对应的alibaba pc safe service服务 xff0c 点击进入具体的
  • APP流量变现之穿山甲广告平台接入

    1 首先百度搜索 穿山甲广告投放 xff0c 第一个出现的链接就是开发者官网 xff0c 截图如下 xff1a 2 进入之后点击注册 xff0c 然后登陆 xff08 这一步穿山甲超级简单 xff0c 如果不着急提现收益的话 xff0c 可
  • spss统计分析基础教程(上)--自学

    64 TOC 目录 xff09 第一章 四种窗口 数据窗口 输出窗口 语法窗口 脚本窗口 菜单 1 文件 xff1a 新建 打开 保存 另存为 将文件标记为只读 xff1a 如果之后保存文件 xff0c 则只能重命名并另存 重新命名数据集
  • OpenWrt 内的阿里云盘 WebDAV 做磁盘使用

    最近在玩OpenwWrt的时候 xff0c 在刷的固件里看到预装的阿里云盘 WebDAV xff0c 加上最近刚刚开始用阿里云 xff0c 不限速 xff0c 非常快 xff0c 通过这个服务 xff0c 可以直接把阿里云的文件架挂载在本地
  • VS2022中使用Copilot

    Copilot可以自动帮你写代码 1 打开vs2022 点击扩展 xff0c 在里面搜索copilot安装 2 安装完成后 xff0c 左下角有个小图标就是copilot 3 点击登录 会弹框 点击确定后 xff0c 跳转到网站 xff0c
  • CSS/SCSS/LESS和自适应布局/响应式布局详解

    在开发前端的时候 xff0c 界面布局尤为重要 xff0c 要布局的非常合理 xff0c 好看 xff0c css是必不可少的 xff0c 然后是各种布局 xff0c 使用这些布局 xff0c 进行混合搭配 xff0c 最终的目的都是开发一
  • 响应式布局之viewport-超级简单

    之前文章CSS布局之详解 故里2130的博客 CSDN博客 上面的文章可以实现响应式布局 xff0c 根据浏览器的大小变化而变化 xff0c 但是相对于viewport来说 xff0c 之前的还是有点复杂 xff0c 而使用viewport
  • .net6API使用AutoMapper和DTO

    AutoMapper xff0c 是一个转换工具 xff0c 说到AutoMapper时 xff0c 就不得不先说DTO xff0c 它叫做数据传输对象 Data Transfer Object 通俗的来说 xff0c DTO就是前端界面需
  • 手机/移动端的UI框架-Vant和NutUI

    下面推荐2款手机 移动端的UI框架 其实还有很多的框架 xff0c 各个大厂都有UI框架 目前 xff0c 找来找去 xff0c 只有腾讯的移动端是setup语法写的TDesign xff0c 其他大厂 xff0c 虽然都是VUE3写的 x

随机推荐

  • 使用uniapp创建小程序和H5界面

    uniapp的介绍可以看官网 xff0c 接下来我们使用uniapp创建小程序和H5界面 xff0c 其他小程序也是可以的 xff0c 只演示创建这2个 xff0c 其实都是一套代码 xff0c 只是生成的方式不一样而已 uni app官网
  • 使用NutUI创建小程序和H5界面

    做开发的时间长了 xff0c 技术都是通用的 xff0c 创建小程序和H5界面有很多的UI xff0c 本章节演示使用NutUI来创建 xff0c 官网 xff0c NutUI 移动端 Vue3 小程序组件库 1 使用HBuilder X创
  • 如何开发微信小程序呢

    也许很多人对小程序 xff0c H5程序 xff0c Vue xff0c 网页程序 xff0c PC端程序认识比较模糊 xff0c 因为这些跨度非常的大 xff0c 很少人会一次性全部接触 xff0c 甚至只是听说过 xff0c 并不了解其
  • .NET6中使用GRPC详细描述

    Supported languages gRPC xff0c 官网 至于原理就不说了 xff0c 可以百度原理之后 xff0c 然后再结合代码 xff0c 事半功倍 xff0c 就能很好理解GRPC了 目录 一 简单使用 二 实际应用 一
  • spss统计分析基础教程(下)--自学

    目录 xff09 第十二章分布类型的检验12 1假设检验的基本思想12 2正态分布检验K S检验的原理 12 3二项分布检验12 4游程检验12 5蒙特卡罗方法 第十三章连续变量的统计推断 xff08 一 xff09 t检验13 1t检验概
  • uniapp学习记录

    目录 1 布局使用flex布局 2 rpx和界面自适应 xff0c 设计稿是750rpx 3 首页不显示tabBar 4 跳转页面 启动跳转页面 5 uniapp中页面生命周期 传值 6 颜色使用 7 字体使用 8 SCSS CSS中获取j
  • uniapp中调用.net6 webapi

    使用uniapp开发程序时 xff0c 不管是小程序 xff0c 还是H5界面 xff0c 它们只是一个显示界面 xff0c 也就是只充当前台界面 xff0c 那么我们后台使用 net6 webapi写业务逻辑 xff0c 然后前端访问后端
  • vue3中前端处理不同数据结构的JSON

    有时候 xff0c 后端返回的JSON数据格式 xff0c 是前端不需要的格式类型 xff0c 这时 xff0c 要么让后端修改 xff0c 你要什么格式 xff0c 那么让后端大哥哥给你返回什么格式 但是有时候不尽人意 xff0c 后端大
  • 在vue3中Element Plus切换主题

    一共2种方法 目录 第一种 第二种 第一种 暗黑模式 xff0c 使用useDark xff0c 可以不用安装Element Plus xff0c 只切换页面的背景颜色 xff0c 不改变Element Plus控件的颜色 xff0c 本案
  • IIS发布.net6 api+微信小程序/H5真机调试接口的流程

    我们创建 net6 api程序 xff0c 然后使用SqlSugar连接MySQL数据库 xff0c 再使用iis发布 xff0c 当然使用其他的也行 再开发一个微信小程序 xff0c 手机运行小程序 xff0c 手机运行H5 xff0c
  • 全栈开发小作品展示(有声音)

    不积跬步 xff0c 无以至千里 xff1b 不积小流 xff0c 无以成江海 目录 1 客户端 2 网站 3 小程序 4 H5演示 1 客户端 PC桌面 xff0c WPF 43 prism框架 xff0c 前后端分离 xff0c 主要是
  • rust学习

    Installation The Rust Programming Language https doc rust lang org book ch01 01 installation html 一 安装 安装了几个组件 curl prot
  • visual studio 2017出现MSB8020,MSB8036等SDK版本选择的错误

    1 xff0c 严重性 代码 说明 项目 文件 行 禁止显示状态 错误 MSB8020 无法找到 v140 的生成 xff1b 2 xff0c 严重性 代码 说明 项目 文件 行 禁止显示状态 错误 MSB8036 找不到 Windows
  • C++中循环include问题的讨论

    问题 C语言中未避免头文件的重复引用 xff0c 一般都会使用include guard 如pragma once或 ifndef等 xff0c 但这样做以后并不是万事大吉了 循环使用include可能会出现一些意想不到的错误 如果代码较为
  • flask+gevent+gunicorn+supervisor+nginx异步高并发部署

    背景 flask是一款同步阻塞框架 xff0c 在调用外部http服务时 xff0c 当前进程将阻塞 多进程模式下 xff0c 无法响应其他用户的请求 xff0c 本文则是研究的是如何利用gevent提升flask的并发能力 xff0c 以
  • 小白学SAS--自学笔记

    64 TOC 目录 xff09 第一章初识SAS 数据集的命名 数据导入 建立永久数据集 用菜单新建文件夹 xff0c 并与电脑上已有文件夹关联 用libname语句指定文件夹名 xff0c 并与电脑上已有文件夹关联 用data语句直接指定
  • http协议常用请求头与响应头

    请求头 xff1a Accept 用于告诉服务器 xff0c 客户机支持的数据类型 Accept Charset xff1a 用于告诉服务器 xff0c 客户机所采用的编码 Accept Encoding xff1a 用于告诉服务器 xff
  • mysqlId 不能自启的问题(错误代号2003)

    计算机服务里看下有没有mysql的服务 xff0c 如果有 xff0c 把服务的启动类型改为自动 xff1b 如果没有 xff0c 则要进入安装目录的bin文件夹双击mysqld exe启动mysql 然后 cmd到 Mysql的安装目录的
  • RabbitMQ学习笔记1-"Hello World!"simple模型

    simple模型是RabbitMQ队列模型中最简单的一个模型 如图 xff1a P 是我们的生产者 xff08 producer xff09 xff0c C 是我们的消费者 xff08 consumer xff09 中间的红色框是队列 xf
  • RabbitMQ学习笔记2-Work queues

    接下来学习第二种模型 xff0c Work queues模型 xff0c 如图所示 xff1a 该模型描述的是 xff1a 一个生产者 xff08 P xff09 向队列发送一个消息 xff0c 然后多个消费者 xff08 P xff09