同一 Kinesis 流的多个不同消费者

2024-05-10

我有一个 Kinesis 生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,对于给定的主题/流,具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。

最初,我为所有消费者和生产者使用相同的应用程序名称。然而,一旦我启动了多个消费者,我就开始收到以下错误:

com.amazonaws.services.kinesis.model.InvalidArgumentException:StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 在帐户下流 PackageCreated 中的分片 shardId-000000000000 上的 GetShardIterator 中使用************ 无效,因为它不是来自此流。 (服务:AmazonKinesis;状态代码:400;错误代码:InvalidArgumentException;请求 ID:..)

这似乎是因为消费者使用相同的应用程序名称时与他们的检查点发生冲突。

从阅读文档来看,使用检查点进行发布/订阅的唯一方法似乎是为每个消费者应用程序创建一个流,这要求每个生产者了解所有可能的消费者。这比我想要的更紧密地耦合;这实际上只是一个队列。

看来 Kafka 支持我想要的:任意消费给定的主题/分区,因为消费者完全控制自己的检查点。如果我想要带有检查点的发布/订阅,我唯一的选择是迁移到 Kafka 或其他替代方案吗?

我的 RecordProcessor 代码在每个消费者中都是相同的:

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  log.trace("Received record(s) from kinesis")
  for {
    record <- processRecordsInput.getRecords
    json   <- jawn.parseByteBuffer(record.getData).toOption
    msg    <- decode[T](json.toString).toOption
  } yield subscriber ! msg
  processRecordsInput.getCheckpointer.checkpoint()
}

该代码解析消息并将其发送给订阅者。现在,我只是将所有消息标记为已成功接收。我可以看到在 AWS Kinesis 仪表板上发送的消息,但没有发生读取,大概是因为每个应用程序都有自己的 AppName 并且看不到任何其他消息。


支持您想要的模式,即一个发布者到一个 Kinesis 流的多个消费者的模式。您不需要每个消费者都有一个单独的流。

你是怎样做的?您需要为每个消费者提供不同的应用程序名称。这样,一个消费者的检查点信息就不会与另一个消费者的信息发生冲突。

检查对此的第一反应:https://forums.aws.amazon.com/message.jspa?messageID=554375 https://forums.aws.amazon.com/message.jspa?messageID=554375

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

同一 Kinesis 流的多个不同消费者 的相关文章

