Kafka连接消费者引用偏移量并存储在消息中

2024-03-04

如果我使用 kafka-connect 来消费消息并存储到 s3(使用 kafka-connect s3 连接器),我是否可以将消息偏移量与事件负载一起存储?我希望使用这些数据对消息进行一些排序,并检查是否存在任何间隙或检查我收到的消息中是否有重复项。 (例如,如果我的消费者偏移量被意外破坏并且我重新启动了 kafka-connect)。这是可能的还是我应该为这种类型的功能编写一个自定义订阅者?


根据有关的文档插入字段 https://docs.confluent.io/current/connect/transforms/insertfield.html#id1转换,你可以使用offset.field:

Name            Description
offset.field    Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

总体而言,您的单消息转换 (SMT) 配置如下所示:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"

如果这不是您想要的,那么总是可以选择创建您的定制 https://docs.confluent.io/current/connect/transforms/custom.html#custom-transform转变

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka连接消费者引用偏移量并存储在消息中 的相关文章

  • 如何在Golang中创建kafka消费者组?

    可用的库是sarama https github com Shopify sarama 或其扩展萨拉玛簇 https github com bsm sarama cluster 但是没有提供消费者组示例 不在sarama https god
  • 如何在java程序中获取kafka消耗滞后

    我写了一个java程序来消费来自kafka的消息 我想监控消费延迟 如何通过java获取它 顺便说一句 我用
  • python 脚本在 docker 内运行时无法导入 kafka 库 [重复]

    这个问题在这里已经有答案了 我有以下 python 脚本 可以从 twitter 中提取推文并将其发送到 kafka 主题 该脚本运行完美 但是当我尝试在 docker 容器内运行它时 它无法导入 kafka 库 它说 语法错误 语法无效
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • Kafka服务器未远程连接zookeeper服务器

    我正在尝试将 kafka 服务器 在 Windows 系统上 连接到 Zookeeper 服务器 我面临着 Opening socket connection to server 10 160 10 25 10 160 10 25 2181
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream

