重试来自 Kafka 死信队列的消息的最佳实践是什么

2024-03-27

我们使用 Kafka 作为微服务之间的消息传递系统。我们有一个 kafka 消费者监听一个特定的主题,然后将数据发布到另一个主题中,由 Kafka Connector 接收,Kafka Connector 负责将其发布到某些数据存储中。

我们使用 Apache Avro 作为序列化机制。

我们需要启用 DLQ 来为 Kafka Consumer 和 Kafka Connector 添加容错能力。

由于多种原因,任何消息都可能转移到 DLQ:

  1. 格式错误
  2. Bad Data
  3. 对大量消息进行限制,因此某些消息可能会移至 DLQ
  4. 由于连接问题,发布到数据存储失败。

对于上述第 3 点和第 4 点,我们想再次从 DLQ 重试消息。

同样的最佳实践是什么。请指教。


仅推送导致不可重试错误的 DLQ 记录,即:示例中的点 1(格式错误)和点 2(数据错误)。对于 DLQ 记录的格式,一个好的方法是:

  • 将与原始记录完全相同的 kafka 记录值和密钥推送到 DLQ,不要将其包装在任何类型的信封内。这使得在故障排除期间使用其他工具重新处理变得更加容易(例如使用新版本的解串器等)。
  • add a bunch of Kafka header to communicate meta-data about the error, a few typical examples would be:
    • 该记录的原始主题名称、分区、偏移量和 Kafka 时间戳
    • 异常或错误消息
    • 未能处理该记录的应用程序的名称和版本
    • 错误发生时间

通常,我为每个服务或应用程序使用一个 DLQ 主题(不是每个入站主题使用一个 DLQ 主题,也不是跨服务共享的主题)。这往往会使事情保持独立且易于管理。

哦,您可能想对 DLQ 主题的入站流量进行一些监控和警报;)

恕我直言,第 3 点(高容量)应该通过某种自动缩放来处理,而不是通过 DLQ 来处理。尝试始终高估(稍微)输入主题的分区数量,因为您可以启动服务的最大实例数量受此限制。过多的消息不会使您的服务超载,因为 Kafka 消费者在决定时会显式轮询更多消息,因此他们请求的消息永远不会超出应用程序可以处理的范围。如果出现消息高峰,会发生什么,它们会继续堆积在上游 kafka 主题中。

应直接从源主题重试第 4 点(连接),而不涉及任何 DLQ,因为错误是暂时的。将消息丢弃到 DLQ 并拾取下一条消息并不能解决任何问题,因为连接问题仍然存在,并且下一条消息也可能会被丢弃。读取或不读取来自 Kafka 的记录并不会使其消失,因此存储在那里的记录以后很容易再次读取。您可以对服务进行编程,使其仅在成功将结果记录写入出站主题时才前进到下一个入站记录(请参阅 Kafka 事务:读取主题实际上涉及到write操作,因为新的消费者偏移量需要被持久化,所以你可以告诉你的程序将新的偏移量和输出记录作为同一原子事务的一部分持久化)。

Kafka 更像是一个存储系统(只有 2 个操作:顺序读取和顺序写入),而不是消息队列,它擅长持久化、数据复制、吞吐量、规模……(……还有炒作;))。它往往非常适合将数据表示为事件序列,如“事件溯源”。如果此微服务设置的需求主要是异步点对点消息传递,并且如果大多数场景宁愿支持超低延迟并选择丢弃消息而不是重新处理旧消息(如列出的 4 点所示),也许像Redis队列这样的有损内存队列系统更合适吗?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

重试来自 Kafka 死信队列的消息的最佳实践是什么 的相关文章

  • CakePHP:控制台命令中的自定义错误报告

    我们当前使用 CakePHP 2 4 7 和自定义错误处理程序 自定义错误处理程序适用于通过 HTTP 或 CronDispatcher 发出的每个请求 不幸的是 当向我们的控制台命令之一发出控制台请求时 错误处理程序被忽略 请参见以下示例
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • 从PHP7.4升级到PHP8,是否可以忽略某些错误

    我们有一个巨大的代码库 在访问未定义的变量或数组键时 我们忽略了 php7 的通知 例如 somethingThatMayNotExist REQUEST somethingThatMayNotExist PHP8 现在会抛出错误 我知道我
  • (定义一个宏)方便OpenGL命令调试?

    有时插入条件打印和检查需要很长时间glGetError 使用二分搜索的形式来缩小范围 其中第一个函数调用是 OpenGL 首先报告错误 我认为如果有一种方法可以构建一个宏 我可以包装所有可能失败的 GL 调用 并有条件地调用 那就太酷了gl
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 警告:implode() [function.implode]:传递的参数无效

    我收到以下错误 警告 implode function implode 在第 1335 行的 wp content themes mytheme functions php 中传递的参数无效 at function my get tags
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • PHP 警告:模块“imagick”已在第 0 行未知加载

    我有一个简单的测试文件 其中唯一的 php 代码是 if isset REQUEST back back back REQUEST back filename images back jpg file file exists filenam
  • 如何重定向到外部404页面Python Flask

    我正在尝试将 404 重定向到外部 URL 如下所示 app route 404 def http error handler error return flask redirect http www exemple com 404 404
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 如何在node-mysql查询后获取警告

    如何获取查询执行后识别的相应警告 如下所示 connection query squery function err rows search for OkPacket in 2 dimension array var warningCoun
  • 隐藏错误报告窗口

    我有以下问题 我的 ASP Net 应用程序接收简单控制台程序的 C 源代码 使用 cl exe 命令行 VC 编译器 对其进行编译 并使用 System Diagnostics Process 运行它 ASP Net应用程序运行在PC上
  • web请求超时处理?

    HttpWebRequest request HttpWebRequest WebRequest Create url request Timeout 20000 using WebResponse response request Get
  • PHP 构造函数返回 NULL

    我有这个代码 是否有可能User对象构造函数以某种方式失败 以便 this gt LoggedUser被分配了一个NULL构造函数返回后值和对象被释放吗 this gt LoggedUser NULL if SESSION verbiste
  • 将 PHP 错误处理程序限制为特定命名空间

    PHP 有没有办法只为特定的命名空间设置错误处理程序 我正在构建一个小型框架 我希望能够通过设置自定义错误处理程序并抛出异常来尝试捕获其名称空间内的所有错误 警告 通知消息 在此特定名称空间之外触发的错误应该以常规方式表现 用PHP可以完成
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 如何处理 Primefaces 延迟加载中的错误?

    我无法让用户知道发生的异常PrimeFaces http primefaces org LazyDataModel load方法 我正在从数据库加载数据 当引发异常时 我不知道如何通知用户 我尝试添加FacesMessage to Face
  • 在 try 中使用零合并运算符? for 抛出并返回可选值的函数

    我想在以下两种情况下使用 nil coalescing 运算符设置默认值 函数抛出错误 函数返回 nil 请看一下下面的代码片段 我有以下问题 为什么 item1 为零 item1和item2的初始化有什么区别 enum VendingMa

随机推荐