Kafka中的延迟消息消费

2024-01-16

如何使用 Apache Kafka 生成/消费延迟消息?似乎标准 Kafka(和 Java kafka-client)功能没有此功能。我知道我可以用标准的等待/通知机制自己实现它,但它看起来不太可靠,因此任何建议和良好实践都值得赞赏。

Found 相关问题 https://stackoverflow.com/questions/31775003/kafka-delayed-queue-implementation-using-high-level-consumer,但这没有帮助。 正如我所看到的:Kafka 基于从文件系统的顺序读取,只能用于直接读取主题,保持消息顺序。我对吗?


事实上,kafka 最低的结构是一个分区,它是队列中具有增量偏移量的顺序事件 - 您不能将日志插入到生成日志时的末尾以外的任何位置。没有延迟消息的概念。

您到底想实现什么目标?

您的情况有一些可能性:

  • 您想要在特定时间推送消息(例如,事件“开始作业”)。在这种情况下,使用计划任务(不是来自 kafka,在您的操作系统/语言/自定义应用程序/其他上使用某种标准方式)在给定时间发送消息 - 消费者将在适当的时间收到它们。

  • 您现在想要发送一个事件,但消费者现在不应该考虑该事件。在这种情况下,您可以使用自定义结构,在其有效负载中包含“时间”。消费者必须了解这个领域并进行定制处理来应对。例如:“在 2017-12-27T20:00:00Z 开始工作”。您也可以为此使用标头,但目前并非所有客户端都支持标头。

  • 您可以更改发送消息的时间戳。在内部,它仍然会按顺序读取,但是一些暗示时间的函数会以不同的方式工作,并且消费者可以使用消息的时间戳来执行其操作 - 这有点像前面的命题,除了时间戳是事件的一个元数据,而不是事件有效负载本身。我个人不会使用这个 - 我只在代理某些事件时处理时间戳。

对于你的最后一个问题:基本上是的,但有一些注意事项:

  • 主题实际上是在分区中分割的,并且顺序仅在分区中保留。所有具有相同密钥的消息都发送到同一分区。
  • 大多数时候,您只从内存中读取,除非您读取旧事件 - 在这种情况下,因为这些事件是从磁盘顺序读取的,所以速度非常快
  • 您可以选择从哪里开始读取 - 给定的偏移量或给定的时间 - 甚至可以在运行时更改它
  • 您可以跨进程并行读取 - 多个消费者可以读取相同的主题,并且不会两次读取相同的消息(每次读取不同的分区,请参阅消费者组)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka中的延迟消息消费 的相关文章

  • 如何使用不同的kafka主题配置Kubernetes部署的微服务的每个pod/进程?

    在我们的应用程序中 有多个不同 kafka 主题的消费者 例如 Cosumer C1 Cosumer C2 Cosumer C3 Cosumer C4 Cosumer C5 以及不同的 kafka 主题 例如主题 1 主题 2 主题 3 主
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • 当记录处理时间超过“max.poll.interval.ms”时,在消费过程中记录/消息会发生什么?

    我的消费者设置如下 auto offset reset earliest enable auto commit true default value session timeout ms 10000 default value max po
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 如何连接Kafka和Elasticsearch?

    我是Kafka的新手 我使用kafka通过logstash收集netflow 可以 并且我想将数据从kafka发送到elasticsearch 但是存在一些问题 我的问题是如何将 Kafka 与 Elasticsearch 连接起来 net
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • 带有 kafka-avro-console-consumer 的未知魔法字节

    我一直在尝试将 Confluence 中的 kafka avro console consumer 连接到我们的旧版 Kafka 集群 该集群是在没有 Confluence Schema Registry 的情况下部署的 我使用以下属性显式
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导

