RabbitMQ保证消息的一致性解决方案

2023-10-31

RabbitMQ保证消息的一致性

一、采用confirm消息确认机制及return返回机制 确保消息发送成功

二、将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在

三、手动确认接收消息方式 消息处理失败拒收重回队列


1. yml配置

spring:
  rabbitmq:
    host: ip
    port: 5672
    username: guest
    password: guest
    ##消息发送确认回调
    publisher-confirms: true
    #采用confirm以及return机制 发送返回监听回调
    publisher-confirm-type: correlated
    publisher-returns: true
listener:
      type: simple
      simple:
        #手动接收消息方式
        acknowledge-mode: manual

2. RabbitMQ配置类

@Configuration
@Slf4j
@AllArgsConstructor
public class RabbitmqConfig {
    private final ConnectionFactory connectionFactory;
    private final RabbitLogsMapper rabbitLogsMapper;
 
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //confirm确认
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            String msgId = correlationData.getId();
            if (ack) {
                //发送成功
                log.info("消息成功发送 , msgId: {}," ,msgId);
                //状态更新  消息发送成功
                BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
                biddingRabbitLogs.setStatus(SendStatus.SEND_SUCCESS.getValue());
                rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
            } else {
                //发送失败
                log.error("消息发送失败, {}, cause: {}, msgId: {}", correlationData, cause, msgId);
                //状态更新  消息发送失败
                BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
                biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue());
                rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            //触发回调  只有交换机找不到队列时才会触发
            log.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
            //状态更新 消息发送失败
            String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
            BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
            biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue());
            rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
        });
        return rabbitTemplate;
    }
    @Bean
    public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

说明:

  • confirm机制只是确保了消息是否成功发送到交换机
  • Return机制确保了消息是否从交换机发送到指定的队列

- - ConfirmCallback则根据状态判断发送成功还是失败 进行更新日志表记录状态
  • ReturnCallback则根据收到消息就是未找到队列发送失败,未收到消息就是发送成功 进行更新日志表记录状态

3. 声明的队列一定要将队列持久化

public String createQueue(String queueName) {
        BiddingQueueConfig biddingQueueConfig = queueMapper.selectOne(Wrappers.lambdaQuery(BiddingQueueConfig.class).eq(BiddingQueueConfig::getQueue, queueName));
        if (biddingQueueConfig == null) {
            biddingQueueConfig = new BiddingQueueConfig();
            biddingQueueConfig.setCreatetime(new Date());
            biddingQueueConfig.setQueue(queueName);
            biddingQueueConfig.setStatus("1");
            int insert = queueMapper.insert(biddingQueueConfig);
            //将队列持久化
            rabbitAdmin.declareQueue(new Queue(queueName,true));
            return queueName + "队列创建成功";
        }
        return queueName + "队列创建失败";
}

4. 发送消息 将发送的消息设置为持久化

发送消息前首先将发送的数据插入数据库,状态变为发送中在这里插入图片描述

5. 消费者监听队列

  • 如果根据消息id查询日志表为空的话那么是没有发送消息,消息自动接收,发送成功消息后日志表会有数据
  • 判断是否重复消费 根据状态是否成功消费以及失败重试次数判断
  • 处理业务逻辑,如果成功消息接收 状态更新
  • 如果处理业务逻辑失败报错则会拒收,消息重回队列重新处理此条消息,当处理次数超过3次处理失败则消息改为接收
