Kafka Streams - 减少大型状态存储的内存占用

2024-04-25

我有一个拓扑(见下文),可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用量相当高,我正在寻找一些关于如何减少状态存储占用空间的建议(更多详细信息如下)。Note:我并不是想逃避国有商店,我只是认为可能有一种方法可以改善我的拓扑 - 见下文。

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道是否流式传输OUTPUT_TOPIC因为 KTable 导致状态存储(REKEYED_STORE)比本地需要的要大。对于具有大量唯一键的变更日志主题,将它们作为流式传输会更好吗?KStream并进行窗口聚合?或者这不会像我想象的那样减少占用空间(例如,只有记录的子集 - 窗口中的记录,会存在于本地状态存储中)。

不管怎样,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:

  • 对于具有这种吞吐量级别的 Kafka Streams 应用程序,是否应该考虑任何配置选项、一般策略等?
  • 是否有关于单个实例的内存密集程度的指导原则?即使您有一个有点武断的指导方针,与他人分享也可能会有所帮助。我的一个实例当前使用 15GB 内存 - 我不知道这是否好/坏/无关紧要。

任何帮助将不胜感激!


以你目前的模式

stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)

您会得到两家内容相同的商店。一个为reduce()运算符和一个用于读取table()-- 不过,这可以减少到一个商店:

KTable rekeyedTable  = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely

这应该会显着减少你的内存使用量。

关于窗口化与非窗口化:

  1. 这是你所需的语义问题;如此简单地从非窗口化到窗口化缩减似乎是有问题的。

  2. 即使您也可以使用窗口语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即 key + currentAgg)。因此,对于单个密钥,两种情况的存储要求是相同的(单个窗口具有相同的存储要求)。同时,如果您使用 Windows,当您获得聚合专业密钥专业窗口时,您实际上可能需要更多内存(而在非窗口情况下您仅获得单个聚合专业密钥)。唯一可以节省内存的情况是“密钥空间”分布在很长一段时间内的情况。例如,您可能很长时间无法获取某些按键的输入记录。在非窗口情况下,这些记录的聚合将始终被存储,而对于窗口情况,键/聚合记录将被删除,并且如果稍后出现具有此键的记录,则将重新创建新条目再次打开(但请记住,在这种情况下您丢失了之前的聚合门 - 参见 (1))

最后但并非最不重要的一点是,您可能需要查看调整应用程序大小的指南:http://docs.confluence.io/current/streams/sizing.html http://docs.confluent.io/current/streams/sizing.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams - 减少大型状态存储的内存占用 的相关文章

  • Kafka Connect 进入重新平衡循环

    我刚刚部署了 Kafka Connect 我只使用连接源 MQTT 应用程序位于两个实例的集群上 2 个容器上 机器 现在它似乎进入了一种重新平衡循环 我一开始有一点数据 但没有新数据出现 这就是我在日志中得到的内容 2017 08 11
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • 如何连接Kafka和Elasticsearch?

    我是Kafka的新手 我使用kafka通过logstash收集netflow 可以 并且我想将数据从kafka发送到elasticsearch 但是存在一些问题 我的问题是如何将 Kafka 与 Elasticsearch 连接起来 net
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream

