MSK 不删除旧消息

2023-12-20

我有三个 MSK 集群;开发、非产品和产品。它们都具有以下集群配置 - 没有主题级别配置。

auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
log.retention.hours=100
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000

Dev 和 Nonprod 正在清除超过 100 小时的消息,如log.retention.hours=100环境。

我们的生产集群有更多的流量,并且旧消息没有被删除。集群上仍有数十万条超过 400 小时的消息。我考虑过添加进一步的配置设置,例如

segment.bytes
segment.ms

为了更快地滚动段,因为可能一个段尚未滚动并且无法标记为删除 - 然而,相同的配置在其他集群中运行良好,尽管没有收到那么多流量。


因此,事实证明,这是生产者以美国日期格式而不是英国日期格式向 Kafka 发送消息的问题。因此,它创建的消息似乎会在未来加上时间戳 - 因此不会早于 100 小时且符合删除条件。

要删除我们设置的现有消息log.retention.bytes它会修剪消息,而不管log.retention.hours环境。这导致 kafka 主题被修剪并删除错误消息 - 然后我们取消设置log.retention.bytes.

接下来我们设置log.message.timestamp.type=LogAppendTime确保消息带有与文档时间相反的队列时间。这将防止生产者的错误日期将来再次导致此问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MSK 不删除旧消息 的相关文章

  • 谁在为kafka集群设置授权

    我有一个 3 节点 Kafka 集群和 2 个用于生产者和消费者的 kafka 客户端 我已启用 SSL 身份验证 我想为集群启用授权 我已在代理节点的 server properties 中添加了以下属性 authorizer class
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • python 脚本在 docker 内运行时无法导入 kafka 库 [重复]

    这个问题在这里已经有答案了 我有以下 python 脚本 可以从 twitter 中提取推文并将其发送到 kafka 主题 该脚本运行完美 但是当我尝试在 docker 容器内运行它时 它无法导入 kafka 库 它说 语法错误 语法无效
  • Spring Cloud Stream动态通道

    我正在使用 Spring Cloud Stream 想要以编程方式创建和绑定通道 我的用例是 在应用程序启动期间 我收到要订阅的 Kafka 主题的动态列表 如何为每个主题创建一个频道 我最近遇到了类似的场景 下面是我动态创建 Subscr
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • 如何使用不同的kafka主题配置Kubernetes部署的微服务的每个pod/进程?

    在我们的应用程序中 有多个不同 kafka 主题的消费者 例如 Cosumer C1 Cosumer C2 Cosumer C3 Cosumer C4 Cosumer C5 以及不同的 kafka 主题 例如主题 1 主题 2 主题 3 主
  • 使用kafka lib反序列化PRIMITIVE AVRO KEY

    我目前无能力反序列化 avro PRIMITIVE 密钥在 KSTREAM 应用程序中 使用 avro 模式编码的密钥 在模式注册表中注册 当我使用 kafka avro console consumer 时 我可以看到密钥已正确反序列化
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 使用 AWS MSK 连接器连接到 AWS VPC 内的 MongoDB atlas

    我正在尝试使用MongoDB使用更改流Kafka 我选择 AWS MSK 是因为我的整个基础设施都位于 AWS 内 并且可以轻松与其他 AWS 服务集成 I created an AWS MSK cluster within the VPC
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re

