如何使用 kafka 使用 python 连接 JDBC 接收器和源

2023-12-06

我想从一个系统直播到另一个系统。

我正在使用 kafka-python 并且能够在本地进行直播。

发现连接器可以处理多个设备。有人可以建议我一种使用连接器在 python 中实现它的方法吗?


Kafka Connect 是一个 Java 框架,而不是 Python。

Kafka Connect 运行一个您可以使用的 REST APIurllib3 or requests与其互动,而不是kafka-python

https://kafka.apache.org/documentation/#connect

创建连接器后,欢迎您使用kafka-python产生数据,由 JDBCsink例如,会消耗,或者您可以使用pandas例如写入数据库,其中 JDBCsource(或Debezium)会消耗

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 kafka 使用 python 连接 JDBC 接收器和源 的相关文章

  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • WARN 获取相关 ID 为 1 的元数据时出错:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

    当我使用 kafka 运行以下命令时0 9 0 1 我收到这些警告 1 你能告诉我我的主题有什么问题吗 我正在与在 ec2 中运行的 kafka 经纪人交谈 kafka console consumer sh new consumer bo
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • KafkaStreams 同一应用程序中的多个流

    我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试

随机推荐

  • WindowsFormsHost 控件上的 ContextMenu

    因此 我的 WPF 应用程序中有一个 WindowsFormsHost 控件 托管 Dundas Chart 并且我想在其上放置一个 ContextMenu 我可以成功地将 ContextMenu 附加到任何普通的 WPF 控件 但它不适用
  • python中的Errno 10061,我不知道该怎么办

    我在Python中学习了套接字 当我尝试在一台计算机上编写套接字脚本时 它起作用了 但是当我尝试使用两台不同的计算机编写套接字脚本并打开连接的套接字时 它不起作用 一台电脑 服务器 import socket s socket socket
  • 如何使用 bash 脚本中的文档转换服务转换多个文档?

    如何使用以下命令转换多个文档文件转换服务 我有 50 100 个 MS Word 和 PDF 文档 我想使用convert documentAPI方法 例如 您可以提供多个 pdf 或 doc 文件吗 curl u username pas
  • “JSON 标准只允许一个顶级值”是什么意思?

    在我的 IntelliJ 编辑器中 我有一个 test json 文件 如下所示 第二条 json 记录显示一条错误消息 Json 标准仅允许一个顶级值 但这是一个有效的 JSON 文件 对吗 如何摆脱这个烦人的消息 reviewerID
  • ComBox 上的事件在 TableView 单元 JavaFx 内?

    我有 TableView 当我的程序启动 TableView 只有一行时 我还有 ComboBoxes 的 ArrayList 并为 TableView 中的每一行创建一个 ComboBox 当用户编辑 产品名称 单元格 其中应包含 Com
  • 从池中获取连接之前超时时间已过

    我每隔几天就会收到此错误 几天之内我不会看到错误 然后我会在一分钟左右的时间内得到 20 个左右的错误 我已经非常彻底地放弃了我的代码 以便我使用这个基本设置来访问数据库 try myConnection Open mySqlDataAda
  • 让寻呼机显示在 magento 列表上

    我们目前有一个报价页面 其中列出了我们希望向寻呼机显示的所有现有报价 我们在列表页面中有以下代码 我们已添加对寻呼机的调用 但它似乎根本没有显示前端 我还添加了历史记录页面 该页面显示带有寻呼机的列表 作为我们尝试模拟的工作页面的示例 由于
  • 在 Laravel 5.4 中获取上传失败的文件的文件名

    如何获取上传失败的文件的文件名并将其传递给验证错误消息 例如 file 0必须是 1024 KB 就像 失败文件样本 jpg必须是 1024 KB 下面是示例代码安排 messages mimes gt File s must be of
  • 共享对象位置

    我正在使用此方法将共享对象保存在特定位置 但我无法给出路径 SharedObject getLocal save how to give path here for window 这给出了以下错误 Error Error 2130 Unab
  • 点击后退按钮但不终止活动并让它进入后退堆栈(Android)

    我有 3 项活动 A B 和 C 我通过单击按钮动态地扩展 Activity B 并且用户可以添加任意数量的视图 操作是这样的 用户看到 活动 A 首先输入他的详细信息并单击保存按钮 我将他带到 活动 B 在那里他可以多次添加某些字段 当他
  • 更新的数据(在数据库中)通过 JPA/Eclipselink 不可见[重复]

    这个问题在这里已经有答案了 我有 Oracle DB 和 Java 项目 它使用 JPA Eclipselink 连接到数据库 问题是 当数据库中的某些数据更新时 手动使用 Oracle SQL Developer 这些数据通过 JPA 不
  • 无法获取日本的交通路线

    搜索路线35 443708 139 638026 to 35 689487 139 691706产生结果谷歌地图 但是对路线 API 的此查询不会 有人有主意吗 这似乎是日本特有的 如果您实际对此进行测试 则需要将上面的出发时间更新为现在
  • Rails、Heroku 未加载我的库

    我正在一个新的 Rails 应用程序中工作 并在中创建了一个小实用程序类RAILS ROOT lib 我在其中一个控制器中使用此类require 在本地 此类已正确加载 但当我在 heroku 上部署应用程序时 它崩溃并显示 LoadErr
  • 使用代码将图像对象添加到wpf

    我是 C 和 WPF 的新手 我正在尝试创建一个简单的汽车模拟器 模拟器的主要想法是 我有 C 类来创建汽车对象 这些对象具有可以更改的速度变量和从左向右移动的计时器 我想用计时器进行运动 而不是例如doubleanimation 在 WP
  • 使用 spring RestTemplate 对 REST API 进行基本身份验证

    我对 RestTemplate 和 REST API 都是全新的 我想通过 Jira REST API 检索应用程序中的一些数据 但返回 401 Unauthorized 找到并发表文章jira Rest API 文档但真的不知道如何将其重
  • firebase iOS 无法接收推送通知

    我在我的应用程序中包含 google firebase 创建谷歌帐户 创建谷歌应用程序 上传 APNS 认证 pem 并在另一个服务中工作 并从控制台发送推送通知 但我的应用程序没有收到它 在 Firebase 控制台中 我看到状态已完成
  • PHP in_array 意外结果

    当我执行这一小段 PHP 代码时 php r echo in array 0 array aaa bbb 那回声true 当然 如果我添加严格标志 我会得到很好的返回值in array 但我就是不明白为什么它会返回true 我不能 如果有人
  • 查找 javascript 中所有没有 data- 属性的元素

    我需要找到其中的所有元素 post content 没有数据属性 我试着说类似的话if post content p attr data example 但显然这行不通 那么我将如何执行以下操作 查找所有属于 post content 且没
  • 为什么在使用函数时总是执行 Angular 2 中的 *ngIf ?

    我正在尝试使用 Angular 2 创建一个应用程序 并在我的应用程序中有一个身份验证服务 我的 html 模板是这样的
  • 如何使用 kafka 使用 python 连接 JDBC 接收器和源

    我想从一个系统直播到另一个系统 我正在使用 kafka python 并且能够在本地进行直播 发现连接器可以处理多个设备 有人可以建议我一种使用连接器在 python 中实现它的方法吗 Kafka Connect 是一个 Java 框架 而