使用 Kafka Streams DSL 进行两步窗口聚合

2023-11-29

假设我有一个流“stream-1”,每秒由 1 个数据点组成,我想计算一个派生流“stream-5”,其中包含使用 5 秒的跳跃窗口和另一个流“stream-10”的总和它基于包含使用 10 秒跳跃窗口的总和的“stream-5”。需要分别对每个键进行聚合,我希望能够在不同的进程中运行每个步骤。如果stream-5和stream-10包含相同密钥/时间戳的更新,这本身不是问题(所以我不一定需要如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?)只要最后的值是正确的。

有没有一种(简单的)方法可以使用高级 Kafka Streams DSL 来解决这个问题?到目前为止,我还没有找到一种优雅的方法来处理由于聚合而在stream-5 上产生的中间更新。

我知道中间更新可以通过某种方式控制cache.max.bytes.buffering and commit.interval.ms设置,但我认为任何设置都不能保证在所有情况下都不会产生中间值。另外,我可以尝试使用密钥的时间戳部分在读取时将“stream-5”转换为 KTable,但似乎 KTable 不支持像 KStreams 那样的窗口操作。

这是我到目前为止所拥有的,由于 Stream-5 上的中间聚合值而失败

Reducer<DataPoint> sum = new Reducer<DataPoint>() {                                                                           
    @Override                                                                                                                 
    public DataPoint apply(DataPoint x, DataPoint y) {                                                                        
        return new DataPoint(x.timestamp, x.value + y.value);                                                                 
    }                                                                                                                         
 };                                                                                                                           

 KeyValueMapper<Windowed<String>, DataPoint, String> strip = new 
           KeyValueMapper<Windowed<String>, DataPoint, String>() {      
      @Override                                                                                                               
      public String apply(Windowed<String> wKey, DataPoint arg1) {                                                            
          return wKey.key();                                                                                                  
      }                                                                                                                       
 };                                                                                                                           

KStream<String, DataPoint> s1 = builder.stream("stream-1");                                                                      

s1.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(5000).advanceBy(5000))                                                                     
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-5");                                                                                                          

KStream<String, DataPoint> s5 = builder.stream("stream-5");                                                                      

s5.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(10000).advanceBy(10000))                                                                   
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-10");      

现在如果stream-1包含输入(键只是KEY)

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}

Stream-5 包含正确的(最终)值:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}

但stream-10是错误的(最终值应该是10.0),因为它还考虑了stream-5上的中间值:

KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}

问题在于,所有聚合的结果都是 KTable,这意味着为其输出主题生成的记录代表了一个变更日志。但是,当您随后将它们作为流加载时,下游聚合将重复计数。

相反,您需要将中间主题加载为表,而不是流。但是,您将无法对它们使用窗口聚合,因为它们仅在流上可用。

您可以使用以下模式来完成表而不是流的窗口聚合:

https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows

如果您想在单独的进程中运行每个步骤,您可以调整它,只需记住使用 builder.table() 而不是 builder.stream() 加载中间表。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Kafka Streams DSL 进行两步窗口聚合 的相关文章

  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk

