Rabbitmq消息的有序性、消息不丢失、不被重复消费

2023-11-20

如何保证消息的顺序性

如图所示,RabbitMQ保证消息的顺序性,就是拆分多个 queue,每个 queue 对应一个 consumer(消费者),就是多一些 queue 而已,确实是麻烦点;

或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

如何保证消息不丢失?

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

Channel channel = connection.createChannel();
channel.confirmSelect();      // 开启confirm模式

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中。

rabbitmq的持久化分为队列持久化、消息持久化和交换器持久化。一般三个都要设置持久化

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以手动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

如何保证消息不被重复消费? (幂等性)

方式1: 消息全局 ID 或者写个唯一标识(如时间戳、UUID 等) :每次消费消息之前根据消息 id 去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局 ID 作为数据库表的主键,防止重复)

方式2: 利用 Redis 的 setnx 命令:给消息分配一个全局 ID,消费该消息时,先去 Redis 中查询有没消费记录,无则以键值对形式写入 Redis ,有则不消费该消息。

死信队列

生产者生产一条1分钟后超时的订单信息到正常交换机exchange-a中,消息匹配到队列queue-a,但一分钟后仍未消费。

消息会被投递到死信交换机dlx-exchange中,并发送到私信队列中。

死信队列dlx-queue的消费者拿到信息后,根据消息去查询订单的状态,如果仍然是未支付状态,将订单状态更新为超时状态。

如何处理消息堆积情况?

场景题:几千万条数据在MQ里积压了七八个小时。

1、出现该问题的原因:

消息堆积往往是生产者的生产速度与消费者的消费速度不匹配导致的。有可能就是消费者消费能力弱,渐渐地消息就积压了,也有可能是因为消息消费失败反复复重试造成的,也有可能是消费端出了问题,导致不消费了或者消费极其慢。比如,消费端每次消费之后要写mysql,结果mysql挂了,消费端hang住了不动了,或者消费者本地依赖的一个东西挂了,导致消费者挂了。

所以如果是 bug 则处理 bug;如果是因为本身消费能力较弱,则优化消费逻辑,比如优化前是一条一条消息消费处理的,那么就可以批量处理进行优化。

2、临时扩容,快速处理积压的消息:

(1)先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉;

(2)临时创建原先 N 倍数量的 queue ,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue 中;

(3)接着,临时征用 N 倍的机器来部署 consumer,每个 consumer 消费一个临时 queue 的数据

(4)等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer 机器消费消息。

这种做法相当于临时将 queue 资源和 consumer 资源扩大 N 倍,以正常 N 倍速度消费。

4、MQ长时间未处理导致MQ写满的情况如何处理:

如果消息积压在MQ里,并且长时间都没处理掉,导致MQ都快写满了,这种情况肯定是临时扩容方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到mq里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低MQ的压力,然后在流量低峰期时去手动查询重导丢失的这部分数据

5 短时间内无法扩容或者扩容无法完全解决问题的话:

可以尝试降低一些非核心业务的消息处理,其次可以通过监控排查,优化消费者端的业务代码或者查看是否存在一些消息被重复消息的情况。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rabbitmq消息的有序性、消息不丢失、不被重复消费 的相关文章

