RabbitMQ重试机制

2023-11-14

1、RabbitMQ重试机制的简介

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。如下图:

注意事项:

如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ 支持消息确认-ACK。

如果忘记了消息确认,那么后果很严重。当 Consumer 退出时候,Message 会一直重新分发。然后 RabbitMQ 会占用越来越多的内容,由于 RabbitMQ 会长时间运行,因此这个"内存泄漏"是致命的。

RabbitMQ 重试机制核心配置:

spring:
  # 项目名称
  application:
    name: rabbitmq-consumer
  # RabbitMQ服务配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        # 重试机制
        retry:
          enabled: true #是否开启消费者重试
          max-attempts: 5 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

 

2、RabbitMQ重试机制的实现

下面将通过示例来讲解 RabbitMQ 重试机制的实现。首先需要创建两个 SpringBoot 项目并整合 RabbitMQ 客户端。

2.1 实现消息发送端

(1)创建第一个 SpringBoot 项目( rabbitmq-provider 消息发送项目)。

在pom.xml配置信息文件中,添加相关依赖文件:

<!-- AMQP客户端 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.4.1</version>
</dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务:

spring:
  # 项目名称
  application:
    name: rabbitmq-provider
  # RabbitMQ服务配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

(2)配置队列

在 rabbitmq-provider(消息发送项目)中,配置队列名称,并将队列交由 IoC 管理,代码如下:

package com.pjb.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 * @author pan_junbiao
 **/
@Configuration
public class RabbitMqConfig
{
    public static final String QUEUE_NAME = "queue_name"; //队列名称
    public static final String EXCHANGE_NAME = "exchange_name"; //交换器名称
    public static final String ROUTING_KEY = "routing_key"; //路由键

    /**
     * 队列
     */
    @Bean
    public Queue queue()
    {
        /**
         * 创建队列,参数说明:
         * String name:队列名称。
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
         * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         * 当没有生产者或者消费者使用此队列,该队列会自动删除。
         * Map<String, Object> arguments:设置队列的其他一些参数。
         */
        return new Queue(QUEUE_NAME, true, false, false, null);
    }