随机推荐

  • 无法在后台任务中调用 Task.Run()

    我想在后台任务的线程中做一些事情 所以我尝试使用 Task Run 但它不起作用 任何人都可以向我展示另一种在后台任务中创建线程的方法 这是我的代码 public sealed class KatzBackgroundTask IBackg
  • 无法将属性与数字进行比较。错误:“‘AnsibleUnsafeText’和‘int’实例之间不支持”

    getent database passwd debug var getent passwd dict2items selectattr value 1 gt 1000 map attribute key list 输出是 TASK deb
  • Fortran 03/08(gfortran 编译器)中使用无限多态类型进行数组操作

    我想通过以下方式实现有用的数组操作 添加元素 删除元素 通过可分配 指针 二叉树结构实现不同的实现 class 特征 无限多态性 我使用 gfortran 5 0 应该可以处理这样的功能 我需要它 以免为我使用的每种类型重复相同的代码 这应
  • 如何在 Django 中创建 unique_for_field slug?

    姜戈有一个日期唯一 http docs djangoproject com en dev ref models fields unique for date您可以在将 SlugField 添加到模型时设置的属性 这会导致 slug 仅对于您
  • 像在eclipse中一样关闭intellij idea中未使用的模块

    据我所知 目前 intellij idea 中没有任何功能可以做到这一点 我不知道为什么 但他们不支持这样做 至少这是我通过所有研究发现的结果 也许我们中的一些人用不同的方式来解决这个问题 如何在 intellij 中使用多个模块 在处理多
  • 如何从 USB 加载 LUKS 密码,然后返回键盘?

    我想设置一台具有全磁盘加密功能的无头 Linux Debian Wheezy PC 能够使用 USB 驱动器或通过键盘输入密码来解锁磁盘 我的起点是使用 Debian 安装程序中基本的整个磁盘加密选项进行全新安装 该安装程序将 boot 之
  • 如何在 Square MockWebServer 中使用 SSL?

    我尝试启用 SSLSquare 的 MockWebServer https github com square okhttp tree master mockwebserver在测试下模拟我的 Android 应用程序中的所有 Web 服务
  • 如何使用 PowerShell 递归合并/“展平”文件夹结构

    我正在寻求帮助来重组许多子文件夹中的大量文件 示例来源 folderX aaa txt bbb txt folderY ccc txt folderZ ddd txt eee txt 理想结果 folderX aaa txt folderX
  • 自上一步以来进程或线程已更改

    我正在 Visual Studio 上调试一些代码 此代码属于我创建的自定义会话提供程序 我正在 Web 应用程序启动时对其进行调试 它开始初始化我的提供程序 并且在该函数上我有一个第一次成功命中的断点 但是 同一断点再次被击中 但它有一个
  • 带有自定义离线页面的 Angular PWA

    在 Angular 8 应用程序中 我想添加一个自定义离线页面 只是一个简单的 html 文件 我已将我的应用程序设置为 PWA 使用 angular pwa并配置了一切 以便它至少在在线时顺利工作 然而 我很难为 PWA 用户提供更新 因
  • unsafePerformIO 和 FFI 库初始化

    我正在为 C 中的库创建一个 FFI 模块 该模块希望在执行其他操作之前调用一个一次性 不可重入的函数 这个调用是幂等的 但是有状态的 所以我可以在每个 Haskell 调用中调用它 但它很慢 并且由于不可重入 可能会导致冲突 那么现在是使
  • 允许用户在 Android 应用程序中插入图像

    我的问题是 如何创建 imageButton 允许用户从手机上传图像并将其作为图片配置文件插入应用程序中 例如 像 Whatsapp 一样 它允许用户从手机中选择图像并将其设置为图片配置文件 Thanks 我的 XML 文件
  • 为什么 Func 与 Func> 不明确?

    这个问题让我很困惑 所以我想我会在这里问 希望 C 大师可以向我解释一下 为什么这段代码会产生错误 class Program static void Main string args Foo X the error is on this
  • Laravel 5.3 存储和读取文件目录

    目前正在尝试处理文件 但很难弄清楚将它们放在哪里以及如何在列表中读回它们 我尝试过将一些测试文件放入 files array dir opendir asset files open the cwd also do an err check
  • 如何使用 pyspark 从 s3 存储桶读取 csv 文件

    我正在使用 Apache Spark 3 1 0 和 Python 3 9 6 我正在尝试从 AWS S3 存储桶读取 csv 文件 如下所示 spark SparkSession builder getOrCreate file s3 b
  • 不获取AudioListenerInterruptionEnd触发器

    我对 OpenAl 和 MPMoviePlayerController 的组合有疑问 我在 OpenAl 设置过程中注册了 AudioInterruptionLister 当我开始播放视频时 侦听器会收到 AudioListenerInte
  • 离子 3 角度 4 动画不起作用

    我有一个组件 我正在尝试为手风琴列表设置动画 我已经进行了所有更改 例如包括import BrowserModule from angular platform browser and import BrowserAnimationsMod
  • std::unordered_set 迭代器遍历的复杂性

    我最近玩了一个std unordered set http en cppreference com w cpp container unordered set 我怀疑我的 STL 版本会跟踪某些 FILO 数据结构 看起来像列表 中的非空存
  • Android JSON解析并存储到数据库

    我正在制作一个具有数据库的应用程序 现在我正在尝试从中解析数据值
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认