32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

2023-11-15

1. 消费者(红包系统)丢失消息的问题

前面两章中,阐述了如何确保订单系统发送出去的消息一定会到达MQ中,而且也能确保了如果消息到达了MQ如何确保一定不会丢失。

在整个消息的生产消费中,就剩下消费者这一端的问题了。

红包系统(消费者)拿到消息后,一定可以成功的派发红包吗?

如果红包系统已经拿到了这条消息,但是消息目前还在它的内存里,还没执行派发红包的逻辑,此时他就直接提交了该消息的offset到broker去说自己已经处理过了。

 在以上的场景中,一旦红包系统在执行派发红包逻辑之前就崩溃了,内存里的消息就没了,红包也没派发出去,结果Broker已经收到它提交的消息offset了,还以为它已经处理完这条消息了。

等红包系统(消费者)重启的时候,就不会再次消费这条消息了。

这里说明了,消费者在获取到消息之后还是可能会丢失消息的。

2.Kafka消费者的数据丢失问题

RocketMQ的各种技术思想,在Kafka等中间件中也是适用的。

在Kafka中,由于Kafka的消费者采用的消费的方式跟RocketMQ不同,如果按照Kafka的消费模式,就会产生数据丢失的风险。

Kafka消费者在拿到一批消息,还没来得及处理时,就提交offset到broker去了,一旦在业务逻辑执行前消费者系统就挂掉了,这批消息就再也没机会处理了,因为它重启后不会再次获取提交过offset的消息。

3.RocketMQ消费者的不同之处

RocketMQ的消费者和Kafka的消费者有较大不同。

下列是RocketMQ消费者的代码:

重点看这里的小块内容:

 RocketMQ的消费者中会注册一个监听器,也就是 MessageListenerConcurrently 这个东西,当你的消费者获取到一批消息之后,就会回调这个监听器函数,让你来处理这一批消息。

然后当你处理完毕之后,才会返 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了。

解决消费者丢失消息的思路是,只要你的红包系统(消费者)是在这个监听器的函数中先处理一批消息,基于这批消息都派发完了红包,然后返回了消费成功的状态,接着才会去提交这批消息的offset到broker去。

在这种情况下,当你对一批消息都处理完毕了,然后再提交消息的offset给broker,接着红包系统(消费者)崩溃了,此时是不会丢失消息的。

 在这种机制下,如果消费者获取到一批消息之后,还没处理完,也就是没返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS 这个状态,同时也就是说没提交这批消息的offset给broker的时候,消费者突然挂了。

在上面说的那种情况下,消费者对这一批消息都没提交它的offset给broker的话,broker就不会认为你已经处理完了这批消息,此时消费者的一台机器宕机了,broker会感知到你的红包系统(消费者)机器作为一个Consumer挂了。

就会把你没处理完的那批消息叫个红包系统的其他机器去进行处理,所以在这种情况下,消息是绝对不会丢失的。

4. 关键点:不能异步消费消息

 只有在同步模式下,才能是必须处理完一批消息了,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态表示消息表示处理结束了,去提交offset到broker去。

而在异步模式下,如果在代码中对消息进行异步的处理,比如开启了一个子线程去处理该批消息,然后启动线程之后,就直接返回了ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了。而此时子线程可能还没处理完业务,却已经返回标识状态并提交了offset。此时一旦红包系统(消费者)宕机了,MQ会认为已经消费完成,而红包系统由于未执行完成派发红包业务,就会导致消息的丢失。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

