RabbitMQ消息可靠性(二)-- 消费者消息确认

2023-10-30

一、消费者消息确认是什么?

在这种机制下,消费者在接收到消息后,需要向 RabbitMQ 发送确认信息,告知 RabbitMQ 已经接收到该消息,并已经处理完毕。如果 RabbitMQ 没有接收到确认信息,则会将该消息重新加入队列,等待其他消费者继续处理。

消费者消息确认机制能够保证消息不会因为消费者宕机或其他原因而丢失,从而保证了消息的可靠性和稳定性。

RabbitMQ 支持两种消费者消息确认机制:自动确认和手动确认。在自动确认模式下,消费者在接收到消息后,RabbitMQ 会自动将该消息标记为已经确认。在手动确认模式下,消费者需要向 RabbitMQ 显式地发送确认信息,才能完成消息的确认。

二、代码实现

1.修改application.yml 配置

spring:
  rabbitmq:
    listener:
      simple:
        # RabbitMQ开启手动确认
        acknowledge-mode: manual

而SpringAMQP则允许配置三种确认模式:

  1. manual:手动ack,需要在业务代码结束后,调用api发送ack。
  2. auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  3. none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

2.消费者确认

生产者发送一笔需要消费的订单到Direct Exchange直连交换机

@GetMapping("/sendDirectMessage")
    @ApiOperation(value = "sendDirectMessage")
    @ApiOperationSupport(order = 1)
    public String sendDirectMessage(@RequestParam String orderNo){
        //设置消息唯一ID
        String uniqueId = "MQ"+ DateUtils.dateTimeNow("yyyyMMddHHmmss")+ RandomUtil.randomNumbers(4);
        CorrelationData correlationData = new CorrelationData(uniqueId);
        log.info("------生产者发送消息,消息唯一id {},订单编号 {}-------",uniqueId,orderNo);
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",orderNo,correlationData);
        return "ok";
    }

下面是消费者的处理逻辑
这里的消息序号是系统自动生成的,还需要注意的是,在手动确认模式下,如果消费者在处理消息时发生了异常或错误的时候

需要确保将该消息重新加入队列或者删除队列之后将该信息保存至数据库中记录下来,否则该消息将被认为已经成功处理并确认。因此,在编写消费者代码时,需要谨慎处理异常情况,避免因为异常而导致消息丢失或重复处理等问题。

/**
 * 消费者,用于消费队列信息
 */
@Component
@Slf4j
public class DirectConsumer {

    @Resource
    RedisService redisService;

