Kafka 生产者 - 如何在不停机的情况下更改主题并保留消息顺序?

2023-11-29

这个问题是关于架构和kafka主题迁移的。

原来的问题:没有向后兼容性的架构演变。

https://docs.confluence.io/current/schema-registry/avro.html

我请求社区给我建议或分享文章,我可以从中获得启发,也许可以找到解决我的问题的方法。也许存在架构或流模式。没有必要给我特定于语言的解决方案;只是给我一个可以前进的方向...我的问题很大,对于后来想要的人来说可能会很有趣

  • a) 更改消息格式并将消息生成到新主题中。
  • b) 停止向一个主题生成消息并“立即”开始向另一个主题生成消息;换句话说,一旦有消息v2已生成,没有新消息附加到v1.

Problem

我正在更改消息格式,该格式与以前的版本不兼容。为了不破坏现有的消费者,我决定向新主题生成消息。

上施法者的想法

我读过有关上施法者的文章。

https://docs.axoniq.io/reference-guide/operations-guide/product-considerations/versioning-events

正式任务

Let v1 and v2成为话题。目前,我以以下格式生成消息format_v1进入主题v1。我想以以下格式生成消息format_v2进入主题v2。切换应该在我可以选择的某个时刻发生。

换句话说,在某个时刻,生产者的所有实例都停止向v1,并开始发送消息到v2;因此最后一条消息m1 in v1在第一条消息之前产生m2 in v2.

Details

我有一个想法,我可以生成针对该主题的消息v1有一个已订阅的 kafka steam up-casterv1并将转换后的消息推送到v2。假设变压器 (当然就我而言)能够转换消息format_v1 into format_v2没有错误。

正如上面关于 avro 模式演变的链接中所述,当我添加了一个向上转型者并将消息生成到v1,我的所有消费者v1改变成v2.

现在,这是一个棘手的部分。我们有两个要求:

1.无生产停机时间。

2.保留消息顺序。

它的意思是:

1)我们不允许丢失消息;客户可以随时使用我们的系统,因此我们的系统应该随时产生消息。

2)我们正在运行生产者的多个实例。在某个时刻,可能(可能)有生产者生成以下格式的消息format_v1进入主题v1,以及一些产生格式消息的实例format_v2进入主题v2.

众所周知,kafka不保证不同分区和主题的消息排序。

我可以通过使用与 v1 相同的分区选择器将消息写入 v2 来解决分区问题。或者现在,我可以想象我们只使用一个分区v1和一个分区v2.


我的简化和尝试

1)我想象当我想要更改生产者以将消息生成到新主题时,我有一个向上投射器(kafka流组件),它能够将消息从v1 into v2没有错误。这个 kafka 流组件是可扩展的。

2)我所有的消费者都已经切换到v2话题。他们不断收到来自v2。此时,我的生产者实例正在向主题生成消息v1向上脚轮的工作做得很好。

3)为了简化问题,我们现在假设format_v1 and format_v2没关系,它们是一样的。

4)假设我们有一个分区v1和一个分区v2.

现在我的问题是,如何立即切换给定时间点的所有生产者;所有实例都会将消息生成到主题 v2 中。

我的同事兼卡夫卡专家告诉我,只要停机就可以完成

如果您依赖分区中消息的顺序,则无法在不停机的情况下切换到新版本。为了最大限度地减少停机时间,我们可以执行以下操作。

Upcaster 组件必须将数据写入相同的分区,并且应尝试进行相同的偏移量。然而,这并不总是可能的,因为偏移量可能有间隙,因此必须保留旧偏移量和新偏移量之间的映射。没有所有记录,只有每个分区的最后一批记录。如果upcaster崩溃了,重新启动即可,生产者仍然不参与v2。

启动 v2 消费者。如果它以与 v1 相同的消费者组开始,则无需执行任何操作,如果它有新的消费者组,请根据新的偏移量更新 Kafka 中的偏移量。

现在生产者写入v1,upcaster转换数据,消费者从v2消费

时间到了。当upcaster的滞后接近0时,关闭v1生产者,等待upcaster转换其余记录,关闭upcaster,启动v2生产者,写入v2主题。

我想在数据库中手动操作(通过一些休息端点等)来更改标志;生产者在生成消息之前总是检查该标志。当旗帜说v2 or true,生产者将开始将消息写入v2。但是,如果在标志为 false 的时刻,生产者开始将消息生成到v1,然后标志已更改,并且另一个生产者已将消息发送到v2在前一个生产者完成生产之前v1.


你们可以接受只有一名制作人活跃吗?

