使用RabbitMQ实现延时队列

2023-11-09

之前公司是一个类电商公司,会有用户下单后未支付取消订单的场景,解决方案是使用RabbitMQ的死信队列来实现一个延时队列:下单时,将订单丢进消息队列,设置过期时间(订单失效时间),然后到时候检查订单状态,如果未支付则取消订单。

1.什么是死信
“死信”(Dead Letter)是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用basic.reject 或者 basic.nack,并且此时requeue 属性被设置为false
  • 消息在队列的存活时间超过设置的TTL(TimeToLive)时间
  • 消息队列的消息数量已经超过最大队列长度

那么该消息将成为“死信”,如果配置了死信队列,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。死信交换机也是一种普通交换机,只不过是接受死信队列而已

2.如何实现死信队列
死信队列与普通队列一样有三种模式:Direct、Topic、Fanout
Direct:需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
Topic:只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Fanout:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。

实现死信队列步骤:

  • 声明死信队列
  • 声明死信交换机
  • 绑定死信队列与死信交换机

  • 声明业务队列(配置死信交换机和死信key)
  • 声明业务交换机
  • 绑定业务队列与业务交换机

这样当业务队列中的message变成dead letter时,就会被投递到死信队列中,然后可以通过监控死信队列,拿到业务message

就是这2个参数:
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

3.具体实现
新建Springboot项目,使用SpringAMQP+SpringWeb

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>RabbitMQDemo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

#rabbitmq
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #消息确认机制
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息未被接收时返回 而不是丢弃
    publisher-returns: true
    #确认消息
    template:
      mandatory: true
    #消费端手动ack
    listener:
      simple:
        acknowledge-mode: manual