随机推荐

  • 类型“[String,AnyObject?]”不符合协议 AnyObject?:为什么?

    我试图理解为什么会出现编译错误 类型 String AnyObject 不符合 AnyObject 协议 var cars String AnyObject model Ferrari var JSON String AnyObject c
  • 在 Python 中使用带有线程的全局字典

    访问 更改字典值是线程安全的吗 我有一本全球词典foo以及带有 id 的多个线程id1 id2 idn 是否可以访问和更改foo如果已知每个线程仅使用其 id 相关值 例如线程 则无需为其分配锁id1只会与foo id1 假设 CPytho
  • 再次休息并获取...

    一般来说 REST 社区似乎不喜欢 GET 请求中的复杂数据 我想知道这背后是否有一个好的原则 或者只是具体化了 GET 字典的 任意 url 长度 限制 我对 url 和资源之间的对应关系感到满意 但为什么我的 GET 请求不能在请求正文
  • Android Kudan - 扩展 ARActivity 将停止系统相机手电筒的工作(闪光灯)

    我正在开发 kudan SDK 用于使用标记构建 3D 模型增强现实对象 所有这些都工作得很好 但是当我在同一个 ARActivity 上构建相机手电筒时 闪光灯将停止工作 甚至它会停止系统闪光灯手电筒的工作 开 关手电筒 来自系统小部件
  • 从 Neo4j 删除所有节点时出现 ConstraintViolationTransactionFailureException

    当尝试从 Neo4j 图形数据库中删除所有节点时 我过去已经在较小的数据集上成功完成过多次 我一直遇到Error undefined undefined运行此查询后 MATCH n DETACH DELETE n 我认为我尝试一次删除的节点
  • 使用什么对称密码来加密消息?

    我对加密一无所知 但我需要它 如何 假设您有一个节点系统 节点通过异步消息在网络上相互通信 节点不维护有关其他节点的会话信息 这是设计限制 假设您想确保只有您的节点可以读取正在发送的消息 我相信加密是解决这个问题的方法 由于节点不维护会话
  • 终端上的“快速构建”抛出“错误:找不到根清单”

    我想在终端上运行我的快速编程 所以我cd我的项目的根文件夹 然后运行 swift build 但是出现了错误 error root manifest not found有什么帮助吗 如果其他人偶然发现同样的问题 我的解决方案是 cd 到我的
  • 在 Windows 上运行 Django 时出现“WinError 10013”

    自从我遇到这个问题以来已经快一个月了 我非常感谢您的帮助 尝试登录我的 Django Web 应用程序时 我在 accounts login 处遇到 OSError 我能够登录 127 0 0 1 8000 admin 但不能登录 acco
  • findDOMNode 与 getElementById 对于普通 DOM 元素

    我不太确定这个问题有真正的答案 但我想知道是否最好使用以下命令在 React 应用程序中查找常规 DOM 元素 A refs 和 ReactDOM findDOMNode or b 普通旧 document getElementById 我
  • 导入而不执行类 - python

    我的问题是我有一个包含类的文件 并且在这个类内有一堆代码将被执行 所以每当我导入该文件时它就会执行 无需创建该类的对象 这是例子 FILE X class d def init self print print this will NOT
  • Gradle 和 Android 支持库

    几乎在每个 Android 应用程序中 我们都需要一些库项目 例如 ABS HoloEverywhere 等 其中大部分都在 Maven Central 中 这很好 不好的是 它们中的大多数都依赖于支持库 并且自然地指向 Maven Cen
  • 在双变量中获取数字的问题

    我的 java 程序中需要的函数出现了一些问题 我想检查 双精度 变量的总位数 例如 5 应该返回 1 5 0034 应该返回 5 2 04 应该返回 3 我的函数是这样的 private int getDoubleLength doubl
  • WPF Datagrid 绑定自定义列标题

    我试图弄清楚如何使用 MVVM 模式将 WPF DataGrid 的列标题和主要数据绑定到数据源 我正在寻找的结果如下所示 source vallelunga com http brian vallelunga com files data
  • 浮点按位运算的用处

    我注意到浮点存在 SSE 指令 这让我想知道 您可以对 fp integer union 中的标量执行相同的操作 我突然想到 如果对浮点数数组的各个分量进行按位或运算 则可以通过查看结果的符号位来快速确定它们中是否有任何一个为负数 浮点值的
  • 获取 TWIG 模板中的控制器名称

    我正在学习 symfony2 3 当我尝试在 twig 模板中获取控制器名称时出现错误 控制器 namespace Acme AdminBundle Controller use Symfony Bundle FrameworkBundle
  • Mac OS-X Mountain Lion 上的 GCC-4.2 错误,无法使用 pip / virtualenv 安装某些软件包

    我看到一个非常烦人的错误 我真的不知道如何处理 这似乎很常见 我几乎尝试了所有能找到的解决方案 但都无济于事 我正在尝试使用 pip 安装库 gevent psycopg2 和 greenlet 都遇到过这个问题 问题似乎是我的计算机找不到
  • 如何从服务器在 ViewPager 内的 VideoView 上播放视频

    我尝试开发一个应用程序从服务器检索视频并在 viewpager 内的 videoview 上播放 原始文件夹中的视频工作正常 但有两个问题 1 部分视频无法播放 或黑色活动显示 2 页面滚动时视频不停止 那么如何使用 URL 而不是 and
  • 如何在打开另一个 Javascript 下拉菜单时关闭另一个下拉菜单

    我不太熟悉 JavaScript 我希望能得到一些帮助来解决我似乎无法解决的问题 目前我的网站上有 2 个下拉菜单 一种是用于导航的下拉菜单 单击汉堡菜单图标时会激活该菜单 第二个下拉列表用于显示我网站上的类别 目前 当我单击一个下拉列表时
  • 检测浏览器缓存是否已满

    我们发现浏览器缓存已满是导致酒店后台出现问题的原因 它只影响我们的一小部分用户 但我们希望提醒他们该问题 并为他们提供一些如何自行解决问题的指导 我们希望使用与 GMail 使用的系统类似的系统 当它检测到您的浏览器缓存已满表现不正常时 它
  • Kafka中的延迟消息消费

    如何使用 Apache Kafka 生成 消费延迟消息 似乎标准 Kafka 和 Java kafka client 功能没有此功能 我知道我可以用标准的等待 通知机制自己实现它 但它看起来不太可靠 因此任何建议和良好实践都值得赞赏 Fou