// 启动自动创建队列
@RabbitListener(queuesToDeclare = { @Queue("queue_work_dontask") })
@RabbitHandler
@SneakyThrows
public void receiveDonTask(String data, Message message, Channel channel){
    //消息id
    String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    //根据消息id查询BiddingRabbitLogs日志表
    BiddingRabbitLogs biddingRabbitLogs = remoteLogsService.get(msgId, SecurityConstants.FROM_IN).getData();
    if (biddingRabbitLogs == null) {
        log.error("消息ID查询 biddingRabbitLogs:null");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        return;
    }
    //状态:1.消息发送中 2.消息发送成功 3.消息发送失败 4.消费成功 5.消费失败
    if (SendStatus.CONSUME_SUCCESS.getValue().equals(biddingRabbitLogs.getStatus()) || SendStatus.SEND_FAILD.getValue() == String.valueOf(biddingRabbitLogs.getTryTimes())) {
        //重复消费
        log.info("消息ID:{},重复消费",msgId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        return;
    }
    try {
        //处理业务逻辑
        Map map = JSON.parseObject(data, Map.class);
        String dataString = (String) map.get("data");
        String username = (String) map.get("username");
        Integer tenantId = (Integer) map.get("tenantId");
        ApproveParam approveParam = JSON.parseObject(dataString, ApproveParam.class);
        R<String> stringR = doneTask(approveParam,username,tenantId);
        //处理成功  更新状态
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        biddingRabbitLogs.setStatus(SendStatus.CONSUME_SUCCESS.getValue());
        biddingRabbitLogs.setSuccesstime(new Date());
        remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN);
        log.info("消费成功,消息ID:{}",msgId);
    } catch (Exception e) {
        e.printStackTrace();
        if (biddingRabbitLogs.getTryTimes() >= Integer.parseInt(SendStatus.TRY_TIMES.getValue())) {
            //多次消费不成功 自动接收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.error("多次消费失败,消息ID:{}",msgId);
        } else {
            //消费失败 拒收 重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            log.error("消费失败,消息ID:{}",msgId);
        }
        biddingRabbitLogs.setStatus(SendStatus.CONSUME_FAILD.getValue());
        biddingRabbitLogs.setTryTimes(biddingRabbitLogs.getTryTimes()+1);
        remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ保证消息的一致性解决方案 的相关文章

  • 如何获取枚举的子集

    大多数情况下 包含所有元素的枚举显示在用户界面的下拉列表中 我们只需要在用户界面中显示 5 个字段中的 2 个 通过某种方式利用可用于枚举的相同函数来获取此数据的更简单方法是什么 enum Color RED GREEN BLACK BLU
  • 哪个类调用了我的静态方法?

    假设我有一个带有静态方法的 Java 类 如下所示 class A static void foo Which class invoked me 进一步假设 A 类有任意数量的子类 class B extends A class C ext
  • 使用 Spring MVC 返回 PDF 文件

    实际上 我有这个功能 我有一个框架 可以在其中设置 URL ip port birt preview report report rptdesign format pdf parameters 并且该框架呈现 PDF 文件 但我想隐藏该网址
  • Eclipse 自动完成更改变量名称

    只是一个愚蠢的问题 但很难搜索 因为有很多关于 Eclipse 自动完成的主题 而且很难找到与我的问题匹配的内容 所以问题是 如果我写 MyClass MyVarName 然后按空格键 添加 new MyClass Eclipse 自动添加
  • 隐藏类的 System.out.print 调用

    我正在使用 java 库 jar 文件 该文件的作者放入了一堆System out print and System out printlns 有没有办法隐藏特定对象的这些消息 编辑 看起来jar文件似乎正在创建一堆线程 并且每个线程都有它
  • 在 Java 中填充布尔数组

    作为一名相当新手的 Java 程序员 我给自己设定了一个艰巨的挑战 尝试编写一个简单的文本冒险 不出所料 我已经遇到了困难 我试图为我的 Location 类提供一个属性来存储它包含的退出 我为此使用了一个布尔数组 本质上保存代表每个出口的
  • 适用于 Solaris 的 Java 8 中缺少 javaws

    看起来 Oracle 从 Java 8 for Solaris 中删除了 Java Web Start javaws 在 Java 8u51 中不再可用 来自兼容性指南 http www oracle com technetwork jav
  • 根据哈希值确认文件内容

    我需要 检查完整性 content文件数量 文件将写入 CD DVD 可能会被复制多次 这个想法是识别正确复制的副本 在从 Nero 等中删除它们之后 我对此很陌生 但快速搜索表明Arrays hashCode byte http down
  • 确定代码是否在 App Engine 运行时 (Java) 上运行

    如何确定某些代码 Serv let 或简单的类 是否正在 Google App Engine 运行时 Java 上运行 以便决定是否使用 App Engine 的特定库 是否有一些可靠的运行时环境 ID 您可以检查com google ap
  • Java 反射:如何检索匿名内部类?

    我在另一个类中有一个匿名内部类 SomeClass Both SomeClass class getClasses and SomeClass class getDeclaredClasses 返回空数组 我在中找不到一些关于此的提示Cla
  • 给定一个单词列表 - 在 java 中完成单词的好的算法是什么?权衡:速度/效率/内存占用

    我正在探索潜在的免费 付费应用程序的硬件 软件要求 最终目标是移动 Java 应用程序 该应用程序将从这个简单的目标开始 给定数据库中相关单词的列表 能够对单个字符串输入进行单词补全 换句话说 我已经知道数据库的内容 但算法的内存占用 速度
  • 使用 CrudRepository 进行自定义查询

    我想使用 CrudRepository 自定义查询 这是我的代码 Repository public interface CustomerRepository extends CrudRepository
  • 在java中是否可以使用反射创建没有无参数构造函数的“空白”类实例?

    我有一个没有默认构造函数的类 我需要一种方法来获取此类的 空白 实例 空白 意味着实例化后所有类字段都应具有默认值 如 null 0 等 我问这个问题是因为我需要能够序列化 反序列化大对象树 而且我无法访问该对象类的源 并且类既没有默认构造
  • 我们必须将 .class 文件放在 Tomcat 目录中的位置

    我必须把我的 class文件在 Tomcat 目录中 在我的 Java Complete Reference 书中 他们告诉将其放入C Program Files Apache Tomcat 4 0 webapps examples WEB
  • 如何告诉 cxf 将包装类型保留在方法中?

    在我的 WSDL 中我有一个操作
  • 在Spring-Boot中,我们如何在同一个项目中连接两个数据库(Mysql数据库和MongoDB)?

    我正在尝试创建一个 Spring Boot 项目 其中我有一个要求 我想连接到不同的数据库 MySql 和 MongoDB 我是否需要做一些特殊的事情来连接到这两个数据库 或者 spring boot 会自动计算出自己连接到这两个数据库 我
  • 使用链接列表插入优先级队列的方法

    首先 我觉得我应该提到这是一项作业 我并不是在寻找直接的代码答案 只是为了指出正确的方向 我们被要求在链表中实现优先级队列 我正在努力编写 insert 函数的第一部分 在代码中我尝试检查是否head包含任何内容 如果没有则设置为head
  • Apache HttpClient TCP Keep-Alive(套接字保持活动)

    我的 http 请求需要太多时间才能被服务器处理 大约 5 分钟 由于连接闲置 5 分钟 代理服务器将关闭连接 我正在尝试在 Apache DefaultHttpClient 中使用 TCP Keep Alive 来使连接长时间处于活动状态
  • Checkstyle - 方法按修饰符排序

    是否可以添加到 checkstyle 规则以按修饰符对类中的方法进行排序 我的意思是开头的公共方法和最后的私有方法 MethodsOrderCheck做这个工作 检查文档 https www qulice com qulice checks
  • 升级到 Tomcat 8 时出现 ClassNotFoundException

    我最近将 NetBeans IDE 从 v7 3 升级到 v8 突然我的应用程序在连接到数据库时在服务器启动时抛出异常 这两个版本的 IDE 之间的唯一区别是后者使用 Tomcat 8 异常日志 javax naming NamingExc

随机推荐

  • 线程及线程的同步互斥

    目录 1 线程的简单介绍 2 同步互斥的概念 3 为什么要进行线程的同步互斥 4 信号量 5 互斥量 6 条件变量 1 线程的简单介绍 1 进程 在讲到线程之前 我们应该先了解一下进程的概念 进程 Process 是指计算机中已运行的程序
  • FRP内网穿透(linux->windows)

    使用背景 由于内网环境所在的电脑无法通过公网暴露访问 而使用类似于向日葵等其他代理工具 又存在一定的延迟卡顿 因此 决定待用Frp的内网穿透的功能 来实现借由公网服务器代理访问内网所在的电脑 原理 frp 主要由 客户端 frpc 和 服务
  • u盘魔术师给服务器装系统,U盘魔术师怎么装系统 U盘魔术师USM制作PE启动盘方法...

    U盘魔术师是一个很好用的装系统的工具 并且可以利用USM制作PE启动盘 很多用户都不太了解具体的方法 其实也非常的简单 下面小编就来给大家介绍一下U盘魔术师怎么装系统 赶紧来看看吧 U盘魔术师怎么装系统 U盘魔术师体积较大1G多如果是小水管
  • MyBatis学习笔记

    MyBatis MyBatis Mapper代理开发 MyBatis是一款优秀的持久层框架 用于简化JDBC MyBatis 持久层 负责把数据保存到数据库的那一层 JavaEE三层架构 表现层 页面展示 业务层 逻辑处理 持久层 对数据持
  • 在外SSH远程连接macOS服务器【cpolar内网穿透】

    文章目录 前言 1 macOS打开远程登录 2 局域网内测试ssh远程 3 公网ssh远程连接macOS 3 1 macOS安装配置cpolar 3 2 获取ssh隧道公网地址 3 3 测试公网ssh远程连接macOS 4 配置公网固定TC
  • 状态机的置位与复位

    1 状态机的异步置位与复位 异步置位与复位是与时钟无关的 当异步置位与复位到来时它们立即分别置触发器的输出为1或0 不需要等到时钟沿到来才置位或复位 把它们列入always块的事件控制括号内就能触发always块的执行 因此 当它们到来时就
  • Linux设置所有用户环境变量

    Linux中每个用户都要指定各自的环境变量 这样会比较麻烦 那么如何配置一个环境变量 所有的用户都可以使用呢 比如说我想把Linux默认语言由en US UTF 8修改为zh CN UTF 8 那么我需要设置环境变量 LANG 百度很多方法
  • Conda 配置 Python 环境

    文章目录 前言 一 Conda 是什么 二 如何获取 三 使用 Conda 命令配置多环境 1 创建新环境 2 激活新环境 3 配置新环境 4 退出新环境 5 检查所有环境 6 检查所有安装的包 7 删除某环境 8 重命名某环境 四 使用
  • Crontab配置任务定时执行

    一 每奇数周的周一执行 16 0 1 date W 2 eq 1 gt dev null sh data1 test sh 具体地 1 分钟字段 Minute field 16 2 小时字段 Hour field 0 3 日期字段 Day
  • 亚马逊首席技术官Werner Vogels:2023年及未来五大技术趋势预测

    近年来 随着我们经历的数次全球危机 如何借助技术解决人类棘手问题至关重要 如今 我们获取数据的来源比以往任何时候都多 包括可穿戴设备 医疗设备 环境传感器 视频捕获和其他联网设备 当这些数据与计算机视觉 机器学习和模拟仿真等云技术相结合时
  • OpenWrt目录之target

    target目录下主要是和平台有关的代码 最主要的是linux文件夹 linux文件夹的ramips中 ramips应该指的是对应cpu的架构 ramips文件夹下的就是不同系列的cpu对应的芯片的型号 进行试验一下 首先在根目录下运行ma
  • IDEA工具实用开发快捷键

    选中new ArrayList lt gt 或者光标放在new前面 按ctrl alt v 选中new ArrayList lt gt 或者光标放在new后边面 按ctrl alt 空格 ideal 工具没识别maven项目的话 右键pom
  • uni-app开发微信小程序,button通过数组的length判断disabled无效(数组length === 0写法无效)

    错误写法
  • caffe特征提取/C++数据格式转换

    Caffe生成的数据分为2种格式 Lmdb 和 Leveldb 它们都是键 值对 Key Value Pair 嵌入式数据库管理系统编程库 虽然lmdb的内存消耗是leveldb的1 1倍 但是lmdb的速度比leveldb快10 至15
  • 国产操作系统进入被彻底抛弃的时代

    当倪光南正在不断呼喊支持国产操作系统的时候 国产操作系统却迎来了噩梦 国产操作系统接连倒闭 国产操作系统进入一个被国家彻底抛弃的时代 红旗linux梦断国产操作系统 今年2月中科红旗linux因为缺钱倒闭解散了 一直以来做得最好的国产操作系
  • 图形学数学基础之基本蒙特卡罗尔积分(Monte Carlo Integration)

    作者 i dovelemon 日期 2017 07 29 来源 CSDN 主题 Monte Carlo Integration 引言 好久没有写博客了 最近一直在忙于工作 同时GLB库中关于PBR的渲染算法 一直卡住 无法实现下去 不过在这
  • dd大牛的《背包九讲》

    P01 01背包问题 题目 有N件物品和一个容量为V的背包 第i件物品的费用是c i 价值是w i 求解将哪些物品装入背包可使这些物品的费用总和不超过背包容量 且价值总和最大 基本思路 这是最基础的背包问题 特点是 每种物品仅有一件 可以选
  • 【HCNP路由交换学习指南】学习笔记丨第07章 BGP

    07 BGP BGP 的基本概念 BGP 对等体关系类型 IBGP 水平分割原则 路由黑洞问题及 BGP 同步规则 路由通告 Router ID 报文类型及格式 Open 报文 Update 报文 Keepalive 报文 Notifica
  • PaddleOCR使用笔记之模型训练

    目录 简介 模型训练 步骤一 文本检测模型 detection 1 准备训练数据集 2 下载预训练模型 模型介绍 下载预训练模型 3 开始训练 断点训练 4 模型评估 5 模型测试 6 训练模型转inference模型 步骤二 文本识别模型
  • RabbitMQ保证消息的一致性解决方案

    RabbitMQ保证消息的一致性 一 采用confirm消息确认机制及return返回机制 确保消息发送成功 二 将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在 三 手动确认接收消息方式 消息处理失败拒收重回队列 1 y