Scala 和 Spark:Windows 上的 Dataframe.write._

2024-05-07

有人设法使用 Spark 写入文件(尤其是 CSV)吗?数据框 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset在 Windows 上?

SO 上的许多答案都已经过时了(例如this one https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv)因为 Sparks 具有编写 .CSV 的本机功能(以及统一的write()方法)自 2.0 版本以来。另外,我下载并添加了winutils.exe喜欢提议的here https://stackoverflow.com/questions/30993655/write-rdd-as-textfile-using-apache-spark.

Code:

// reading works just fine
val df = spark.read
             .option("header", true)
             .option("inferSchema", true)
             .csv("file:///C:/tmp/in.csv")
// writing fails, none of these work
df.write.csv("file:///C:/tmp/out.csv")
df.write.csv("C:/tmp/out.csv")

Error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:551)
    at prost.ebtl.load.DataSourceCSV$.loadFromFilesystem(DataSourceCSV.scala:12)
    at TestScala$$anonfun$main$2.apply(TestScala.scala:98)
    at TestScala$$anonfun$main$2.apply(TestScala.scala:80)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at TestScala$.main(TestScala.scala:80)
    at TestScala.main(TestScala.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 13, 192.168.56.1): java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:305)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:294)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:326)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:393)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:191)
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
    ... 27 more
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:305)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:294)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:326)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:393)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:191)
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Note: 名为的文件夹out.csv虽然创建了

Setup:Hadoop v.2.7.3、Spark 2.0.1 Intelli J IDEA 2016.2、Scala 2.11.8、Win7 工作站上的 Testcluster


我尝试过这个,它有效。您需要设置仓库目录配置。这是您的代码中唯一缺少的东西,您也对您尝试写入的目录具有写访问权限。