随机推荐

  • 调用 ffmpeg.c 的 main 两次导致应用程序崩溃

    使用 FFmpeg 4 0 2 并调用它ffmpeg c s main函数两次导致 Android 应用程序崩溃 使用 FFmpeg 共享库和 JNI A libc Fatal signal 11 SIGSEGV code 1 fault
  • 使用通用参数作为端口数组长度

    我想做的事 entity FIRfilter is generic NTAPS integer port h in array 0 to NTAPS 1 of std logic vector 15 downto 0 end FIRfitl
  • 基于数据库数组PHP自动检查复选框

    在我的页面的 用户设置 选项卡中 我希望用户确定特定用户发布的帖子类型 表格如下
  • Spark独立模式和本地模式有什么区别?

    Spark独立模式和本地模式有什么区别 Spark Standalone是一个可以在集群上工作的资源管理器 它只是内置的资源管理器 而不是像纱线这样的外部资源管理器 Spark本地运行无需任何资源管理器 一切都在单个jvm中运行 您可以决定
  • Java 同步方法...不同步

    对于我当前的 java 练习 我必须从 2 个不同的 Gmail 帐户获取邮件 我通过创建 Gmail 类的新实例来完成此操作 gmail 类扩展了线程 其中有一个同步方法 readMail 用于获取邮件并打印它 这个 readMail 方
  • 使用 NLog 将记录器名称写入 Excel 文件

    感谢 Rolf 在这个问题中的评论 NLog 在 C 中具有严重性和类别 我能够将日志消息的类别 例如 热 或 数据库 或 机械 记录到文本文件中 我只需将名称传递给 GetLogger 方法即可完成此操作 public MainWindo
  • Mongoimport 带有字符串 _id 和 upsert 的 csv 文件

    我正在尝试使用 mongoimport 来更新插入 id 中带有字符串值的数据 由于 id 看起来像整数 即使它们在引号中 因此 mongoimport 将它们视为整数并创建新记录 而不是更新插入现有记录 我正在运行的命令 mongoimp
  • Android GPU 分析 - OpenGL 动态壁纸速度很慢

    我正在使用 OpenGL ES 3 0 开发动态壁纸 我已经根据优秀教程进行了设置http www learnopengles com how to use opengl es 2 in an android live wallpaper
  • 如何在 NestJS 中处理 RpcException

    我正在尝试构建一个包含多个微服务的 NestJS 后端和一个作为与微服务通信的网关的 REST API 对于网关和微服务之间的通信 我使用 gRPC 简单的通信已经可以工作 但现在我想在微服务中实现错误处理 NestJS 文档指出 这可以通
  • RecyclerView 查看项目

    我想在 RecyclerView 中显示 2 列 但它们显示在 1 列中 如下图所示 如何在两列中显示我的视图 我在我的代码中尝试了两列 rcv pro setLayoutManager new GridLayoutManager this
  • 如何在无需用户交互且仅通过客户端 ID 和密码的情况下验证我的 Quickbook Intuit api 访问?

    我正在开发一个项目 其中后台 crons 创建发票 我想将它们添加到我在后端创建的 Quickbook 帐户中 所以问题是我想仅使用客户端 ID 和秘密参与来访问 api 如何在无需用户交互且仅通过客户端 ID 和密码的情况下验证我的 Qu
  • 有没有办法在使用 ES6 简写方法表示法的方法中使用词法 `this` ?

    关于SO的第一个问题 我希望我没有重复任何内容 我看过other 问题并认为我的不同足以值得询问 基本上 有没有办法让this它位于使用速记符号编写的方法的方法主体中 或者是词法的 或者是绑定到特定值的 这样做的动机来自于我在实现时想要使用
  • 如何指定 JSON 对象应采用哪一个 oneOf 项?

    使用Python和jsonschema我正在尝试验证分配ObjA or ObjB等等beta test json alpha beta ObjA 在我的架构中 testschema json beta is oneOf多个项目 每个项目定义
  • Selenium-IDE:如何验证/断言页面刷新

    我的页面上有一个链接 单击该链接会刷新此页面 如何使用 Selenium IDE 验证页面是否确实已刷新 重新加载 我通过断言页面上最初存在的元素在刷新后不存在于页面上来解决这个问题 然后等到页面完全刷新 并断言该元素再次存在 刷新并等待
  • React router dom 中的链接不会加载页面,仅 url 浏览器导航会更改

    React router dom v5 和 React 16 我的加载应用程序组件包含 ReactDOM render
  • 如何通过 Android 应用程序编辑日历事件

    我如何通过 Android 应用程序编辑日历中的日历事件 任何人都知道如何在日历应用程序中打开议程活动 从日历中读取数据后 试试这个 将单次事件添加到日历 要将条目添加到特定日历 我们需要使用 ContentValues 配置要插入的日历条
  • unicodecsv 读取器从 unicode 字符串无法正常工作?

    我在将 unicode CSV 字符串读入 python unicodescv 时遇到问题 gt gt gt import unicodecsv StringIO gt gt gt f StringIO StringIO u gt gt g
  • 在sqlite3数据库中插入1000000行

    我想向数据库中插入 10 00 000 行 但是插入的时间太长了 例如现在我正在尝试 2055 行 需要 3 分钟才能将这些数据上传到数据库中 对于 2055 个条目来说 这个时间太多了 以下是我将数据插入数据库的方法 public voi
  • numpy var() 和 pandas var() 之间的区别

    最近遇到的一件事让我注意到numpy var and pandas DataFrame var or pandas Series var 给出不同的值 我想知道它们之间有什么区别吗 这是我的数据集 Country GDP Area Cont
  • 使用 Kafka Streams DSL 进行两步窗口聚合

    假设我有一个流 stream 1 每秒由 1 个数据点组成 我想计算一个派生流 stream 5 其中包含使用 5 秒的跳跃窗口和另一个流 stream 10 的总和它基于包含使用 10 秒跳跃窗口的总和的 stream 5 需要分别对每个