    @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    public void process(Message message, Channel channel) {
        // 消息序号
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //取出消息唯一标识
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        // 取出订单编码
        String orderNo = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("------消费者收到消息,消息唯一id {},订单编号 {}-------",messageId,orderNo);
        try {
            //消费者在消费消息之前,先去redis中查看消息状态是否已被消费
            if (redisService.setCacheMapIfAbsent("rabbit-tag", messageId, Boolean.FALSE)){
                //删除过期订单.......
                //消费完消息后,设置key的值为true
                redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.TRUE);
                channel.basicAck(deliveryTag,false);
                log.info("------订单处理完毕,订单编号 {}--------", orderNo);
            }else {
                //如果从redis中获取消息的value是TRUE,表示已消费,直接发送确认信号,避免重复消费
                if (Boolean.TRUE.equals(redisService.getCacheMapValue("rabbit-tag",messageId))) {
                    /**
                     * TODO 手动确认消息
                     * tag:消息序号
                     * multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除
                     */
                    channel.basicAck(deliveryTag, false);
                    log.info("--------订单已经被消费过了,订单编号 {}-------", orderNo);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            try {
                /**
                 * TODO 消费者消费消息异常,手动否认信息,将消息退回到队列中
                 * tag:消息序号
                 * multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除
                 * requeue:是否要退回到队列
                 */
                channel.basicNack(deliveryTag, true, false);
                redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.FALSE);
                log.error("------------订单消费失败,已从队列删除.订单编号 {}, 原因 {}--------",orderNo, e.getMessage());
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        log.info("------消费者处理完毕-------");
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ消息可靠性(二)-- 消费者消息确认 的相关文章

  • 每次发送消息时是否需要重建RabbitMQ连接

    我有一个 Spring 3 应用程序 它通过非 RabbitMQ 接收器接收消息 处理它们并通过 RabbitMQ 转发 每次发送消息时都会建立一个新的 RabbitMQ 连接 这似乎有点浪费 我只是想知道这是否真的有必要 或者是否有原因导
  • RabbitMQ 中的并发

    经过一周的编码和搜索论坛后 似乎是时候问 我有一个 C 应用程序 它使用 EventingBasicConsumer 处理 RabbitMQ 发送的消息 我想同时处理多个消息 因此我在同一连接上实例化了几个通道 本例中为 8 个 每个通道都
  • amqp 与 amqplib - 哪个 Node.js amqp 客户端库更好?

    这些 amqp 客户端库之间有什么区别 哪一款最值得推荐 主要区别是什么 我会推荐amqp node https github com squaremo amqp node and bramqp https github com bakke
  • 在 RabbitMQ 主题交换中路由与模式不匹配的消息

    两个队列绑定到具有以下路由键的主题交换 队列 A 与路由键模式匹配绑定 foo队列 B 与路由键模式匹配绑定 bar 我想向此交换添加第三个队列 该队列接收的消息都不是foo消息也不bar消息 如果我用一个绑定这个队列 路由密钥 我自然会得
  • 组在 RabbitMQ 中接收消息,最好使用 Spring AMQP?

    我正在从服务 S 接收消息 该服务将每个单独的属性更改作为单独的消息发布到实体 一个人为的例子是这样的实体 Person id 123 name Something address 如果姓名和地址在同一交易中更新 则 S 将发布两条消息 P
  • 如何在 celery 内为每个用户生成队列?

    因此 我尝试将 Web 请求中的阻塞内容移至后台任务并利用队列 我对消息传递和发布 订阅也很陌生 用户将数据推送到那里并进行处理 稍后用户会收到相关通知 我为此做了一个 celery 设置 发现它不能满足我为每个用户分配自己的任务的专用队列
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 在 Red Hat 上安装 RabbitMQ - 错误的 Erlang 版本

    我正在尝试按照以下说明在 Red Hat Enterprise Linux 7 64 位工作站版本 的评估虚拟机上安装 RabbitMQhttps www rabbitmq com install rpm html https www ra
  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 无法在Windows上启用rabbitmq管理插件

    所以 这就是我所做的 在我的 Windows x64 位机器上安装了 Erlang 安装 RabbitMQ 启动 RabbitMQ 服务 这一步我没有任何错误 但是 当我尝试启用rabbitmq management时 我在控制台中收到一些
  • Spring AMQP RabbitMQ 如何直接发送到Queue而不需要Exchange

    我正在使用 Spring AMQP 和 Rabbitmq 模板 如何直接将消息发送到队列而不使用Exchange 我该怎么做 我该怎么做 你不能 发布者不知道队列 只是交换和路由密钥 但是 所有队列都绑定到默认交换器 以队列名称作为其路由键
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • 多个队列在一个通道中消耗

    我使用rabbitMq 来管理和使用队列 我有多个队列 它们的数量并不具体 我使用直接交换来发布消息 我怎样才能仅使用一个队列来消费每个队列的所有消息 基于routing key 渠道 此时我假设我有 5 个队列 我使用了 for 循环并为
  • springrabbitmq:无法将id设置为属性?

    我有一个属性文件 其中包含队列 其值为queue name 如果我在其他请使用该属性 那么它可以工作 但如果我在 id 上使用它 那么它会失败
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • Rabbit mq - 等待 Mnesia 表时出错

    我已经在 Kubernetes 集群上使用 Helm Chart 安装了 RabbitMQ rabbitmq pod不断重新启动 在检查 pod 日志时 我收到以下错误 2020 02 26 04 42 31 582 warning lt

随机推荐

  • 自动化接口测试-第02天-接口用例设计思路、单接口用例、业务场景用例、postman

    更多功能测试以及全套学习路线图均在专栏 戳进去领取 系列文章目录 身为开发必知必会的Linux Linux远程连接 命令的使用 Linux命令大全 唯一以案例详解文 持续更新中 Linux命令大全以及数据库 唯一以案例详解文 已完结 Web
  • yield和join方法的使用。

    join方法用线程对象调用 如果在一个线程A中调用另一个线程B的join方法 线程A将会等待线程B执行完毕后再执行 yield可以直接用Thread类调用 yield让出CPU执行权给同等级的线程 如果没有相同级别的线程在等待CPU的执行权
  • 【笔记】sass

    sass入门 1 可以定义变量 2 嵌套 3 导入其他sass文件 最后编译为一个CSS文件 4 mixin定义一些代码片段 且可传参数 include 5 extend组合代码声明 6 运算 7 颜色函数color darken link
  • 存储卡的使用方法大全

    存储卡的使用方法大全 现在购买诺基亚手机 尤其是其智能手机的朋友是越来越多了 与其他品牌的机型相比 诺基亚的手机有一个最大的优点 就是支持储存卡内存扩充的机型比较多 这让大家在使用中不必再担心手机容量的问题 这也体现了诺基亚 科技以人为本
  • 关于 ubuntu18.04 机械革命 RTX2060 解决分屏显示问题

    本人小白 如有错漏 欢迎批评指正 同时感谢之前发表过以及以后会在CSDN上发表自己学习经验的同志们 问题背景 本人使用的是 机械革命笔记本电脑 安装双系统 win10 以及 ubuntu18 04 在使用过程中 遇到了 分屏黑屏 将鼠标移动
  • NAT穿透的工作原理

    一 引言 1 1 背景 IPv4地址短缺 引入NAT 全球IPv4地址早已不够用 因此人们发明了NAT 网络地址转换 来缓解这个问题 简单来说 大部分机器都使用私有IP地址 如果它们需要访问公网服务 那么 出向流量 需要经过一台NAT设备
  • Pytest+Unittest+Git+Jenkins企业级CICD自动化测试平台建设方案

    随着持续集成的引入 项目中的自动化测试用例越来越多 每轮执行所消耗的时间也越来越久 要提高自动化测试用例执行的效率 以下几点是需要考虑的根本点 1 公司项目的交付策略如何 首先 测试团队服务于公司项目 因此我们必须根据公司项目的交付策略做对
  • SSL证书有什么用?

    SSL证书提供了一种在互联网上身份验证的方式 是用来标识和证明双方身份的数字信息文件 使用SSL证书的网站 可以保证用户和服务器间信息交换的保密性具有不可窃听 不可更改 不可否认 不可冒充的功能 45 113 203 1 45 113 20
  • 怎么用EF框架进行增删改查

    EF框架 1 EF是什么 2 EF的框架模式 3 EF框架优缺点 4 进行增删改查 4 1 新增 4 1 1 直接新增 4 1 2 改变状态新增 4 1 3 批量新增 4 2 删除 4 2 1 删除 先查询后删除 删除ID为6的 4 2 2
  • java jsch_java 利用jsch端口转发 建立连接

    其他代码都一样 参照 使用JSch完成 ssh隧道建立 只有获取连接的部分不相同 代码如下 端口转发 链接两层 public boolean connect throws JSchException try session ssh getS
  • Android NDK C++开发注意事项总结

    相信Android开发者都喜欢用C 编写一些高效的应用 有关Android NDK的C 开发相关知识总结如下 从Android NDK r5开始支持了STL Port 在这个版本开始就可以使用部分STL库的功能了 比如说vector str
  • thumbnailator压缩图片并存至Excel单元格代码

    文章目录 依赖 压缩图片工具类 存至excel的转化器 Excel导入导出的数据类 将数据库实体转化为excel实体 依赖
  • Python 基于 Django 的学生成绩管理系统,可视化界面

    1简介 对于学生成绩管理系统 充分运用现代化的信息技术手段 对于学生成绩信息管理发展的趋势就是信息化 信息化时代下的信息管理 需要深化信息管理体制与手段的改革 充分运用信息化手段来全方位的进行学生成绩管理系统工作 构建学生成绩管理系统 实现
  • Unity 中检测射线穿过的所有的物体

    在开发中 有个需求 射线要检测所有穿过的物体 代码如下 using UnityEngine public class HitCollider MonoBehaviour public float raycastDistance Mathf
  • 线程与线程池的理解

    1 什么是线程 线程和进程的区别是什么 线程 程序执行流的最小执行单位 是行程中的实际运作单位 进程简单来说 一个应用程序的运行就可以被看做是一个进程 而线程 是运行中的实际的任务执行者 进程中包含了多个可以同时运行的线程 2 线程的生命周
  • LeetCode子域名访问计数-Python3.7<五>

    上一篇 LeetCode 键盘行 lt 四 gt 题目 https leetcode cn com problems subdomain visit count description 一个网站域名 如 discuss leetcode c
  • Python数据分析之——数据可视化(折线图)

    matplotlib的pyplot子库提供了和matlab类似的绘图API 方便用户快速绘制2D图表 首先我们先来看看效果图 然后 是数据 接着是代码 coding utf 8 import numpy as np import matpl
  • c语言入门----详解分支语句(if语句)

    文章目录 一 前言 二 顺序结构 三 为什么会有分支语句 四 if语句 五 if语句形式 1 if的基本形式 2 有关if的例子 3 有关if的易错提醒 六 if else语句 1 为什么会有if else语句 2 if else的基本形式
  • Canvas和SVG有什么区别

    在项目开发中也许会涉及到图形 经常用到的就是svg和canvas两种画图方式 下面就让我们看一看他们两者的区别 svg绘制出来的每一个图形的元素都是独立的DOM节点 能够方便的绑定事件或用来修改 canvas输出的是一整幅画布 svg输出的
  • RabbitMQ消息可靠性(二)-- 消费者消息确认

    一 消费者消息确认是什么 在这种机制下 消费者在接收到消息后 需要向 RabbitMQ 发送确认信息 告知 RabbitMQ 已经接收到该消息 并已经处理完毕 如果 RabbitMQ 没有接收到确认信息 则会将该消息重新加入队列 等待其他消