随机推荐

  • 如何在 Ubuntu 22.04 上使用 uWSGI 和 Nginx 为 Flask 应用程序提供服务

    介绍 在本指南中 您将使用FlaskUbuntu 22 04 上的微框架 本文的大部分内容将讨论如何设置uWSGI应用服务器以及如何启动应用程序和配置Nginx充当前端反向代理 先决条件 在开始本指南之前 您应该 安装了 Ubuntu 22
  • 如何使用 Nginx、Let's Encrypt 和 Docker Compose 保护容器化 Node.js 应用程序

    介绍 有多种方法可以增强您的灵活性和安全性Node js应用 用一个反向代理 like Nginx为您提供负载平衡请求 缓存静态内容和实施传输层安全 TLS 在服务器上启用加密 HTTPS 可确保与应用程序之间的通信保持安全 在容器上使用
  • Python 中的 numpy.append()

    Python numpyappend 函数用于合并两个数组 该函数返回一个新数组 原数组保持不变 NumPy append 语法 函数语法为 numpy append arr values axis None The arr可以是类似数组的
  • 如何使用 Apache 和 Nginx 创建临时和永久重定向

    介绍 HTTP 重定向或 URL 重定向是一种将一个域或地址指向另一个域或地址的技术 重定向有很多用途 并且需要考虑几种不同类型的重定向 每当站点需要将请求一个地址的人定向到另一个地址时 就会使用重定向 当您创建内容和管理服务器时 您经常会
  • eclipse.ini vm 参数 - eclipse.ini 文件位置 Mac、Windows

    eclipse ini是用于控制Eclipse启动的配置文件 我们可以使用 Xms Xmx 参数配置 Eclipse VM 参数 例如要使用的 JDK eclipse ini vm permgen 空间 最大和最小堆大小 eclipse i
  • 如何在 Java 中将字符串转换为数组

    有时我们不得不分开字符串到数组基于分隔符或某些正则表达式 例如 读取 CSV 文件行并解析它们以将所有数据获取到字符串数组 在本教程中 我们将学习如何在 Java 程序中将字符串转换为数组 Java 中的字符串到数组 字符串类split S
  • Java优先级队列

    我们时不时地需要以特定的顺序处理队列中的项目 优先级队列是一种完成这项工作的数据结构 Java优先级队列与 普通 不同queue 它不是 先进先出 而是按优先级顺序检索项目 Java优先级队列 The java util PriorityQ
  • Java 中的 CallableStatement 示例

    java中的CallableStatement用于从java程序中调用存储过程 存储过程是我们在数据库中为某些任务编译的一组语句 当我们处理复杂场景的多个表时 存储过程很有用 我们可以将所需的数据发送到存储过程 并在数据库服务器本身中执行逻
  • Docker 详解:如何容器化并使用 Nginx 作为代理

    Status 已弃用 本文已弃用 不再维护 Reason 本文中的技术已经过时 可能不再反映 Docker 最佳实践 请参阅 Docker 生态系统 常用组件简介 如何在 Ubuntu 14 04 上的 Docker 容器中运行 Nginx
  • 如何在 Ruby 中使用字符串方法

    介绍 Ruby strings有许多内置方法可以轻松修改和操作文本 这是许多程序中的常见任务 在本教程中 您将使用字符串方法来确定字符串的长度 索引和拆分字符串以提取子字符串 添加和删除空格和其他字符 更改字符串中字符的大小写以及查找和替换
  • Python 字符串包含

    Python String 类有 contains 我们可以使用它来检查它是否包含另一个字符串的函数 Python 字符串包含 Python字符串 contains 是一个实例方法 根据字符串对象是否包含指定的字符串对象 返回布尔值 Tru
  • 如何在 Ubuntu 14.04 上使用 Docker 和 Docker Compose 配置持续集成测试环境

    文章来自Docker 介绍 持续集成 CI 是指开发人员的实践整合尽可能频繁地编写代码 并且每次提交在合并到共享存储库之前和之后都会经过测试自动构建 CI speeds up your development process and min
  • Android BroadcastReceiver 示例教程

    今天我们将讨论并实现Android BroadcastReceiver 它是Android Framework中非常重要的组件 Android 广播接收器 Android BroadcastReceiver是android的一个休眠组件 用
  • 蟒蛇集

    In this tutorial we are going to learn Python Set In our previous article we learnt about Python String You can learn it
  • 如何在 Ubuntu 12.10 上使用 Python 创建 Nagios 插件

    介绍 Python 是 Linux 上默认提供的流行命令处理器 我们之前已经介绍过如何在Ubuntu 12 10 x64上安装Nagios监控服务器 这次 我们将扩展这个想法并使用 Python 创建 Nagios 插件 这些插件将在客户端
  • 如何编辑 Sudoers 文件

    介绍 权限分离是 Linux 和类 Unix 操作系统中实现的基本安全范例之一 普通用户以有限的权限进行操作 以减少对自己环境 而不是更广泛的操作系统 的影响范围 一个特殊的用户 称为root has 超级用户特权 这是一个管理帐户 没有普
  • JSTL 教程、JSTL 标签示例

    JSTL 代表JSP 标准标签库 JSTL 是标准标记库 它提供标记来控制 JSP 页面行为 JSTL 标签可用于迭代和控制语句 国际化 SQL 等 我们将在本 JSTL 教程中详细研究 JSTL 标签 之前我们看到了如何使用JSP EL
  • 了解 Python 3 中的类继承

    介绍 面向对象编程创建可重用的代码模式 以减少开发项目中的冗余 面向对象编程实现可回收代码的一种方法是通过继承 此时一个子类可以利用另一个基类的代码 本教程将介绍 Python 中继承的一些主要方面 包括父类和子类如何工作 如何重写方法和属
  • 电商java 面试题_JAVA电商项目面试题(一)

    需要按照功能点把系统拆分 拆分成独立的功能 单独为某一个节点添加服务器 需要系统之间配合才能完成整个业务逻辑 叫做分布式 集群 同一个工程部署到多台服务器上 优点 1 把模块拆分 使用接口通信 降低模块之间的耦合度 2 把项目拆分成若干个子
  • Rabbitmq消息的有序性、消息不丢失、不被重复消费

    如何保证消息的顺序性 如图所示 RabbitMQ保证消息的顺序性 就是拆分多个 queue 每个 queue 对应一个 consumer 消费者 就是多一些 queue 而已 确实是麻烦点 或者就一个 queue 但是对应一个 consum