RabbitMq结合springBoot实现延时任务

2023-11-11

简介

rabbitMQ延时任务的实现思想:

rabbitmq实现延时任务本质就是使用  "过期时间"  和   "死信队列"实现的,首先定义一个死信队列和死信队列的消费者,这个死信队列用来接收延时队列过期的消息,死信队列消费者用来接收到过期消息后就进行消费,在定义一个延时队列存储我们的信息,这个延时队列没有消费者,所以消息一直存在这个队列中,当我们设置了过期时间为24小时后,每个消息24小时候就会过期,进入到死信队列中,死信队列中有消费者,只要一过期就会被死信队列的消费者消费,就实现了延时任务

准备工作

这里不介绍如何安装rabbitmq和erlang了,如果有需要可以跳转下面的

windows环境下安装RabbitMQ(超详细)_windows安装rabbitmq_luckySnow-julyo的博客-CSDN博客

创建springBoot项目,下载依赖

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

实现

方式一:队列设置过期时间(建议使用该方式)

生产者

我们首先创建一个死信队列

package com.qtt.mq.create.scheduled;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DeadLetter {

    //创建死信的队列
    @Bean
    public Queue  createDeadLetterQueue1(){
        //queue(队列名,是否持久化)
        return new Queue("dead-letter-queue-1",true);
    }

    //创建死信的交换机
    @Bean
    public DirectExchange createDeadLetterExchange1(){
        //DirectExchange(交换机名称,是否持久化,是否自动删除)
        return new DirectExchange("dead-letter-exchange-1",true,false);
    }

    //队列和交换价绑定
    @Bean
    public Binding createDeadLetterBinding1(){
        return BindingBuilder.bind(createDeadLetterQueue1())
                .to(createDeadLetterExchange1())
                //routing-key
                .with("111");
    }
}

创建一个延时用的队列(该队列没有消费者,且要设置过期时间)

package com.qtt.mq.create.scheduled;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class DelayedQueue {

    //创建队列,且指定队列的死信队列
    @Bean
    public Queue createDelayedQueue(){

        //创建map用于定义队列的一些属性
        Map<String, Object> map = new HashMap<>();

        //指定队列的死信队列的交换机(x-dead-letter-exchange是固定的)
        map.put("x-dead-letter-exchange","dead-letter-exchange-1");

        //指定死信接收队列绑定的routing-key(x-dead-letter-routing-key)
        map.put("x-dead-letter-routing-key","111");

        //设置队列消息过期时间(如果我们通过消息设置过期时间,这个可以不定义,但建议使用队列过期)
        map.put("x-message-ttl",60000);

        //Queue(队列名称,是否持久化,不知道,不知道,队列属性)
        return new Queue("delayed-send-queue",true,false,false,map);
    }

    //创建交换机
    @Bean
    public DirectExchange createDelayedExchange(){
        //DirectExchange(交换机名称,是否持久化,是否自动删除)
        return new DirectExchange("delayed-send-exchange",true,false);
    }

    //创建绑定
    @Bean
    public Binding  createDelayedBinding(){
        return BindingBuilder.bind(createDelayedQueue())
                .to(createDelayedExchange())
                //routing-key
                .with("222");
    }

}

将消息发送到我们的延时队列中

package com.qtt.mq.controller;

import com.qtt.mq.send.IsendMQService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)   //test自动装配为空的问题解决
public class mqController {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void  delayedTest(){
        rabbitTemplate.convertAndSend("delayed-send-exchange","222","测试延时任务");
    }

}

消费者

给我们延时任务的死信队列设置消费者进行消费

package com.qtt.mq.reception;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//一定要加入到spring容器中
@Component
public class DelayedReception {

    //定义的监听队列(该处是监听的死信息队列)
    @RabbitListener(queues = {"dead-letter-queue-1"})
    public void  delayedTest(String mess){
        System.out.println(mess);
    }

}

方式二:消息设置过期时间

和上面的思想一样,只是使用的方式不同

1. 延时队列的过期时间去掉(注意:如果已经指定了死信队列,要删除该队列,在重新创建,因为指定过死信队列后不能通过代码修改属性)

不去掉也行,但默认会使用最小的一个过期时间,所以可能会冲突

2. 定义消息时定义消息过期时间

package com.qtt.mq.controller;

