【RabbitMQ教程】“Hello World”工作队列模式

2023-11-05

目录

前言

“Hello World”工作队列模式介绍

消息模型

入门案例代码示例(自动ACK)

消息确认机制

自动ACK存在的问题

演示手动ACK


前言

1、将‘Hello World工作队列模式’单独抽出来细讲,目的是借助这个模式好好讲一下rabbitmq的‘自动ACK’和‘手动ACK’。

2、代码中的每一步,都演示了rabbitmq管理界面的变化;

“Hello World”工作队列模式介绍

rabbitmq六大工作模式架构图:

消息模型

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

入门案例代码示例(自动ACK)

 基于maven采用java原生写法。不需要写properties或者yml配置文件

新建一个maven工程,添加amqp-client依赖:

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.1</version>
</dependency>

连接工具类:

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.1.103");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/kavito");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        factory.setUsername("kavito");
        factory.setPassword("123456");
        // 通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生产者:

public class Send {
 
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 1、获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 2、从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 3、声明(创建)队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4、消息内容
        String message = "Hello World!";
        // 向指定的队列中发送消息
        //参数:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 参数明细:
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        
        //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
        channel.close();
        connection.close();
    }
}

控制台:

web管理页面:服务器地址/端口号 (本地:127.0.0.1:15672,默认用户及密码:guest guest) 

点击队列名称,进入详情页,可以查看消息:

 消费者接收消息:

public class Recv {
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        
        // 监听队列,第二个参数:是否自动进行消息确认。
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

控制台打印:

再看看队列的消息,已经被消费了

 我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印. 

消息确认机制

通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ怎么知道消息被接收了呢

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  •     自动ACK:消息一旦被接收,消费者自动发送ACK
  •     手动ACK:消息接收后,不会发送ACK,需要手动调用

大家觉得哪种更好呢?

这需要看消息的重要性:

  •     如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  •     如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
     

我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:

public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
                // 手动进行ACK
                /*
                 *  void basicAck(long deliveryTag, boolean multiple) throws IOException;
                 *  deliveryTag:用来标识消息的id
                 *  multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                 */
                
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列,第二个参数false,手动进行ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

 最后一行代码设置第二个参数为false

channel.basicConsume(QUEUE_NAME, false, consumer);

自动ACK存在的问题

修改消费者,添加异常,如下:

生产者不做任何修改,直接运行,消息发送成功:  

 运行消费者,程序抛出异常:

 管理界面:

消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息。

演示手动ACK

重新运行生产者发送消息:

同样,在手动进行ack前抛出异常,运行Recv2

 再看看管理界面:

 消息没有被消费掉!

还有另外一种情况:修改消费者Recv2,把监听队列第二个参数自动改成手动,(去掉之前制造的异常) ,并且消费方法中没手动进行ACK

 

 生产者代码不变,再次运行:

运行消费者 :

但是,查看管理界面,发现:

停掉消费者的程序,发现:  

这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次变为Ready。

正确的做法是:

我们要在监听队列时设置第二个参数为false,代码中手动进行ACK

 再次运行消费者,查看web管理页面:

消费者消费成功!  

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【RabbitMQ教程】“Hello World”工作队列模式 的相关文章

  • 为什么我无法使用 python 建立与rabbitMQ的连接?

    我正在学习如何使用rabbitMQ 我正在 MacBook 上运行rabbit MQ 服务器并尝试与 python 客户端连接 我按照安装说明进行操作here http www rabbitmq com install homebrew h
  • 使用spring-amqp和rabbitmq实现带退避的非阻塞重试

    我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法 但要求是侦听器不应被阻止 因此可以自由地处理其他消息 我在这里看到了类似的问题 但它不包括 后退 的解决方案 RabbitMQ 和 Sprin
  • 在 RabbitMQ 主题交换中路由与模式不匹配的消息

    两个队列绑定到具有以下路由键的主题交换 队列 A 与路由键模式匹配绑定 foo队列 B 与路由键模式匹配绑定 bar 我想向此交换添加第三个队列 该队列接收的消息都不是foo消息也不bar消息 如果我用一个绑定这个队列 路由密钥 我自然会得
  • 当我为rabbitmq-management创建用户时,发生了错误

    当我为rabbitmq创建用户时 root localhost rabbitmqctl add user admin admin 发生错误 消息 Creating user admin Error undef crypto hash sha
  • 保持鼠兔 BlockingConnection 存活而不禁用心跳

