Kafka Streams stateStores 容错一次?

2023-12-08

我们正在尝试使用 Kafka Streams 实现重复数据删除服务。 总体而言,它将使用它的rocksDB状态存储来在处理过程中检查现有的密钥。

如果我错了,请纠正我,但为了使这些 stateStore 也具有容错能力,Kafka Streams API 将透明地复制 Kafka 主题内的 stateStore 中的值(称为更改日志)。 这样,如果我们的服务出现故障,另一个服务将能够根据 Kafka 中找到的 ChangeLog 重建其 stateStore。

但这向我提出了一个问题,这个“StateStore -->changelog”本身是否恰好是一次? 我的意思是,当服务更新其 stateStore 时,它​​也会以一次的方式更新变更日志..? 如果服务崩溃,另一个服务将承担负载,但我们能否确定它不会错过崩溃服务的 stateStore 更新?

Regards,

Yannick


简短的回答是肯定的。

使用事务 - 原子多分区写入 - Kafka Streams 确保,当执行偏移提交时,状态存储也会刷新到代理上的更改日志主题。上述操作是原子的,因此如果其中一个操作失败,应用程序将从先前的偏移位置重新处理消息。

您可以在以下博客中阅读有关恰好一次语义的更多信息https://www.confluence.io/blog/enabling-exactly-kafka-streams/。有一节:How Kafka Streams Guarantees Exactly-Once Processing.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams stateStores 容错一次? 的相关文章

  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 无法找到任何实现 Connector 且名称与 io.debezium.connector.mysql.MySqlConnector 匹配的类,可用的连接器有

    使用 Kafka MySQL 和 Debezium 设置数据流管道 我是这个版本的 Kafka 3 4 0 MySQL 8 Debezium 2 2 1 Java 11 目标 我想从 MySQL 捕获所有 CDC 并将数据流式传输到 Kaf
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer

