rabbitTemplate 让setConfirmCallback执行完成后再去发送消息给消费者

2023-11-12

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;

public class MySender {
    private static final String EXCHANGE_NAME = "exchange";
    private static final String ROUTING_KEY = "routingKey";
    
    private final RabbitTemplate rabbitTemplate;
    private final CountDownLatch latch;

    public MySender() {
        ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        latch = new CountDownLatch(1);
        
        rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("Message confirmed: " + correlationData.getId());
                    latch.countDown(); // 标记确认回调执行完成
                } else {
                    System.out.println("Message not confirmed: " + correlationData.getId() + " - " + cause);
                    // 在 handleNack 中进行消息重发等处理
                    // ...
                }
            }
        });
    }

    public void send(String message) throws MyException {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        try {
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
            latch.await(); // 等待确认回调执行完成
            System.out.println("Message sent to consumer: " + message);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MyException("Message send failed: " + correlationData.getId(), e);
        } catch (AmqpException e) {
            System.out.println("Message send failed, correlationData: " + correlationData);
            // 在 convertAndSend 方法中发生异常时,也需要进行消息重发等处理
            // ...
            throw new MyException("Message send failed: " + correlationData.getId(), e);
        }
    }

    public static class MyException extends Exception {
        public MyException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

使用了 CountDownLatch 类来实现等待确认回调函数执行完成后再发送消息给消费者的功能。具体实现方式如下:

在 MySender 构造函数中创建 CountDownLatch 对象,并将初始计数器设置为 1。
在 setConfirmCallback 回调函数中,如果消息得到了确认,我们调用 CountDownLatch 对象的 countDown 方法来将计数器减一,标记确认回调执行完成。
在 send 方法中,我们先调用 convertAndSend 方法来发送消息,并使用 latch.await() 方法来等待确认回调函数执行完成。
当确认回调函数执行完成后,latch.await() 方法返回,我们就可以将消息发送给消费者。

在这个示例代码中,我们在 send 方法中捕获了 InterruptedException 和 AmqpException 异常,并将它们封装到自定义异常 MyException 中抛出。同时,我们在 InterruptedException 捕获块中重新设置中断标志,以便上层调用者能够知道线程被中断了。

如果消息发送失败,我们抛出 MyException 异常,并在异常消息中包含消息的 CorrelationData ID,以便后续的错误处理能够定位到具体的消息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rabbitTemplate 让setConfirmCallback执行完成后再去发送消息给消费者 的相关文章

随机推荐

  • git查看日志

    目录 引言 git查看该项目提交记录 查看指定条数的记录 显示提交的差异 提交的简略信息 按行显示提交信息 按照指定格式显示记录 指定文件的提交记录 指定字符串或函数的提交记录 示例 引言 有时需要对之前所做的一些修改查看记录 这里是查看g
  • STM32F407ZGT6控制舵机(采用高级定时器8)

    前言 32单片机给舵机供电不足 会出现不稳定的情况 舵机鬼畜 所以要外加电源给舵机供电 利用12v锂电池 通过稳压模块降压到5 5v 提供给舵机 稳压电路的gnd一定要接上32单片机的gnd 不共地虽然能供电但数据线无法传输数据 stm32
  • 以太坊ERC-20协议详解

    区块链学习 https github com xianfeng92 Love Ethereum ERC20是以太坊定义的一个 代币标准 https github com ethereum EIPs blob master EIPS eip
  • 计算机网络综合选择题

    计算机网络综合选择题 TCP IP体系结构中的TCP和IP所提供的服务分别为 A 运输层服务和网络层服务 B 运输层服务和应用层服务 C 链路层服务和网络层服务 D 网络层服务和运输层服务 答案 A 2 对于无序接收的滑动窗口协议 若序号位
  • JAVA中的异常处理机制

    JAVA中的异常处理机制 java异常处理中的关键字 try catch finally throw throws return try 检测代码块 在此代码块中一旦检测到异常就会自动跳转到相应的catch try 检测代码块 catch
  • 21_pre_access 阶段

    文章目录 限制每个客户端的并发连接数 limit conn 指令 示例配置 限制每个客户端的每秒处理请求数 limit req 指令 限制每个客户端的并发连接数 ngx http limit conn module 生效阶段 ngx htt
  • Java实现FTP的上传和下载!

    java实现连接FTP服务器 实现文件的上传和下载 一 FTP服务器 FTP服务器 File Transfer Protocol Server 是在互联网上提供文件存储和访问服务的计算机 它们依照FTP协议提供服务 FTP协议是一种专门用来
  • 一文简单了解RPMB

    不知道大家对于RPMB有所了解吗 最近在看这些存储介质的介绍的时候 在推荐里面看到了这个东西 又因为对安全本身就有所涉及学习 所以这里来看看这个东西 学习的内容都是来自前辈们的blog 会在文末附注 1 Flash是什么 关于存储的种类有很
  • 解决mysql忘记密码无法登陆问题

    当我们忘记mysql密码的时候我们不仅无法访问数据库 也无法修改密码 这是个很头疼的问题 下面是跳过用户验证登陆数据库的小技巧 第一步 打开我们安装mysql的目录 复制 D PhpStudy PHPTutorial MySQL bin 地
  • 同一端口有2个前端应用应该如何配置nginx.conf

    需求 业务系统中有2种完全不同角色 页面没有相同模块拆分成了2个应用A和B 但后端是同一个后端 部署的时候要求A和B在同一端口下 问 如何配置nginx 首先我们将A B前端包放到 opt app jhscf deploy html下 这样
  • 电脑如何打开虚拟化设置?

    当你开启Vmware中的虚拟机时 如果出现以上提示 说明你的虚拟化没有打开 在计算机中 虚拟化 英语 Virtualization 是一种资源管理技术 是将计算机的各种实体资源 如服务器 网络 内存及存储等 予以抽象 转换后呈现出来 打破实
  • R语言-随机前沿分析法--SFA

    3 1介绍 生产函数模型 lnqi x i b vi ui 随机生产前沿函数 qi 产出变量向量 x i 投入变量向量 b 变量参数估计 vi 统计噪声的对称随机误差 ui 无效效应 3 2度量技术效率的方法 SFA 参数 DEA 非参数
  • Fortran 90学习之旅(一)Visual Fortran 6.5 的安装与第一个例子

    转载请标明是引用于 http blog csdn net chenyujing1234 源码 http www rayfile com zh cn files e5f02f0a 8799 11e1 b6a2 0015c55db73d 高尔夫
  • Java 中封装JDBC连接到JDBCUtils工具类的详解

    博主前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住也分享一下给大家 点击跳转到网站 前言 在JDBC操作中 获取连接和释放资源是经常使用到的 可以将其封装成到一个工具类JDBCUtils中 JDBCUtils中有两个方法
  • .numpy()、.item()、.cpu()、.clone()、.detach()及.data的使用 && tensor类型的转换

    文章目录 numpy item cpu clone detach 及 data的使用 item cpu numpy clone detach data data和 detach 不同点 Tensor类型的转换 numpy item cpu
  • STM32定时器中断

    目录 一 关于STM32定时器中断 1 定时器分类 2 通用定时器的功能特点 3 定时器中断的触发 4 定时时钟计算方法 二 CubeMX初始化配置 1 芯片选型 我们这里运用的STM32F103C8T6 编辑 2 时钟配置 3 TIM2中
  • 经纬恒润OTA仿真测试解决方案为汽车智能化发展保驾护航

    OTA技术是汽车实现完整网联化 智能化体验的基础 自被引用汽车以来 广受研发人员 市场用户的关注 近来 国家有关部门也陆续出台了相应政策 对汽车企业OTA技术的应用进行了约束和规范 因此 OTA技术在量产车型的应用落地 离不开完整的测试验证
  • SpringSecurity中授权时fastjson序列化问题

    最近在复习Spring Security 复习的鉴权的时候出现问题 26 封装权限信息 哔哩哔哩 bilibili 如果是从B站中看到 直接说问题可能出现的原因 可能是private List
  • XML建模(简单易学)

    目录 XML建模步骤 1 什么是建模 2 导入jar包 3 创建XML文件 4 根据XML文件中的元素创建模型类 ConfigModel类 ActionModel类 ForwardModel类 4 工厂类的编写 XML建模步骤 1 什么是建
  • rabbitTemplate 让setConfirmCallback执行完成后再去发送消息给消费者

    import org springframework amqp AmqpException import org springframework amqp rabbit connection CachingConnectionFactory