RabbitMQ解决消息幂等性问题

2023-10-31

前言

关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费。使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解。

1. RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?
使用重试机制,RabbitMQ默认开启重试机制。
实现原理:

  • @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务
    如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端

注意:

  • 默认会一直重试到消费者不抛异常为止,这样显然不好。我们需要修改重试机制策略,如间隔3s重试一次)
    配置:
spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

2. 如何合理选择重试机制?

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本


总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job进行健康检查+人工进行补偿

3. 调用第三方接口自动实现补偿机制

我们知道了,RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

4. 如何解决消费者幂等性问题,防止重复消费 (MQ重试机制需要注意的问题)

产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。

面试题:MQ中消费者如何保证幂等性问题,不被重复消费?

在这里插入图片描述
伪代码:
生产者核心代码:

请求头设置消息id(messageId)

@Component
public class FanoutProducer {
	@Autowired
	private AmqpTemplate amqpTemplate;

	public void send(String queueName) {
		String msg = "my_fanout_msg:" + System.currentTimeMillis();
		//请求头设置消息id(messageId)
		Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
				.setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
		System.out.println(msg + ":" + msg);
		amqpTemplate.convertAndSend(queueName, message);
	}
}

消费者核心代码:

@RabbitListener(queues = "fanout_email_queue")
	public void process(Message message) throws Exception {
		// 获取消息Id
		String messageId = message.getMessageProperties().getMessageId();
		String msg = new String(message.getBody(), "UTF-8");
		//② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可
		//从redis中获取messageId的value
		String value = redisUtils.get(messageId)+"";
		if(value.equals("1") ){ //表示已经消费
			return; //结束
		}
		System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
		JSONObject jsonObject = JSONObject.parseObject(msg);
		// 获取email参数
		String email = jsonObject.getString("email");
		// 请求地址
		String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
		JSONObject result = HttpClientUtils.httpGet(emailUrl);
		if (result == null) {
			// 因为网络原因,造成无法访问,继续重试
			throw new Exception("调用接口失败!");
		}
		System.out.println("执行结束....");
		//① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)
	}


5. SpringBoot整合RabbitMQ应答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack
        acknowledge-mode: manual

2.消费者增加代码:
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手动ack
channel.basicAck(deliveryTag, false);手动签收

//邮件队列
@Component
public class FanoutEamilConsumer {
	@RabbitListener(queues = "fanout_email_queue")
	public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
		System.out
				.println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
						+ ",messageId:" + message.getMessageProperties().getMessageId());
		// 手动ack
		Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
		// 手动签收
		channel.basicAck(deliveryTag, false);
	}
}




