RabbitMQ(二)confirm/return机制

2023-11-15

程序用了1.5.3.RELEASE版本的spring-boot-starter-amqp依赖。

confirm确认机制

配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="rabbitConnectionFactory"
        username="${rabbit_username}"
        password="${rabbit_password}"
        host="${rabbit_host}"
        port="${rabbit_port}"
        publisher-confirms="true"/>
        <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="rabbitTemplate"  connection-factory="rabbitConnectionFactory"
                     exchange="exchangeTest"
                     confirm-callback="publishService"
                     mandatory="true"/>
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin connection-factory="rabbitConnectionFactory" />
        <!--定义queue -->
        <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
        <!-- 定义direct exchange,绑定queueTest -->
        <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
                <rabbit:bindings>
                         <rabbit:binding queue="queueTest"  key="queueTestKey"> </rabbit:binding>
                 </rabbit:bindings>
        </rabbit:direct-exchange>
        <!-- 消息接收者 -->
        <bean id="confirmListener" class="com.example.hello2.controller.ConfirmListener"></bean>
        <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual" >
            <rabbit:listener queues="queueTest" ref="confirmListener" />
        </rabbit:listener-container>
</beans>

重点是:
在这里插入图片描述
生产者:

@Service("publishService")
public class PublishService implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String exchange,String routingKey, Object message) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("correlationId:"+correlationId);
        rabbitTemplate.convertAndSend(exchange, routingKey, message,correlationId);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("correlationId:"+correlationData);
        if (ack) {
            System.out.println("消息成功发送到exchange");
            System.out.println( correlationData.toString());
        } else {
            System.out.println("消息发送exchange失败:" + cause);
        }
    }
}

消费者:

@Service("confirmListener")
public class ConfirmListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        String messageStr = new String(message.getBody());
        System.out.println("消费者接收到信息 : " + messageStr);
    }
}

发送正确的消息:

http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey&message=hello

在这里插入图片描述
发送exchange正确,queue错误的请求:

http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey2&message=hello

只有confirm()中收到回调,但是消费者并没有接收到
在这里插入图片描述
发送exchange错误的请求:

http://localhost:8081/test/send?exchange=exchangeTest2&key=queueTestKey&message=hello

在这里插入图片描述
说明了confirm机制是只保证了消息到达exchange,并不保证消息可以路由到正确的queue

return机制

mandatory和immediate是AMQP协议中basic.publish方法中的两个标识位,
它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。

  • mandatory
    当标志位true:若交换机无法找到消息对应的队列,将会调用basic.return将消息返回给生产者。
    当标志位false:消息直接被丢弃

  • immediate(参数已不被支持)
    标志位为true:交换机将消息路由到队列,但是队列上没有消费者,调用basic.return将消息返回给生产者
    标志位为false:消息被丢弃

mandatory的实现:
配置文件
在这里插入图片描述
发送者:

@Service("publishService")
public class PublishService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMsg(String exchange,String routingKey, Object message) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("correlationId:"+correlationId);
        rabbitTemplate.convertAndSend(exchange, routingKey, message,correlationId);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("correlationId:"+correlationData);
        if (ack) {
            System.out.println("消息成功发送到exchange");
            System.out.println( correlationData.toString());
        } else {
            System.out.println("消息发送exchange失败:" + cause);
        }
    }
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println( " 消息没有成功发送到消费者队列");
    }
}

发送exange正确,queue不正确的消息:

http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey2&message=hello

