kafka 主题中的唯一消息检查

2024-02-14

我们使用 Logstash,希望从 Oracle 数据库读取一张表并将这些消息(如下所示)发送到 Kafka:

Topic1: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"2019-07-30"}
        message4: {"name":"name-4", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}

请注意message1 and message4是重复项same ID number.

现在,我们希望确保所有消息都是唯一的,那么我们如何过滤topic1并且所有消息都是唯一的,然后发送到topic2?

我们想要的最终结果:

Topic2: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"

这被称为一次性处理.

您可能对第一部分感兴趣卡夫卡常见问题解答 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactly-oncemessagingfromKafka?描述了如何避免数据生产重复的一些方法(即从生产者方面):

Exactly Once 语义有两部分:避免数据重复 生产并避免数据消费过程中的重复。

有两种方法可以在数据期间获取恰好一次语义 生产:

  1. 每次获得网络时,每个分区都使用单写入器 错误检查该分区中的最后一条消息,看看您的最后一条消息是否 写入成功
  2. 在中包含主键(UUID 或其他) 消息并对消费者进行重复数据删除。

如果您执行其中一项操作,Kafka 托管的日志将是 无重复。然而,无重复的阅读取决于一些 也来自消费者的合作。如果消费者定期 检查其位置,如果失败并重新启动,它将 从检查点位置重新启动。因此,如果数据输出和 检查点不是以原子方式写入的,因此可以获得 此处也重复。这个问题是你的存储所特有的 系统。例如,如果您正在使用数据库,您可以提交 这些一起在一个交易中。 LinkedIn 推出的 HDFS 加载器 Camus write 对 Hadoop 负载做了类似的事情。另一种选择 不需要事务的方法是将偏移量存储在 使用主题/分区/偏移量加载数据并进行重复数据删除 组合。

我认为有两个改进可以让这变得更容易:

  1. 生产者幂等性可以自动完成并且成本更低 通过选择在服务器上集成对此的支持。
  2. 现有的 高级消费者不会暴露很多更细粒度的信息 控制偏移(例如重置您的位置)。我们将工作 很快

另一个选择(这不完全是您正在寻找的)是日志压缩。假设您的重复消息具有相同的密钥,那么当日志压缩策略有效时,日志压缩最终将删除重复项。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka 主题中的唯一消息检查 的相关文章

  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • Logstash删除类型并保留_type

    我有一个logstash 客户端和服务器 客户端将带有logstash的udp输出的日志文件发送到服务器 服务器也运行logstash来获取这些日志 在服务器上 我有一个 json 过滤器 它会在实际日志的字段中提取 json 格式的消息
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties

