Spark 结构化流 - 从嵌套目录读取文件

2024-04-14

我有一个客户端将 CSV 文件放置在嵌套目录中,如下所示,我需要实时读取这些文件。我正在尝试使用 Spark 结构化流来做到这一点。

Data:
/user/data/1.csv
/user/data/2.csv
/user/data/3.csv
/user/data/sub1/1_1.csv
/user/data/sub1/1_2.csv
/user/data/sub1/sub2/2_1.csv
/user/data/sub1/sub2/2_2.csv

Code:

val csvDF = spark
  .readStream
  .option("sep", ",")
  .schema(userSchema)      // Schema of the csv files
  .csv("/user/data/")

添加任何配置以允许 Spark 从结构化流中的嵌套目录读取。


我可以使用 glob 路径流式传输子目录中的文件。

在这里发帖是为了其他人。

inputPath = "/spark_structured_input/*?*"
inputDF = spark.readStream.option("header", "true").schema(userSchema).csv(inputPath)
query = inputDF.writeStream.format("console").start()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 结构化流 - 从嵌套目录读取文件 的相关文章

  • scala.collection.Seq 不适用于 Java

    Using 阿帕奇火花2 0 1 Java 7 在 Apache Spark Java API 文档中 DataSet 类出现了一个example http spark apache org docs latest api java org
  • 使用列值作为 Spark DataFrame 函数的参数

    考虑以下数据框 letter rpt X 3 Y 1 Z 2 可以使用以下代码创建 df spark createDataFrame X 3 Y 1 Z 2 letter rpt 假设我想重复每行列中指定的次数rpt 就像这样questio
  • Spark:如何从spark shell运行spark文件

    我正在使用CDH 5 2 我能够使用火花外壳运行命令 如何运行包含 Spark 命令的文件 file spark 有没有办法在没有 sbt 的情况下在 CDH 5 2 中运行 编译 scala 程序 在命令行中 您可以使用 spark sh
  • 如何使用 Apache Livy 设置 Spark 配置属性?

    我不知道在向 Apache Livy 提交 Spark 作业时如何以编程方式传递 SparkSession 参数 这是测试 Spark 作业 class Test extends Job Int override def call jc J
  • 计算行的排名

    我想根据一个字段对用户 ID 进行排名 对于相同的字段值 排名应该相同 该数据位于 Hive 表中 e g user value a 5 b 10 c 5 d 6 Rank a 1 c 1 d 3 b 4 我怎样才能做到这一点 可以使用ra
  • Spark 中的广播 Annoy 对象(对于最近邻居)?

    由于 Spark 的 mllib 没有最近邻居功能 我正在尝试使用Annoy https github com spotify annoy为近似最近邻 我尝试广播 Annoy 对象并将其传递给工人 然而 它并没有按预期运行 下面是可重复性的
  • 获取 emr-ddb-hadoop.jar 将 DynamoDB 与 EMR Spark 连接

    我有一个 DynamoDB 表 需要将其连接到 EMR Spark SQL 才能对该表运行查询 我获得了带有发行标签 emr 4 6 0 和 Spark 1 6 1 的 EMR Spark Cluster 我指的是文档 使用 Spark 分
  • 如何将多行标签 xml 文件转换为 dataframe

    我有一个包含多个行标签的 xml 文件 我需要将此 xml 转换为正确的数据帧 我使用了spark xml 它只处理单行标签 xml数据如下
  • 如何从 Databricks Delta 表中删除列?

    我最近开始发现 Databricks 并遇到了需要删除增量表的特定列的情况 当我使用 PostgreSQL 时 它就像 ALTER TABLE main metrics table DROP COLUMN metric 1 我正在浏览 Da
  • Spark:并行转换多个数据帧

    了解如何在并行转换多个数据帧时实现最佳并行性 我有一系列路径 val paths Array path1 path2 我从每个路径加载数据帧 然后转换并写入目标路径 paths foreach path gt val df spark re
  • Kubernetes WatchConnectionManager:执行失败:HTTP 403

    我遇到错误Expected HTTP 101 response but was 403 Forbidden 在我使用以下命令设置新的 Kubernetes 集群之后Kubeadm当我提交下面遇到的 pyspark 示例应用程序时 只有一个主
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • Spark问题中读取大文件 - python

    我已经使用 python 在本地安装了 Spark 并在运行以下代码时 data sc textFile C Users xxxx Desktop train csv data first 我收到以下错误 Py4JJavaError Tra
  • pyspark flatmat 错误:TypeError:“int”对象不可迭代

    这是我书中的示例代码 from pyspark import SparkConf SparkContext conf SparkConf setMaster spark chetan ThinkPad E470 7077 setAppNam
  • Spark:如何使用crossJoin

    我有两个数据框 df1有 100000 行并且df2有 10000 行 我想创建一个df3这是两者的交叉连接 val df3 df1 crossJoin df2 这将产生 10 亿行 尝试在本地运行它 但似乎需要很长时间 您认为本地可以实现
  • Scala:什么是 CompactBuffer?

    我试图弄清楚 CompactBuffer 的含义 和迭代器一样吗 请解释其中的差异 根据 Spark 的文档 它是 ArrayBuffer 的替代方案 可以提供更好的性能 因为它分配的内存更少 以下是 CompactBuffer 类文档的摘
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • 根据 pyspark 中的条件从数据框中删除行

    我有一个包含两列的数据框 col1 col2 22 12 2 1 2 1 5 52 1 2 62 9 77 33 3 我想创建一个新的数据框 它只需要行 col1 的值 gt col2 的值 就像注释一样col1 很长类型和col2 有双
  • Spark 2.2 无法将 df 写入 parquet

    我正在构建一个聚类算法 我需要存储模型以供将来加载 我有一个具有以下架构的数据框 val schema new StructType add StructField uniqueId LongType add StructField tim
  • Spark.sql.shuffle.partitions 的最佳值应该是多少,或者在使用 Spark SQL 时如何增加分区?

    我实际上正在使用 Spark SQLhiveContext sql 它使用 group by 查询 我遇到了 OOM 问题 所以考虑增加价值spark sql shuffle partitions从默认的 200 到 1000 但这没有帮助

