MQ消息队列的重复消费问题的通用解决办法以及幂等性的原理

2023-11-08

详细介绍了MQ消息队列重复消费的原因,以及通过保证幂等性来避免重复消费带来的问题。

1 至少一次

消息领域有一个对消息投递的QoS定义(Quality of Service,服务质量),分为:最多一次(At most once)、至少一次(At least once)、仅一次( Exactly once)。

目前火热的几款MQ,比如RocketMQ、Kafka、RabbitMQ、ActiveMQ等,都是保证的至少一次(At least Once),它指每个消息必须投递一次。既然是至少一次,那么他们就避免不了重复消费的问题,因此这个问题最终还是要靠业务代码来解决。

2 重复消费的原因

重复消费的的原因大概可以分为两个,一个是生产者发送消息的时候发送了重复的消息,另一个是消费者消费的时候消费了重复的消息。

生产者发送消息的时候,如果我们只管发送,而不需要等待Broker响应消息发送成功,那么发往Broker的消息是不会重复的,除非自己的业务出错了(当然这也是一个原因)。

但是为了保证消息的可靠性,我们不可能调用发送接口就完事儿了,必须还得等待Broker的响应,这时就有可能出现问题了,如果某次发送消息之后,Broker的响应由于网络波动一时没有收到,那么当这个响应超出时间之后,通常生产者会因为没有收到响应而认为这条消息没有发送成功,此时生产者又会重复发送一次,最终导致两条消息都发送成功了,消息队列有两条重复的消息,这样就导致了消息的重复。

消费者消费消息的时候,如果业务逻辑已经走完了,那么就需要进行commit提交offset,如果此时消费者挂了,那么被消费但是没有被提交的消息将被认为是没有消费成功的消息而被分发到其他消费者上,导致一条消息被重复消费。

以上两个原因都是可能导致消息重复消费的原因,我们也能知道,网络波动或者服务挂掉等原因是不可能被100%避免的,因此消息的重复消费也不可能被避免。另外,RocketMQ消费者Rebalance的时候,可能出现拉取了消息还没有被成功消费并提交的时候,该队列被分配给了其他消费者,导致多个消费者消费同样的消息。

RocketMQ、Kafka等消息队列的说明中已经明确表示,消息队列本身不能处理和避免重复消费的情况,这需要业务人员自己处理。

既然无法避免重复消费,并且消息队列也无法处理,那么从业务或者说代码的角度处理重复消息呢?关键点就是幂等性。

3 幂等性处理重复消费

重复消费会带来的问题呢?如果消费者的业务逻辑是执行一个查询的逻辑,那么无论消费几次,这对于业务来说是没有太大的影响的,但如果消费者的业务逻辑是向数据库中插入一条数据,那么重复消费将会导致至少插入两条相同的数据,自然导致了数据的异常。

那么幂等是什么意思呢?幂等(idempotent、idempotence)实际上是一个数学与计算机学概念,在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同,通俗点说就是:同样的参数或者数据去调用同一个接口,无论重复调用多少次,总能保证数据的正确性,不能出错,这就是接口的幂等性。这里“数据的正确性”和具体的业务相关,不同的业务,对于幂等性的定义是不一样的。

基于幂等性的要求,我们需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。怎么改造呢?这得结合具体的业务来考虑:

  1. 如果一个业务是只读业务,或者是更新的业务,那么多次读取或者多次更新相同的数据基本上都没什么问题。
  2. 如果业务需要插入数据,但是插入的数据中有唯一key能够区分(比如订单id,或者生产者生成的token),那么业务逻辑就可以变成在插入前先查询这个唯一key是否在数据库中,如果消费方的业务表不需要存储这个key,那么消费方也可以单独建立一张唯一key表,插入唯一key表和插入业务表的sql逻辑一定要都在一个事务中,在插入前判断这个唯一key是否在数据库的唯一key表中,如果存在说明此前已经成功消费了这条消息,不再消费,否则就是没有成功消费,继续消费。
  3. 如果业务需要插入数据,但是插入的数据中没有唯一key能够区分,这种是无法完全避免因为多条重复的消息或者一条消息多次重复消费带来的问题。因此最好是让发送消息的同事向消息内容中添加一个唯一的字段,即使消费者方的业务不需要也没关系,因为这样就能的防止重复的消息和重复消费了。
  4. 上面通过数据库的方式来防止重复消费的都属于“强校验”类型,会一定程度上影响数据库性能,通常涉及到金钱的都需要强校验,如果不需要强校验,那么使用Redis来代替也行,比如生产者将id或者token作为key存入Redis,消费者消费时先判断是否存在id或者token,如果存在则消费,消费完毕之后消除key。很多的业务都不需要强校验,比如获取登陆短信验证码的时候,多发送两条,或者发送失败都没关系,我们都遇到过。