配置消息

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);


        //消息发送到exchange后回调
        rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
            System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
            System.out.println("ConfirmCallback:     "+"确认情况:"+b);
            System.out.println("ConfirmCallback:     "+"原因:"+s);
        });

        //从exchange发送到队列失败后回调
        rabbitTemplate.setReturnsCallback((callback) -> {
            System.out.println("ReturnCallback:     "+"消息:"+callback.getMessage());
            System.out.println("ReturnCallback:     "+"回应码:"+callback.getReplyCode());
            System.out.println("ReturnCallback:     "+"回应信息:"+callback.getReplyCode());
            System.out.println("ReturnCallback:     "+"交换机:"+callback.getExchange());
            System.out.println("ReturnCallback:     "+"路由键:"+callback.getRoutingKey());
        });

        return rabbitTemplate;
    }

    /**
     * 声明死信队列
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead-queue",true);
    }

    /**
     * 声明死信交换机
     */
    @Bean
    public DirectExchange deadExchane(){
        return new DirectExchange("dead-exchange");
    }

    /**
     * 绑定死信队列到死信交换机
     */
    @Bean
    public Binding deadBing() {
        return BindingBuilder.bind(deadQueue()).to(deadExchane()).with("dead");
    }

    /**
     * 声明订单队列
     */
    @Bean
    public Queue orderQueue(){
        Map<String, Object> map = new HashMap<>(3);
        //死信交换机,对应上面的死信队列
        map.put("x-dead-letter-exchange","dead-exchange");
        //死信key,对应上面的死信key
        map.put("x-dead-letter-routing-key","dead");
        //可以在这里统一设置过期时间,也可以在发送消息到队列的时候设置
         //map.put("x-message-ttl",5 * 1000);
        return new Queue("order-queue",true,false,false,map);
    }

    /**
     * 声明订单交换机
     */
    @Bean
    public DirectExchange orderExchane(){
        return new DirectExchange("order-exchange");
    }

    /**
     * 绑定订单队列到订单交换机
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchane()).with("order");
    }
}

监听死信队列,这里没有手动确认会一直在队列中

package com.example.demo.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class DeadMessageReceiver {

    @RabbitListener(queues = "dead-queue")
    public void process(Message message, Channel channel, Map map) {
        System.out.println("收到死信队列消息:" + message.toString());
    }
}

发送消息

package com.example.demo.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class SendMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendOrderMessage")
    public String sendOrderMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageInfo = "Order message";

        String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map map = new HashMap();
        map.put("messageId",messageId);
        map.put("messageInfo",messageInfo);
        map.put("time",time);

        //将消息发送到订单队列
        rabbitTemplate.convertAndSend("order-exchange", "order",map,message -> {
            message.getMessageProperties().setExpiration(5000 + "");
            System.out.println("5秒后将过期");
            return message;
        });
        System.out.println("发送消息成功");
        return "success";
    }
}

测试:
发送消息,5秒后过期到死信队列中
在这里插入图片描述
可以看到order-quueue的消息到dead-queue中去了
在这里插入图片描述
参考:
https://blog.csdn.net/qq_41389354/article/details/111352242

https://www.cnblogs.com/mfrank/p/11184929.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用RabbitMQ实现延时队列 的相关文章

随机推荐

  • 深入浅出python系列(三):逻辑判断语句

    深入浅出python系列 深入浅出python系列 一 基本数据类型 深入浅出python系列 二 运算符 版权申明 未经博主同意 谢绝转载 请尊重原创 博主保留追究权 本博客的内容来自于 深入浅出python系列 三 逻辑判断语句 学习
  • 如何基于 Git 设计合理的多人开发模式

    本文转自 Java高性能架构 一个企业级项目是由多人合作完成的 不同开发者在本地开发完代码之后 可能提交到同一个代码仓库 同一个开发者也可能同时开发几个功能特性 这种多人合作开发 多功能并行开发的特性如果处理不好 就会带来诸如丢失代码 合错
  • 信安大佬真的用kali吗?

    Kali只是现在网络安全和kali比较火的一个操作系统 下面我为大家讲讲kali系统都有那些优点 Kali介绍Kali Linux是基于Debian的Linux发行版 设计用于数字取证操作系统 面向专业的渗透测试和安全审计 集成化 预装超过
  • 【Linux】2、systemd、journalctl 超详细介绍

    文章目录 一 背景 二 系统管理 2 1 systemctl 2 1 1 State degraded 2 2 systemd analyze 2 3 hostnamectl 2 4 localectl 2 5 timedatectl 2
  • 微信小程序项目实战-电影票订票系统的毕业设计(附源码+论文)

    大家好 我是职场程序猿 感谢您阅读本文 欢迎一键三连哦 当前专栏 微信小程序毕业设计 精彩专栏推荐 安卓app毕业设计 Java毕业设计 电影票订票小程序软件 java 演示 源码下载地址 https download csdn net d
  • 电工学习笔记——示波器交直流耦合的区别

    一 概述 示波器的输入耦合方式的意思是输入信号的传输方式 耦合是指两个或两个以上的电路元件或电网络等的输入与输出之间存在紧密配合与相互影响 并通过相互作用从一侧向另一侧传输能量的现象 示波器的输入耦合属于信号直接耦合 一般有两种方式 分别是
  • spark报错:The current account does not have the permission of database

    异常信息 22 01 18 20 21 34 main WARN HiveSessionCatalog The current account does not have the permission of database adm It
  • 关于软件产品化的几点思考【转】

    关于软件产品化的几点思考 转自 汉捷咨询 国内很多软件企业尤其是行业软件企业是从开发一 二个软件项目起家的 而且项目规模和复杂度也不大 依赖其中一两个高手 他们能够在客户适度满意的状态下成功完成项目 基于以往研究 成功的主要因素是项目具备以
  • 各种语言的简写代码

    中文 zh CN 英语 en 中文 繁体 zh TW 越南语 vi 阿尔巴尼亚语 sq 阿拉伯语 ar 阿塞拜疆语 az 爱尔兰语 ga 爱沙尼亚语 et 白俄罗斯语 be 保加利亚语 bg 冰岛语 is 波兰语 pl 波斯语 fa 布尔文
  • Matlab - Solidworks 机器人建模(6)——使用rigidBodyTree构建机器人模型

    前言 本文适用对象 没有机器人的Solidworks模型自己又懒得画的童鞋 没有机器人URDF模型的童鞋 如果你在Matlab帮助里面搜索rigidBody 你大概率会看到matlab自带的例程 链接在这里 教你怎么用rigidBody建立
  • 腾讯会议——录制的视频如何正常观看(转为MP4格式)

    1 打开腾讯会议 2 点击历史会议 3 点击你录制的会议 4 点击录制详情 5 点击转码 完成这5步 即可将所保存的视频转为MP4格式 便于观看
  • 游戏开发unity插件Cinemachine系列:制作摄像机沿路径移动的动画

    可以参看 https blog csdn net zhenghongzhi6 article details 104885429
  • 初级软件测试工程师需要具备那些知识与技能

    哈喽 大家好 今天我们来聊聊如何成为一名初级软件测试工程师 需要必备那些知识和技能 什么是软件测试 软件测试的经典定义是 在规定的条件下对程序进行操作 以发现程序错误 衡量软件品质 并对其是否能满足设计要求进行评估的过程 软件测试的现实定义
  • iOS安全之ipa 包重签名的3种方法

    重签名的意义 ipa 重签名最大的用处是 不必重新打包 和配置其它第三方获取 appkey 等操作 直接重签名之后依然可以拥有这些功能 更快的发布测试或者灰度版本 方法一 终端命令 sigh resign 1 明白两个东西 想要重签名的证书
  • Unity笔记--Canvas渲染

    参考 五 UGUI源码分析之Rebuild 布局重建 图形重绘 网格重建 网格重建大体包括布局重建和图形重建两部分 canvas更新过程可分为布局 渲染两部分 共六阶段 public enum CanvasUpdate Prelayout
  • C++类和对象——引用作为函数形参

    问题 1 如果函数的形参为普通函数 那么调用函数时形参对象会被构造 函数调用结束形参对象还需要被销毁 2 为了避免形参对象这种 临时对象 的创建 我们可以将形参设计成引用 着重理解下边的代码 include
  • 牛客网--HJ1 字符串最后一个单词的长度

    文章目录 前言 一 题目内容和牛客网的链接 二 话不多说 引入代码 1 引入库 2 读入数据 总结 前言 题目的分析 一 题目内容和牛客网的链接 牛客网题目链接 二 话不多说 引入代码 1 引入库 代码如下 示例 include
  • Origin常见问题

    1 在绘图时 常常移动一个图 其他的图也跟着缩放 这是由于图层关联导致 取消即可 如下 图中所示 默认是图层2关联到了图层1 所以取消关联就可以了
  • C语言数组指针和指针数组实例演示

    一 数组指针 1 简介 数组指针就是指向数组的指针 定义方式 int p len NULL 示例 include
  • 使用RabbitMQ实现延时队列

    之前公司是一个类电商公司 会有用户下单后未支付取消订单的场景 解决方案是使用RabbitMQ的死信队列来实现一个延时队列 下单时 将订单丢进消息队列 设置过期时间 订单失效时间 然后到时候检查订单状态 如果未支付则取消订单 1 什么是死信