---------------------------------------------- 纸上得来终觉浅,绝知此事要躬行 ------------------------------------------
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ解决消息幂等性问题 的相关文章

  • 如何防止mq的消息丢失

    分为两种情况 1 主要在消费方 当消息从mq传到消费者时 消费者可能消费消息失败 这时mq中的消息已经自动删除了 导致消息的丢失 需要使用ack机制来保证消息不会丢失 当消费者从mq中拿到消息时 这个消息在mq中不删除 而是消费者对消息进行
  • RabbitMQ--交换机、队列、消息

    交换机 autoDelete 如果设置为true 唯一的一个交换机或者队列解绑 那么该队列将会被自动删除 交换机类型 faout 经过交换机的消息直接转到所有与这个交换器绑定的队列中 无视rounting key的存在 direct 经过交
  • Return消息机制

    Return Listener 用于处理一些不可路由的消息 消息生产者通过制定一个Exchane和RoutingKey 把消息送达到某一个队列中去 消费者监听队列进行消费处理 但是在某些情况下 发送消息的时候 当前Exchange不存在或制
  • mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给

    mq topic持久化订阅者 topic queue的producer setDeliveryMode DeliveryMode PERSISTENT 是指的mq服务 queue的消费者不在也会给他保留 topic只有持久化订阅者会保留 1
  • 《消息队列高手课》主题和队列有什么区别?

    如果你研究过超过一种消息队列产品 你可能已经发现 每种消息队列都有自己的一套消息模型 像队列 Queue 主题 Topic 或是分区 Partition 这些名词概念 在每个消息队列模型中都会涉及一些 含义还不太一样 为什么出现这种情况呢
  • 【MQ】kafka(四)——kafka消费者如何消费的?如何防止重复消费?如何顺序消费?

    一 前言 前面博客小编向大家分享了 kafka如何保证消息不丢失 基本是从producer和broker来分析的 producer要支持重试和acks producer要做好副本和及时刷盘落地 这篇博客呢 就跟大家一起聊一下 kafka 消
  • rocketmq消息重试和死信队列

    1 消息重试 若Consumer消费某条消息失败 则RocketMQ会在重试间隔时间后 将消息重新投递给Consumer消费 若达到最大重试次数后消息还没有成功被消费 则消息将被投递至死信队列 消息重试只针对集群消费模式生效 广播消费模式不
  • 《消息队列高手课》内存管理:如何避免内存溢出和频繁的垃圾回收?

    不知道你有没有发现 在高并发 高吞吐量的极限情况下 简单的事情就会变得没有那么简单了 一个业务逻辑非常简单的微服务 日常情况下都能稳定运行 为什么一到大促就卡死甚至进程挂掉 再比如 一个做数据汇总的应用 按照小时 天这样的粒度进行数据汇总都
  • kafka生产者幂等与事务

    目录 前言 幂等 事务 总结 参考资料 前言 Kafka 消息交付可靠性保障以及精确处理一次语义的实现 所谓的消息交付可靠性保障 是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺 常见的承诺有以下三
  • 【RocketMQ】设计理念与核心概念扫盲

    RocketMQ 设计理念与核心概念扫盲 文章目录 RocketMQ 设计理念与核心概念扫盲 一 RocketMQ的设计理念和目标 1 1 设计理念 1 2 设计目标 二 RocketMQ的核心概念扫盲篇 2 1 部署架构 2 1 1 Na
  • JAVA消息(第一篇)JMS 很重要!!!!包教包会!!不闹!!!下一篇-AMQP(wire-level protocol)

    如果看完 进入第二篇AMQP 首先大致讲一下 java 消息模块 消息 个人理解分为两种 1 同步消息 RPC调用 2 异步消息 本篇讲解部分 一 同步消息java提供了多种方案 最新比较常用的方式就是spring Http invoker
  • rabbitmq简介

    开发十年 就只剩下这套Java开发体系了 gt gt gt 1 AMQP AMQP协议是一个高级抽象层消息通信协议 RabbitMQ是AMQP协议的实现 它主要包括以下组件 1 1 Server broker 接受客户端连接 实现AMQP消
  • 《消息队列高手课》传输协议:应用程序之间对话的语言

    传输协议就是应用程序之间对话的语言 设计传输协议 并没有太多规范和要求 只要是通信双方的应用程序都能正确处理这个协议 并且没有歧义就好了 这节课 我们就来说一下设计高性能传输协议的一些方法和技巧 如何 断句 既然传输协议也是一种语言 那么在
  • RabbitMQ MQTT集群方案官方说明

    RabbitMQ MQTT 官方网说明 官方地址 https www rabbitmq com mqtt html 从3 8开始 该MQTT插件要求存在一定数量的群集节点 这意味着三分之二 五分之三 依此类推 该插件也可以在单个节点上使用
  • rabbitmq重试机制

    1 应答模式 NONE 可以称之为自动回调 即使无响应或者发生异常均会通知队列消费成功 会丢失数据 AUTO 自动检测异常或者超时事件 如果发生则返回noack 消息自动回到队尾 但是这种方式可能出现消息体本身有问题 返回队尾其他队列也不能
  • 事务提交后发送MQ消息

    前言 本文主要介绍关于MQ使用过程中 通过场景分析为什么要使用事务控制 以及事务如何实现 场景分析 为什么我们在使用MQ的时候需要考虑结合事务 试想一下 我们平时使用Mq发送消息的通用场景是不是 生产者和MQ集群建立连接 并发送消息 消费者
  • RocketMQ的消息优先级

    有些场景 需要应用程序处理几种类型的消息 不同消息的优先级不同 RocketMQ是个先入先出的队列 不支持消息级别或者Topic级别的优先级 业务中简单的优先级需求 可以通过间接的方式解决 下面列举三种优先级相关需求的具体处理方法 第一种
  • MQ的基本原理是怎样的?MQ的优点有哪些?MQ的核心概念包括哪些?MQ的消息传递模式有哪些?

    1 MQ的基本原理是怎样的 MQ 消息队列 是一种面向消息的中间件 用于实现不同系统之间的异步通信 其基本原理如下 生产者 Producer 将消息发送到消息队列中 而不是直接发送给消费者 Consumer 消息队列将接收到的消息存储在内部
  • 如何清除 MassTransit 队列?

    我想在集成测试设置例程中删除队列中的所有消息 如何实现 谷歌搜索 智能感知暴力没有运气 如果重要的话 我使用 RabbitMq 作为传输 无法从 MassTransit 内的队列中 删除 对于测试 您可以通过使用临时的随机队列 URIrab
  • JMS MQ绑定问题

    我在使用 MDB 与 MQ 队列绑定 JMS 时遇到了这个奇怪的问题 消息驱动的 EJB CrbEventMessageAsynchronousService无法连接到 JMS 目标 queue contratto crb input pu