import com.qtt.mq.send.IsendMQService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)   //test自动装配为空的问题解决
public class mqController {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void  delayedTest(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message("这是一个消息测试死信队列".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("delayed-send-exchange","222",message);
    }

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMq结合springBoot实现延时任务 的相关文章

  • 每次发送消息时是否需要重建RabbitMQ连接

    我有一个 Spring 3 应用程序 它通过非 RabbitMQ 接收器接收消息 处理它们并通过 RabbitMQ 转发 每次发送消息时都会建立一个新的 RabbitMQ 连接 这似乎有点浪费 我只是想知道这是否真的有必要 或者是否有原因导
  • 在使用 FromEventPattern 订阅之前捕获事件

    我正在使用 Rx 框架编写消息监听器 我面临的问题是 我正在使用的库使用一个消费者 每当消息到达时就会发布事件 我已经设法通过以下方式消费传入的消息Observable FromEventPattern但我对服务器中已有的消息有疑问 目前我
  • 发送消息到任意虚拟主机/与 RabbitMQ / Spring AMQP 交换

    I use RabbitMQ and Spring AMQP发送消息 我有这个片段 rabbitTemplate convertAndSend exchange key object 当我对单个操作进行操作时 这有效VHOST 我必须从 1
  • 在点网核心应用程序中使用 RabbitMQ 跳过 MassTransit 中的队列

    我有三个项目 一个是Dot net core MVC 两个是API项目 MVC 正在调用一个 API 来获取用户详细信息 当询问用户详细信息时 我通过 MassTransit 向队列发送消息 我看到跳过队列 第三个项目中有消费者 即API项
  • 如何触发 IModel.BasicAcks?

    我第一次使用 RabbitMQ 的 NET API 我想出了一个对我来说似乎合理的用例 我想创建发布消息并在消息被确认后执行某些操作的发布者 IModel BasicAcks 事件似乎是了解这一点的好方法 所以 我给出版商写了一封信 pri
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 如何使用自动装配的 Spring Boot 监听多个队列?

    我是 Spring Boot 的新手 正在尝试它 目前我已经构建了一些应用程序 我希望能够通过队列相互通信 我目前有一个侦听器对象 可以从特定队列接收消息 Configuration public class Listener final
  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • springrabbitmq:无法将id设置为属性?

    我有一个属性文件 其中包含队列 其值为queue name 如果我在其他请使用该属性 那么它可以工作 但如果我在 id 上使用它 那么它会失败
  • 如何使用 Celery、RabbitMQ 和 Django 确保每个用户的任务执行顺序?

    我正在运行 Django Celery 和 RabbitMQ 我想要实现的是确保与一个用户相关的任务按顺序执行 具体来说 一次执行一个 我不希望每个用户执行任务并发 每当为用户添加新任务时 它应该取决于最近添加的任务 如果此类型的任务已为此
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样

随机推荐

  • Vscode快速入门、 插件安装、插件位置、修改vscode默认引用插件的路径、在命令行总配置code、快捷键

    Vscode快速入门 这里写目录标题 安装 插件安装 Ctrl Shift X 插件商店 插件位置 默认位置 修改默认路径 修改vscode默认引用插件的路径 在命令行总配置code 基本操作 安装 Visual Studio Code 官
  • RxJava2+Retrofit2+RxLifecycle3+OkHttp3网络请求封装(动态演示)

    入职公司后 公司要求组件化开发 经过讨论后我将网络请求框架单独进行了封装 不过当时框架里将常用的 util 和 ui 均放入到了共同的 Common 包下 导致里面部分代码耦合 后来为了降低耦合性又将 Common 拆分为了lib comm
  • Python 解决百钱买百鸡问题

    我国古代数学家张丘建在 算经 一书中曾提出过著名的 百钱买百鸡 问题 该问题叙述如下 鸡翁一 值钱五 鸡母一 值钱三 鸡雏三 值钱一 百钱买百鸡 则翁 母 雏各几何 翻译过来 意思是公鸡一个五块钱 母鸡一个三块钱 小鸡三个一块钱 现在要用一
  • Django开发员工管理系统(Part I)

    文章目录 1 准备工作 1 1 创建django项目 1 2 创建app 1 3 配置settings py文件 完成app注册 2 设计数据库表结构 3 在MySQL中生成表 3 1 创建数据库 3 2 修改配置文件 连接MySQL数据库
  • Nature:为啥室温超导支棱不起来

    克雷西 发自 凹非寺量子位 公众号 QbitAI 引发全球热议的LK 99风波告一段落后 Nature的一篇资讯头条再次提及了 室温超导 尽管对超导的热情一直不减 但随着一次又一次被证伪 人们很难不对 室温超导 慎之又慎 Nature的这篇
  • GitHub开源:狗屁不通文章生成器

    万字申请 废话报告 魔幻形式主义大作怎么写 GitHub开源狗屁不通文章生成器了解一下 只要输入一句话 系统就会给你一篇万字长文 查看源代码编写风格清新脱俗 并且毫无算法 简单暴力 直接在关键语句前后加上废话 名人名言 GitHub Git
  • 关系数据库中连接池的机制是什么?

    前提 为数据库连接建立一个缓冲池 1 从连接池获取或创建可用连接 2 使用完毕之后 把连接返回给连接池 3 在系统关闭前 断开所有连接并释放连接占用的系统资源 4 能够处理无效连接 限制连接池中的连接总数不低于或者不超过某个限定值 其中有几
  • 模式识别之分类器

    常见分类器介绍 1 SVM分类器 监督学习分类器 答 训练样本必须先标识不同类别 然后进行训练 SVM算法就是找一个超平面 对于已经被标记的训练样本 SVM训练得到一个超平面 使得两个类别训练集中距离超平面最近的样本之间的垂直距离要最大 也
  • Java--Map和HashMap基础

    一 Map常用方法 1 Map集合在 java util Map 包下 Map集合以键值对 key和value 的方式存储数据 key和value都是引用数据类型 都是存储对象的内存地址 2 Map接口中常用方法 V put K key V
  • W3C?什么是W3C相关标准?

    什么是W3C标准 什是W3C标准 不是一个标准 而是万维网联盟制定的一系列标准 网页主要由三部分组成 结构 Structure 表现 Presentation 和行为 Behavior 对应的标准也分三方面 结构化标准语言主要包括XHTML
  • 数据采集+数据可视化练习(2022-1-6)

    任务书3 赛题说明 竞赛内容分布 竞赛时长 任务一 Spark 组件部署管理 Standalone 模式 15 任务二 数据采集 20 任务三 数据清洗与分析 30 任务四 数据可视化 20 任务五 综合分析 10 团队分工明确合理 操作规
  • Python-Django毕业设计信息安全风险评估系统设计与实现(程序+Lw)

    项目运行 环境配置 Jdk1 8 Tomcat7 0 Mysql HBuilderX Webstorm也行 Eclispe IntelliJ IDEA Eclispe MyEclispe Sts都支持 项目技术 SSM mybatis Ma
  • CentOs yum源配置

    yum 的理念是使用一个中心仓库 repository 管理一部分甚至一个distribution 的应用程序相互关系 根据计算出来的软件依赖关系进行相关的升级 安装 删除等等操作 减少了Linux 用户一直头痛的dependencies
  • 通过opencv与神经网络对滑动验证码的一次深入学习

    好久没写博客了 人到中年 有点儿犯懒 从信息安全行业 又去了IT合规领域 与信息安全结合还是两手抓 两手都不硬 由于工作原因 需要获取一个token来请求接口 奈何没有现成的接口 需要在web端登录才可以获取 既然如此 想要实现这个功能肯定
  • 【LeetCode解题报告】《算法基础009_算术基本定理》- Java

    目录 一 507 完美数 1 题目 2 分析 3 代码 二 263 丑数 1 题目 2 分析 3 代码 一 507 完美数 1 题目 507 完美数 对于一个 正整数 如果它和除了它自身以外的所有 正因子 之和相等 我们称它为 完美数 给定
  • PHP通过URL远程下载图片到本地

    一 业务场景 我们需要远程将微信提供接口生成的临时二维码图片下载下来之后使用本地服务器去访问图片并存储在服务器数据库作为记录 此方法同样可以帮助你爬取网上一些公开数据 如cdnjs css 所需参数 公网能够访问到的图片 文件 地址 二 代
  • Source Insight给Linux内核创建工程

    所有文档请关注公众号 一口Linux 后台回复 ubuntu linux驱动视频同步更新到 https live bilibili com 22719960 一 Source Insight安装 1 预先准备好 Source Insight
  • ubuntu 文件删除后磁盘没有释放

    磁盘满了 删除文件后df发现没有变化 base root xddz df h Filesystem Size Used Avail Use Mounted on udev 32G 0 32G 0 dev tmpfs 6 3G 2 8M 6
  • 在JDBC连接池中启动Oracle RAC的TAF

    Oracle RAC 的一个负责故障切换处理的主要组件是透明应用程序故障切换 TAF 选件 下面列举通过JDBC实现的透明应用程序故障切换的代码 more Tele zhou Class forName oracle jdbc driver
  • RabbitMq结合springBoot实现延时任务

    简介 rabbitMQ延时任务的实现思想 rabbitmq实现延时任务本质就是使用 过期时间 和 死信队列 实现的 首先定义一个死信队列和死信队列的消费者 这个死信队列用来接收延时队列过期的消息 死信队列消费者用来接收到过期消息后就进行消费