如何在Scala中使用Flink的KafkaSource?

2023-11-29

我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序。我正在使用以下内容:

  • 弗林克0.9
  • 斯卡拉2.10.4
  • 卡夫卡0.8.2.1

我按照文档测试 KafkaSource(添加依赖项,将 Kafka 连接器 flink-connector-kafka 捆绑在插件中),如下所述here and here.

下面是我的简单测试程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

然而,编译总是抱怨找不到 KafkaSource:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

我在这里想念什么?


我是 sbt 用户所以我使用了以下内容build.sbt:

organization := "pl.japila.kafka"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"

这让我可以运行该程序:

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

输出:

[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在Scala中使用Flink的KafkaSource? 的相关文章

随机推荐

  • 可以匹配空字符串的正则表达式正在破坏 javascript 正则表达式引擎

    我写了以下正则表达式 D D d g 我认为它应该这样工作 D D match the last non digit or match the start of the string with optional literal charac
  • 更改 Material-UI 日期选择器的标题颜色

    我在 React 单页应用程序中添加了一个 Material UI 日期选择器 但我不知道如何更改标题颜色 我尝试从 muiTheme Pallet 属性更改它 但标题是唯一保持不变的颜色 var muiTheme getMuiTheme
  • 如何获取一天的开始时间和结束时间?

    如何获取一天的开始时间和结束时间 这样的代码不准确 private Date getStartOfDay Date date Calendar calendar Calendar getInstance int year calendar
  • hibernateTemplate.save() 命令返回主键但不保存到数据库

    第 37 行中的保存命令不起作用 它可以打印主键 但不能将新记录保存到数据库中 模板保存命令 GitHub 链接 https github com DipanjanSG BankingApp None
  • 节点和错误:EMFILE,打开的文件太多

    几天来我一直在寻找错误的有效解决方案 错误 EMFILE 打开的文件太多 看来很多人都有同样的问题 通常的答案是增加文件描述符的数量 所以 我尝试过这个 sysctl w kern maxfiles 20480 默认值是 10240 在我看
  • Jackson自定义序列化和反序列化

    我无法找出使用杰克逊实现自定义序列化 反序列化的正确方法 我有许多带有原始字段的类 50 这些字段应该被序列化 反序列化 而不是作为原始字段 喜欢 class User int height this field should be ser
  • 身体滚动与鼠标位置?

    基本上 功能就在这里 它只需要一些我不知道如何调整的细化 我编写了这个小片段 它完全符合我的要求 但不会滚动整个页面 它只是滚动 窗口 的大小 有人能接受这个并让它变得令人惊奇吗 document mousemove function e
  • 我正在尝试让树莓派相机与opencv一起工作

    我尝试使此代码与树莓派摄像头一起使用 如何让 cv2 VideoCapture 0 将树莓派摄像头识别为指定摄像头 import cv2 def diffImg t0 t1 t2 d1 cv2 absdiff t2 t1 d2 cv2 ab
  • 如何使用 CSS/JavaScript 对角组合两个图像?

    是否可以使用 CSS 或 JavaScript 如果需要 来实现此目的 我希望三角形 div image1 2 jpg 内的内容成为 2 个不同的 div 因为我想让它们成为指向 2 个不同页面的链接 使用 html canvas 和kin
  • 迭代包含空格的文件列表

    我想迭代文件列表 该列表是一个结果find命令 所以我想出了 getlist for f in find iname foo do echo File found f do something useful done 除非文件名称中包含空格
  • 占位符在 IE9 中不起作用

    我是 Salesforce SFDC 开发人员 在我的输入框的 Visualforce 页面中 我使用占位符代码 div class control group div
  • 将 vbscript 字符串列表传递给 SQL“in”运算符

    在 vb 脚本中 我有一个 select 语句 我试图将一个长度不确定的字符串值传递给 SQL in 运算符 下面的代码可以工作 但允许 SQL 注入 我正在寻找一种使用 ADO createParameter 方法的方法 我相信我尝试过的
  • 编译器/链接器如何解析从 Linux 上的模块调用的内核 API(如“printk”)

    我编写了一个示例 hello ko 内核模块 include
  • 无法运行 Grails 项目 GGTS

    我有一个在命令行中完美运行的 grails 项目 当我尝试在 GGTS 中运行它时 grails run app 它第一次运行没有任何错误 然后 我停止了服务器并使用 grails run app 再次运行它 现在出现以下错误 Error
  • awk 和 md5:替换列

    从 开始awk 用其哈希值替换列 我尝试对数字列表进行散列 md5 cat n file 1 40755462755 2 40751685373 3 40730094339 4 40722740446 5 40722740446 6 407
  • 在 Aspnet Core 中加载元数据

    我正在使用 AspNet Core 在 MVC 模板中构建一个项目 我使用了实体框架 并搭建了一个现有的数据库 现在 我想向某个类添加一些数据注释 但我不想编辑脚手架自动生成的类 因此我尝试使用元数据并覆盖现有方法 即saveChanges
  • 用于 REST 服务的 Jersey 和 Spring 的 @Autowired 属性上的 NullPointerException

    我一直在开发一个 gwt 应用程序 应该有一个休息服务来访问数据库 包括它自己的数据库和其他远程数据库 我使用 Spring 来更好地处理数据库 objectdb 而不是使用 Jersey 进行练习 这是给出问题的代码 用户 java En
  • 在哪里可以找到 ECMAscript/Actionscript/Javascript 的 yacc 语法 [关闭]

    很难说出这里问的是什么 这个问题模棱两可 含糊不清 不完整 过于宽泛或言辞激烈 无法以目前的形式合理回答 如需帮助澄清此问题以便重新打开 访问帮助中心 我的编译器类要求我们找到其中一种语言的 yacc 语法 并将其与我们编写的分词器结合起来
  • 使用 jython 调用 nltk 库时出现问题

    我正在尝试调用集成在我的 python 代码中的 nltk 库 根据要求 我需要通过 java 代码调用它们 因此 我使用 Jython 进行集成 如果我的 pyhton 代码不包含任何 nltk 库 在这种情况下它可以正常工作 但是当它包
  • 如何在Scala中使用Flink的KafkaSource?

    我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序 我正在使用以下内容 弗林克0 9 斯卡拉2 10 4 卡夫卡0 8 2 1 我按照文档测试 KafkaSource 添加依赖项 将 Kafka 连接器 flin