在这种情况下,您可以将您的想法与标志一起使用:

  1. 关闭所有生产商p2,p3,...,pn except p1
  2. p1写信给v1 alone
  3. 将标志切换为v2, so p1结束最后一次写入v1并开始写信给v2
  4. 现在没有人写信给v1
  5. 启动你的其他生产者p2,p3,...,pn
  6. 每个生产者现在都会因为活动标志而写入v2仍然没有人v1
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 生产者 - 如何在不停机的情况下更改主题并保留消息顺序? 的相关文章

  • Kafka Producer:使用回调处理异步发送中的异常

    我需要捕获异步发送到 Kafka 时的异常 Kafka生产者API带有一个函数send ProducerRecord record Callback回调 但是当我针对以下两种情况进行测试时 卡夫卡经纪人宕机 主题未预先创建 回调没有被调用
  • SQLServer 如何向 vb.net 应用程序通知事件?

    有没有一种相对简单的方法可以通知我的 VB NET 应用程序新值已写入 SQL Server Express 2008 中的表 轮询不是一种选择 因为我需要每 10 秒不间断地进行一次轮询 看看让您的应用程序订阅查询通知 http msdn
  • 我们可以在 Spring Boot 中使用多个 kafka 模板吗?

    在我的 spring boot kafka 发布者应用程序中 我想提供对以 String json 或字节格式发布消息的支持 因为我想同时提供对 json 和 avro 的支持 但是 Spring Boot 中的 Kafka 模板让我们只定
  • 为什么将 Avro 与 Kafka 结合使用 - 如何处理 POJO

    我有一个 Spring 应用程序 它是我的 kafka 生产者 我想知道为什么 avro 是最好的选择 我读到了它以及它提供的所有内容 但为什么我不能序列化我用 jackson 自己创建的 POJO 并将其发送到 kafka 我这样说是因为
  • GoogleCloudMessaging - InstanceID.getInstance(),从客户端注册

    我对 Java 编程和一般编程还很陌生 现在我决定制作自己的应用程序 该应用程序应该利用 Google Cloud Messaging 不知怎的 我成功了 但后来我意识到我使用了方法 String regid gcm register PR
  • 为时间戳记录创建正确的 avro 架构

    我想知道对于这种格式的 json 到 avro 转换 正确的 avro 模式是什么 entryDate 2018 01 26T12 00 40 930 我的架构 type record name schema fields name ent
  • 卡夫卡连接|无法反序列化主题数据 |检索 id 的 Avro 键/值架构版本时出错 |未找到主题错误代码:40401

    首先感谢 OneCricketeer 迄今为止的支持 到目前为止我已经尝试了很多配置 我不知道还能尝试什么 使用汇合connect standalone worker properties sink properties访问外部流 连接正在
  • Kafka生产者读取数据文件

    我正在尝试在循环中加载数据文件 以检查统计信息 而不是 Kafka 中的标准输入 下载 Kafka 后 我执行了以下步骤 启动动物园管理员 bin zookeeper server start sh config zookeeper pro
  • 如何在storm中注册kryo序列化器实例?

    我拼命尝试配置序列化器实例以在我的风暴拓扑中使用 Storm 文档指出 有两种注册序列化器的方法 1 The name of a class to register In this case Storm will use Kryo s Fi
  • 卡夫卡领导者选举什么时候发生?

    Kafka High Level Producer 何时以及多久选举一次领导者 它是在发送每条消息之前执行还是仅在创建连接时执行一次 每个代理都有有关主题 和分区 及其领导者列表的信息 每当新领导者当选或分区数量发生变化时 动物园管理员都会
  • Kafka Streams - SerializationException:未知的魔术字节

    我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序 但出现以下错误 Exception in thread streams application c8031218 8de9 4d55 a5d0 81c30051
  • 如何用UML表示通信协议?

    在我的 UML 模型中 我有一个系统及其相互通信的子组件 例如 我有一台计算机和一个遥控机器人 它们通过蓝牙进行通信 目前图中的流程类似于 计算机 触发 遥控车 的 setVelocity 函数 在这一点上 我想通过说以下的话来完善沟通 计
  • Apache AVRO 与休息

    我正在评估将 Apache AVRO 用于我的 Jersey REST 服务 我将 Springboot 与 Jersey REST 结合使用 目前我接受 JSON 作为输入 并使用 Jackson 对象映射器将其转换为 Java Pojo
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 在 Mac OS X 中安装 Avro

    我正在查看 Avro RPC for Python 网址为https github com phunt avro rpc quickstart python https github com phunt avro rpc quickstar
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 如何将我的 json 字符串 avro 二进制编码为字节数组?

    我有一个实际的 JSON 字符串 我需要将其 avro 二进制编码为字节数组 在经历了Apache Avro 规范 http avro apache org docs 1 7 7 spec html 我想出了下面的代码 我不确定这是否是正确