随机推荐

  • 如何在 Node.js 中从 URL 进行请求

    是否有标准方法要求 Node 模块位于某个 URL 而不是本地文件系统上 就像是 require http example com nodejsmodules myModule js 目前 我只是将文件提取到临时文件中 并要求这样做 您可以
  • Windows 资源监视器使用哪个 API?

    Windows 资源监视器显示 除其他外 当前哪些进程正在访问磁盘上的哪些文件 它是实时进行的 How 我知道它可能使用 ETW 并且我可以使用 xperf 等工具生成跟踪 但是如何在无需启动 停止和解析跟踪文件的情况下获取实时信息呢 我需
  • 在Android中向多列GridView添加页脚视图?

    是否可以将页脚视图添加到 GridView 具有多列 其行为类似于 ListView 的页脚 那么这个页脚视图 例如分页视图 仅在用户滚动到 GridView 底部时才会出现 并且它具有整个屏幕的宽度 而不仅仅是 1 个网格元素 不 对不起
  • “num - 1”与“num -= 1”

    在第 4 行中 为什么我们必须在 后面添加 num 5 if num gt 2 print num num 1 print num num 1 产生减去 1 的结果num num没有改变 num 1 减一num并存储该结果 相当于num n
  • 小程序无法从 jar 加载类

    一些用户抱怨小程序不再工作 当他们查看 java 控制台时 他们会遇到 java lang noClassDefFoundError 并检查我的访问日志 我发现他们已经下载了包含该类的 jar 文件 然后发出 get对特定类别的请求 不同的
  • 如何用图像中每个像素的颜色绘制图形?

    我正在研究图像颜色识别 因此我将 RGB 图像转换为 Lab 因为它是最接近人类视觉的颜色空间 之后 我获取实验室的 3 个通道中的每一个 并希望在 3D 图形中绘制我在转换图像中识别出的颜色变化 如何使用图像的颜色绘制图形 import
  • iphone MGTwitterEngine - 使用我的应用程序签名发布推文

    我在 iPhone 应用程序中使用 MGTwitterEngine 效果非常好 我想要更改的一件事是 Twitter 上出现的 从 MGTwitterEngine 发布 显然 我希望它说 来自 MyCoolApp 并链接到该应用程序的网站
  • 特使过滤器拦截上游响应

    我已经为 envoy 编写了 ext authz 过滤器 并且对 envoy 过滤器的工作原理有基本的了解 但现在我想过滤从上游返回的响应 具体来说 我想处理两件事 在 Envoy 发送回下游之前 拦截来自上游的 data jsonBody
  • jquery验证插件,如何在自定义方法中添加多个自定义消息

    我在用jquery 验证插件 我使用添加了一个自定义方法添加方法这又调用另一个方法来检查是否有效UK telephone number 这是我的代码 简化 html
  • Apache PDFBOX - 使用 split(PDDocument 文档)时出现 java.lang.OutOfMemoryError

    我正在尝试使用 Apache PDFBOX API V2 0 2 拆分一个 300 页左右的文档 尝试使用以下代码将 pdf 文件拆分为单页时 PDDocument document PDDocument load inputFile Sp
  • 在 D3 中通过上下文缩放和画笔移动散点图圆圈

    我正在尝试基于 d3 示例创建 Focus Context Tooltip 图http bl ocks org 1667367 我已经使基本图表正常工作 但是当我尝试使用焦点图放大某个区域时 我计划用于工具提示的 圆圈 不会移动 这是我的代
  • IOS:使用图案图像作为背景-内存泄漏

    好的 我会寻找答案 也许我自己就能找到 我有一个坏习惯 就是自己回答问题 无论如何 我有一个设计为相当容易 换肤 的应用程序 作为其中的一部分 我在特定于变体的静态类中隔离了方法 这些静态方法为主应用程序提供特定于变体的图像 颜色和设置 h
  • 为什么 `False in pandas.Series([True,True])` 返回 True?

    False in True True False in pd Series True True 第一行代码返回False 第二行代码返回True 我想我一定是在这里做错了什么或者错过了什么 当我检查该系列是否为 0 时 我得到了同样的结果
  • 无法下载jmeter插件:Json Path Extractor

    我无法下载 apache jmeter Json Path Extractor 加载器保持加载 url https jmeter plugins org search jpgc json 安装 JMeter 插件的最佳方法是使用插件管理器
  • 如何使用输入字段更新页面的 url?

    我尝试将搜索页面与 React Router v5 集成到我的应用程序中 如何使用搜索框更新网址的查询参数 当我刷新应用程序时 我会丢失搜索结果和搜索字段的值 我使用 redux 来管理搜索字段和搜索结果的值的状态 我认为遍历 url 的参
  • 由于“single Cursor HandlerTouchEvent -getEditableSupport FALSE”,Phonegap 按钮不会触发

    在最新的 Android 4 0 3 中 phonegap 按钮不会触发 我收到调试消息 singleCursorHandlerTouchEvent getEditableSupport FASLE 注意拼写错误的 FALSE 之后按钮不会
  • 通过 SSL 的 C# BinaryWrite

    我正在尝试使用存储在 MSSQL varbinary MAX 字段中的 PDF 回复客户端 该响应在我的本地主机和通过 http 连接的测试服务器上工作 但在通过 https 连接的生产服务器上不起作用 我只使用一个简单的 BinaryWr
  • htaccess(无重定向)[REWRITEURL](index.php 的文件夹)

    我只是想获得快速的 htaccess 重定向 IE 域名 com subfolderGreen gt 域名 com index php folder subfolderGreen 请注意 子文件夹Green实际上存在 我一直在尝试但无法获得
  • 如何对线程使用静态生命周期?

    我目前正在为 Rust 1 0 的生命周期而苦苦挣扎 尤其是在通过通道传递结构时 我如何编译这个简单的例子 use std sync mpsc Receiver Sender use std sync mpsc use std thread
  • Kafka Streams stateStores 容错一次?

    我们正在尝试使用 Kafka Streams 实现重复数据删除服务 总体而言 它将使用它的rocksDB状态存储来在处理过程中检查现有的密钥 如果我错了 请纠正我 但为了使这些 stateStore 也具有容错能力 Kafka Stream