val spark = SparkSession
    .builder()
    .appName("Spark SQL CSV example")
    .master("local")
    .config("spark.sql.warehouse.dir", "file:///C:/IJava/")
    .getOrCreate()

  val df = spark.read
    .option("header", true)
    .option("inferSchema", true)
    .csv("file:///C:/Users/sankar/Downloads/FLinsurancesample.csv")

  df.write.csv("file:///C:/Users/sankar/Downloads/out.csv")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Scala 和 Spark:Windows 上的 Dataframe.write._ 的相关文章

  • 如何使用 R 将每个文件的数据添加为附加行,从而将不同的 .csv 文件合并为一个完整的文件?

    我有几个不同的文件夹 它们都包含一个 csv 文件 所有这些 csv 文件都有一个单独的列 其中包含实验的一种条件的数据 我想以将每个文件的数据添加为新列的方式合并这些 csv 文件 目前 它看起来像这样 C1 csv 102 106 15
  • 仅将 pandas df 的前 N ​​行写入 csv

    如何仅将前 N 行或从 P 到 Q 行从 pandas 数据帧写入 csv 而不首先对 df 进行子集化 由于内存问题 我无法对要导出的数据进行子集化 我正在考虑一个逐行写入 csv 的函数 谢谢 Use head https pandas
  • 可以读取目标文件吗?

    我很好奇 obj文件 我几乎不知道它们是什么 或者它们包含什么 所以我用 Vim 文本编辑器打开它们 我在里面发现了一种类似外星人的语言 有什么办法可以理解它们代表什么以及它们的内容是什么 另外 它们的用途是什么 Thanks Sure 但
  • 加载数据infile,Windows和Linux的区别

    我有一个需要导入到 MySQL 表的文件 这是我的命令 LOAD DATA LOCAL INFILE C test csv INTO TABLE logs fields terminated by LINES terminated BY n
  • 如何有效截断文件头?

    大家都知道truncate file size 函数 通过截断文件尾部将文件大小更改为给定大小 但是如何做同样的事情 只截断文件的尾部和头部呢 通常 您必须重写整个文件 最简单的方法是跳过前几个字节 将其他所有内容复制到临时文件中 并在完成
  • Scala 的“神奇”函数列表

    在哪里可以找到 Scala 的 神奇 函数列表 例如apply unapply update etc 魔法函数是指编译器的某些语法糖使用的函数 例如 o update x y lt gt o x y 我用谷歌搜索了一些组合scala mag
  • 在高 dpi Windows 平台上自动重新缩放应用程序?

    我正在编写一个需要在高 dpi Windows 192dpi 而不是 96dpi 上运行的 Qt 应用程序 不幸的是 Qt 框架尚不支持高 dpi 至少在 Windows 上 因此我的应用程序及其所有元素看起来只有应有尺寸的一半 有没有办法
  • Python3 在 DirectX 游戏中移动鼠标

    我正在尝试构建一个在 DirectX 游戏中执行一些操作的脚本 除了移动鼠标之外 我一切都正常 是否有任何可用的模块可以移动鼠标 适用于 Windows python 3 Thanks I used pynput https pypi or
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • CPU 周期与总 CPU 时间

    在 Windows 上 GetProcessTimes 和 QueryProcessCycleTime 可用于获取应用程序所有线程的总计 我期望 显然是天真地 找到总周期数和总处理器时间 用户 内核 之间的比例关系 当转换为相同的单位 秒
  • Visual C++ 找不到“Windows 类型”,如 PVOID、DWORD、ULONG 等

    Windows 似乎无法找到任何这些类型 我完全不知道该怎么办 我在 MSDN 上找到的东西似乎表明它们是默认包含的 但它们在 Native 程序或 CLR 程序中不起作用 我收到的具体错误是
  • 如何使用 Scala 从 Spark 更新 ORC Hive 表

    我想更新 orc 格式的 hive 表 我可以从 ambari hive 视图进行更新 但无法从 sacla spark shell 运行相同的更新语句 objHiveContext sql select from table name 能
  • 过滤器的 Scala 集合类型

    假设您有一个 List 1 1 其类型为 List Any 这当然是正确的且符合预期 现在如果我像这样映射列表 scala gt List 1 1 map case x Int gt x case y String gt y toInt 结
  • 我可以在没有 Hadoop 的情况下使用 Spark 作为开发环境吗?

    我对大数据和相关领域的概念非常陌生 如果我犯了一些错误或拼写错误 我很抱歉 我想了解阿帕奇火花 http spark apache org 并使用它仅在我的电脑中 在开发 测试环境中 由于Hadoop包含HDFS Hadoop分布式文件系统
  • 将下划线分配给变量。下划线是做什么的?

    最近我遇到了这样的代码 var myVariable variableKind 这似乎是一种分配方式null to myVariable 谁能解释一下背后的理由 在这种情况下 分配之间有什么区别 and null到一个变量 它使用默认值初始
  • Windows 上的 Apache Pig 在运行“pig -x local”时出现“hadoop-config.cmd”未被识别为内部或外部命令”错误

    如果您由于以下错误而无法在 Windows 上运行 Apache Pig hadoop 2 4 0 bin hadoop config cmd is not recognized as an internal or external com
  • 如何在Windows中的Python 3.9下pip安装pickle?

    我需要pickle https docs python org 3 9 library pickle html module pickle包安装在我的下面Python 3 9在 Windows 10 下 我尝试过的 当尝试与pip inst
  • 代码 GetAsyncKeyState(VK_SHIFT) & 0x8000 中的这些数字是什么?它们是必不可少的吗?

    我试图在按下按键的简单动作中找到这些数字及其含义的任何逻辑解释 GetAsyncKeyState VK SHIFT 0x8000 可以使用哪些其他值来代替0x8000它们与按键有什么关系 GetAsyncKeyState 根据文档返回 如果
  • 通过 PowerShell 运行 .cmd 文件

    我正在尝试使用 PowerShell 在远程服务器上运行 cmd 文件 在我的 ps1 脚本中我尝试过 C MyDirectory MyCommand cmd 它会导致此错误 C MyDirectory MyCommand cmd is n
  • 在 Windows 上查找父进程 ID

    Problem 给定远程 Windows 主机上的进程 ID 和命令行访问权限 如何找到其父进程的 PID Solution 鉴于 Marc B 的回答 我们可以使用 WMIC 命令示例here https learn microsoft

随机推荐