结果:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ(二)confirm/return机制 的相关文章

  • 没有连接的 AMQP/RabbitMQ 通道什么时候会死亡?

    我有一个简单的 RabbitMQ 测试程序 随机将消息排队 另一个读取它们 所有这些都使用 Spring AMQP 如果消费者死亡 例如 在没有机会关闭其连接或通道的情况下终止进程 则它尚未确认的任何消息似乎将永远保持未确认状态 我看过很多
  • 我应该在 Django 项目中使用 Celery 还是 Carrot?

    我有点困惑我应该使用哪一个 我认为两者都可以 但其中一个比另一个更好或更合适吗 http github com ask carrot tree master http github com ask carrot tree master ht
  • 如何根据条件限制并发消息消耗

    场景 我已经简化了事情 许多最终用户可以从前端 Web 应用程序 生产者 开始工作 繁重的工作 例如渲染大型 PDF 这些作业被发送到单个持久的 RabbitMQ 队列 许多工作应用程序 消费者 处理这些作业并将结果写回到数据存储中 这个相
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 当我为rabbitmq-management创建用户时,发生了错误

    当我为rabbitmq创建用户时 root localhost rabbitmqctl add user admin admin 发生错误 消息 Creating user admin Error undef crypto hash sha
  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • rabbitmq 通道因 PRECONDITION_FAILED 关闭 - 快速回复消费者不存在

    当我们从 Spring Boot 服务向rabbitmq 发布消息时 出现以下错误 而且这是间歇性的 我们无法重现这一点 AMQP 连接 123 11 xxx xx 5672 错误 org springframework amqp rabb
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 定义具有多种消息类型的消息传递域

    到目前为止 我见过的大多数 F 消息传递示例都使用 2 4 种消息类型 并且能够利用模式匹配将每条消息定向到其正确的处理函数 对于我的应用程序 由于处理和所需参数的不同性质 我需要数百种独特的消息类型 到目前为止 每个消息类型都是其自己的记
  • Erl 无法连接到本地 EPMD。为什么?

    Erlang R14B04 erts 5 8 5 source 64 bit rq 1 async threads 0 kernel poll false Eshell V5 8 5 abort with G root ip 10 101
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • ECONNREFUSED:无法连接到集群内默认端口上的 RabbitMQ pod

    我的本地集群中有一个运行 RabbitMQ 的 pod 我已经将其配置为 apiVersion v1 kind Service metadata name service rabbitmq spec selector app service
  • 使用 Spring 与 RabbitMQ 集成

    我正在为我们的一个应用程序开发消息传递界面 该应用程序是一种服务 旨在接受 作业 进行一些处理并返回结果 实际上以文件的形式 这个想法是使用 RabbitMQ 作为消息传递基础设施 并使用 Spring AMQP 来处理协议特定的细节 我不
  • RabbitMQ 中 Pub/Sub 与工作队列的混合

    我正在评估使用 RabbitMQ 作为消息队列 消息总线 并一直在查看示例教程 https www rabbitmq com getstarted html在 RabbitMQ 页面上 我正在寻找教程中未涵盖的特定场景 并且我不确定是否以及
  • 具有重新排队功能的 BasicReject 实际上去了哪里?

    这似乎是一个简单的问题 但我很难找到明确的答案 如果在 RabbitMQ 3 6 1 中我有一个如下所示的队列 5 4 3 2 1 lt head 我使用消息 1 然后执行以下操作 channel BasicReject ea Delive