随机推荐

  • Bootstrap 3 删除内联输入之间的空间

    我正在做一个简单的内联表单 如何调整每个输入表单之间的空白 我想让输入基本上接触 div class row div
  • 如何导入带有回调函数的外部JS?

    我正在使用 Google API 根据他们的链接我必须将以下脚本放入 HTML 文件中 client js加载成功后正在加载自定义回调函数 function callback var ROOT https your app id appsp
  • SyntaxError:IncomingMessage 处 JSON.parse () 处的 JSON 输入意外结束。

    我正在尝试制作一个天气应用程序 并且正在使用天气 API 来获取信息 但是当我尝试解析 JSON 数据时会出现此错误 SyntaxError Unexpected end of JSON input at JSON parse
  • java中Iterator()的时间复杂度

    我是 Java 新手 我有一个关于java iterator 的时间复杂度的问题 Set
  • pandas 中的 read_table 和 read_csv 有区别吗?

    我已经对其进行了测试 并检查了文档 没有发现明显的差异 无论哪种方式 我都想问以防万一 您是否认为 read csv 应该仅用于 csv 即使它适用于其他类型 而 read table 有什么作用 它们存在时是否相同 您可以使用其中之一来处
  • 使用 Google+ 环聊 API

    如果有人添加 电子邮件受保护 cdn cgi l email protection到他的 Google Hangout 联系人并向其发送一条 Google Hangout Chat 消息 我想从我的服务器设置自动回复他的聊天 有没有办法做到
  • C# 遍历二维数组

    for int k 0 k lt odds GetLength 1 k 上面的代码行应该迭代 Double 类型的二维数组 但不断抛出以下异常 索引超出范围异常 有哪位好心人能解释一下原因并提供解决方案吗 非常感谢 您正在将无效索引传递给G
  • 是否可以为条形图中的每个类别自定义标签?

    最近 我收到一个要求 需要创建一个显示每个项目数据的条形图 这是一个例子 如您所见 Category是项目的名称 并且Series是该项目中不同类型的数据 但是 由于系统不保证项目名称的唯一性 将其用作类别可能会导致问题 并且我将无法使用项
  • CakePHP Cookie 被打乱 - Suhosin 相关

    由于某种原因 在设置后 我无法在任何页面上读取 CakePHP 应用程序中的任何 cookie 唯一返回的是乱码文本 我的代码很简单 this gt Cookie gt write Region test reg this gt Cooki
  • Linux 系统调用的内部结构

    当线程通过引发中断 80 进行系统调用时会发生什么 详细 Linux 对线程的堆栈和其他状态做了什么工作 对处理器进行了哪些更改才能将其置于内核模式 运行中断处理程序后 控制权如何恢复到调用进程 如果系统调用无法快速完成怎么办 例如从磁盘读
  • 有没有办法让 SQLAlchemy 不在 BEGIN 和 COMMIT 中包装 SQL 写入?

    我在 Pylons 1 0 框架上使用 SQLAlchemy 0 6 4 我已经尝试了将 autoflush 和 autocommit 设置为 True 和 False 的所有排列 但我发现 SQLAlchemy 想要包装所有 SQL 会话
  • R 包不存在

    我收到了可怕的包裹R不存在 它要了我的命 代码很好 我在市场上有运行该代码的应用程序 现在效果很简单webview应用程序 的 0 R string app name我的有错吗main xml and string xml很好 我很困惑 我
  • Libgdx - 支持 OpenGL 4+

    是否可以仅使用 Libgdx 进行桌面开发并从 Libgdx 抽象访问较新版本的 Opengl 我的意思是 如果你想开发移动应用程序 你将必须使用 Opengl ES 的某个版本 它不支持经典 Opengl 那样的很多功能 例如绘制线框等
  • Django-channels:ChatConsumer 仅向一个用户发送消息,而不是向两个用户发送消息

    我正在使用 django channels 和 redis 在 django 和 Angular 中实现聊天应用程序 套接字已连接并正常工作 但我面临的问题是 当两个用户在线并使用相同的线程 url 连接同一个聊天室时 它会连接 但任何用户
  • 使用包含单行分区的 Cassandra 表是一种不好的做法吗?

    假设我有一张这样的桌子 CREATE TABLE request transaction id text request date timestamp data text PRIMARY KEY transaction id 据我了解 tr
  • PHP 包含绝对路径

    我的网站上有一个名为 basePath 的变量 其设置为 basePath Systems dgw 我在所有 css js 和图像标签上使用它 为了更好的可见性而缩短 我对这些包含没有任何问题 它们在我所在的任何文件和文件夹中都可以正常工作
  • pip 安装错误:SyntaxError:语法无效

    尝试在 CentOS6 上安装 virtualenv requests 或 pex 时 pip install 不起作用 我使用的是 python2 6 和 pip 9 0 1 谁能告诉我为什么会发生这种情况 pex build root
  • 如何获取点击了哪个按钮?

    我想在单击特定按钮时检查某些条件 该怎么做 document ready function var prm Sys WebForms PageRequestManager getInstance prm add initializeRequ
  • Android 数据绑定构建错误:[数据绑定插件]:无法设置数据绑定

    我正在使用 Google 的 Android 数据绑定库 我曾经使用过该库 现在 似乎 某些东西 发生了变化 并且给数据绑定库带来了问题 我被一个非常普遍的错误所困扰 该错误被证明很难调试 data binding plugin faile
  • Kafka连接消费者引用偏移量并存储在消息中

    如果我使用 kafka connect 来消费消息并存储到 s3 使用 kafka connect s3 连接器 我是否可以将消息偏移量与事件负载一起存储 我希望使用这些数据对消息进行一些排序 并检查是否存在任何间隙或检查我收到的消息中是否