用于重复数据删除的 Kafka 压缩

2023-12-01

我试图了解 Kafka 压缩的工作原理,并有以下问题:kafka 是否保证启用压缩的主题中存储的消息的键的唯一性?

Thanks!


简短的回答是否定的。

Kafka 不保证启用主题保留时存储的密钥的唯一性。

在 Kafka 中有两种类型cleanup.policy:

  • delete- 这意味着在配置的时间之后消息将不可用。有几个属性可以用于此目的:log.retention.hours, log.retention.minutes, log.retention.ms。默认情况下log.retention.hours is set 168。这意味着,消息older超过 7 天将deleted
  • compact- 对于每个键,至少有一条消息可用。在某些情况下可能是一个,但在大多数情况下会更多。压缩处理会定期在后台运行。它通过删除重复项并仅保留最后一个值来复制日志部分。

如果您只想读取每个键的一个值,则必须使用KTable<K,V>抽象自卡夫卡流.

关于键和压缩的最新值的相关问题:Kafka只订阅最新消息?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

用于重复数据删除的 Kafka 压缩 的相关文章

  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • WARN 获取相关 ID 为 1 的元数据时出错:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

    当我使用 kafka 运行以下命令时0 9 0 1 我收到这些警告 1 你能告诉我我的主题有什么问题吗 我正在与在 ec2 中运行的 kafka 经纪人交谈 kafka console consumer sh new consumer bo
  • 带有 Kafka 消费者的 Spring Boot 作业调度程序

    我正在开发一个 POC 我想使用来自 Kafka 主题 用户 的消息 尝试实现消费者应该从 Kafka 主题读取消息 一旦 spring boot 调度程序在预定时间或 cron 时间触发 那么我们应该开始从 kafka 主题中一一消费现有
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues

随机推荐

  • Android 和 Java:减少服务循环上的内存使用

    我有一个 Android 服务 它使用此线程每秒更新一个通知 评论并不真正相关 thread new Thread Override public void run Preparando la notificaci n de Swap No
  • 字符串 s = 新字符串(“xyz”)。这行代码执行后创建了多少个对象?

    这个面试问题的普遍同意的答案是代码创建了两个对象 但我不这么认为 我写了一些代码来确认 public class StringTest public static void main String args String s1 a Stri
  • 如何在MySQL中记录记录的顺序集合

    假设我有一张桌子 里面有某种类型的记录 比如烹饪说明 比如 Fold the melted chocolate into the egg whites 该表包含唯一 ID 字段和字符串 我想为食谱构建另一个表 每个表都有一个唯一的 ID 和
  • 在对象内部使用“this”和对象名称进行引用有什么区别?

    如果我有以下代码 var obj x 34 init function alert this x alert obj x 两个警报都显示 34 但是有什么区别 一个比另一个更好吗 http jsfiddle net 4scz435q 我在j
  • 如何在 C++ 中编写正确的哈希表析构函数

    我正在写一个 C 哈希表 这是我的析构函数 HashMap HashMap for int i 0 i
  • 如何从 kdeplot 获取半高全宽 (FWHM)

    我在一些数据上使用了seaborn的kdeplot import seaborn as sns import numpy as np sns kdeplot np random rand 100 是否可以从创建的曲线返回 fwhm 如果不是
  • 教义 FindBy 方法与“OR 条件”?

    是否可以使用OR教义中的声明findBy method 我希望输出是这样的 SELECT FROM friends WHERE userId 1 OR FriendId 1 现在的代码 user repository gt findBy a
  • tf.where 的 TensorFlow 梯度在不应该返回 NaN 时返回 NaN

    下面是可重现的代码 如果运行它 您将看到在第一次 sess 运行中 结果为 nan 而第二种情况给出了正确的梯度值 0 5 但根据指定的 tf where 和条件 它们应该返回相同的值 我也根本不明白为什么 tf where 函数梯度在 1
  • 显示日志文件更新时的内容

    我有外部程序 例如 ffmpeg 和 gstreamer 在后台运行并写入日志文件 我想用我的 Flask 应用程序显示此日志的内容 以便用户可以观看日志更新 例如tail f job log会在终端做 我尝试使用指向日志文件 但未能显示数
  • pyspark中的DataFilter是什么?

    我看到一个叫做DataFilter在我的查询执行计划中 FileScan parquet product id 12 price 14 Batched true DataFilters isnotnull product id 12 For
  • 计时器:如何在后台保持计时器处于活动状态

    在我的 iPhone 定时器应用程序中 其中计时器应在后台运行 所以 我已经在 appdelegate 中设置了通知 它工作得很好 这样我就可以从视图控制器调用方法 这使得计时器处于活动状态 看一些代码 应用程序委托 void applic
  • h2混合模式连接问题

    我在 servlet 上下文侦听器中启动 h2 数据库 public void contextInitialized ServletContextEvent sce org h2 Driver load String apprealPath
  • 如何使用 proguard 获取发布构建 apk 文件

    我正在尝试使用ProGuard为了为我的项目制作发布 apk 文件 显然我正在使用许多第三方库 我只需要使用其中的几个类 我真的很想得到一些关于此的解释 我的调试版本超过20 MB 所以我想通过使用来减少它shrinking用于progua
  • NDB 查询 fetch() 和 ContextOptions

    我想仅在我的一个查询中禁用上下文缓存 我想我可以这样做 MyModel query ancestor user key fetch 100 options ContextOptions use cache False use memcach
  • HTML5 的 History.js - 需要进行黑客攻击才能不破坏 IE7

    我的目标是仅支持 HTML5 浏览器的 AJAX 历史记录 但是 我希望我的网站能够使用 HTML4 浏览器 但没有 AJAX 历史记录 许多 History js 示例在执行任何操作之前都包含以下检查 if History enabled
  • mailto链接多条正文线

    无法在 mailto 链接中使多行正常工作 就我而言 我正在使用 Outlook 默认邮件阅读器对其进行测试 以下内容放入锚点 href 中 mailto email protected subject test body type 20y
  • 如何防止 XmlSerialzer 转义“嵌套 XML”?

    我正在使用 XmlSerializer 来序列化 反序列化复杂对象 一个属性包含一个 XML 字符串 应将其写入字符串属性而不进行反序列化 示例 可在 LinqPad 中执行 XmlRoot RootObject Serializable
  • 从日期时间获取日期名称

    如何从 Python 中的日期时间对象获取日期名称 例如星期一 星期二 星期三 星期四 星期五 星期六和星期日 所以 举例来说 datetime 2019 9 6 11 33 0 应该给我 Friday import datetime no
  • 覆盖单个文件的编译标志

    我想使用一组全局标志来编译项目 这意味着我在顶级 CMakeLists txt 文件中指定了 ADD DEFINITIONS Wall Weffc pedantic std c 0x 但是 对于子目录中的特定文件 假设为 foo cpp 我
  • 用于重复数据删除的 Kafka 压缩

    我试图了解 Kafka 压缩的工作原理 并有以下问题 kafka 是否保证启用压缩的主题中存储的消息的键的唯一性 Thanks 简短的回答是否定的 Kafka 不保证启用主题保留时存储的密钥的唯一性 在 Kafka 中有两种类型cleanu