Kafka 连接集群设置或启动连接工作线程

2024-02-01

我正在通过 kafka connect,我正在尝试了解这些概念。

假设我已经设置了 kafka 集群(节点 k1、k2 和 k3)并且它正在运行,现在我想在不同节点(例如 c1 和 c2)中以分布式模式运行 kafka 连接工作程序。

几个问题。

1)要在分布式模式下运行或启动kafka connect,我需要使用命令../bin/connect-distributed.sh,它在 kakfa 集群节点中可用,所以我需要从任意一个 kafka 集群节点启动 kafka connect 吗?或者我启动 kafka connect 的任何节点都需要有 kafka 二进制文件,以便我能够使用../bin/connect-distributed.sh

2)我需要将连接器插件复制到任何 kafka 集群节点(或所有集群节点?),从哪里执行步骤 1?

3)在工作节点上启动jvm进程之前,kafka如何将这些连接器插件复制到工作节点?因为该插件包含我的任务代码,需要将其复制到工作程序才能在工作程序中启动该进程。

4)我是否需要在连接集群节点c1和c2中安装任何东西,比如需要安装java或任何kafka连接相关的东西?

5)在某些地方它说使用汇合平台,但我想首先单独使用 apache kafka connect 启动它。

请有人通过一些线索甚至指向一些资源也会有所帮助。

谢谢。


1)为了获得高可用的 kafka-connect 服务,您需要运行至少两个实例connect-distributed.sh在两台具有相同功能的不同机器上group.id。您可以找到有关每个工作人员配置的更多详细信息here https://docs.confluent.io/current/connect/userguide.html#distributed-worker-configuration。为了提高性能,Connect 应独立于代理和 Zookeeper 计算机运行。

2)是的,您需要将所有连接器放在plugin.path(通常在/usr/share/java/)在您计划运行 kafka-connect 的每台机器上。

3) kafka-connect 将在启动时加载连接器。你不需要处理这个。请注意,如果您的 kafka-connect 实例正在运行并且添加了新连接器,则需要重新启动该服务。

4) 您需要在所有机器上安装Java。特别是对于 Confluence 平台:

此版本的 Confluence Platform 支持 Java 1.7 和 1.8 (目前不支持Java 1.9)。你应该运行 垃圾优先(G1)垃圾收集器。欲了解更多信息,请参阅 支持的版本和互操作性 https://docs.confluent.io/current/installation/versions-interoperability.html#interoperability-versions.

5)这要看情况。 Confluence 由 Apache Kafka 的原始创建者创建,它是一个更完整的发行版,添加了模式管理、连接器和客户端。它还附带了 KSQL,如果您需要对某些事件采取行动,它非常有用。 Confluence 只是添加到 Apache Kafka 发行版之上,它不是修改版本。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 连接集群设置或启动连接工作线程 的相关文章

  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB

随机推荐

  • 排序不完整怎么排序?

    我有一个要排序的元素列表和一个比较函数cmp x y 这决定了是否x应该出现在之前y或之后y 问题是有些元素没有定义的顺序 这cmp函数返回 不在乎 示例 输入 A B C D and C gt D B gt D 输出 许多正确答案 例如
  • 如何在 JavaScript 中为图像和/或视频制作后退和下一步按钮?

    我正在尝试为 iframe 制作后退和下一步按钮 这是我的代码
  • 重复标准化 IEEE 浮点向量会使其发生变异吗?

    如果我采用 非零 浮点向量 x y z 向量 并将其标准化为单位长度 第二次标准化是否能保证返回相同的结果 我不知道文献中有相关结果 快速测试表明 频繁地对 3D 向量进行两次归一化会导致归一化向量和重新归一化向量之间存在微小差异 即使小心
  • 使用java api的Elasticsearch多条件查询

    有多个文档 每个文档包含大约 100 个字段 我想通过 elasticsearch Java API 5 x 执行以下搜索 我想使用 3 个字段进行此搜索 即 department job name 我想搜索与 部门 D1 部门 D2 工作
  • 如果我有 iPhone SDK 3.2 有什么办法可以使用 3.1.3

    当前的 3 2 SDK 似乎不包含 3 1 3 我如何让它工作 如果您想保留 3 2 2 beta 请下载 3 1 3 SDK 并将其安装在另一个位置 即 Developer313 请参阅下图 了解在哪里单击以进入新的安装位置 替代文本 h
  • 如何在没有 jQuery 的情况下提交表单而不重新加载页面?

    我的表单如下 它需要向我的 java Servlet 发送一个操作来更新数据库 如何在不重新加载页面的情况下提交表单 目前与action myServlet 它不断将我引导到新页面 如果我删除对 myServlet 的操作 输入不会添加到我
  • 模拟器无法升级 xcode 10?

    将 Xcode 9 4 1 更新到 Xcode 10 后 没有任何模拟器可以构建 iOS 应用程序 我的 iOS 应用程序在 Xcode 9 4 1 上完美构建并运行 我有多个这样的警告 警告 iOS Simulator 部署目标设置为 7
  • 将环境变量传递给 Angular2 应用程序?

    我需要将后端 url 传递到我的 Angular2 应用程序 因为生产服务器和开发服务器托管在不同的位置 我知道我可以将这些东西存储在外部 config json 中并在启动时加载 然而 在应用程序启动之前 这似乎是不必要的对服务器的额外调
  • 具有多个条件和列表的 COUNTIF

    我需要计算满足 CRITERIA 1 满足 CRITERIA 2 以及 CRITERIA 3 属于 E 列中包含的值集的单元格数量 我目前正在使用以下公式 SUM COUNTIFS A2 A11 TRUE B2 B11 TRUE C2 C1
  • Haskell 中的 fromInteger 重写

    所以我喜欢Haskell 但对Num类不满意 所以我想为代数类型创建我自己的类型类层次结构 问题是 即使我导入 Prelude 隐藏 Num 以及与之相关的所有内容 使文字 1 具有类型 t 的唯一方法仍然是使 t 实例 Num 我很想从
  • .Net C# String.Join 如果元素值为 null,如何输出“null”而不是空字符串?

    根据String Join 的 MSDN 文档 http msdn microsoft com en us library 57a79xd0 aspx 如果 value 中的任何元素为 null 则使用空字符串 我的代码从数据表中提取数据
  • 如何将 Unix 'Top' 命令输出捕获到 CSV 文件?

    我正在尝试获取前 5 行top通过 shell 脚本命令 我需要将输出写入csv文件 我需要每 15 秒监控一次结果 最后 我需要使用获得的数据表绘制图表 我得到了 shell 脚本来编写前 5 行top命令给一个txt file bin
  • -fomit-frame-pointer *总是*省略 fp 吗?

    是否 fomit frame pointeralways省略帧指针 是否存在pc和fp都需要设置的情况 动态堆栈增长是否会强制设置 fp 专门询问MIPS32 谢谢 正确执行并不真正需要帧指针 除非有时异常展开 动态堆栈增长通常需要某种帧指
  • 管理加载 ZipFile 时分配的内存

    我正在尝试将 69 930 个文件加载到基本文本编辑器中 一切都很顺利 全部加载完毕后 内存就达到了非常酷的 130MB 然而 在高峰加载时间期间 最大容量可达 900MB 1200MB 内存全部引用Inflater buf http gr
  • C++ 重载模式:使用可变 lambda 进行调用解析

    考虑这个众所周知的 C 模式 template
  • 如何获取远程桌面客户端的IP地址?

    我正在尝试编写一个脚本来记录用户启动远程桌面以登录 Windows Server 的 Windows 客户端的 IP 地址 如何在服务器中捕获客户端的IP地址 所以 你忽略代理 在域中使用环境变量 CLIENTNAME 您可以将其解析回IP
  • wordpress:polylang 插件的媒体库问题

    我正在使用 Wordpress 开发一个网站 其中使用 Polylang 插件为多种不同语言制作内容 我使用 Polylang 来表示两种语言 荷兰语 主要 和英语 次要 当我在荷兰语页面上传图像时 一切都很好 但是当我创建一个英文页面 并
  • 使用 PIL 的 ImageDraw 模块

    我正在尝试使用 PIL 的 ImageDraw 模块进行单独的像素操作 下面的代码应该创建 Tkinter 画布小部件 然后打开一张图像 将一个像素的颜色更改为红色 然后将图像嵌入到画布小部件中 但是 它似乎不起作用 My Code imp
  • 如何在 Visual Studio 2012 中编辑 SSRS 2008R2 报告?

    我是一名报告编写者 能够在 Visual Studio 2008 中编辑 更新报告 我们使用 SSRS2008R2 我刚刚安装了 Visual Studio 2012 当我去转换我的报告项目时 它无法工作 因为我似乎缺少一个组件 本质上 项
  • Kafka 连接集群设置或启动连接工作线程

    我正在通过 kafka connect 我正在尝试了解这些概念 假设我已经设置了 kafka 集群 节点 k1 k2 和 k3 并且它正在运行 现在我想在不同节点 例如 c1 和 c2 中以分布式模式运行 kafka 连接工作程序 几个问题