发现这两个问题:here and here,但我还是不太明白。
我仍然有(意外的?)行为。
我尝试使用此配置来记录紧凑的 kafka 主题
kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test1 --config "cleanup.policy=compact" --config "delete.retention.ms=1000" --config "segment.ms=1000" --config "min.cleanable.dirty.ratio=0.01" --config "min.compaction.lag.ms=500"
然后我发送这些消息,每条消息至少有1秒的间隔
A: 3
A: 4
A: 5
B: 10
B: 20
B: 30
B: 40
A: 6
我期望的是几秒钟后(配置为 1000?),当我运行时kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic test1 --from-beginning
,我应该得到
A: 6
B: 40
相反,我得到:
A: 5
B: 40
A: 6
如果我发布另一条消息B:50
并运行消费者,我得到:
B: 40
A: 6
B: 50
而不是预期的
A: 6
B: 50
- 实际上,如何配置日志压缩呢?
- From 卡夫卡文档 : 日志压缩确保 Kafka 始终至少保留单个主题分区的数据日志中每个消息键的最后一个已知值
这是否意味着我只能对单个分区的主题使用日志压缩?
基本上,您自己已经提供了答案。正如 Kafka 文档中所述,“日志压缩确保 Kafka 始终保留至少是最后已知的单个主题分区的数据日志中每个消息键的值”。因此,不能保证您始终拥有一个键对应的一条消息。
如果我正确理解日志压缩,那么它并不适用于像您在非常有效的问题中提出的用例。相反,它的目的是最终达到主题中每个键仅存在一条消息的阶段。
日志压缩是一种提供更细粒度的每条记录保留的机制,而不是提供更粗粒度的基于时间的保留。这个想法是有选择地删除具有相同主键的最新更新的记录。这样可以保证日志至少具有每个键的最后状态。
如果您计划仅保留每个键的最新状态,并希望处理尽可能少的旧状态(非压缩主题的情况,取决于基于时间/大小的保留,压缩主题是正确的选择) )。据我所知,日志压缩的用例是保存最新的地址、手机号码、数据库中的值等。这些值不会每时每刻都在变化,并且通常有很多键。
从技术角度来看,我猜您的情况发生了以下情况。
当涉及到压缩时,日志被视为分为两部分
-
Clean:之前已经压缩过的消息。此部分仅包含每个键的一个值,该值是上一次压缩时的最新值。
-
Dirty:上次压缩后写入的消息。
生成消息后B: 40
(A: 5
已经生产出来了)clean
日志的一部分是空的并且dirty/active
部分包含A: 5
and B: 40
。消息A: 6
还不是日志的一部分。产生新消息A: 6
将开始对日志的脏部分(因为你的比率非常低)进行压缩,但是排除新消息本身。如前所述,没有更多内容需要清理,因此新消息将仅添加到主题中,并且现在位于日志的脏部分中。与您在生产时观察到的情况相同B: 50
.
此外,压实将never发生在您的活跃段上。所以,即使你设置了segment.ms
只是1000 ms
它不会产生新的段,因为产生后没有新数据传入A: 6
or B: 50
.
为了解决您的问题并遵守您需要生成另一条消息的期望C: 1
生产后A: 6
or B: 50
。这样,清洁工可以再次比较原木的干净部分和脏部分,并将其清除A: 5
or B: 40
.
同时,看看这些段在 Kafka 日志目录中的行为如何。
从我的角度来看,日志压缩的配置完全没问题!这并不是观察预期行为的正确用例。但对于生产用例,请注意您当前的配置会尝试非常频繁地启动压缩。根据数据量,这可能会变得相当 I/O 密集型。默认比率设置为是有原因的0.50
log.roll.hours 通常设置为 24 小时。此外,您通常希望确保消费者有机会在压缩之前读取所有数据。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)