Kafka-MongoDB Debezium 连接器:分布式模式

2024-01-10

我正在研究 debezium mongodb 源连接器。我可以通过将 kafka bootstrap 服务器地址提供为远程计算机(部署在 Kubernetes 中)和远程 MongoDB url 来在本地计算机上以分布式模式运行连接器吗?

我尝试了这个,我看到连接器成功启动,没有错误,只有几个警告,但没有数据从 mongodb 流出。

使用以下命令运行连接器

./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties ./etc/kafka/connect-mongodb-source.properties

如果不是,我还能如何实现这一点,我不想像大多数教程所建议的那样安装本地 kafka 或 mondoDB。我想为此使用我们的测试服务器。

为此遵循以下教程: https://medium.com/tech-that-works/cloud-kafka-connector-for-mongodb-source-8b525b779772 https://medium.com/tech-that-works/cloud-kafka-connector-for-mongodb-source-8b525b779772

以下是该问题的更多详细信息 连接器工作正常,我在连接器日志末尾看到以下几行

 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1000)
] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1021)

我还在 /etc/kafka/connect-mongodb-source.properties 中定义了 MongoDB 配置,如下所示

name=mongodb-source-connector 
connector.class=io.debezium.connector.mongodb.MongoDbConnector 
mongodb.hosts=/remoteserveraddress:27017 
mongodb.name=mongo_conn 
initial.sync.max.threads=1 
tasks.max=1

但数据并没有在 MongoDB 和 Kafka 之间流动。我还发布了有关 Kafka-MongoDB Debezium Connector 的单独问题:分布式模式

任何指针表示赞赏


connect-distributed只接受单个属性文件。

您必须使用 REST API 在分布式模式下配置 Kafka Connect。

https://docs.confluence.io/current/connect/references/restapi.html https://docs.confluent.io/current/connect/references/restapi.html

注意:默认情况下,消费者将读取主题之外的最新数据,而不是现有数据。

您可以将其添加到connect-avro-distributed.properties要解决这个问题

consumer.auto.offset.reset=earliest
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka-MongoDB Debezium 连接器:分布式模式 的相关文章

  • 如何在Golang中创建kafka消费者组?

    可用的库是sarama https github com Shopify sarama 或其扩展萨拉玛簇 https github com bsm sarama cluster 但是没有提供消费者组示例 不在sarama https god
  • Kafka中如何实现强一致性?

    尝试了解 Kafka 中的一致性维护 请找出场景并帮助理解 Number of partition 2 Replication factor 3 Number of broker in the cluster 4 那么 为了实现强一致性 需
  • 通过API服务端点消费来自Kafka主题的消息

    目前 我有一个 API 服务端点 用 netcore6 C 编写 它将接受消息对象并将其保存到数据库 然后将该消息发布到 kafka topic 2 如何利用我的 API 服务端点始终监听 观看并连接到 kafka topic 1 一旦新消
  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 当记录处理时间超过“max.poll.interval.ms”时,在消费过程中记录/消息会发生什么?

    我的消费者设置如下 auto offset reset earliest enable auto commit true default value session timeout ms 10000 default value max po
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • 带有 kafka-avro-console-consumer 的未知魔法字节

    我一直在尝试将 Confluence 中的 kafka avro console consumer 连接到我们的旧版 Kafka 集群 该集群是在没有 Confluence Schema Registry 的情况下部署的 我使用以下属性显式
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 使用 AWS MSK 连接器连接到 AWS VPC 内的 MongoDB atlas

    我正在尝试使用MongoDB使用更改流Kafka 我选择 AWS MSK 是因为我的整个基础设施都位于 AWS 内 并且可以轻松与其他 AWS 服务集成 I created an AWS MSK cluster within the VPC
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream

随机推荐

  • 关于类成员函数指针的sizeof[重复]

    这个问题在这里已经有答案了 假设我们有一个 A 类 class A 和这些 typedef typedef void A a func ptr void typedef void func ptr void 我的问题是为什么 sizeof
  • 请讨论什么是 portlet 以及为什么使用 portlet

    为什么我要在 tomcat 和 gwt 之上使用 java portlet Portlet 是否会减少或不需要我使用 jsp 和 jsf Jboss 是否已成为 Portlet 演化文化的一部分 Jboss 是否满足 portlet jsr
  • 无法解析类型“jint”,以及 JNIEnv、jclass

    尝试使用 jni c 代码构建一个简单的 helloWorld android java 应用程序 我在 Windows 7 上使用 Eclipse Indigo 在非空间路径中安装了 ndk r8 最终使用 ndk build cmd 构
  • Linux 获取开机以来的系统时间

    我需要找到系统时间 因为我的 C 代码中的 Linux 机器已通电 time 和 gettimeofday 等函数返回自纪元以来的时间 而不是开机以来的时间 如何查找自开机以来的时间或时钟滴答数 提前致谢 该信息是通过以下方式提供的 pro
  • 清单中的 Android 抽象活动

    对于我的应用程序 我将创建各种扩展 android app Activity 和 android app Service 类的抽象类 当我对抽象类进行子类化时 如何将它们添加到 Android 清单中 我是否需要将抽象类和我的子类都添加到清
  • 使用 Jsoup 获取网页元素

    我正在尝试使用Jsoup从名为 Morningstar 的网站获取股票数据 我查看了其他论坛 但无法找出问题所在 我正在尝试进行更高级的数据报废 但我似乎甚至无法获得价格 我要么返回 null 要么什么也没有返回 我知道其他语言和 API
  • Doctrine – 如何在两个实体之间建立一对一的关系

    我有两个表 用户和联系人 Users id username Contacts id user id 电子邮件 我简化了结构 那么 如何正确设置条令实体呢 ORM Entity ORM Table name users class User
  • 从sql server 2005迁移到2008对应用程序的影响

    我们正在将 ASP NET Web 应用程序的后端从 sql server 2005 升级到 sql server 2008 或 2012 您能告诉我这对整个应用程序有什么影响吗 所有这些改变我们都必须做一次彻底成功的转型 我们也在考虑将前
  • 如何正确扩展WCF返回的类?

    我在我的项目中使用 WCF 服务 该服务返回一个名为 Store 的类 我创建了一个继承自 Store 的新本地类 我的课程名为 ExtendedStore 我的 ExtendedStore 看起来像这样 class ExtendedSto
  • 仅在 Linux 上通过命令行将 xlsx 转换为文本 CSV [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 简单的问题 目前是否可以从命令行调用 LibreOffice 以打开 xlsx 并将其转换 另存为 csv 或者 如果这是不可能的 当前通过命令行执行
  • 使用 Javascript 生成 SVG 路径的库?

    我在用着Rapha l http raphaeljs com 满足我的 SVG 渲染需求 但我发现 Path 语法有点低级 那么有谁知道一个很好的 Javascript 包装器 库 它允许这样的事情 var pathStr move 10
  • 指针条件 while(*s1++=*s2++)

    int main char str1 Overflow char str2 Stack char s1 str1 s2 str2 while s1 s2 printf s str1 return 0 当这个条件被打破时 while s1 s
  • Python 并行计算 - Scoop

    我正在尝试熟悉 Scoop 库 此处的文档 https media readthedocs org pdf scoop 0 7 scoop pdf https media readthedocs org pdf scoop 0 7 scoo
  • ASP.NET 代码隐藏类中的静态方法是非线程安全的吗?

    我可以用吗static我的 ASP NET 中的方法Pages and UserControls如果类不使用任何实例成员 IE protected void gridView PageIndexChanging object sender
  • ggplot 未绘制正确的颜色[重复]

    这个问题在这里已经有答案了 gb lt read csv results gradient boosting csv p lt ggplot gb geom point aes x pred y y alpha 0 4 fill darkg
  • 本地 Laradock Nginx 项目上的 SSL 证书

    我需要你的帮助来在我的本地计算机上使用 Nginx 和 SSL 假 证书设置我的 Laradock 带有 Docker 我不知道如何设置它 请你帮助我好吗 Thanks 要使用当前版本的 laradock 截至 2019 年 11 月 使用
  • Ruby 连接字符串并添加空格

    我有 4 个字符串变量name quest favorite color speed那可能是空的 我想将它们连接在一起 在非空的之间添加空格 代码的简单性 即查看和理解的简单程度 比速度更重要 So name Tim quest destr
  • 如何在 CUDA 内核启动之间使用共享内存?

    我想在同一内核的多次启动中使用共享内存中的值 我可以这样做吗 不 你不能 共享内存具有线程块生命周期 存储在其中的变量可以被属于一组的所有线程访问 global 函数调用
  • 如何获取 Quartz 用于描边 NSBezierPath 的路径

    我使用这段代码用一条宽的黑色虚线来描画 NSBezierPath c and strForBezier在其他地方定义 NSGlyph glyph for n 0 n lt len n glyph font glyphWithName str
  • Kafka-MongoDB Debezium 连接器:分布式模式

    我正在研究 debezium mongodb 源连接器 我可以通过将 kafka bootstrap 服务器地址提供为远程计算机 部署在 Kubernetes 中 和远程 MongoDB url 来在本地计算机上以分布式模式运行连接器吗 我