随机推荐

  • CVE-2022-24112 Apache APISIX 命令执行漏洞复现

    CVE 2022 24112 Apache APISIX 命令执行漏洞 Apache APISIX 是 Apache 软件基金会下的云原生 API 网关 它兼具动态 实时 高性能等特点 提供了负载均衡 动态上游 灰度发布 金丝雀发布 服务熔
  • 如何去编写一个C++程序

    如何去编写一个C 程序 1 防卫式声明 2 写class的头 3 考虑复数准备什么数据 4 考虑复数准备哪些函数 5 类外定义函数 成员函数 6 类外定义函数 非成员函数即全局函数 7 调用函数 学习侯捷老师讲授课程C 面向对象高级开发 总
  • 在浏览器中输入URL中会发生什么

    面试官问 在浏览器中输入URL中会发生什么 我们首先分析下这个问题 这是一个很宽泛 细节非常非常多的一个问题 如果要展开来细说我们可以直接从应用层的协议 讲到传输层 网络层 再到数据链路层 可是我们这里不建议大家一开始就深挖协议中的细节 原
  • Unity 视频播放

    Unity 视频播放 前言 在Unity引擎做视频播放的方式有很多种 这里介绍两种 一种是使用RawImage组件的纹理进行视频播放 将视频每一帧的画面复制在RawImage的纹理贴图中 实现视频的播放展示 另外一种是使用插件Av Pro进
  • node.js系统学习4-sync&&async

    async异步 sync同步 阻塞 非阻塞 https www runoob com nodejs nodejs callback html
  • PASCAL VOC 2012 数据集解析

    目录 一 Introduction Classification Detection Competitions Segmentation Competition Action Classification Competition Image
  • IDEA创建第一个spring boot项目提示cannot resolve xxx等错误

    在学习spring boot课程 精通spring boot 42讲 时 根据课程安装好maven和jdk之后 准备开始使用IDEA创建第一个spring boot项目 使用插件spring assistant创建好项目之后 却发现包org
  • python爬虫时报错

    我用的是Python3 66版本 最近写了一段爬虫代码 在DEBUFG模式运行时没有报错 但是普通模式运行时就报下面这个错误 wrap socket argument 1 must be socket socket not SSLSocke
  • (附源码)springboot垃圾自动分类管理系统 毕业设计 160846

    摘 要 随着现在网络的快速发展 网络的应用在各行各业当中它很快融入到了许多分类管理之中 他们利用网络来做这个垃圾自动分类的网站 随之就产生了 垃圾自动分类管理系统 这样就让垃圾自动分类管理系统更加方便简单 对于本垃圾自动分类管理系统的设计来
  • Matlab绘图?学会一招就够了——源代码

    hello 欢迎来到我的博客 你应该是b站来的小伙伴吧 谢谢你的关注 这篇博客的内容是以下这个视频的源代码 Matlab绘图 学会一招就够了 代码如下 clc clear all close all 生成图窗 clc 清楚command窗口
  • 双向卷积神经网络_基于双向特征融合卷积神经网络的液晶面板缺陷检测算法

    基于双向特征融合卷积神经网络的液晶面板缺陷检测算法 彭大芹 刘恒 许国良 邓柯 摘 要 摘要 针对手机液晶面板生产工业中缺陷检测面临的精度低的问题 提 出了一种基于深度学习的液晶面板缺陷检测算法 该算法在传统单向特征融合 的基础上提出了双向
  • Windows上运行Caffe自带的mnist例子

    环境要求 win10 VS2013 CUDA7 5 Caffe的windows版下载地址 https github com Microsoft caffe 配置Caffe工程 1 将Caffe源码的windows目录下CommonSetti
  • 第十三届蓝桥杯大赛软件赛决赛(Java 大学A组)

    蓝桥杯 2022年国赛真题 Java 大学A组 试题 A 火柴棒数字 试题 B 小蓝与钥匙 试题 C 内存空间 试题 D 斐波那契数组 试题 E 交通信号 试题 F 数组个数 试题 G 六六大顺 试题 H 选素数 试题 I 图书借阅 试题
  • GFS浅谈

    GFS Google File System Big Table Map Reduce作为google的三宝技术 是Google诸多服务的基石 我想就自己的理解对GFS文件系统作一个分析 GFS首先是属于分布文件系统 但作为Google公司
  • PAT C入门题目练习-7-90 螺旋方阵 (20 分)

    7 90 螺旋方阵 20 分 所谓 螺旋方阵 是指对任意给定的N 将1到N N的数字从左上角第1个格子开始 按顺时针螺旋方向顺序填入N N的方阵里 本题要求构造这样的螺旋方阵 输入格式 输入在一行中给出一个正整数N lt 10 输出格式 输
  • C#中的一些基本方式总结

    目录 1 委托delegate c 中的将方法作为参数传递 2 堆和栈 3 override重写 4 new关键字 如果子类声明了和父类同样的方法 但用new声明了 会隐藏掉父类的方法 5this和base 6 sealed类 7 泛型方法
  • python中的sort排序_python的sort()怎么排序

    python中的sort 函数只能应用在列表list上 而sorted可以对所有可迭代的对象进行排序的操作 sort方法会在原list上直接进行排序 不会创建新的list 而sorted方法不会对原来的数据做任何改动 排序后的结果是新生成的
  • JavaSE + bluecove 蓝牙连接

    最近公司准备将旧系统的 NET部分翻版 项目除了有后台的还有个与设备对接的客户端用蓝牙连接的 所有这周对相关技术做了一个验证 搜了一下Java 蓝牙相关信息 我去资料也太少了 少也就算了连bluecove库也是有问题的 经过艰难的查找 最终
  • 内核编译出错 [arch/arm/boot/compressed/piggy.lzo] Error 1

    项目场景 linux4 x 内核编译出现错误 问题现象 编译打印输出 arch arm boot compressed Makefile 180 recipe for target arch arm boot compressed pigg
  • RabbitMQ解决消息幂等性问题

    前言 关于MQ消费者的幂等性问题 在于MQ的重试机制 因为网络原因或客户端延迟消费导致重复消费 使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解 1 RabbitMQ自动重试机制 消费者在消费消息的时候 如果消费者业