随机推荐

  • 如何将 Bamboo 变量从 Bamboo 脚本发送到 docker 容器?

    我正在为bamboo 使用 Docker 插件 我需要在 docker 容器中执行一个脚本 sh 脚本包含 echo ini source path bamboo ini source path 如果我将此行直接放入容器命令中 则 bamb
  • 删除或重置 Cookie

    我正在设置一个cookieRequest Cookies TemplateName value在我的申请的其中一页 第 3 页 上 现在我可以从第 3 页导航到第 4 页和第 2 页 并保留 cookie 的值 但是现在当我注销并再次登录时
  • RxJS 更新,类型“typeof Observable”上不存在属性“merge”

    我更新了我的材质角度项目 以在表中包含可扩展的详细信息行 为此 我需要升级到 rsjx 6 现在我收到以下错误 我对角度完全陌生 所以不幸的是我不知道如何解决这个问题 Property merge does not exist on typ
  • 发送带有授权标头的 axios get 请求

    我尝试使用 vue js 发送 axios get 请求 当不需要发送标头时它工作得很好 但是 当需要发送授权 jwt 时 我收到 CORS 错误 对预检请求的响应未通过访问控制检查 请求的资源上不存在 Access Control All
  • 通过地理位置获取用户的状态

    获取美国用户所在州的最有效方法是什么 HTML5 地理定位是否是一种无需涉及谷歌地图的选项 这里有几个 JavaScript 和 JSON 的例子 在jQuery http jquery com 使用IP查找方法 借助IPinfoDB ht
  • UITableView - 使用 Swift 注册类

    其他人在使用时遇到问题吗tableView registerClass方法与斯威夫特 它不再为我提供代码补全 如果手动键入 我也不能使用它 但它仍然在标题中 它对我来说非常有效 self tableView register UITable
  • CGAL:继承和内核

    CGAL问题 我正在尝试向点类添加一个属性 我想第一步是继承一个内核并用我自己的从 CGAL 继承的点类替换点类 但只是想迈出这小小的第一步 我就遇到了麻烦 编辑 根据下面的评论 我将继承更改为手册中描述的方式 下面的代码给出了以下编译错误
  • 程序集 32 位打印显示在 qemu 上运行的代码,无法在真实硬件上运行

    我已经用 x86 汇编语言编写了一小段在裸硬件上运行的代码 此时 它已启用受保护的 32 位模式 然而 我遇到了与屏幕打印有关的问题 我读到 要在不中断的情况下执行此操作 可以将字符加载到特殊的内存区域 即 RAM 地址 0xb8000 知
  • C++ new、delete 和函数

    这对我来说有点不清楚 所以 如果我有一个函数 char test int ran char ret new char ran process return ret 然后多次调用它 for int i 0 i lt 100000000 i c
  • 搜索过滤器:最少字符数

    这是我用于至少 3 个字符检查的简单代码 如果查询是全数字的 我想例外 代码还可以选择按案例 ID 进行搜索 案例 ID 少于 3 个字符 感谢帮助 用这个 if strlen POST Search gt 3 is numeric POS
  • AutoMapper:将接口映射到抽象类 - 这可能吗?

    我在用着自动映射器 http automapper codeplex com在我的应用程序的不同层之间映射对象 一方面 我有一个如下所示的界面 public interface MyRepo IEnumerable
  • 如何获得 OxyPlot 中的所有颜色?

    我有一个问题 我在 WPF C 中使用 OxyPlot 我需要将所有颜色设置为系列的 MarkerType 和 MarkerStroke 我怎样才能获得所有颜色 Green IndianRed 等是静态 OxyColors 类中的静态字段
  • javax.el.PropertyNotFoundException:在 JSP 中使用 JSTL [重复]

    这个问题在这里已经有答案了 我有一个 JSP 我尝试使用 JSTL 标记来显示类的内存实例中的数据 该数据由一系列字符串组成 其中每个字符串都是 RSS 提要的地址 在 JSP 中 我有以下代码 table border 1 tr tr t
  • view.invalidate() 无法重绘 imageview

    好吧 伙计们 这可能听起来很愚蠢 但我已经用头撞键盘有一段时间了 试图找出为什么这不会刷新 基础知识 我有一个小示例应用程序 我正在测试它是否可以将图像围绕一个点旋转 X 度 并一次显示一个度以制作平滑的动画 所以我有一个很棒的示例 我发现
  • 如何根据键名合并2个数组并根据合并后的值进行排序?

    假设我有两个列表 const listA apple 100 banana 50 pearl 10 cherry 5 kiwi 3 const listB peach 30 apple 15 kiwi 10 mango 5 问题是如何将两个
  • 为什么导航在版本 2.4.1 的导航抽屉活动模板中不起作用?

    使用Android Studio 2021 1 1 使用导航抽屉活动创建一个新项目 使用导航抽屉活动模板创建了一个默认的 Android 应用程序 在项目中添加了一个设置片段来测试action settings菜单和配置菜单项 被覆盖onO
  • Python“‘模块’对象不可调用”

    我正在尝试制作一个情节 from matplotlib import import sys from pylab import f figure figsize 7 7 但是当我尝试执行它时出现此错误 File mratio py line
  • 我可以在 git config 中设置推送选项(git push -o "...")吗?

    Git 2 10 引入了git推送选项 https git scm com docs git push git push o git push o my string 许多命令行选项都是可配置的 我想知道是否也可以这样做 我没能找到它git
  • 无法使用自定义 DelegateProxy 和协议接收事件

    我尝试将 DifficultyViewDelegate 的委托迁移到可观察的 这是我的 DifficultyViewDelegate objc protocol DifficultyViewDelegate class func level
  • MSK 不删除旧消息

    我有三个 MSK 集群 开发 非产品和产品 它们都具有以下集群配置 没有主题级别配置 auto create topics enable false default replication factor 3 min insync repli