Kafka-MongoDB Debezium 连接器:分布式模式

2024-01-10

我正在研究 debezium mongodb 源连接器。我可以通过将 kafka bootstrap 服务器地址提供为远程计算机(部署在 Kubernetes 中)和远程 MongoDB url 来在本地计算机上以分布式模式运行连接器吗?

我尝试了这个,我看到连接器成功启动,没有错误,只有几个警告,但没有数据从 mongodb 流出。

使用以下命令运行连接器

./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties ./etc/kafka/connect-mongodb-source.properties

如果不是,我还能如何实现这一点,我不想像大多数教程所建议的那样安装本地 kafka 或 mondoDB。我想为此使用我们的测试服务器。

为此遵循以下教程: https://medium.com/tech-that-works/cloud-kafka-connector-for-mongodb-source-8b525b779772 https://medium.com/tech-that-works/cloud-kafka-connector-for-mongodb-source-8b525b779772

以下是该问题的更多详细信息 连接器工作正常,我在连接器日志末尾看到以下几行

 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1000)
] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1021)

我还在 /etc/kafka/connect-mongodb-source.properties 中定义了 MongoDB 配置,如下所示

name=mongodb-source-connector 
connector.class=io.debezium.connector.mongodb.MongoDbConnector 
mongodb.hosts=/remoteserveraddress:27017 
mongodb.name=mongo_conn 
initial.sync.max.threads=1 
tasks.max=1

但数据并没有在 MongoDB 和 Kafka 之间流动。我还发布了有关 Kafka-MongoDB Debezium Connector 的单独问题:分布式模式

任何指针表示赞赏


connect-distributed只接受单个属性文件。

您必须使用 REST API 在分布式模式下配置 Kafka Connect。

https://docs.confluence.io/current/connect/references/restapi.html https://docs.confluent.io/current/connect/references/restapi.html

注意:默认情况下,消费者将读取主题之外的最新数据,而不是现有数据。

您可以将其添加到connect-avro-distributed.properties要解决这个问题

consumer.auto.offset.reset=earliest
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka-MongoDB Debezium 连接器:分布式模式 的相关文章

  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • Kafka Connect Confluence S3 Sink 连接器:找不到类 io.confluence.connect.avro.AvroConverter

    使用此 Kafka Connect 连接器 https www confluence io hub confluenceinc kafka connect s3 https www confluent io hub confluentinc
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache

随机推荐

  • 关于类成员函数指针的sizeof[重复]

    这个问题在这里已经有答案了 假设我们有一个 A 类 class A 和这些 typedef typedef void A a func ptr void typedef void func ptr void 我的问题是为什么 sizeof
  • 请讨论什么是 portlet 以及为什么使用 portlet

    为什么我要在 tomcat 和 gwt 之上使用 java portlet Portlet 是否会减少或不需要我使用 jsp 和 jsf Jboss 是否已成为 Portlet 演化文化的一部分 Jboss 是否满足 portlet jsr
  • 无法解析类型“jint”,以及 JNIEnv、jclass

    尝试使用 jni c 代码构建一个简单的 helloWorld android java 应用程序 我在 Windows 7 上使用 Eclipse Indigo 在非空间路径中安装了 ndk r8 最终使用 ndk build cmd 构
  • Linux 获取开机以来的系统时间

    我需要找到系统时间 因为我的 C 代码中的 Linux 机器已通电 time 和 gettimeofday 等函数返回自纪元以来的时间 而不是开机以来的时间 如何查找自开机以来的时间或时钟滴答数 提前致谢 该信息是通过以下方式提供的 pro
  • 清单中的 Android 抽象活动

    对于我的应用程序 我将创建各种扩展 android app Activity 和 android app Service 类的抽象类 当我对抽象类进行子类化时 如何将它们添加到 Android 清单中 我是否需要将抽象类和我的子类都添加到清
  • 使用 Jsoup 获取网页元素

    我正在尝试使用Jsoup从名为 Morningstar 的网站获取股票数据 我查看了其他论坛 但无法找出问题所在 我正在尝试进行更高级的数据报废 但我似乎甚至无法获得价格 我要么返回 null 要么什么也没有返回 我知道其他语言和 API
  • Doctrine – 如何在两个实体之间建立一对一的关系

    我有两个表 用户和联系人 Users id username Contacts id user id 电子邮件 我简化了结构 那么 如何正确设置条令实体呢 ORM Entity ORM Table name users class User
  • 从sql server 2005迁移到2008对应用程序的影响

    我们正在将 ASP NET Web 应用程序的后端从 sql server 2005 升级到 sql server 2008 或 2012 您能告诉我这对整个应用程序有什么影响吗 所有这些改变我们都必须做一次彻底成功的转型 我们也在考虑将前
  • 如何正确扩展WCF返回的类?

    我在我的项目中使用 WCF 服务 该服务返回一个名为 Store 的类 我创建了一个继承自 Store 的新本地类 我的课程名为 ExtendedStore 我的 ExtendedStore 看起来像这样 class ExtendedSto
  • 仅在 Linux 上通过命令行将 xlsx 转换为文本 CSV [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 简单的问题 目前是否可以从命令行调用 LibreOffice 以打开 xlsx 并将其转换 另存为 csv 或者 如果这是不可能的 当前通过命令行执行
  • 使用 Javascript 生成 SVG 路径的库?

    我在用着Rapha l http raphaeljs com 满足我的 SVG 渲染需求 但我发现 Path 语法有点低级 那么有谁知道一个很好的 Javascript 包装器 库 它允许这样的事情 var pathStr move 10
  • 指针条件 while(*s1++=*s2++)

    int main char str1 Overflow char str2 Stack char s1 str1 s2 str2 while s1 s2 printf s str1 return 0 当这个条件被打破时 while s1 s
  • Python 并行计算 - Scoop

    我正在尝试熟悉 Scoop 库 此处的文档 https media readthedocs org pdf scoop 0 7 scoop pdf https media readthedocs org pdf scoop 0 7 scoo
  • ASP.NET 代码隐藏类中的静态方法是非线程安全的吗?

    我可以用吗static我的 ASP NET 中的方法Pages and UserControls如果类不使用任何实例成员 IE protected void gridView PageIndexChanging object sender
  • ggplot 未绘制正确的颜色[重复]

    这个问题在这里已经有答案了 gb lt read csv results gradient boosting csv p lt ggplot gb geom point aes x pred y y alpha 0 4 fill darkg
  • 本地 Laradock Nginx 项目上的 SSL 证书

    我需要你的帮助来在我的本地计算机上使用 Nginx 和 SSL 假 证书设置我的 Laradock 带有 Docker 我不知道如何设置它 请你帮助我好吗 Thanks 要使用当前版本的 laradock 截至 2019 年 11 月 使用
  • Ruby 连接字符串并添加空格

    我有 4 个字符串变量name quest favorite color speed那可能是空的 我想将它们连接在一起 在非空的之间添加空格 代码的简单性 即查看和理解的简单程度 比速度更重要 So name Tim quest destr
  • 如何在 CUDA 内核启动之间使用共享内存?

    我想在同一内核的多次启动中使用共享内存中的值 我可以这样做吗 不 你不能 共享内存具有线程块生命周期 存储在其中的变量可以被属于一组的所有线程访问 global 函数调用
  • 如何获取 Quartz 用于描边 NSBezierPath 的路径

    我使用这段代码用一条宽的黑色虚线来描画 NSBezierPath c and strForBezier在其他地方定义 NSGlyph glyph for n 0 n lt len n glyph font glyphWithName str
  • Kafka-MongoDB Debezium 连接器:分布式模式

    我正在研究 debezium mongodb 源连接器 我可以通过将 kafka bootstrap 服务器地址提供为远程计算机 部署在 Kubernetes 中 和远程 MongoDB url 来在本地计算机上以分布式模式运行连接器吗 我