    我正在使用 pika 0 10 0 和 python 2 7 版本开发 RabbitMQ 消费者 在我的消费者客户端中 我有一个根据输入消息运行一段时间的进程 时间可能从 3 到 40 分钟不等 我不想禁用心跳 相反 我正在寻找一些回滚机制
  • 如何使用自动装配的 Spring Boot 监听多个队列?

    我是 Spring Boot 的新手 正在尝试它 目前我已经构建了一些应用程序 我希望能够通过队列相互通信 我目前有一个侦听器对象 可以从特定队列接收消息 Configuration public class Listener final
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • 何时使用 RabbitMQ 铲子以及何时使用 Federation 插件?

    对于我工作的公司 我们希望使用 RabbitMQ 作为我们的主要消息总线 我们的想法是 每个应用程序都使用自己的虚拟主机进行内部通信 并且通过 shovel 或联合插件 我们可以在多个虚拟主机 甚至可能是多台机器 非集群 之间共享某些类型的
  • 列出与rabbitmq java客户端API交换的绑定

    我似乎在文档中找不到任何信息 所以我想知道是否可以通过某种方式使用 java RabbitMQ API 获取与交换相关的所有绑定 我在查询 api bindings 时正在寻找类似 http api 结果的内容 api definition
  • RabbitMQ - 升级到新版本并收到很多“PRECONDITION_FAILED Unknown Delivery Tag 1”

    刚刚升级到新版本的 RabbitMQ 2 3 1 现在出现以下错误 PRECONDITION FAILED unknown delivery tag 1 随后通道关闭 这适用于较旧的 RabbitMQ 无需客户端更改 在应用程序行为方面 当
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • 将 sensu-client 连接到服务器时 AMQP 连接的 bad_header

    我已经安装了 sensu 和厨师社区食谱 但是 sensu客户端无法连接到服务器 导致rabbitmq连接错误 尝试连接时消息超时 这是详细的客户端日志 来自 sensu client log 的日志 timestamp 2014 07 0
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • RabbitMQ:无法启动rabbitmq_management插件

    Version gt sudo rabbitmqctl status grep rabbit RabbitMQ rabbit RabbitMQ 3 5 6 Error gt sudo rabbitmq plugins enable rabb

随机推荐

  • i.MX6ULL - 问题解决:NFS挂载失败 - VFS: Unable to mount root fs on unknown-block(2,0)

    i IMX6ULL 问题解决 NFS挂载失败 VFS Unable to mount root fs on unknown block 2 0 开发环境 移植的linux5 4 7 0 ubuntu1804 x64 arm linux gn
  • 毕业设计-机器视觉深度学习的视频去水印算法

    目录 前言 课题背景和意义 实现技术思路 实现效果图样例 前言 大四是整个大学期间最忙碌的时光 一边要忙着备考或实习为毕业后面临的就业升学做准备 一边要为毕业设计耗费大量精力 近几年各个学校要求的毕设项目越来越难 有不少课题是研究生级别难度
  • MFC视频教程(孙鑫)学习笔记2-掌握C++

    这一集中 主要总结了C 经典语法与应用 1 C 的三大特性 封装 继承 多态 2 C 中提供了一套输入输出流类的对象 它们是cin cout和cerr 对应c语言中的三个文件指针stdin stdout stderr 分别指向终端输入 终端
  • Ubuntu下网页打开速度缓慢的解决方法

    Ubuntu下网页打开速度缓慢的解决方法 网速较慢可能是网络配置的原因导致 解决步骤如下 以下指令均在Ubuntu终端输入执行 一 查看Ubuntu版本信息 lsb release a 二 使用pdnsd软件为本机搭建DNS代理服务器 1
  • Redis第二十讲 Redis哨兵自动故障转移与优缺点

    sentinel哨兵是特殊的redis服务 不提供读写服务 主要用来监控redis实例节点 哨兵架构下client端第一次从哨兵找出redis的主节点 后续就直接访问redis的主节点 不会每次都通过sentinel代理访问redis的主节
  • ES 聚合函数的用法

    1 ES聚合分析是什么 聚合分析是数据库中重要的功能特性 完成对一个查询的数据集中数据的聚合计算 如 找出某字段 或计算表达式的结果 的最大值 最小值 计算和 平均值等 ES作为搜索引擎兼数据库 同样提供了强大的聚合分析能力 对一个数据集求
  • K和KB的区别