随机推荐

  • 从活动目录读取objectGUID

    我正在尝试使用 node js 从 AD 获取信息 我试过了activedirectory and ldapauth fork一般来说 代码可以工作 但如果我需要一些octetstring数据如objectGUID 我在对象中看到了垃圾字符
  • 如何让 Terraform 将空值替换为默认值?

    Terraform 文档表明这应该已经发生 https www terraform io docs language expressions types html https www terraform io docs language e
  • 如何匹配和删除包含特定字符串的任何行?

    我的网站上有一个巨大的 URL 目录列表 例子 folder folder2 folder3 page htm folder folder2 folder3 page2 htm folder folder2 folder3 page3 ht
  • 为什么 Parsec 的 sepBy 停止并且不解析所有元素?

    我正在尝试解析一些逗号分隔的字符串 该字符串可能包含也可能不包含具有图像尺寸的字符串 例如 hello world 300x300 good bye world 我写了下面的小程序 import Text Parsec import qua
  • R igraph - 保存布局?

    我想知道是否可以 保存 igraph 网络的布局 以便其他人能够重现相同的图表 目前 Fruchterman Reingold 算法总是创建一个新的网络 par mfrow c 1 2 g lt erdos renyi game 100 1
  • 移动网络的 max-device-width 和 max-width 有什么区别?

    我需要为iphone android手机开发一些html页面 但是它们之间有什么区别max device width and max width 我需要针对不同的屏幕尺寸使用不同的CSS media all and max device w
  • 列出其他人(即不是我)所做的提交?

    是否有一种标准方法可以列出 git 存储库中其他人 即不是我自己 所做的所有提交 I tried git log not author username 但看起来 not仅适用于修订版 的联机帮助页git log似乎没有提供反转谓词的方法
  • 在(每个)Web API 操作之前执行代码

    我有一个 Web API 接口 我正在尝试适应多租户架构 以前 我们采用 WCF 模式 将参数 客户端 ID 传递给服务 然后服务将其存储起来以供稍后在代码中使用 这意味着客户端 ID 不必是传递给每个调用的第一个参数 我想对 Web AP
  • Outlook Application_NewMailEx 在启动时不工作

    我正在使用一个Application NewMailEx处理收到的所有电子邮件 它适用于 Outlook 打开时收到的电子邮件 然而在启动时 Application NewMailEx不会因收到的电子邮件而被呼叫 我尝试使用Applicat
  • 使用指定的用户名和密码运行 mstsc.exe

    我意识到在 Windows 7 中 不可能为同一主机保存不同的凭据 但我需要一些解决方法 我可以在代码中手动提供用户名和密码吗 将它们存储在临时 rdp 文件中 Process rdcProcess new Process rdcProce
  • 如何随着 ViewPager 位置偏移量的变化对视图进行动画处理

    我们希望创建一个带有动画的应用程序介绍 用户可以在其中滚动页面 并且当用户滚动时 视图会动画化并遍历所有幻灯片 动画视图应该随着用户滚动而移动 因此如果用户滚动得更快 动画视图应该移动得更快 如果用户滚动回到上一页 动画视图应该向后移动 这
  • React Native 检查平板电脑或屏幕是否以英寸为单位

    我为平板电脑和移动设备建立了不同的渲染逻辑 我想知道是否有办法获取屏幕尺寸 以英寸为单位 或者甚至可能是任何模块自动检测设备是否是平板电脑 我没有直接使用尺寸 API 来获取屏幕分辨率的原因是 许多 Android 平板电脑的分辨率低于许多
  • 为 DownloadManager 的 BroadcastReceiver 设置附加功能 [重复]

    这个问题在这里已经有答案了 有一种方法可以添加额外内容DownloadManager已登记行动意图DownloadManager ACTION DOWNLOAD COMPLETE 例如 接收一个在意图中设置为额外的布尔值 这就是我创建请求的
  • 数组到字符串转换注意事项。为什么?

    为什么我得到 Applications MAMP htdocs test2 php 第 11 行中的数组到字符串转换 注意 users array aa a b bb cc c foreach users as usr var htmlsp
  • 使用 Financial Modeling Prep (Python) 访问指定时间间隔的所有历史加密数据

    Financial Modeling Prep 是一个免费的 API 可用于访问各种财务指标 例如股票价格和加密货币数据 API 文档概述了如何通过 Python 等编程语言访问数据 特别是对于加密货币数据 https financialm
  • 如何在操作表中添加日期选择器?

    IBAction showCatPicker if self catList nil self catList nil catList release self catList NSMutableArray alloc init self
  • 在 solr 8 中的 fl 中使用父过滤器时获取“当架构嵌套时不应发送父过滤器”

    我正在尝试使用子文档获取父文档 但得到 当模式嵌套时不应发送父过滤器 error 附上下面我尝试过但无法得到解决方案的查询 q parent which content type person fl child parentFilter c
  • 在源代码管理中管理我的数据库

    由于我正在处理一个新的数据库项目 在 VS2008 中 而且我从未从头开始开发数据库 因此我立即开始研究如何在源代码管理 在本例中为 Subversion 中管理数据库 我找到了一些关于SO的信息 包括这篇文章 保持多个环境中的开发数据库同
  • 在 jinja2 模板中转义 jinja2 语法

    我在 Flask 中提供来自 Jinja2 模板的动态页面 现在 我在脚本标记内定义客户端模板 比如 Jinja2 clone Nunjucks 问题是 客户端模板的语法如下 that Flask sJinja2 解释器可以解释而不是渲染v
  • 同一 Kinesis 流的多个不同消费者

    我有一个 Kinesis 生产者 它将单一类型的消息写入流 我想在多个完全不同的消费者应用程序中处理这个流 因此 对于给定的主题 流 具有单个发布者的发布 订阅 我还想利用检查点来确保每个消费者处理写入流的每条消息 最初 我为所有消费者和生