随机推荐

  • 查看 NSUserDefaults 文件内容

    有什么办法可以看到 NSUserDefaults 的内容吗 我可以从终端使用 pico 打开 plist 文件 但它显示奇怪的字符 我实际上看不到内容文件内容 有什么办法可以在 Xcode 中打开该文件吗 基本上 我希望能够查看和编辑 NS
  • iOS 上的 AWSS3TransferUtilityErrorDomain 代码=2

    AWSS3TransferUtilityErrorDomain Code 2 在 iOS 上上传达到 100 时出现此错误 而 Android 则工作正常 我在用react native s3 但这似乎是 sdk 或我的存储桶策略的问题 但
  • NameError:名称“self”未在 EXEC/EVAL 中定义

    我正在编码一些东西 并且有一个部分出现错误 但我找不到发生错误的原因 代码 示例 类似于错误部分 class Test def init self a 0 self x a self l 2 x for x in range a lt se
  • 使用 pip 安装 VTK

    我在 Arch Linux 上使用 Python 3 7 我一直在尝试用 pip 安装 Mayavi 但在安装 vtk 时总是失败 所以我发现即使尝试通过 pip 自行安装 vtk 应该有效 那个vtk确实没有安装 我收到此错误 sudo
  • 如何检测照片的拍摄角度,并像桌面应用程序在查看时自动旋转网站显示?

    如果我用相机拍照 它会存储设备的方向 角度 因此当我使用良好的应用程序在 PC 上查看图像时 它会显示自动旋转到 0 但是当我上传到网站时 它显示的是原始角度 所以图像看起来不太好 我怎样才能用 PHP 检测到这一点并旋转图像 并从它的元信
  • 如何在 Android 中用谷歌地图 v2 上的我的图标替换蓝点?

    我正在尝试用 Google 地图 v2 上我自己的图标替换蓝点 在地图上显示当前位置 我在下面尝试过 但没有成功 Android Maps API v2 更改我的位置图标 地图 V2 myLocation 蓝点回调 在 Google Map
  • 如何检测用户是否已登录 Firebase?

    我在 javascript 文件中使用 firebase node api 进行 Google 登录 firebase initializeApp config let provider new firebase auth GoogleAu
  • 具有多个 sql_variant 参数的 SQLCLR 自定义聚合

    Hy 几个月前我发布了一个关于 CLR 用户定义聚合的问题post 这就像一个魅力 但现在我想使用 sql variant 类型的两个参数来实现完全相同的功能 就像我之前的文章一样 这两个函数是 sMax 和 sMin 并且将根据第二个值返
  • 单线程同步与异步混淆

    Assume makeBurger 需要 10 秒 在同步程序中 function serveBurger makeBurger makeBurger console log READY Assume takes 5 seconds to
  • 如何正确使用CALLER_IS_SYNCADAPTER

    不知怎的 我不理解查询参数CALLER IS SYNCADAPTER的工作概念 它的默认值为 false 如果设置 则不会自动设置 DIRTY 标志 那么它到底意味着什么呢 根据我的理解 联系人的每次更改都会导致将脏标志设置为 1 同步适配
  • 具有模块导入的命名空间

    我正在学习Python 尽管我已经学习了大约一年 但我仍然是一个初学者 我正在尝试编写一个在主模块中调用的函数模块 被调用模块中的每个函数都需要数学模块才能运行 我想知道是否有一种方法可以在不将数学模块导入被调用模块内的情况下执行此操作 这
  • 使用 Gradle 构建 uberjar

    我想构建一个 uberjar 又名 fatjar 其中包含项目的所有传递依赖项 我需要添加哪些行build gradle 这就是我目前所拥有的 task uberjar type Jar from files sourceSets main
  • 使用 jni 库构建 AOSP 应用程序

    我正在尝试在 AOSP 内构建 Android 应用程序 我已经定义了Android bp文件如下 cc prebuilt library shared name libPrintString target android arm srcs
  • 获取文件的绝对路径

    如何在 Unix 上将相对路径转换为 C 中的绝对路径 有没有方便的系统功能 在 Windows 上有一个GetFullPathName函数可以完成这项工作 但我在 Unix 上没有找到类似的东西 Use 真实路径 The realpath
  • “在此上下文中需要子类型标记”到底是什么?

    I get Subtype mark required in this context at 子类型掩码到底是什么 为什么它在这里抱怨 main adb Open Route Route 1 3 others gt new Location
  • buildTypes 无法应用于 groovy.lang.Closure

    我在我的项目 gradle 文件中收到此警告 警告 16 5 buildTypes 无法应用于 groovy lang Closure 我的 buildTypes 部分是 buildTypes debug debuggable true r
  • ffmpeg 在单个命令中剪切视频并刻录字幕

    我想从视频中剪下一段并刻录该段的字幕 我可以分三步完成 剪切视频 ffmpeg ss 25 00 to 26 00 i vid mp4 c copy out mp4 剪掉副标题 ffmpeg i sub srt ss 25 00 to 26
  • 如何隐藏实际的下载文件夹位置

    我心里有一个问题 如何mod rewrite增加安全性 我有一个 php 文件 它在线显示 pdf 文件 例如www exaple com id 234它会查询数据库并获取实际的文件夹位置 实际文件夹位置是uploads 我正在使用类似的东
  • 在新的 Android 模拟器上禁用首次运行的欢迎程序

    我正在编写一个测试 需要直接从启动器启动应用程序 因为我无法通过意图启动来正确模拟它 问题是 当我在新的模拟器上运行测试时 我使用的是 Travis CI 但它可以在我的家用 PC 上轻松重现 模拟器会以 首次运行 问候语覆盖层开始 这会阻
  • Kafka 生产者 - 如何在不停机的情况下更改主题并保留消息顺序?

    这个问题是关于架构和kafka主题迁移的 原来的问题 没有向后兼容性的架构演变 https docs confluence io current schema registry avro html 我请求社区给我建议或分享文章 我可以从中获