如何删除/清除 Kafka Streams 中的状态存储?

2023-12-21

我有一个习惯Transformer在我的 kafka-streams DSL 的末尾实现,并带有持久的变更日志KeyValueStore绑定到它。

几周以来,我在商店里放了太多的数据。现在,每当我加载应用程序时,它就会消耗太多的内存。

然而,应用程序本身只是一个原型,所以我不介意完全清理商店。

我可以重命名kafka.application.idstate-store-name但这是一个临时解决方法(并且相应的数据/主题不会被删除)。

我该如何彻底清除它?


合流的文档建议 https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html#step-2-reset-the-local-environments-of-your-application-instances使用 KafkaStreams.cleanUp(),或手动删除目录/var/lib/kafka-streams/<application.id>(配置参数state.dir).

您还需要使用以下命令重置应用程序使用的所有主题专用重置工具 https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html - bin/kafka-streams-application-reset:

bin/kafka-streams-application-reset --application-id my-streams-app \
                                  --input-topics my-input-topic \
                                  --intermediate-topics rekeyed-topic

This post https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/关于重置状态非常有趣。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何删除/清除 Kafka Streams 中的状态存储? 的相关文章

  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 无法找到任何实现 Connector 且名称与 io.debezium.connector.mysql.MySqlConnector 匹配的类,可用的连接器有

    使用 Kafka MySQL 和 Debezium 设置数据流管道 我是这个版本的 Kafka 3 4 0 MySQL 8 Debezium 2 2 1 Java 11 目标 我想从 MySQL 捕获所有 CDC 并将数据流式传输到 Kaf
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153