随机推荐

  • nifi invokehttp post复杂的json

    我尝试在 Apache NiFi 中使用 InvokeHttpProcessor 来执行具有复杂 JSON 正文的 POST 请求 因此本教程 http www tomaszezula com 2016 10 30 nifi and htt
  • PostgreSQL 中是否有可用的多值字段类型?

    我想知道是否可以在 PostgreSQL 的一个字段中存储多个值 我有一张桌子叫Token与列id text and category category是一个多值字段 是否有必要为其创建一个单独的表 或者有没有办法将其存储在Token ta
  • java.lang.NoSuchMethodError:没有静态方法 getDrawable(Landroid/content/Context;I)

    正在将 Socialize SDK 集成到我的应用程序中 我还将 Android Studio 更新到了 2 3 3 我的 Activity 的 super onCreate savedInstanceState 给了我一个错误 这里也提出
  • malloc(0) 的行为

    int main char p p char malloc sizeof char 0 printf Hello Enter the data without spaces n scanf s p printf The entered st
  • 如何在 Java 中定义重复的枚举常量?

    我想定义一个具有两个 值 相同的常量的枚举类型 我将这两个常量称为重复项 考虑以下示例 我想要定义一个浏览器类型列表 并且想要同时拥有文字 IE 和 InternetExplorer 如下所示 enum Browser CHROME chr
  • 关闭SKScene后,内存仍然居高不下

    我使用dispatch onceNSObject创建数据指针 因此 当主视图控制器出现时 所有游戏资源指针都会被创建 为了玩游戏 用户点击UIButton对应于某个特定级别UI视图控制器 让我称之为 LevelSelectionContro
  • AXML 和 XAML 之间的区别?

    我是 Visual Studio Xamarin 跨平台移动开发的新手 我一直在搜索 AXML 但我找不到任何设计和应用 MVC 方法的教程 实际上我对此有很多疑问 但我先把这 3 留在这里 他们有什么区别 xaml设计可以应用在axml中
  • 使用 psycopg cur.execute 创建 postgres 模式

    我的 python 应用程序允许用户创建其命名模式 我需要一种方法来保护应用程序免受 SQL 注入 要执行的SQL读取 CREATE SCHEMA schema name AUTHORIZATION user name psycopg 文档
  • PHP 正则表达式生成器

    我现在已经获得了满足以下所需条件的有效正则表达式字符串 一行 php 就绪正则表达式 包含许多关键字和关键术语 并且至少匹配其中一个 例如 关键术语 apple banana strawberry pear cake 现在 如果找到任何这些
  • git、mercurial、bazaar 源代码库的可理解性

    我想阅读一种流行的版本控制工具的源代码 以了解版本控制的工作原理 我想读一本最具可读性的书 我不知道对此有什么客观 定量的衡量标准 所以本着WTF 分钟漫画 http www osnews com story 19266 WTFs m 想请
  • 内容提供商中 Android 投影图的用途是什么?

    我正在查看 Android 记事本应用程序示例代码
  • Firefox Web 扩展“无法访问死对象”错误

    我很难找到这个问题的最新答案 并且经常没有时间在这里回答问题 所以我想我会发布这个 这样我就可以回答我自己的问题 因为我找到了解决方案 我正在为 Chrome 和 Firefox 制作一个 Web 扩展 Firefox 有一个问题 当我从选
  • 跨浏览器选项卡共享 websocket?

    我们希望每个浏览器都有一个套接字 而不是浏览器中的每个选项卡都有一个套接字 我们怎样才能实现它呢 我读到了有关共享网络工作者的文章 这很有前途 对此的参考也值得赞赏 不幸的是 据我所知 共享网络工作者尚未被 Mozilla 或 Intern
  • 在非托管 CDI Bean 中引用 CDI Bean

    是否可以在使用创建的类中获取 CDI bean 的实例new关键词 我们目前正在对旧应用程序进行一些增强 并且我们总是会得到上下文不活跃异常 http docs jboss org cdi api 1 0 javax enterprise
  • 如何在 Xamarin iOS 中绘制文本?

    我想在给定点 x y 处绘制文本Draw自定义的方法View 我已关注这个样本 https developer xamarin com recipes ios graphics and drawing core text draw unic
  • 如何在CKEditor 4中设置默认字体和字体大小

    我使用以下代码在 CKEditor 4 中设置默认字体和字体大小 config font defaultLabel Tahoma config fontSize defaultLabel 24px 但上面的代码在 Mozilla Firef
  • php 中转义引号的意义是什么

    这是我正在学习的一本书中的验证脚本 为什么需要转义引号 例如
  • 在 PHP 中对逗号分隔值列表运行选择

    我在数据库上运行选择查询时遇到一些问题 一些数据以逗号分隔值的列表形式保存 例如 Table example tbl Id People Children 1 1 2 3 8 10 3 2 7 6 12 18 19 2 我正在尝试运行的示例
  • SQL Server 2008 - 按带有数字的字符串排序

    我的表中有以下值 ABC ABC1 ABC2 ABC3 and so on ABC11 ABC12 ABC13 and so on ABC20 ABC21 ABC22 and so on 所以基本上我拥有的是任何字符串值 并不总是 ABC
  • Spark 结构化流 - 从嵌套目录读取文件

    我有一个客户端将 CSV 文件放置在嵌套目录中 如下所示 我需要实时读取这些文件 我正在尝试使用 Spark 结构化流来做到这一点 Data user data 1 csv user data 2 csv user data 3 csv u