Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类”

2023-12-09

我正在尝试 Flink 的新 Python 流 API 并尝试使用以下命令运行我的脚本./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py。 python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或输出方法默认发出数据的日志目录中的 *.out 文件)。

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
    for jar in glob.glob(os.path.join(directory,'*.jar')):
                sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",
          "group_id": "flink_test",
          "topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):
    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

    env = factory.get_execution_environment()
    env.add_java_source(consumer) \
        .output()
    env.execute()

我从 Maven 存储库中获取了一些 jar 文件,即flink-connector-kafka-0.9_2.11-1.6.1.jar, flink-connector-kafka-base_2.11-1.6.1.jar and kafka-clients-0.9.0.1.jar并将它们复制到 Flink 中lib目录。除非我误解了文档,否则这应该足以让 Flink 加载 kafka 连接器。事实上,如果我删除这些罐子中的任何一个,导入都会失败,但这似乎不足以实际调用该计划。 添加 for 循环以动态地将它们添加到sys.path也没用。以下是控制台中打印的内容:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

这是我在日志中看到的:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

有没有办法解决这个问题并使连接器可用于 Python?我怀疑这是 Jython 的类加载器问题,但我不知道如何进一步调查(也考虑到我对 Java 不了解)。非常感谢。


您在这里使用了错误的 Kafka 消费者。在你的代码中,它是FlinkKafkaConsumer09,但是您正在使用的库是flink-connector-kafka-0.11_2.11-1.6.1.jar,这是为了FlinkKafkaConsumer011。尝试更换FlinkKafkaConsumer09有了这个FlinkKafkaConsumer011,或者使用lib文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类” 的相关文章

  • Django:将博客条目查看次数增加一。这有效率吗?

    我的索引视图中有以下代码 latest entry list Entry objects filter is published True order by date published 10 for entry in latest ent
  • 从控制台生成具有空值(墓碑)的 Kafka 消息

    有没有什么方法可以在 kafka console Producer 中生成一条具有空值的消息 即 将其标记为压缩器以使用逻辑删除来删除它 我尝试过生成 mykey 和 mykey 前者产生错误 后者使该值成为空字符串 像这样运行生产者 KA
  • 蜘蛛内的Scrapyd jobid值

    Scrapy 框架 Scrapyd 服务器 我在获取蜘蛛内部的 jobid 值时遇到一些问题 将数据发布到后http localhost 6800 schedule json http localhost 6800 schedule jso
  • 在 Python 中绘制分类数据的三个维度

    我的数据包含三个我试图可视化的分类变量 城市 五个之一 职业 四种之一 血型 四种之一 到目前为止 我已经成功地以一种我认为易于使用的方式对数据进行了分组 import numpy as np pandas as pd Make data
  • 在 vim 折叠线中语法高亮 Python

    我发现代码折叠 http en wikipedia org wiki Code folding帮助我更好地组织我的文件 因此 在我的底部 vimrc 我启用vim代码折叠 http vimdoc sourceforge net htmldo
  • 如何为 C 分配的 numpy 数组注册析构函数?

    我想在 C C 中为 numpy 数组分配数字 并将它们作为 numpy 数组传递给 python 我可以做的PyArray SimpleNewFromData http docs scipy org doc numpy reference
  • python-polars 通过分隔符将字符串列拆分为许多列

    在 pandas 中 以下代码会将 col1 中的字符串拆分为许多列 有没有办法在极地做到这一点 d col1 a b c d a b c d df pd DataFrame data d df a b c d df col1 str sp
  • 使用 python 只读取 Excel 中的可见行

    我想只读取 python 中 Excel 工作表中的可见行 输入 Excel表 所以当我过滤时 作为 python 中的输出 在本例中我将仅获得可见数据 1 行 这是我的代码 from openpyxl import load workbo
  • python:是否有用于对输入流进行分块的库函数?

    我想对输入流进行分块以进行批处理 给定一个输入列表或生成器 x in 1 2 3 4 5 6 我想要一个能够返回该输入块的函数 说 如果chunk size 4 then x chunked 1 2 3 4 5 6 这是我一遍又一遍地做的事
  • pandas to_sql sqlalchemy 与 secure_transport 的连接

    我正在尝试将数据发送到具有 require secure transport ON 的服务器上的 mysql 数据库 当我尝试使用以下代码连接到它时 import pandas as pd import pymysql from sqlal
  • 如何删除 pandas 数据框中的唯一行?

    我遇到了一个看似简单的问题 在 pandas 数据框中删除唯一的行 基本上 相反drop duplicates https pandas pydata org pandas docs stable generated pandas Data
  • Python,多线程,获取网页,下载网页

    我想在一个站点批量下载网页 我的 urls txt 文件中有 5000000 个 url 链接 大约有300M 如何让多线程链接这些网址并下载这些网页 或者如何批量下载这些网页 我的想法 with open urls txt r as f
  • 使用 Python 脚本打开特定文件类型?

    如何使 Python 脚本成为特定文件类型 例如 foo 的默认应用程序 例如 当我双击 Finder Explorer 中的文件时 我希望该文件在 Python 脚本中打开 这可以在 Win 和 或 OS X 中实现吗 如果重要的话 该应
  • 如何测试列表中多个值的成员资格

    我想测试两个或多个值是否在列表中具有成员资格 但我得到了意外的结果 gt gt gt a b in b a foo bar a True 那么 Python 可以同时测试列表中多个值的成员资格吗 这个结果意味着什么 See also How
  • 在python中安装scipy模块时出错

    我正在尝试使用 pip 在 python 中安装 scipy 模块 它显示以下错误 Command c users sony appdata local programs python python35 32 python exe u c
  • Python:帮助(numpy)在退出时导致段错误

    我遇到了一个奇怪的现象 在 python 解释器中 我执行以下操作 gt gt gt import numpy gt gt gt help numpy 帮助显示正确 但一旦我按 q 返回解释器 Segmentation fault core
  • Scrapy的redirect_urls异常.KeyError

    我是 Scrapy 和 Python 的新手 最近推出了我的第一个蜘蛛 有一个功能似乎以前有效 但现在它只适用于我试图废弃的一些网站 代码行是 item url direct response request meta redirect u
  • 如何使用 pygame.mixer 重复音乐?

    我创建了以下使用 pygame mixer 播放 mp3 音乐的代码 然而 音乐不会重复 有什么想法可以让音乐重复播放吗 这是代码 playlist list playlist append put music here mp3 playl
  • 如何保持 python 3 脚本 (Bot) 运行

    不是母语英语 抱歉 英语可能很蹩脚 我也是编程新手 您好 我正在尝试使用 QueryServer 连接到 TeamSpeak 服务器来创建机器人 经过几天的努力 它有效 只有 1 个问题 而我却被这个问题困扰了 如果您需要检查 这是我正在使
  • 在至少 7 天内连续三天登录该产品的用户

    我有一个用于用户参与的数据框 df 如下所示 time stamp user id 2013 01 01 10 05 23 1 2013 01 03 16 35 23 1 2013 01 06 11 06 35 1 2013 01 10 1