随机推荐

  • DevPay和Mfa是互斥的授权方式

    我尝试使用以下命令通过 AWS cli 将 MFA 删除添加到我的 S3 存储桶 aws s3api put bucket versioning bucket
  • 2D 软体:凝胶状和可塑性?

    我正在使用 Matter js 物理学来尝试创建软体 我能够创建这样的身体 但我不确定这是否是我想要的 软体 确实 这个物体并不完全是刚性的 并且在碰撞和被拖动时具有弹性的感觉 我一直在寻找与凝胶有相似之处的身体 这张图片可能在视觉上有助于
  • Pandas 分割错误

    由于内存不足 以下代码行未成功执行 import pandas as pd import datetime as dt u cols remote host dummy1 dummy2 date timezone get status by
  • PDO 错误:一般错误:2031 [重复]

    这个问题在这里已经有答案了 当我执行代码时出现此错误 我知道这已经在这里讨论过几次了 但我无法通过阅读那里提供的解决方案来解决我的问题 这是我得到的错误 致命错误 未捕获异常 PDOException 消息为 SQLSTATE HY000
  • 对于软件开发人员来说,学习如何对微控制器进行编程有多难?

    我是一名软件开发人员 我使用高级语言进行编程已有几年了 我想知道如何迈出硬件编程的第一步 不是什么疯狂复杂的东西 但也许是一些普通的 CE 设备 假设我不需要将 PCB 与各种组件放在一起 而只是对微型 cpu 进行编程 我要到多低的级别
  • 该算法的复杂度(Big-O)是多少?

    我对算法分析相当熟悉 并且可以说出我使用的大多数算法的大体 但我已经被困了几个小时 无法为我编写的这段代码想出 Big O 基本上 它是一种生成字符串排列的方法 它的工作原理是使字符串中的每个字符成为第一个字符 并将其与子字符串减去该字符的
  • 我可以在摘要式身份验证中使用已 MD5 编码的密码吗

    我在数据库中有密码的 MD5 哈希值 我想将其用于 HTTP AUTH DIGEST 但在阅读文档时 摘要哈希看起来包含用户名 领域和明文密码的哈希 在这种情况下有什么办法可以使用密码的 MD5 哈希吗 不 如果他们需要的哈希是这样生成的
  • Spark 结构化流:多个接收器

    我们使用结构化流从 Kafka 进行消费 并将处理后的数据集写入 s3 我们还想将处理后的数据写入 Kafka 是否可以通过同一个流查询来完成此操作 火花版本2 1 1 在日志中 我看到流式查询进度输出 并且我有来自日志的示例持续时间 JS
  • 分割字符串后将值插入表中

    我想将值插入员工表中 这些值是字符串格式的 分开 E g AA B 123 我使用以下函数分割它 CREATE FUNCTION db owner FN Split String varchar 8000 Delimiter char 1
  • 在异步方法中使用反射获取方法名称不会返回预期结果

    以下是我编写的一小段代码 用于演示此问题的基础知识 Code private async void Form1 Load object sender EventArgs e var result await TestAsyncMethodN
  • 查找鼠标相对于面板的位置

    我试图获取鼠标在面板中的位置 如面板左上角 x y 0 0 我目前所拥有的给出了整个屏幕上的位置 因此根据面板 位于框架中 在屏幕上的位置 坐标是不同的 我想你可以添加 x y 坐标来解决这个问题 但这似乎是一个混乱的解决方案 有人可以帮忙
  • 错误的身份验证数据 QuickBlox - Android

    我正在研究QuickBlox SDK http quickblox com developers SimpleSample users android Sign In 26 Social authorization 使用这个 SDK 我尝试
  • 可以使用 boost::threads 中的 std::this_thread* 函数吗?

    可以混合搭配来自的东西吗 boost thread and std thread 或者应该为每个函数使用一组函数 我问是因为我的代码使用boost threads 但我发现boost this thread sleep for设置系统时间时
  • 如何检查远程 git 存储库 URL 的有效性?

    在 bash 脚本中 验证 git URL 是否指向有效的 git 存储库以及脚本是否有权读取它的最简单方法是什么 应该支持的协议有git https and git 卷曲失败git 协议 email protected cdn cgi l
  • 如何导致 ldap_simple_bind_s 超时?

    最近 我们的测试 LDAP 服务器遇到了问题 它挂起并且无法响应请求 结果 我们的应用程序在尝试绑定时永远挂起 这仅发生在 Unix 机器上 在 Windows 上 ldap simple bind s大约 30 秒后呼叫超时 我不知道是不
  • 将 UTF-8 文本转换为 wchar_t

    我知道这个问题已经被问过很多次了 我确实阅读了一些答案 但是有一些建议的解决方案 我试图找出其中最好的解决方案 我正在编写一个 C99 应用程序 它基本上接收以 UTF 8 编码的 XML 文本 它的部分工作是复制和操作该字符串 查找子字符
  • 从因子变量中删除特定因子水平

    我有一个数据框 其中包含多个具有 5 个因子水平的变量 我只想删除其中一个级别 首先 我将该级别的所有实例分配给 NA 然后使用droplevels命令摆脱空的水平 然而 对于我的数据框中的一个变量 我不想删除的级别之一没有任何观察结果 有
  • Boost:序列化/反序列化通过 ZeroMQ 拉套接字传递的自定义 C++ 对象

    描述 我有一个名为的 C 类通用消息它仅保存一个 id 和数据作为其成员 请参阅下面的代码片段 1 GenericMessage hxx 我的目的是序列化此类的实例并通过实现推送模式的 ZeroMQ 套接字发送它 序列化和发送任务已实现类
  • Python:如何对自定义 HTTP 请求处理程序进行单元测试?

    我有一个自定义 HTTP 请求处理程序 可以简化为如下所示 Python 3 from http import server class MyHandler server BaseHTTPRequestHandler def do GET
  • 如何删除/清除 Kafka Streams 中的状态存储?

    我有一个习惯Transformer在我的 kafka streams DSL 的末尾实现 并带有持久的变更日志KeyValueStore绑定到它 几周以来 我在商店里放了太多的数据 现在 每当我加载应用程序时 它就会消耗太多的内存 然而 应