Apache Spark + Parquet 不遵守使用“分区”暂存 S3A 提交器的配置

2024-01-12

我正在使用本地计算机上的 Apache Spark (3.0) 将分区数据(Parquet 文件)写入 AWS S3,而无需在计算机中安装 Hadoop。当我有很多文件要写入大约 50 个分区(partitionBy = date)时,我在写入 S3 时遇到 FileNotFoundException。

然后我又遇到了新的S3A提交者 https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html#Using_the_Directory_and_Partitioned_Staging_Committers,所以我尝试配置“分区”提交者。但当文件格式为“parquet”时,我仍然可以看到 Spark 使用 ParquetOutputCommitter 而不是 PartitionedStagingCommitter。当我有大量数据要写入时,我仍然收到 FileNotFoundException 。

我的配置:

        sparkSession.conf().set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2);
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.name", "partitioned");
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.magic.enabled ", false);
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append");
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.unique-filenames", true);
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", true);
        sparkSession.conf().set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory");
        sparkSession.conf().set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol");
        sparkSession.conf().set("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter");
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.tmp.path", "tmp/staging");

我做错了什么?有人可以帮忙吗?

Note:我已经在 Spark 中创建了一个 JIRA,但到目前为止没有任何帮助:SPARK-31072 https://issues.apache.org/jira/browse/SPARK-31072

=================================================== ===========

我尝试了(@Rajadayalan)的答案。但它仍然使用 FileOutputFormatter。我尝试将 Spark 版本降级到 2.4.5,但没有成功。

20/04/06 12:44:52 INFO ParquetFileFormat: Using user defined output committer for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
20/04/06 12:44:52 WARN AbstractS3ACommitterFactory: **Using standard FileOutputCommitter to commit work**. This is slow and potentially unsafe.
20/04/06 12:44:52 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
20/04/06 12:44:52 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/04/06 12:44:52 INFO AbstractS3ACommitterFactory: Using Commmitter FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200406124452_0000}; taskId=attempt_20200406124452_0000_m_000000_0, status=''}; org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@61deb03f}; outputPath=s3a://******/observation, workPath=s3a://******/observation/_temporary/0/_temporary/attempt_20200406124452_0000_m_000000_0, algorithmVersion=2, skipCleanup=false, ignoreCleanupFailures=false} for s3a://********/observation
20/04/06 12:44:53 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
20/04/06 12:44:54 INFO CodeGenerator: Code generated in 81.077046 ms
20/04/06 12:44:54 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
20/04/06 12:44:54 INFO CodeGenerator: Code generated in 31.993775 ms
20/04/06 12:44:54 INFO CodeGenerator: Code generated in 9.967359 ms

注意:我的本地没有安装 Spark。所以给了Spark-hadoop-cloud_2.11作为编译时依赖 我的 build.gradle 如下所示:

    compile group: 'org.apache.spark', name: 'spark-hadoop-cloud_2.11', version: '2.4.2.3.1.3.0-79'
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.0'
    // https://mvnrepository.com/artifact/org.apache.parquet/parquet-column
    compile group: 'org.apache.parquet', name: 'parquet-column', version: '1.10.1'
    // https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop
    compile group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.10.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.1'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-sketch
    compile group: 'org.apache.spark', name: 'spark-sketch_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-core
    compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst
    compile group: 'org.apache.spark', name: 'spark-catalyst_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-tags
    compile group: 'org.apache.spark', name: 'spark-tags_2.11', version: '2.4.5'
    compile group: 'org.apache.spark', name: 'spark-avro_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-hive
    compile group: 'org.apache.spark', name: 'spark-hive_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.xbean/xbean-asm6-shaded
    compile group: 'org.apache.xbean', name: 'xbean-asm7-shaded', version: '4.15'
   compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.2.1'
//    compile group: 'org.apache.hadoop', name: 'hadoop-s3guard', version: '3.2.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.2.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '3.2.1'
    compile group: 'com.amazonaws', name: 'aws-java-sdk-bundle', version: '1.11.271'

遇到同样的问题,解决方案来自如何让 AWS 上的本地 Spark 写入 S3 https://stackoverflow.com/questions/58495909/how-to-get-local-spark-on-aws-to-write-to-s3努力加载 PartitionedStagingCommitter。您还必须从解决方案中提到的下载spark-hadoop-cloud jar。

我也使用spark 3.0,这个版本的jar可以工作https://repo.hortonworks.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/2.4.2.3.1.3.0-79/ https://repo.hortonworks.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/2.4.2.3.1.3.0-79/

我的spark-defaults.conf中的设置

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.committer.name                           partitioned
spark.hadoop.fs.s3a.committer.magic.enabled                  false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode           append
spark.hadoop.fs.s3a.committer.staging.unique-filenames       true
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads  true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a    
org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass                        
org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class                     
org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark + Parquet 不遵守使用“分区”暂存 S3A 提交器的配置 的相关文章