    来源 综合自己和网上的观点 问题1 K与KB之间有什么区别 我在做一道解时 就是 某计算机字长16位 它的存储容量是1MB 按字编址 这经的寻址范围是 A 512K B 1M C 512KB 答案给的是A 我很不解 为什么512K与512K
  • (error) CROSSSLOT Keys in request don‘t hash to the same slot 解决方法

    Redis 哈希槽基本概念 哈希槽 hash slot 是来自Redis Cluster的概念 但在各种集群方案都有使用 哈希槽是一个key的集合 Redis集群共有16384个哈希槽 每个key通过CRC16散列然后对16384进行取模来
  • Python opencv 机器学习 5.knn pca降维 ocr识别数字 mnist数据集

    coding utf 8 from numpy import import numpy as np import struct import matplotlib pyplot as plt import operator 定义一个全局特征
  • 轻松获取在线媒体:视频下载工具推荐

    ytdl org youtube dl Stars 121 0k License Unlicense youtube dl 一个命令行程序 可以从YouTube com和其他视频网站下载视频 基于 Python 实现 你可以在Unix Wi
  • 上班之路 华为OD真题 200

    public class Main public static char map 地图 public static int t 转弯次数 public static int c 路障个数 public static int n 地图行数 p
  • Android 项目必备(十一)--> 轮询操作

    文章目录 前言 实战 前言 什么叫轮询请求 简单理解就是 App 端每隔一定的时间重复请求的操作就叫做轮询请求 比如 App 端每隔一段时间上报一次定位信息 App 端每隔一段时间拉去一次用户状态等 这些应该都是轮询请求 为何不用长连接代替
  • ibm服务器开机后显示器闪烁,IBM E50彩色显示器,开机后电源指示灯闪烁,机内有“咔嗒”声,黑屏...

    经观察发现 咔嗒 声是消磁继电器断开 闭合的声音 经测量 该继电器13V供电电压时有时无 该故障现象特别 不易判断 但从屏幕无显示这一故障现象入手 可初步判定故障范围可能在电源电路和行扫描电路 首先 不开机直观检查相关电路 未见异常 然后测
  • 由bibtex生成引用文献字符串

    word 文档写引用文献 用 mendeley 的插件生成的效果似乎一般 用法见 1 2 而且自己改格式的那个网页令人火大 可能我网速问题 用 python 写了个脚本 通过解析 bibtex 来生成 格式自编 项目页见 4 Code 目前
  • C++的函数_默认参数详解

    案例 int func int a int b 10 return a b int main func 10 return 0 注意事项 1 实现的函数中参数有默认参数 调用时如再传入 默认参数的值被替换为传入的值 func 10 20 r
  • vue报错

    启动vue项目时报如下错误 可是我的代码里并没有matched 后来才发现是路由引入错误 在 main js 文件中 上面的 import router from router 这个语句的前面 router 中的 R 必须的小写 不然就会出
  • 《substrate 快速入门与开发实战》

    视频地址 https www bilibili com video BV1C4411U7Rv substrate的升级过程 编写的runtime代码 gt 编译后 得到runtime的wasm二进制文件 gt 通过链上的治理模块发送升级ru
  • 关于nodejs中使用fluent-ffmpeg模块、ffmpeg工具的使用心得

    类人猿Blog 欢迎来到我的博客 您好 这是本人第一次写博客 请多多指教 nodejs中使用 fluent ffmpeg 详细方法和系统配置 适应于 windows和 linux 特别是在 redhat6 x中得以验证通过 简介 我们都知道
  • 对于c++中模板函数的一点体会

    何为模板函数 从字面上就可以看出来模板函数必须具备通用性 举个简单却很实用的例子 交换两个值的函数Swap 交换两个字符型void Swap char a char b 交换两个整型void Swap int a int b 交换两个浮点型
  • 【RabbitMQ教程】“Hello World”工作队列模式

    目录 前言 Hello World 工作队列模式介绍 消息模型 入门案例代码示例 自动ACK 消息确认机制 自动ACK存在的问题 演示手动ACK 前言 1 将 Hello World工作队列模式 单独抽出来细讲 目的是借助这个模式好好讲一下