    /**
     * Direct交换器
     */
    @Bean
    public DirectExchange exchange()
    {
        /**
         * 创建交换器,参数说明:
         * String name:交换器名称
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 绑定
     */
    @Bean
    Binding binding(DirectExchange exchange, Queue queue)
    {
        //将队列和交换机绑定, 并设置用于匹配键:routingKey
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

(3)创建发送者

在 rabbitmq-provider(消息发送项目)中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息,代码如下:

package com.pjb;

import com.pjb.config.RabbitMqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * RabbitMq测试类
 * @author pan_junbiao
 **/
@SpringBootTest
public class RabbitMqTest
{
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessage()
    {
        String message = "您好,欢迎访问 pan_junbiao的博客";
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message);
        System.out.println("消息发送成功!");
    }
}

2.2 实现消息接收端

(1)创建第二个 SpringBoot 项目( rabbitmq-consumer 消息接收项目)。

在pom.xml配置信息文件中,添加相关依赖文件:

<!-- AMQP客户端 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.4.1</version>
</dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务,这里需要配置 RabbitMQ 重试机制:

spring:
  # 项目名称
  application:
    name: rabbitmq-consumer
  # RabbitMQ服务配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        # 重试机制
        retry:
          enabled: true #是否开启消费者重试
          max-attempts: 5 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

(2)创建接收者

在 rabbitmq-consumer(消息接收项目)中,创建创建接收者,注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。

接收者接收到消息后,打印输出消息,然后程序抛出运行时异常,观察现象。代码如下:

package com.pjb.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 接收者
 * @author pan_junbiao
 **/
@Component
@RabbitListener(queues="queue_name")
public class Receiver
{
    @RabbitHandler
    public void process(String message)
    {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收消息: " + message + " 接收时间:" + sdf.format(new Date()));
        throw new RuntimeException();
    }
}

特别注意:

如果在消息接收端的 application.yml 配置文件中没有添加 RabbitMQ 重试机制的相关配置,当接收端收到消息后程序抛出异常,那么发送端将得不到消息确认(ACK),此时发送端将会循环的发送消息,最终导致内存溢出。

执行结果:

从上述执行结果来看,当接收端重试5次后,将消息确认(ACK)。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ重试机制 的相关文章

  • RabbitMQ 中的并发

    经过一周的编码和搜索论坛后 似乎是时候问 我有一个 C 应用程序 它使用 EventingBasicConsumer 处理 RabbitMQ 发送的消息 我想同时处理多个消息 因此我在同一连接上实例化了几个通道 本例中为 8 个 每个通道都
  • 在使用 FromEventPattern 订阅之前捕获事件

    我正在使用 Rx 框架编写消息监听器 我面临的问题是 我正在使用的库使用一个消费者 每当消息到达时就会发布事件 我已经设法通过以下方式消费传入的消息Observable FromEventPattern但我对服务器中已有的消息有疑问 目前我
  • 为什么我无法使用 python 建立与rabbitMQ的连接?

    我正在学习如何使用rabbitMQ 我正在 MacBook 上运行rabbit MQ 服务器并尝试与 python 客户端连接 我按照安装说明进行操作here http www rabbitmq com install homebrew h
  • 发送消息到任意虚拟主机/与 RabbitMQ / Spring AMQP 交换

    I use RabbitMQ and Spring AMQP发送消息 我有这个片段 rabbitTemplate convertAndSend exchange key object 当我对单个操作进行操作时 这有效VHOST 我必须从 1
  • 如何根据条件限制并发消息消耗

    场景 我已经简化了事情 许多最终用户可以从前端 Web 应用程序 生产者 开始工作 繁重的工作 例如渲染大型 PDF 这些作业被发送到单个持久的 RabbitMQ 队列 许多工作应用程序 消费者 处理这些作业并将结果写回到数据存储中 这个相
  • 如何在 celery task.apply_async 中使用优先级

    我有一个testcelery 中的队列 我为它定义了一个任务 celery app task queue test ignore result True def priority test priority print priority 它
  • 保持鼠兔 BlockingConnection 存活而不禁用心跳

    我正在使用 pika 0 10 0 和 python 2 7 版本开发 RabbitMQ 消费者 在我的消费者客户端中 我有一个根据输入消息运行一段时间的进程 时间可能从 3 到 40 分钟不等 我不想禁用心跳 相反 我正在寻找一些回滚机制
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • Akka 的语言和产品替代品是什么?

    现在我正在看游戏框架 https www playframework com 并且非常喜欢它 Play 中提供的功能中最受宣传的部分之一是Akka http akka io 为了更好地理解 Akka 以及如何正确使用它 您能告诉我其他语言或
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • rabbitmq-erlang-client,使用 rebar 友好的 pkg,在开发环境上工作在 rebar 版本上失败

    我成功地将rabbitmq erlang client的rebar友好包用于一个简单的Hello World rebarized和OTP 兼容 应用程序 并且在开发环境中工作正常 我能够启动 erl 控制台并执行我的操作applicatio
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • RabbitMQ - 如何死信/处理过期队列中的消息?

    我有一个队列x expires放 我遇到的问题是我需要对队列中的消息进行进一步处理IF队列过期 我最初的想法是设置x dead letter exchange在队列中 但是 当队列过期时 消息就会消失而不会进入死信交换 如何处理死信或以其他
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • Rabbit mq - 等待 Mnesia 表时出错

    我已经在 Kubernetes 集群上使用 Helm Chart 安装了 RabbitMQ rabbitmq pod不断重新启动 在检查 pod 日志时 我收到以下错误 2020 02 26 04 42 31 582 warning lt

随机推荐

  • cygwin的git vscode中的使用

    背景 需要用到cygwin 编辑器是vscode 版本 vscode 版本1 55 cygwin版本2 11 2 1 cygcheck c cygwin cygwin的git2 17 cygcheck c git vscode报错 open
  • 算法篇-------贪心2

    文章目录 题目1 活动选择 题目2 无重叠区间 题目3 最多可以参加的会议数目 题目4 去除重复字母 题目5 移掉K位数字 题目6 拼接最大数 题目1 活动选择 有n个需要在同一天使用同一个教室的活动a1 a2 an 教室同一时刻只能由一个
  • linux指令_龙红云

    一 基础指令 1 ls指令 ls 列出当前目录下的所有文件 文件夹的名字 ls root 列出root下的所有文件 文件夹的名字 ls l 以详细列表的形式展示 ls la ls a 显示所有文件 文件夹 包含了隐藏文件 文件夹 ls lh
  • Spring学习总结

    因为是学习总结 所以参考了很多资料做的博客 如果有侵权 请联系我 写的不对的欢迎指出 Spring 开源的轻量级框架 Spring核心 IOC 控制反转控 制反转还有一个名字叫做DI Dependency Injection 中文意思叫依赖
  • Python轻量级Web框架Flask(9)——图书馆项目

    1 项目要求 创建一个项目 用来说明出版社 书籍和作者的关系 作者和书籍之间的关系 1对多 一本书由一个作者完成 一本书可以有多个创作者 出版社和书籍之间的关系 多对多 一个出版社可以出版多本书 一本书可以由多个出版社出版 要求 1 在书籍
  • React项目开发常用API

    记录一下React项目开发常用API 作为后续开发的参考 路由配置 配置文件形式 router index js import React lazy Suspense from react Suspense配合lazy实现懒加载 const
  • Golang 微框架 Gin 简介

    框架一直是敏捷开发中的利器 能让开发者很快的上手并做出应用 甚至有的时候 脱离了框架 一些开发者都不会写程序了 成长总不会一蹴而就 从写出程序获取成就感 再到精通框架 快速构造应用 当这些方面都得心应手的时候 可以尝试改造一些框架 或是自己
  • element-ui的table动态渲染表头

  • ruoyi管理系统+微信小程序登录解决

    原料 ruoyi管理系统项目模板 基于springboot 微信小程序项目 阿里云服务器 有域名 有ssl证书 阿里云服务器的ssl证书可以买一个仅限一年免费的那个证书 注意事项 1 微信小程序调用云服务器接口需要https csdn ne
  • Android:usb转232串口通信

    准备工作 首先得adb进入盒子root模式 将 dev ttys1这个文件改为777 使得所有用户可操作 adb root adb remount adb shell 进入设备的root模式 执行 chmod 777 dev ttys1 执
  • 项目 谷粒学院Day16-18

    Day 16 07 13 统计分析模块 后台 准备工作 创建统计表 创建service statistics模块 使用代码生成器生成代码 创建启动类 SpringBootApplication ComponentScan basePacka
  • 《Hadoop权威指南》书摘-HDFS概述

    转载请注明出处 独立博客 http wangnan tech 简书 http www jianshu com u 244399b1d776 知乎 https zhuanlan zhihu com c 121958856 设计 超大文件 已经
  • pthread_cond_timedwait

    pthread cond timedwait pthread cond signal
  • Keepalived的权值问题(priority与weight)

    在Keepalived集群中 其实并没有严格意义上的主 备节点 虽然可以在Keepalived配置文件中设置 state 选项为 MASTER 状态 但是这并不意味着此节点一直就是Master角色 控制节点角色的是Keepalived配置文
  • 【深度解析→博文总结】李宏毅机器学习2023作业02Classification(Framewise Phoneme Prediction)

    文章目录 系列文章 简要说明 视频分享 作业详情 调参记录 Simple Baseline 0 49798 Medium Baseline 0 66440 Stong Baseline 0 74944 Boss Baseline 0 830
  • oracle rac 性能影响参数 - MTU

    在项目中遇到了数据导入特别缓慢的问题 或者是建立索引特别慢 在优化数据库无果之后 主机方面找到了问题的原因 root rac1 netstat nai Kernel Interface table Iface MTU Met RX OK R
  • R2dbc连接操作mysql

    R2dbc操作mysql 依赖
  • 公网IP查询方法

    1 我们平时在家用的网络就是公网 直接在命令行运行ipconfig查询即可 2 如果是公司内或者学校内的局域网 要查询公网IP有以下方式 1 查自己本机IP 打开http ip cn 显示的IP就是本机IP 2 打开 http site i
  • Java 构建 HashCode 相同的字符串

    在查看 hashcode 源码的时候 想模拟红黑树的生成 发现不知如何构建 HashCode 相同的值 从源码看HashCode 的生成 jdk1 8 为例 都知道 Object 有一个通用的 hashcode 方法 但该方法被 nativ
  • RabbitMQ重试机制

    1 RabbitMQ重试机制的简介 RabbitMQ 不会为未确认的消息设置过期时间 它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开 这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久