随机推荐

  • 使用数字索引重新采样 pandas 系列

    假设我有一个 pandas Series 其索引具有数值类型 例如 pd Series 10 20 1 1 2 3 我们如何以 0 1 间隔对上述序列进行重新采样 看起来 resample 函数只适用于日期时间间隔 这就是插值的名称 您可以
  • 带有 Docker 执行器 /usr/bin/bash 的 Gitlab-CI:第 90 行:git:找不到命令

    我有一个本地 GitLab 服务器和带有 Docker 执行器的 gitlab ci 运行程序 我想使用 gitlab ci 构建 第一阶段 我的 Maven 项目 由于我使用 buildnumber maven plugin 我向 git
  • 为什么看似空的文件和字符串会产生 md5sum?

    考虑以下 md5sum dev null d41d8cd98f00b204e9800998ecf8427e dev null touch empty md5sum empty d41d8cd98f00b204e9800998ecf8427e
  • 为什么在新的 virtualenv 中导入 numpy 需要 5 秒?

    背景 你好 我们编写的 Python 代码在我们无法控制的服务器上运行 我们不太了解代码运行的环境 如果我们的代码运行时间超过 3 秒 就会被拒绝 因此 我决定开始使用虚拟环境对我们的代码进行计时 以给出最坏情况下的运行时间估计 Quest
  • Java swing:选择/取消选择 JButton 以模仿脉冲

    FE我有一个电子邮件客户端 它接收新消息 带有传入消息的按钮开始执行某些操作 直到用户单击它以查看发生了什么 我试图通过选择 等待然后取消选择按钮来吸引注意力 但这没有任何作用 do button setSelected true Thre
  • 乳胶输出

    当我编译乳胶文件时 它还会生成 txt bbl aux 文件 它们没有用 因为我可以删除它们而不会造成任何损害 我的问题是这些文件的用途是什么以及如何在编译 tex 文件时选择不生成它们 这些文件很有用 代表多遍排版过程的输出 如果删除它们
  • Python numpy 数组元素不改变值

    所以我的 python 代码中遇到了一个问题 我将其归结为 假设我们有一个函数u def u y t h float 10 U0 float 1 return U0 h y 和一个数组 a np array 0 2 2 然后执行以下操作 a
  • 使用 Laravel Mix 时如何包含 webpack 插件?

    如果我使用 WebPack 和 Laravel Mix 我应该如何包含 webpack 插件 我很困惑将插件代码添加到哪个文件中 我的以下尝试似乎没有运行我的插件 该插件应该压缩 js css 文件 但事实并非如此 webpack conf
  • 使用 Sympy 集成到 Python 中

    我目前正在使用Sympy帮助我进行数学计算 现在 我正在尝试执行数值积分 但每次运行脚本时都会出现错误 这是脚本 from sympy import cst qe 1 60217646 10 19 m0 N 1 25663706 10 6
  • 无论如何,我可以在谷歌合作实验室下载该文件吗?

    我正在这个 Codelab 的 Google Colaboratory 中尝试张量流 我需要下载 http download tensorflow org example images flower photos tgz http down
  • PHP 复选框多重删除

    我的实现似乎不起作用 您能指出可能出现的问题或指出更好的解决方案吗 当我选中复选框并单击删除按钮时 它似乎没有执行任何操作 请帮助我 div class page img class page src images DISCLAIMER p
  • 获取当月数据记录条数

    我正在尝试查找数据库中当月结束的车辆记录总数 我不知道我应该在里面写什么InvoiceDate本例中的部分 public void MonthlyStatus NetContext context var monthlyStatus fro
  • Zend Framework,将 URL 的扩展名映射到格式参数?

    是否可以将 URL 的扩展名映射到 ZF 中的格式参数 我希望默认路由仍然有效 包括从 URI 映射参数 因此您可以说 http example com controller action param1 value1 param2 valu
  • 何时返回 IOrderedEnumerable?

    Should IOrderedEnumerable纯粹用作语义值的返回类型 例如 当在表示层中消费模型时 我们如何知道集合是否需要排序或已经排序 如果存储库用一个存储过程包装了一个存储过程 该怎么办 ORDER BY条款 存储库是否应该返回
  • 不存在类型变量 U 的实例,因此 void 符合 U

    我正在努力避免isPresent检查下面的代码 但编译器发出错误消息 没有类型变量的实例U存在使得void符合U 打电话给printAndThrowException 这是我的代码 values stream filter value gt
  • 您在 ASP.NET MVC 中使用什么视图引擎? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我知道您可以在 ASP NET MVC 中使用几种不同的视图引擎 ASPX 显然 NV速度 Brail NHaml et al 默认的 ASPX
  • 更改“查看购物车”按钮的文本

    我正在使用 woocommerce 插件 但我遇到了如何更改查看购物车按钮文本的问题 希望有人可以帮助解决我的问题 这是my site http unlieusurterre fix it buddy clients com the tru
  • 无服务器 python 请求具有长时间超时?

    我有几个遵循类似格式的 python 脚本 您传入一个日期 它要么 检查我的 S3 存储桶中文件名中包含该日期的文件 并解析它 或者 运行一个 python 脚本 对文件进行一些分析该日期的文件 运行时间超过 1 小时 我正在寻找一种无服务
  • PHP MySQL 数据库连接

    执行查询 和其他数据库操作 后是否有必要显式关闭数据库连接 不 php 自动执行此操作 不过 您可以将其称为 良好的编程实践 来清理 也称为关闭连接
  • Apache Spark + Parquet 不遵守使用“分区”暂存 S3A 提交器的配置

    我正在使用本地计算机上的 Apache Spark 3 0 将分区数据 Parquet 文件 写入 AWS S3 而无需在计算机中安装 Hadoop 当我有很多文件要写入大约 50 个分区 partitionBy date 时 我在写入 S