我有三个 MSK 集群;开发、非产品和产品。它们都具有以下集群配置 - 没有主题级别配置。
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
log.retention.hours=100
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
Dev 和 Nonprod 正在清除超过 100 小时的消息,如log.retention.hours=100
环境。
我们的生产集群有更多的流量,并且旧消息没有被删除。集群上仍有数十万条超过 400 小时的消息。我考虑过添加进一步的配置设置,例如
segment.bytes
segment.ms
为了更快地滚动段,因为可能一个段尚未滚动并且无法标记为删除 - 然而,相同的配置在其他集群中运行良好,尽管没有收到那么多流量。
因此,事实证明,这是生产者以美国日期格式而不是英国日期格式向 Kafka 发送消息的问题。因此,它创建的消息似乎会在未来加上时间戳 - 因此不会早于 100 小时且符合删除条件。
要删除我们设置的现有消息log.retention.bytes
它会修剪消息,而不管log.retention.hours
环境。这导致 kafka 主题被修剪并删除错误消息 - 然后我们取消设置log.retention.bytes
.
接下来我们设置log.message.timestamp.type=LogAppendTime
确保消息带有与文档时间相反的队列时间。这将防止生产者的错误日期将来再次导致此问题。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)