Kafka以相反的顺序消费消息

2024-01-16

我使用Kafka 0.10,我有一个主题logs我的物联网设备将日志发布到其中,我的消息的关键是device-id,所以同一设备的所有日志都在同一个分区。

我有一个 API/devices/{id}/tail-logs需要显示呼叫时某台设备的最后 N 条日志。

目前,我以一种非常低效的方式(但有效)实现它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳。

一种更有效的方法是,如果我可以获得当前的最新偏移量,然后向后消费消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息)

可以用kafka来做吗?如果不是,如何解决这个问题? (我看到的一个更重的解决方案是将kafka-connect链接到弹性搜索,然后查询elasticsearch,但为此再增加2个组件似乎有点矫枉过正......)


由于您使用的是 0.10.2,我建议编写一个 Kafka Streams 应用程序。应用程序将是有状态的,并且状态将保存每个的最后 N 条记录/日志device-id-- 如果新数据写入输入主题,Kafka Streams 应用程序将仅更新其状态(无需重新读取整个主题)。

此外,该应用程序还可以满足您的请求(“api/devices/{id}/tail-logs" using 交互式查询 http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries特征。

因此,我不会构建一个必须重新计算每个请求的答案的无状态应用程序,而是构建一个有状态应用程序,它为所有可能的请求(即,对于所有请求)急切地计算结果(并始终自动更新结果)device-ids) 并在请求到来时返回已经计算的结果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka以相反的顺序消费消息 的相关文章

  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • 嵌入式 Kafka 测试随机失败

    我使用 EmbededKafka 实现了一系列集成测试 以测试使用 spring kafka 框架运行的一个 Kafka 流应用程序 流应用程序正在从 Kafka 主题读取消息 将其存储到内部状态存储中 进行一些转换并将其发送到另一个微服务