随机推荐

  • VS 2015导入和导出设置,消失的窗口

    帖子已更新 03 05 2017 人们在这里讨论这个问题 VS 2010 导入导出设置 https stackoverflow com questions 3624073 vs 2010 import export settings VS2
  • 在 JavaScript 中解码 HTML 实体? [复制]

    这个问题在这里已经有答案了 转换示例 amp gt gt gt gt 有什么小的库函数可以处理这个问题吗 我的实用腰带上总是有这个小功能 function htmlDecode input var e document createElem
  • 在 C/C++ 中读/写半字节(无位字段)

    有没有一种简单的方法可以在不使用位字段的情况下读取 写入字节中的半字节 我总是需要读取两个半字节 但需要单独写入每个半字节 Thanks 使用面膜 char byte byte byte 0xF0 nibble1 0xF write low
  • 限制 WAMP 文件系统访问

    我的 PC 上安装了 WAMP 堆栈 昨天 我正在使用 PHP 处理文件系统 并注意到我可以访问硬盘中的任何目录 甚至可以访问网站文档根目录之上的目录 这是一个我想避免的明显的安全问题 目前 我在 WAMP 堆栈中使用多个虚拟主机以及使用主
  • 使用 -deprecation 运行 SBT

    我的 project build scala 文件中似乎有警告 不在我的 Scala 项目中 如何配置 SBT 来运行 deprecation flag Does not help so do not suggest it scalacOp
  • PHP Curl 输出缓冲区未收到响应

    我有一个协议 其中 file1 phpcurl 运行 file2 php file2 php 是一个长时间运行的文件 但它发送 或应该发送 一个响应回 file1 php 然后继续执行它的代码 我正在使用输出缓冲区来尝试发送此数据 但问题是
  • 列出 COMobject 中的所有方法

    是否可以 内容如下 import win32com client ProgID someProgramID com object win32com client Dispatch ProgID for methods in com obje
  • 如何使用 jQuery 在选择框中显示 JSON 数据?

    这是我的 JSON 数据 ACT Australian Capital Territory NSW New South Wales NT Northern Territory QLD Queensland SA South Australi
  • 列表列表的总和

    我正在寻找 python 中的方法来对仅包含整数的列表进行求和 我看到了这个方法sum 仅适用于列表 不适用于列表的列表 有什么适合我的吗 谢谢 您可以使用sum 这里有一个生成器表达式 In 18 lis 1 2 3 4 5 6 In 1
  • SELECT 语句中表达式的执行顺序

    我想知道表达式的执行顺序SELECT声明总是发生于左到右 SET a 0 SELECT a AS first a a 1 AS second a a 1 AS third a a 1 AS fourth a a 1 AS fifth a a
  • Qt Creator 代码编辑期间 CPU 为 100%

    我有 Qt Creator 项目 它用boost and Point Cloud library 当我编辑包含这些库中的内容的文件时 Qt Creator 在每次代码更改 添加行 更改变量类型等 后挂起大约 30 秒 TaskManager
  • “AsyncThunkAction”类型的参数不可分配给“AnyAction”类型的参数

    store ts export const store configureStore reducer auth authReducer middleware export type AppDispatch typeof store disp
  • git 子模块 Visual Studio 2017 未下载

    我们已经开始利用git submodules作为跨项目重用公共代码的一种方式 我们在工作中有一个简单的规则 当人们get你的代码应该build第一次 您的代码应该建立在可视化在线基础上 而不需要太多麻烦 如果我通过添加共享代码git sub
  • 使用 React + Flux 时,操作或存储是否应该负责转换数据?

    使用 React 和 Flux 时 标准做法是从操作中进行 API 调用 然后将结果数据存储在 Store 类中 但是 谁应该负责存储数据后对其进行转换呢 示例 我有一个EntryStore保存代表购物清单项目的对象 我有许多可以应用于它们
  • 如何在 Android 中使用设备管理员应用程序阻止安装其他应用程序

    如何使用 Android 中的设备管理员应用程序阻止安装其他应用程序 我们如何对其进行编码以限制使用设备管理应用程序安装应用程序 请帮帮我 据我所知 对于 ICS
  • 推送到heroku的应用程序仍然显示标准索引页面

    我完成了安装 git 和 heroku gem 的步骤 并成功将我的应用程序推送到了 heroku 问题是 它显示了一个标准的 You re Riding Ruby on Rails 页面 即使我的本地应用程序已将路由设置为根到某个控制器
  • 如何查看查询中的参数?

    为了调试我的代码 我想查看执行的显式 sql 查询 我创建查询createQueryBuilder 我实现的最明确的事情是使用以下原始查询 qb gt getQuery gt getSQL 问题是我看到的不是参数 而是持有者 我在网上找到了
  • 使用 awk 打印文件中的重复行

    我需要打印文件中的所有重复行uniq D选项不支持 所以我正在考虑使用 awk 打印重复行的另一种方法 我知道 我们在 awk 中有一个选项 如下所示 测试文件 txt apple apple orange orange cherry ch
  • 如何在 ASCII 表中以科学记数法显示数字?

    我试图在 ASCII 表中显示极小的数字 library stargazer example lt data frame parameter letters value runif 26 min 1E 14 max 5E 14 starga
  • kafka 主题中的唯一消息检查

    我们使用 Logstash 希望从 Oracle 数据库读取一张表并将这些消息 如下所示 发送到 Kafka Topic1 message1 name name 1 id fbd89256 12gh 10og etdgn1234njF si