随机推荐

  • 使用 async 时,bcrypt.hash 函数返回 undefined,但与 .then 一起工作正常

    这是异步代码 返回undefined userService register username password gt return bcrypt hash password saltRounds async err hash gt co
  • 在 Windows 窗体中以编程方式添加新的用户控件

    嘿 首先我想指出 我知道这里还有关于这个主题的其他几个问题 我什至以前自己也做过这件事 我在这里问是因为我不知道我的问题是什么 这是我尝试显示新用户控件的代码 private void ValidationLabel Click objec
  • 使用 jQuery 更改下拉列表的选定值

    我有一个包含已知值的下拉列表 我想做的是将下拉列表设置为我知道存在的特定值jQuery 使用常规JavaScript 我会做类似的事情 ddl document getElementById ID of element goes here
  • 我们可以将 Laravel 项目作为库集成到 CodeIgniter 中吗?

    我想通过集成一些用 laravel 编写的代码来增加 CodeIgniter 项目的功能 我该如何接近 我可以通过 CodeIgniter 库包含代码吗 如果是的话怎么办 我只想将控制器和 ORM 包含到 CI 中 Laravel 代码是一
  • R 中 apply 中的 equal() 行为

    这很奇怪 apply matrix c 1 NA 2 3 NA NA 2 4 ncol 2 1 function x identical x 1 x 2 1 FALSE TRUE TRUE FALSE apply data frame a
  • 在这种情况下,iPhone 上的“发布”是什么意思?

    我想问一个关于 iPhone 应用程序的愚蠢问题 我是iPhone应用程序的绿色 我在Apple网站上阅读了以下代码 MyViewController aViewController MyViewController alloc initW
  • 设计模式 - 理解外观模式

    我是设计模式的新手 正在尝试了解它们通常的样子 现在我正在尝试理解外观模式 我觉得外观模式是一个相当广泛的概念 所以我想知道我的第二个图是否会被视为外观模板的一部分 我知道一个典型的外观模式基本上是这样的 A 级是外观 但是如果我们有一个更
  • 密钥不得包含 . pymongo 中的错误

    我试图通过 pymongo 获取 serverStatus 命令的输出 然后将其插入到 mongodb 集合中 这是字典 u metrics u getLastError u wtime u num 0 u totalMillis 0 u
  • PHP Constant() 不适用于名称空间?

    以下不起作用 use application components auditor AuditLevel public function actionAudit data unserialize POST data message data
  • 如何更新由另一个组合框触发的组合框中的值?

    我的表单中有 2 个组合框 我希望在组合框 2 中的列表更新时更改组合框 1 中的选定值 例如 ComboBox1 包含移动公司的名称 ComboBox2 包含该公司的所有手机列表 假设您有一个将手机型号与其制造商关联起来的字典 Dicti
  • 流星:云中

    我正在尝试上传 Lepozepo cloudinary 的照片 这是我的服务器和客户端配置 server Cloudinary config cloud name api key api secret client cloudinary c
  • UIViewController 的背景到分组表视图颜色

    在 UITableView 分组样式中 表格视图的背景有点像浅灰色的纹理颜色 如何获取该值以便将 UIViewController 的整个背景设置为该颜色 如果您正在为 iPhone 和 iPod touch 进 行开发 UIColor定义
  • 搜索文本文件并插入行

    我想要做的是 以下面的文本为例 在文本文件中搜索字符串 Text2 然后在 Text 2 后两行插入一行 插入文本 文本 2 可以位于文本文件中的任何行 但我知道它会在文本文件中出现一次 所以这是原始文件 Text1 Text2 Text3
  • 从存档导出 ipa 时 Xcode 9 崩溃

    我在 Xcode 9 中为任何项目创建了一个存档 然后我尝试创建一个 ipa 文件 开发文件或临时文件 我首先尝试导出存档 然后我选择 开发 或 临时分发 Xcode 9 崩溃 我什至在应用程序的 info plist 中添加了 编译位码
  • 向函数发送 stderr/stdout 消息并捕获退出信号

    我正在处理错误处理并登录我的 bash 脚本 下面我提供了一个简化的代码片段来举例说明用例 我想在我的脚本中实现以下目标 陷阱退出信号应触发下面代码中的 onexit 函数 stderr 和 stdout 应发送到 log 函数 该函数将确
  • R 中的嵌套 if-else 循环

    我有一个名为 crimes 的数据框 其中包含一个 pre rate 列 表示实施特定法律之前的犯罪率 我想使用嵌套的 if else 循环将每个费率放入 rate category 列中 我有以下代码 crimes rate catego
  • 为什么c++使用memset(addr,0,sizeof(T))来构造一个对象?标准或编译器错误?

    这个问题和我的另一篇文章有 关 为什么 allocate shared 和 make shared 这么慢 在这里我可以更清楚地描述这个问题 考虑下面的代码 struct A char data 0x10000 class C public
  • 在多个 UIView 上添加 Facebook Shimmer

    我正在尝试在具有多个 UIView 的 UICollectionViewCell 上添加 Facebook Shimmer For 一个 UIView 使用下面的代码可以正常工作 let shimmeringView FBShimmerin
  • 执行存储在数据库中的Java代码

    我有定期推送到数据库的 Java 代码 解释它为什么在数据库中太复杂 这只会将焦点从主要问题上移开 在运行时我查询数据库 我可以执行从数据库获取的代码吗 我只将 main 方法的内容存储在代码中 运行数据库的服务器是HTTP 服务器 数据库
  • Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类”

    我正在尝试 Flink 的新 Python 流 API 并尝试使用以下命令运行我的脚本 flink 1 6 1 bin pyflink stream sh examples read from kafka py python 脚本相当简单