如何使用scala从apache Spark中的kafka主题读取json数据

2024-04-06

我是新的 Spark,您能否让我知道如何使用 scala 从 apache Spark 中的 kafka 主题读取 json 数据。

Thanks.


最简单的方法是使用 Spark 附带的 DataFrame 抽象。

val sqlContext = new SQLContext(sc)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
                  ssc, kafkaParams, Set("myTopicName"))

stream.foreachRDD(
  rdd => {
     val dataFrame = sqlContext.read.json(rdd.map(_._2)) //converts json to DF
     //do your operations on this DF. You won't even require a model class.
        })
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用scala从apache Spark中的kafka主题读取json数据 的相关文章

  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • Scala 条件列表构造

    我正在使用 Scala 2 9 2 并且想根据某些条件构建一个列表 考虑以下情况 其中 cond 是采用谓词 p 和类型 T 的值 在本例中为 t3 的某个函数 t1 t2 cond p t3 t4 我想要的行为如下 如果 p 为真 则应给
  • Spark 与 Webhdfs/httpfs

    我想通过 httpfs 或 Webhdfs 将文件从 HDFS 读入 Spark 类似的东西 sc textFile webhdfs myhost 14000 webhdfs v1 path to file txt 或者 理想情况下 sc
  • 尝试创建 jar 时出现 UNRESOLVED DEPENDENCIES 错误

    我正在尝试构建一个 Scala jar 文件以在 Spark 中运行它 我正在关注这个tutorial http spark apache org docs latest quick start html 当尝试使用 sbt 作为构建 ja
  • Scala 功能设计模式目录

    一周以来我一直在阅读 Scala 编程 作者一步一步地介绍了该语言的元素 但我仍然很困惑何时使用演员 闭包 柯里化等功能性的东西 我正在寻找功能结构的典型用例或最佳实践的目录 我并不是说在 Scala 中重新实现像 GoF 这样的众所周知的
  • 如何在 Pyspark 中使用滑动窗口对时间序列数据进行数据转换

    我正在尝试根据时间序列数据的滑动窗口提取特征 在Scala中 似乎有一个sliding函数基于这个帖子 https stackoverflow com a 28863132 3089523 and 文档 http spark apache
  • 无法在 Windows 10 中启动 Spark Master

    我是 Spark 新手 我正在尝试手动启动 master 在 Windows 10 中使用 MINGW64 当我这样做时 Downloads spark 1 5 1 bin hadoop2 4 spark 1 5 1 bin hadoop2
  • 错误:无法在 scala 中找到或加载主类

    安装 eclipse scala 插件和 eclipse maven scala 插件后 我是 scala 新手 所以我尝试确保在测试 scala hello world 项目后环境正常工作 它按预期工作 但我在尝试执行我从公司存储库中签出
  • scala.math.BigDecimal :1.2 和 1.20 相等

    将 Double 或 String 转换为 scala math BigDecimal 时如何保持精度和尾随零 用例 在 JSON 消息中 属性的类型为 String 值为 1 20 但是在 Scala 中读取这个属性并将其转换为 BigD
  • Build.scala中%和%%符号含义

    我是新来玩的 Framework 2 1 java版本 并且没有scala经验 我不明白什么是以及什么是 and 在 Build scala 中表示 我用谷歌搜索了它们但找不到它们的含义 在我的 Build scala 文件中 我有 org
  • 为什么我不需要在 Databricks 中创建 SparkSession?

    为什么我不需要在 Databricks 中创建 SparkSession 集群设置的时候会自动创建一个SparkSession吗 还是其他人帮我做的 这仅在笔记本中完成 以简化用户的工作并避免他们指定不同的参数 其中许多参数不会产生任何效果
  • 将当前类作为 scala 中的参数传递

    如何传递当前类作为参数 在java中我们这样做 mymethod this class or mymethod MyClass class 如何将 scala 当前类传递给此方法 this getClass or classOf MyCla
  • 如何在 Spark 数据帧 groupBy 中执行 count(*)

    我的目的是做相当于基本sql的事情 select shipgrp shipstatus count cnt from shipstatus group by shipgrp shipstatus 我见过的 Spark 数据帧的示例包括其他列
  • WSClient - 打开的文件太多

    我正在 CentOS 6 上使用 Play Framework 2 4 我的应用程序抛出此异常 java net SocketException Too many open files 我在 Stack Overflow 上搜索了很多主题并
  • Jack(Java Android 编译器套件)将如何影响 Scala 开发人员

    现在随着公告Jack https source android com source jack html谷歌阐明了 Java 与 Android 相关的可预见的未来 但这对 Scala 和其他基于 JVM 的语言开发人员有何影响 尤其 Sc
  • 承诺的反面是什么?

    承诺代表将来可能可用 或无法实现 的值 我正在寻找的是一种数据类型 它表示将来可能变得不可用的可用值 可能是由于错误 Promise a b TransitionFromTo
  • Akka中有轻量级的actor吗?

    我的用例非常简单 在两个对象之间交换少量 现在我正在从 Scala Actors 迁移到 Akka 但是我再也找不到那些轻量级 Actors 使用Akka 我不仅需要为Actor创建创建ActorSystem Props 还需要照顾Acto
  • Spark SQL/Hive 查询通过 Join 永远持续下去

    所以我正在做一些应该很简单的事情 但显然它不在 Spark SQL 中 如果我在 MySQL 中运行以下查询 查询将在不到一秒的时间内完成 SELECT ua address id FROM user u inner join user a
  • 需要澄清令人困惑的 Http4s 消息类型 `Response[F]` / `Request[F]`

    我很难理解为什么Request and Response参数化为F 类似的东西是猫效应数据类型资源 从文档中 https typelevel org cats effect docs std resource https typelevel
  • 来自 Janino 和 Commons-Compiler 的 Spark java.lang.NoSuchMethodError

    我正在构建一个使用 Spark 进行基于随机森林分类的 应用程序 当尝试运行该程序时 我从该行收到异常 StringIndexerModel labelIndexer new StringIndexer setInputCol label