随机推荐

  • 在 asp.net 中上传文件之前如何检查文件类型?

    我们如何在不使用文件扩展名的情况下检查文件类型 例如jpg等格式 上传它们使用 asp net 和 c 我正在使用 vs 2008 asp net c TELERIK 控件 RadUpload 想象一下有人将文本文件扩展名更改为 jpg 并
  • Haskell 中类型表达式的 Lambda?

    Haskell 或特定的编译器是否有类似类型级 lambda 的东西 如果这甚至是一个术语 详细说明一下 假设我有一个参数化类型Foo a b并想要Foo b成为 Functor 的一个实例 有没有什么机制可以让我做类似的事情 instan
  • 如何在安装了 goclipse 的 eclipse 中运行 GO 项目

    我已经在 eclipse 中安装了 goclipse 并创建了一个新的 go 项目 现在这就是我所拥有的 我的 hello go 看起来像这样 package main import fmt func main fmt Println He
  • 何时在 Makefile 中使用空格或制表符?

    我正在创建一个使用条件 if 和 ifneq 的 makefile 我注意到 如果我使用 if 下一行应该用空格缩进 if d d then
  • 如何在 gdb 中使用带有 FS 或 GS​​ 基址的逻辑地址?

    gdb 提供了读取或写入特定的功能线性地址 例如 gdb x 1wx 0x080483e4 0x80483e4
  • Spark 2.0 DataSets groupByKey 和 除法操作以及类型安全

    我对 Spark 2 0 DataSets 非常满意 因为它的编译时类型安全 但这里有几个我无法解决的问题 我也没有找到很好的文档 问题 1 对聚合列进行除法运算 考虑下面的代码 我有一个 DataSet MyCaseClass 我想对 c
  • 如何在android中使网格视图水平滚动而不是垂直滚动?

    我有一个动态网格视图 意味着其内容有所不同 因此 如果项目数量增加 则会进行垂直滚动 我想把它做成水平滚动 请为此提出一些解决方案
  • Python pandas / matplotlib 在条形图列上方注释标签[重复]

    这个问题在这里已经有答案了 如何添加要在条形图中的条形上方显示的值的标签 import pandas as pd import matplotlib pyplot as plt df pd DataFrame Users Bob Jim T
  • 使用“wait_variable()”时无法退出 tkinter 应用程序

    我有一个 python 代码 其中包括tkinter窗口和其他正在运行的任务 我一直在尝试绑定 WM DELETE WINDOW 当我关闭窗口但无法实现该功能时 该事件会退出我的 python 代码 这就是我尝试的 def on exit
  • 如何在 postgresql 上使用 sqlalchemy 进行正确的更新插入?

    我想使用 sqlalchemy 核心使用 postgresql 9 5 添加的 新 功能进行更新插入 虽然它已实现 但我对语法感到非常困惑 它无法适应我的需求 这是我希望能够执行的示例代码 from sqlalchemy ext decla
  • 仅在提供后才计算下载次数

    我们有以下代码可供下载 public class downloadRelease IHttpHandler public void ProcessRequest HttpContext context snip context Respon
  • Flex-wrap 具有不同高度的行

    我正在实现带有哈希标签链接的纯 CSS 选项卡 我非常非常接近 但无法完全让柔性包装正常工作 为了让一切按照我想要的方式工作 target 我之前已经使用单选按钮完成了此操作 这提供了更多的灵活性 我需要所有选项卡和所有部分都处于同一级别
  • 如何解决 AWS Cloudformation 中的循环依赖关系

    我创建了一个 AWS Cloudformation 模板 但在克服循环依赖项时遇到问题 我正在创建一个 EC2 实例和一个负载均衡器 负载均衡器依赖于 EC2 实例 因为它在其实例属性中引用它 一切都工作正常 直到我必须在 EC2 实例 I
  • 如何更新datagridview中的单元格?

    我有连接到我的数据库 访问 的 datagridview 如果我停留在任何单元格上并更改值 则会看到该值已更改 但是当我进行刷新时 我看到该值又回到了原始值 我如何更新这个单元格 没有sql查询 我将数据集绑定到 datagridview
  • 在 Spring Boot JPA 中,如何正确 POST 其实体表示与不同实体具有外键关联的对象?

    如果我有一个包含另一个类的对象的实体 例如Book其内部有一个实体Publisher关联的实体如下 ManyToOne JoinColumn name PUB CODE referencedColumnName PUB CODE priva
  • 如何反转 Groovy 集合的排序?

    我正在根据多个字段对列表进行排序 sortedList sort it getAuthor it getDate 这工作正常 但我想要逆转日期并且reverse 不起作用 如何按升序对作者排序 但按降序 反向 顺序对日期排序 我想要的示例
  • 如何以干净的方式分叉现有的 Meteorite 包?

    我正在尝试找出在项目中分叉 Atmosphere 上现有包的最佳 最干净的方法 我遇到过一些情况 现有的包需要一些修改 我被迫分叉它 据我所知 存在以下选项 不幸的是 所有这些都有自己的问题 我还没有找到完美的解决方案 我会用meteor
  • 使用水豚测试内容顺序(序列)

    我尝试过使用以下语法 page body index 姓名 但问题是 如果同一页面上有多个具有相同内容的字符串 则无法检查特定字符串的索引 对于前 页面有内容 姓名 和 电话 3次 那么如何验证具体内容的顺序 请建议我们是否可以使用 CSS
  • Chrome 不支持 getUserMedia()

    我正在尝试使用 getUserMedia 使用我自己的网站 使用我自己的 IP 地址运行 来访问我的网络摄像头 它工作正常 直到我再次尝试我的网站 我尝试过其他演示站点 给出的错误是 getUserMedia 不受支持 Chrome版本v4
  • Kafka以相反的顺序消费消息

    我使用Kafka 0 10 我有一个主题logs我的物联网设备将日志发布到其中 我的消息的关键是device id 所以同一设备的所有日志都在同一个分区 我有一个 API devices id tail logs需要显示呼叫时某台设备的最后