32 Consumer消息零丢失方案:手动提交offset + 自动故障转移 的相关文章

  • Java - 因内存不足错误而关闭

    关于如何最好地处理这个问题 我听到了非常矛盾的事情 并且陷入了以下困境 OOME 会导致一个线程崩溃 但不会导致整个应用程序崩溃 我需要关闭整个应用程序 但不能 因为线程没有剩余内存 我一直认为最佳实践是让它们离开 这样 JVM 就会死掉
  • Guice 忽略注入构造函数参数上的 @Nullable

    我正在使用 Guice v 3 0 并且有一个值被注入到构造函数中 该值可以为 null 因此我在构造函数中使用 Nullable 来自 javax annotations 注释了该参数 public MyClass Parameter1
  • Reactive Spring 不支持 HttpServletRequest 作为 REST 端点中的参数?

    我创建了一个 RestController 如下所示 RestController public class GreetingController RequestMapping value greetings method RequestM
  • Logback:SizeAndTimeBasedRollingPolicy 不遵守totalSizeCap

    我正在尝试以一种方式管理我的日志记录 一旦达到总累积大小限制或达到最大历史记录限制 我最旧的存档日志文件就会被删除 当使用SizeAndTimeBasedRollingPolicy在 Logback 1 1 7 中 滚动文件追加器将继续创建
  • Android 中 localTime 和 localDate 的替代类有哪些? [复制]

    这个问题在这里已经有答案了 我想使用从 android API 获得的长值 该值将日期返回为长值 表示为自纪元以来的毫秒数 我需要使用像 isBefore plusDays isAfter 这样的方法 Cursor managedCurso
  • 如何在java Spring Boot中实现通用服务类?

    我有许多具有重复代码的服务 我想知道如何实现通用服务 以便我的所有服务都可以扩展它 服务接口示例 重复代码 Service public interface IUserService List
  • 为自定义驱动程序创建 GraphicsDevice

    我正在开发一个在嵌入式系统中使用 Java 的项目 我有用于屏幕和触摸输入的驱动程序 以及用于文本输入的虚拟键盘 我的屏幕驱动程序有一个Graphics2D您可以绘制的对象和repaint Rectangle 更新方法 类似地 触摸驱动器能
  • 为什么 MOVE CURSOR 在 OS X Mountain Lion 上不显示?

    我正在做一个项目 想看看 Swing 提供的每个光标是什么样子的 public class Test public static void main String args JFrame frame new JFrame frame set
  • Spring数据中的本机查询连接

    我有课 Entity public class User Id Long id String name ManyToMany List
  • 当 minifyEnabled 为 true 时 Android 应用程序崩溃

    我正在使用多模块应用程序 并且该应用程序崩溃时minifyEnabled true in the installed模块的build gradle 以下是从游戏控制台检索到的反混淆堆栈跟踪 FATAL EXCEPTION Controlle
  • 通过 appassembler-maven-plugin 生成的脚本无法在 Spring Boot 应用程序中找到主类

    我使用 appassembler maven plugin 生成的启动脚本有问题 我有一个基本的 spring boot 应用程序 只有一个类 SpringBootApplication public class ScriptDemoApp
  • 用于缓存的 Servlet 过滤器

    我正在创建一个用于缓存的 servlet 过滤器 这个想法是将响应主体缓存到memcached 响应正文由以下方式生成 结果是一个字符串 response getWriter print result 我的问题是 由于响应正文将不加修改地放
  • 如何通过 Android 按钮单击运行单独的应用程序

    我尝试在 Android 应用程序中添加两个按钮 以从单独的两个应用程序订单系统和库存系统中选择一个应用程序 如图所示 我已将这两个应用程序实现为两个单独的 Android 项目 当我尝试运行此应用程序时 它会出现直到正确选择窗口 但是当按
  • 无法在 Java/Apache HttpClient 中处理带有垂直/管道栏的 url

    例如 如果我想处理这个网址 post new HttpPost http testurl com lists lprocess action LoadList 401814 1 Java Apache 不允许我这么做 因为它说竖线 是非法的
  • Java - 从 XML 文件读取注释

    我必须从 XML 文件中提取注释 我找不到使用 JDOM 或其他东西来让它们使用的方法 目前我使用 Regex 和 FileReader 但我不认为这是正确的方法 您可以使用 JDOM 之类的东西从 XML 文件中获取注释吗 或者它仅限于元
  • 使用Java绘制维恩图

    我正在尝试根据给定的布尔方程绘制维恩图 例如 a AND b AND c我想在 Android 手机上执行此操作 因此我需要找到一种使用 Java 来执行此操作的方法 我找到了一个完美的小部件 它可以完成我在这方面寻找的一切布尔代数计算器
  • Hadoop NoSuchMethodError apache.commons.cli

    我在用着hadoop 2 7 2我用 IntelliJ 做了一个 MapReduce 工作 在我的工作中 我正在使用apache commons cli 1 3 1我把库放在罐子里 当我在 Hadoop 集群上使用 MapReduceJob
  • 源值 1.5 的错误已过时,将在未来版本中删除

    我使用 scala maven plugin 来编译包含 scala 和 java 代码的项目 我已经将源和目标设置为1 7 但不知道为什么maven仍然使用1 5 这是我在 pom xml 中的插件
  • 记录类名、方法名和行号的性能影响

    我正在我的 java 应用程序中实现日志记录 以便我可以调试应用程序投入生产后可能出现的潜在问题 考虑到在这种情况下 人们不会奢侈地使用 IDE 开发工具 以调试模式运行事物或单步执行完整代码 因此在每条消息中记录类名 方法名和行号将非常有
  • 即使调整大小,如何获得屏幕的精确中间位置

    好的 这个问题有两部分 当我做一个JFrame 并在其上画一些东西 即使我将宽度设置为 400 并使其在一个项目击中它时 当然 允许项目宽度 它会反弹回来 但由于某种原因 它总是偏离屏幕约 10 个像素 有没有办法解决这个问题 或者我只需要