随机推荐

  • 解决“The debugger has set two breakpoints at the same address 0x08xxxxx”问题

    今天来分享一个前段时间做项目适合遇到的一个bug 正好今天有空就拿出来跟大家分享一下 错误 首先 大家直接来看这个错误提示 这个错误是我在使用J Link调试时候出现的 上面的意思是 调试器在同一地址设置了两个断点 但是我检查了整个工程也没
  • 数据结构编程回顾(五)交通咨询系统设计

    题目五 交通咨询系统设计 设计要求 设计一个咨询交通系统 能让旅客咨询从任一个 城市到另一个城市之间的最短路径 里程 最低费用或者 最少时间等问题 对于不同的咨询要求 可以输入城市间路 程 所需时间或者所需费用 设计分3 个部分 1 建立交
  • QT connect第五个参数

    一 介绍 1 Qt AutoConnection 默认连接 连接类型在信号发出时确定 如果接收者和发送者在同一个线程 使用Qt DirectConnection类型 如果接收者和发送者不在一个线程 则使用Qt QueuedConnectio
  • 管理概论笔记

    前言 本文章属于在听课时做的笔记 第一周 管理导论 来源 管理概论 浙江大学 邢以群 MOOC 学习理论的目的是为了能够做没有学过的人做不了的事情或者比他们做得更好 一 管理及其功能 介绍什么是管理以及为什么需要管理 观念决定行为 行为决定
  • GBDT的正则化及与XGBOOST区别

    1 GBDT的正则化 和Adaboost一样 我们也需要对GBDT进行正则化 防止过拟合 GBDT的正则化主要有三种方式 第一种是和Adaboost类似的正则化项 即步长 learning rate 定义为 对于前面的弱学习器的迭代 fk
  • Shiro中Session和Cache

    Session是一种状态保持机制 参考文章Session是什么可知Session和Web服务也没有必然关系 Shiro本身的Security Manager也可以脱离Servlet自己管理Session 根据Security Manager
  • 13-3 动态链接库的编译和使用

    1 静态链接库与动态链接库 由于静态链接库不能共享 且依赖的符号的对应目标文件与主程序文件需要一同编译 故静态链接库内存空间占用较大 而动态链接库具有共享性质 通过特定路径即可引用 可以有效减少内存空间的占用 此外 可使用 ldd 命令查看
  • ubuntu16.04.1安装xrdp实现远程桌面访问

    之前测试过xfce4桌面 但是其实ubuntu16 04 1默认的unity桌面也是可以的 首先需要安装 tigervncserver 1 6 80 wget c http www c nergy be downloads tigervnc
  • python刷题第七周

    以下是有所收获的题目 第一题 第5章 2 图的字典表示 20 分 图的字典表示 输入多行字符串 每行表示一个顶点和该顶点相连的边及长度 输出顶点数 边数 边的总长度 比如上图0点表示 O A 2 B 5 C 4 用eval函数处理输入 ev
  • 服务器网站5m带宽在线多少人?

    同时访问一个网站的人数是在线的 这由许多因素决定 包括服务器带宽 质量和同时访问您的网站的人数有关的网站类型 若使用的是独享5M带宽 即5Mbit s 相应云服务器的数据最高传输速度应为5Mbit s x 1024 8 640KB 1分钟流
  • 面试官问:Redis 分布式锁如何自动续期?

    资深面试官 你们项目中的分布式锁是怎么实现的 老任 基于redis的set命令 该命令有nx和ex选项 资深面试官 那如果锁到期了 业务还没结束 如何进行自动续期呢 老任 这个 面试官 您上个问题是啥来着 资深面试官 你们项目中分布式锁是怎
  • springboot之乐观锁和悲观锁

    适用场景 悲观锁 比较适合写入操作比较频繁的场景 如果出现大量的读取操作 每次读取的时候都会进行加锁 这样会增加大量的锁的开销 降低了系统的吞吐量 乐观锁 比较适合读取操作比较频繁的场景 如果出现大量的写入操作 数据发生冲突的可能性就会增大
  • python_mysql

    pymysql模块 pip3 install pymysql pymysql使用流程 1 建立数据库连接 db pymysql connect 2 创建游标对象 cur db cursor 3 游标方法 cur execute insert
  • 网络安全-跨站请求伪造(CSRF)的原理、攻击及防御

    目录 简介 原理 举例 漏洞发现 链接及请求伪造 CSRF攻击 不同浏览器 未登录状态 登录状态 代码查看 工具 防御 用户 程序员 简介 跨站请求伪造 Cross site request forgery 也被称为 one click a
  • 二进制、八进制、十进制、十六进制之间的相互转换

    一 二进制 八进制 十六进制转换为十进制 方法 位权求和法 二进制用符号 B 表示 十进制用符号 D 表示 八进制用符号 O 表示 十六进制用符号 H 表示 100101 10111 B 1 2 5 0 2 4 0 2 3 1 2 2 0
  • OpenGL渲染字体的批处理操作

    一 问题描述 在OpenGL中 绘制字体通过纹理贴图的方式 一个场景中有200个单词 按照正常做法 一个单词生成一个贴图 指定Quad四个顶点纹理坐标 最后把数据传给OpenGL 进行绘制 OpenGL顶点数组是客户端 服务器模式 客户端是
  • 删除报错不能删除myeclipse或者eclipse项目方法

    当在myeclipse创建了项目 想删除的时候 发现删除不了 终极的解决方法如下 一 删除myeclipse或者eclipse上的java项目工程 1 找到对应myeclipse工作空间 使用强力删除 粉碎文件 删除成功 2 接着回到mye
  • Ubuntu下安装egg

    http blog csdn net flydirk article details 8506463 用easy install安装就可以了 安装之前需要python setuptools sudo apt get install pyth
  • 数字图像散斑计算Matlab连续处理1/2

    数字图像散斑计算Matlab连续处理 1 数字散斑相关测量法原理 2 打开 All m 文件 设置路径 3 运行程序 输入参考图像序号 4 框选高对比度区域 下图左图 双击以结束 结果后为下图右图 5 回到命令行 输入高对比度区域裁剪位置
  • RabbitMQ(二)confirm/return机制

    程序用了1 5 3 RELEASE版本的spring boot starter amqp依赖 confirm确认机制 配置文件