随机推荐

  • 如何在不引入竞争条件的情况下等待 RX 主体的响应?

    我有一项服务允许调用者异步发送命令和接收响应 在真实的应用程序中 这些操作是相当断开的 某些操作将发送命令 并且响应将被独立处理 但是 在我的测试中 我需要能够发送命令 然后等待 第一个 响应 然后再继续测试 响应是使用 RX 发布的 我对
  • Android 拖放问题不显示

    我正在开发一个拖放应用程序 一切正常 但我看到了一个问题 我有 3 个 ImageView 其中两个是可拖动对象 另一个是放置目标 每次我将物体扔到除放置目标之外的任何位置时 它都会完全消失 下面是我使用的代码 ImageView iv1
  • s3 临时签名 URL 中缺少对象的自定义 404 页面

    我正在为 Amazon S3 中的一些私人信息生成一些签名 URL 如果签名链接已过期或对象不存在 则会报告 XML 错误以及 404 错误 或 403 禁止 是否可以将签名链接 404 重定向到自定义错误页面 这似乎与 S3 静态网站设置
  • VS2010没有断点时调试器停止

    我最近更改了调试器中的一个选项 我认为这就是导致此问题的原因 但我似乎无法 撤消 它 我谷歌 所有命中都返回相反的 为什么调试器 not在断点处停止 无论如何 有人可以透露一些信息吗 编辑 当我在调试模式下按 f5 时 每次 它进入 Pro
  • Logstash创建管道但未创建索引

    我正在尝试使用 json 文件在 elasticsearch 云上创建索引 我已经创建了如下所示的配置 input file path gt root leads json start position gt beginning ignor
  • 在WPF中画一个十字

    我有一个 WPF 控件 I need to have in background a cross like this After that I d be able to add other controls over my crossed
  • 对数组元素(带有数字的字符串)进行排序,自然排序

    我有一个像这样的数组 IL0 Foo PI0 Bar IL10 Baz IL3 Bob says hello 并且需要对其进行排序 使其看起来像 IL0 Foo IL3 Bob says hello IL10 Baz PI0 Bar 我尝试
  • PHP 循环动态变量

    我正在尝试创建一个动态变量 我有一个循环 我希望它循环记录并为每个记录创建一个变量 我的代码 ct 1 foreach record as rec var ct rec Name ct ct 1 echo var1 当我尝试使用上面的代码时
  • 将 ToolBar 添加到 UITableView 的正确方法是什么?

    我正在编写一个基于导航的 iPhone 应用程序 我希望将 UIToolBar 停靠在屏幕底部 并在工具栏和导航栏之间滚动 UITableView 我见过几个论坛 其中有人建议处理此视图的视图控制器应该是标准 UIViewControlle
  • 车道检测器分隔线 C ++ 与 OpenCV

    现在我一直在用OpenCV进行图像分析 我想做的是识别车道分割线 我所做的如下 1 I receive a image 2 Then transform it to grayscale 3 I apply the GaussianBlur
  • 将 CodeMirror 与 Angular2 集成(打字稿)

    我目前正在致力于将 CodeMirror 编辑器添加到项目中 更准确地说是 Angular2 项目 但我做起来有困难 我的编辑器的实例化似乎无法正常工作 我的代码如下 编辑器 组件 ts import Component from angu
  • C++ 交换值的最有效方法

    我想知道在 C 中交换整数的最有效的操作方式是什么 为什么 是这样的 int a b a a b b a b a a b 比使用临时的更有效 还有其他更有效的方法吗 不只是要求其他方法来交换整数 以及为什么它们会更有效 赋值总是比进行算术运
  • 调整 UILabel 大小以适应自动换行

    这是 iPhone 应用程序的一部分 但通常应该适用于用 objC 编写的 Cocoa 我有一个 UILabel 保存各种数量的文本 从单个字符到几个句子 文本应始终以适合 UILabel 中所有文本的尽可能大的字体显示 最大行数设置为 4
  • List 复杂排序

    我有一个List
  • 使用 gradle 创建一个包含 web 应用程序中的类的 JAR

    IE 有没有等价的archiveClasses maven war plugin 的 true 设置 https maven apache org plugins maven war plugin faq html attached对于 W
  • 如何为谷歌云存储服务帐户进行密钥轮换?

    我已经编写了用于访问 GCS 存储桶的代码 以通过 java 中的 API 存储文件 该 API 需要 JSON 凭证文件 我已经从 google 控制台创建了该 JSON 文件 我需要每 90 天自动执行一次 JSON 文件或密钥轮换 如
  • ModalBottomSheetLayout 在返回导航上闪烁 (Jetpack Compose)

    我在用着ModalBottomSheetLayout import androidx compose material ModalBottomSheetLayout 并在模式表布局中有一些项目 当我按下一个项目时 它会导航到下一个屏幕 但是
  • 使用循环使用 Dataframe Pandas 创建 Excel 工作表

    我正在开发这个函数 它可以抓取网站上的 Fantasy Football 信息并将其写入 Excel 文件 最终 我希望在 Excel 工作簿的单独工作表中包含每周的信息 下面发布的代码可以完美运行 直到我想将其写入 Excel 工作簿 该
  • 使用 Nginx 为 Angular2 CLI 构建的应用程序提供服务会抛出 404、403

    我使用 Angular CLI 创建一个测试应用程序并使用 Nginx 为其提供服务 得到 404 或 403 我猜这是我的 Nginx 配置的问题 但为了额外确定 我已经提供了达到这一点所执行的所有步骤 这些是我遵循的步骤 安装的角度 c
  • 如何使用scala从apache Spark中的kafka主题读取json数据

    我是新的 Spark 您能否让我知道如何使用 scala 从 apache Spark 中的 kafka 主题读取 json 数据 Thanks 最简单的方法是使用 Spark 附带的 DataFrame 抽象 val sqlContext