随机推荐

  • Fish Redux系列学习之认识view、action

    继续上一篇文章 Fish Redux系列学习之新建page以及认识state 如上图 现在我们学习的是buildview这个组件 说白了 buildView是我们写页面的地方 跟写普通flutter的page页面一样 我们将页面都写在这里面
  • Date互转String和时间戳

    Date转字符串 private static String convertDateToStr Date date String pattrn if date null return StringUtils EMPTY Instant in
  • 数据类型 -- uint32_t 类型

    整型的每一种都有无符号 unsigned 和有符号 signed 两种类型 float和double总是带符号的 在默认情况下声明的整型变量都是有符号的类型 char有点特别 如果需声明无符号类型的话就需要在类型前加上unsigned 无符
  • QT5 动态链接库的创建和使用(QT自己做动态库给自己使用)

    记录一下QT5 动态链接库的创建和使用 在文章的最后有完成的代码供下载 1 创建动态链接库 先新建一个库项目 选择chose进入下一下页面 类型选择共享库 输入一个名称 我输入的是sld 再点击下一步到 如果这里我们需要QtGui所以也勾选
  • osgEarth的Rex引擎原理分析(一一五)tif文件分辨率的计算

    目标 一一四 中的问题202 maxX为右经度 minX为左经度 maxY为上纬度 minY为下纬度 double resolutionX maxX minX double warpedDS gt GetRasterXSize double
  • cuBLAS的使用

    cuBLAS包含了三部分 cuBLAS API 从cuda6 0开始 CUBLASXT API 从cuda6 0开始 cuBLASLt API 从cuda10 1开始 想要使用cuBLAS API 必须按照以下步骤 在GPU端建立矩阵或向量
  • 资源有限的大型语言模型的全参数微调

    文章目录 摘要 1 简介 2 相关工作 3 方法 3 1 重新思考optimizer的功能 3 1 1 使用SGD 3 1 2 隐式BatchSize 3 2 LOMO 低内存优化 3 3 使用LOMO稳定训练 3 3 1 梯度归一化和裁剪
  • android layout 界面开发,Android开发之CoordinatorLayout使用详解一

    官网描述为 CoordinatorLayout是一个增强版的FrameLayout 继承自ViewGroup 用途 1 作为应用的顶层视图 2 作为一个可以指定子View之间相互作用的容器 通过给CoordinatorLayout的子Vie
  • 协程框架的堆栈大小陷阱

    昨晚和同事联调我们的开放平台 由于基于协程框架的网关服务器总是在接受两个消息后发生段错误 Core Dump掉 让我们百思不得其解 查看Dump文件 没有任何有效的调试信息 gdb设置断点调试 程序总是在接受到第二条消息之前 没到断点就崩溃
  • Qt连接Oracle数据库详细介绍(QOCI)

    Qt连接Oracle数据库详细介绍 1 前提条件 1 本地安装了Oracle数据库或者oracle instant client 2 已编译成功所需要的lib文件QOCI lib 这部分等我稍后补上 2 实现代码 1 包含lib文件 QtS
  • vue电商项目(三)——开发search页面

    目录 一 页面分析 二 获取数据到组件 1 获取数据 2 通过仓库getter简化数据 3 根据参数返回数据 1 将请求封装成一个方法 2 准备一个响应式数据 4 在发送请求之前准备好数据 三 渲染组件内容 1 完成子组件searchSel
  • 做一个FSK的收发试验 之一

    这里使用我们之前写好的简易的DDS模块 我们先回顾一下用到的这个my dds模块 my dds my dds clk rst clr cnt step sin cos module my dds input clk rst clr inpu
  • element 树型结构表格的合并问题

    完成以上的树型结果的表格 需要掌握以下几点 首先 我们先看比较简单 总价单元格的合并的问题 这里总价是合并了两列 他实际上其实就是用到element 中的show summary 同时定义了table中的refs 实际上就是获取到了表格的元
  • apifox图片验证码显示

    添加后置脚本 脚本内容如下 var resp response pm response json let img resp response data let template img src img pm visualizer set t
  • vue中如果解决列表删除最后一页暂无数据bug

    bug 当删除数据的时候 页码变了但是数据没有变化 页面显示暂无数据 是因为你删除了当前的数据之后瞬间发了一个请求 异步请求请求刷新列表 列表刷新的时候需要传一个当前页 这里的当前页没有改变还是之前的当前页导致数据没有变 解决 就是当前页减
  • 【css】css自定义div的滚动条宽度

    需要通过对应浏览器的伪元素来修改 点击这里查看 主流浏览器对应伪元素简介链接地址 示例代码 针对google类webkit内核浏览器 div class scrollDiv div scrollDiv max height 300px ov
  • apt-get自动补全

    sudo apt get install bash completion source etc bash completion
  • 房屋价格预测

    机器学习 房屋价格预测 点击链接查看文档代码 一 项目概述及计划 项目背景 影响房屋价格的因素众多 如房屋面积 房屋层数 配套设施等等 项目要求 利用竞赛提供的数据 通过分析影响房屋价格的诸多因素来对房屋价格进行预测 项目数据 项目数据分成
  • 男子英文名释义

    AARON 希伯来 启发的意思 AARON被描绘为不高但英俊的男人 诚实刻苦具有责任感 是个有效率个性沉静的领导者 ABEL 希伯来 呼吸 的意思 为ABELARD的简写 大部份的人认为ABEL是高大 强壮的运动员 能干 独立 又聪明 有些
  • 32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

    1 消费者 红包系统 丢失消息的问题 前面两章中 阐述了如何确保订单系统发送出去的消息一定会到达MQ中 而且也能确保了如果消息到达了MQ如何确保一定不会丢失 在整个消息的生产消费中 就剩下消费者这一端的问题了 红包系统 消费者 拿到消息后