Kafka 日志压缩始终显示同一键的最后两条记录

2023-12-12

发现这两个问题:here and here,但我还是不太明白。 我仍然有(意外的?)行为。

我尝试使用此配置来记录紧凑的 kafka 主题

kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test1 --config "cleanup.policy=compact" --config "delete.retention.ms=1000" --config "segment.ms=1000" --config "min.cleanable.dirty.ratio=0.01" --config "min.compaction.lag.ms=500"

然后我发送这些消息,每条消息至少有1秒的间隔

A: 3
A: 4
A: 5
B: 10
B: 20
B: 30
B: 40
A: 6

我期望的是几秒钟后(配置为 1000?),当我运行时kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic test1 --from-beginning,我应该得到

A: 6
B: 40

相反,我得到:

A: 5
B: 40
A: 6

如果我发布另一条消息B:50并运行消费者,我得到:

B: 40
A: 6
B: 50

而不是预期的

A: 6
B: 50
  1. 实际上,如何配置日志压缩呢?
  2. From 卡夫卡文档 : 日志压缩确保 Kafka 始终至少保留单个主题分区的数据日志中每个消息键的最后一个已知值
    这是否意味着我只能对单个分区的主题使用日志压缩?

基本上,您自己已经提供了答案。正如 Kafka 文档中所述,“日志压缩确保 Kafka 始终保留至少是最后已知的单个主题分区的数据日志中每个消息键的值”。因此,不能保证您始终拥有一个键对应的一条消息。

如果我正确理解日志压缩,那么它并不适用于像您在非常有效的问题中提出的用例。相反,它的目的是最终达到主题中每个键仅存在一条消息的阶段。

日志压缩是一种提供更细粒度的每条记录保留的机制,而不是提供更粗粒度的基于时间的保留。这个想法是有选择地删除具有相同主键的最新更新的记录。这样可以保证日志至少具有每个键的最后状态。

如果您计划仅保留每个键的最新状态,并希望处理尽可能少的旧状态(非压缩主题的情况,取决于基于时间/大小的保留,压缩主题是正确的选择) )。据我所知,日志压缩的用例是保存最新的地址、手机号码、数据库中的值等。这些值不会每时每刻都在变化,并且通常有很多键。

从技术角度来看,我猜您的情况发生了以下情况。

当涉及到压缩时,日志被视为分为两部分

  • Clean:之前已经压缩过的消息。此部分仅包含每个键的一个值,该值是上一次压缩时的最新值。
  • Dirty:上次压缩后写入的消息。

生成消息后B: 40 (A: 5已经生产出来了)clean日志的一部分是空的并且dirty/active部分包含A: 5 and B: 40。消息A: 6还不是日志的一部分。产生新消息A: 6将开始对日志的脏部分(因为你的比率非常低)进行压缩,但是排除新消息本身。如前所述,没有更多内容需要清理,因此新消息将仅添加到主题中,并且现在位于日志的脏部分中。与您在生产时观察到的情况相同B: 50.

此外,压实将never发生在您的活跃段上。所以,即使你设置了segment.ms只是1000 ms它不会产生新的段,因为产生后没有新数据传入A: 6 or B: 50.

为了解决您的问题并遵守您需要生成另一条消息的期望C: 1生产后A: 6 or B: 50。这样,清洁工可以再次比较原木的干净部分和脏部分,并将其清除A: 5 or B: 40.

同时,看看这些段在 Kafka 日志目录中的行为如何。

从我的角度来看,日志压缩的配置完全没问题!这并不是观察预期行为的正确用例。但对于生产用例,请注意您当前的配置会尝试非常频繁地启动压缩。根据数据量,这可能会变得相当 I/O 密集型。默认比率设置为是有原因的0.50log.roll.hours 通常设置为 24 小时。此外,您通常希望确保消费者有机会在压缩之前读取所有数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 日志压缩始终显示同一键的最后两条记录 的相关文章

  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • 使用 Kafka Streams 在输出中设置时间戳无法进行转换

    假设我们有一个变压器 用 Scala 编写 new Transformer String V String V var context ProcessorContext override def init context Processor
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153

随机推荐