上面的都是通用的办法,但处理重复消费的问题,始终要根据具体业务来考虑自己的解决办法,比如我们公司有很多监听一张表的binlog日志然后将操作同步到另一张表上的场景,这种情况下,某些消费者直接将原始表的id作为同步表的id,这样插入的时候,如果id重复肯定是插入不了的,天然的就保证了消息的幂等性。但有时候,同步的表是使用的自己的自增id,此时就需要在插入之前通过其他唯一的业务字段判断此数据是否已被消费过,如果被消费过,则不执行插入。

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MQ消息队列的重复消费问题的通用解决办法以及幂等性的原理 的相关文章

  • 使用 Tabula 通过 Python 读取 pdf 时出现 Java 错误

    我已经安装了 tabula 库 用于使用 python 将 pdf 读取到 pandas 数据框中 但是当我运行代码时 import tabula df tabula read pdf sample1 pdf pages 1 我得到了例外
  • java.lang.ClassNotFoundException:javax.mail.MessagingException

    我想使用 eclipse 将电子邮件从我的 gmail 帐户发送到另一个邮件帐户 我使用 apache tomcat 7 0 34 作为我的 Web 服务器 并使用端口 8080 作为 apache 服务器 HTTP 1 1 并使用 JRE
  • 与 Eclipse 中的 Java Content Assist 交互

    作为我的插件项目的一部分 我正在考虑与 Eclipse 在 Java 文件上显示的内容辅助列表进行交互 我正在尝试根据一些外部数据对列表进行重新排序 我看过一些有关创建新内容辅助的教程 但没有看到有关更改现有内容辅助的教程 这可能吗 如果是
  • 如何在 JPQL 或 HQL 中进行限制查询?

    在 Hibernate 3 中 有没有办法在 HQL 中执行相当于以下 MySQL 限制的操作 select from a table order by a table column desc limit 0 20 如果可能的话 我不想使用
  • 如何在 Java 中向时间戳添加/减去时区偏移量?

    我正在使用 JDK 8 并且玩过ZonedDateTime and Timestamp很多 但我仍然无法解决我面临的问题 假设我得到了格式化的Timestamp在格林威治标准时间 UTC 我的服务器位于某处 假设它设置为Asia Calcu
  • Android中如何使用JNI获取设备ID?

    我想从 c 获取 IMEIJNI 我使用下面的代码 但是遇到了未能获取的错误cls 它总是返回NULL 我检查了环境和上下文 它们都没有问题 为什么我不能得到Context班级 我在网上搜索了一下 有人说我们应该使用java lang Ob
  • Jframe 内有 2 个 Jdialogs 的 setModal 问题

    当我设置第一个选项时 我遇到了问题JDialog模态 第二个非模态 这是我正在尝试实现的功能 单击 测试对话框 按钮 一个JDialog有名字自定义对话框 主要的将会打开 如果单击 是 选项自定义对话框主 其他JDialog named 自
  • 从 MATLAB 调用 Java?

    我想要Matlab程序调用java文件 最好有一个例子 需要考虑三种情况 Java 内置库 也就是说 任何描述的here http docs oracle com javase 6 docs api 这些项目可以直接调用 例如 map ja
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • 将非 Android 项目添加到 Android 项目

    我在 Eclipse 中有三个项目 Base Server 和 AndroidClient Base和Server是Java 1 7项目 而AndroidClient显然是一个android项目 基础项目具有在服务器和 Android 客户
  • Android 无法解析日期异常

    当尝试解析发送到我的 Android 客户端的日期字符串时 我得到一个无法解析的日期 这是例外 java text ParseException 无法解析的日期 2018 09 18T00 00 00Z 位于 偏移量 19 在 java t
  • Java继承,扩展类如何影响实际类

    我正在查看 Sun 认证学习指南 其中有一段描述了最终修饰符 它说 如果程序员可以自由地扩展我们所知的 String 类文明 它可能会崩溃 他什么意思 如果可以扩展 String 类 我是否不会有一个名为 MyString 的类继承所有 S
  • Spring @Cacheable 和 @Async 注解

    我需要缓存一些异步计算的结果 具体来说 为了克服这个问题 我尝试使用 Spring 4 3 缓存和异步计算功能 作为示例 我们采用以下代码 Service class AsyncService Async Cacheable users C
  • 部署 .war 时出现 Glassfish 服务器错误:部署期间发生错误:准备应用程序时出现异常:资源无效

    我正在使用以下内容 NetBeans IDE 7 3 内部版本 201306052037 爪哇 1 7 0 17 Java HotSpot TM 64 位服务器虚拟机 23 7 b01 NetBeans 集成 GlassFish Serve
  • 手动设置Android Studio的JDK路径

    如何为 Android Studio 使用自定义 JDK 路径 我不想弄乱 PATH 因为我没有管理员权限 是否有某个配置设置文件允许我进行设置 如果您查看项目设置 您可以从那里访问 jdk 在标准 Windows 键盘映射上 您可以在项目
  • 列表过滤器内的 Java 8 lambda 列表

    示例 JSON id 1 products id 333 status Active id 222 status Inactive id 111 status Active id 2 products id 6 status Active
  • java XMLSerializer 避免复杂的空元素

    我有这个代码 DocumentBuilderFactory factory DocumentBuilderFactory newInstance DocumentBuilder builder factory newDocumentBuil
  • 抛出 Java 异常时是否会生成堆栈跟踪?

    这是假设我们不调用 printstacktrace 方法 只是抛出和捕获 我们正在考虑这样做是为了解决一些性能瓶颈 不 堆栈跟踪是在构造异常对象时生成的 而不是在抛出异常对象时生成的 Throwable 构造函数调用 fillInStack
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • Java 和/C++ 在多线程方面的差异

    我读过一些提示 多线程实现很大程度上取决于您正在使用的目标操作系统 操作系统最终提供了多线程能力 比如Linux有POSIX标准实现 而windows32有另一种方式 但我想知道编程语言水平的主要不同 C似乎为同步提供了更多选择 例如互斥锁

随机推荐