Spark Scala 使用 s3a 读取 csv 文件

2024-03-14

我正在尝试使用本地运行的 Spark - Scala 从 S3 存储桶读取 csv(本机)文件。我可以使用 http 协议读取该文件,但我打算使用 s3a 协议。

以下是调用前的配置设置。

    val awsId = System.getenv("AWS_ACCESS_KEY_ID")
    val awsKey = System.getenv("AWS_SECRET_ACCESS_KEY")
    sc.hadoopConfiguration.set("fs.s3a.access.key", awsId) 
    sc.hadoopConfiguration.set("fs.s3a.secret.key", awsKey)    
    sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider");
    sc.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "us-east-1.amazonaws.com")
    sc.hadoopConfiguration.set("fs.s3a.impl.disable.cache", "true")
 here

读取文件并打印 rdd/dataframe 中的前 5 行

    val fileAPath = Files.s3aPath(Files.input);
    println("reading file s3", fileAPath)
    // s3a://bucket-name/dataSets/policyoutput.csv
    val df = sc.textFile(fileAPath);
    df.take(5).foreach(println);

我收到以下异常

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: FD92FDC175C64AA2, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: IuloUEASgqnY4lrSMpbyJpwgFfCFbttxuxmJ9hGHMUgZTbO/UR/YyDgjix+3rBe0Y4MQHPzNvhA=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

任何进一步调查的帮助/指导将不胜感激。

Thanks


其他任何人都在为此苦苦挣扎,我必须更新 hadoop-client 的版本

另外下面的链接非常有帮助

  • https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html

  • http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region

pom详细信息如下

<properties>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.8.0</hadoop.version>

</properties>


<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Scala 使用 s3a 读取 csv 文件 的相关文章

  • Kubernetes WatchConnectionManager:执行失败:HTTP 403

    我遇到错误Expected HTTP 101 response but was 403 Forbidden 在我使用以下命令设置新的 Kubernetes 集群之后Kubeadm当我提交下面遇到的 pyspark 示例应用程序时 只有一个主
  • 如何从spark管道逻辑模型中提取变量权重?

    我目前正在尝试学习 Spark Pipeline Spark 1 6 0 我将数据集 训练和测试 导入为 oas sql DataFrame 对象 执行以下代码后 生成的模型是oas ml tuning CrossValidatorMode
  • Scala 中奇怪的类型不匹配

    我希望这个问题还没有在其他地方得到解答 在这里没有找到答案 在我的本地化系统中 我有一个名为 Language 的类 class Language val name String dict HashMap String String def
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • 通过过滤对 Pyspark Dataframe 进行分组

    我有一个数据框如下 cust id req req met 1 r1 1 1 r2 0 1 r2 1 2 r1 1 3 r1 1 3 r2 1 4 r1 0 5 r1 1 5 r2 0 5 r1 1 我必须观察客户 看看他们有多少要求 看看
  • Scala 中抛出异常,什么是“官方规则”

    我正在 Coursera 上学习 Scala 课程 我也开始阅读 Odersky 的 Scala 书 我经常听到的是 在函数式语言中抛出异常不是一个好主意 因为它破坏了控制流 并且我们通常返回一个失败或成功的 Either Scala 2
  • 如何在使用 Active Storage 上传之前调整图像大小(与 AWS 链接)

    我尝试将 Active Storage 与 Amazon Web Services 结合使用 而不是 Carrierwave 和 Cloudinary 使用 Carrierwave 我有一些功能可以在通过上传器控制器上传之前调整图像大小 但
  • IntelliJ IDEA 13:新的 Scala SBT 项目尚未生成 src 目录结构

    我按照 Jetbrains 网站上的入门视频设置 IntelliJ IDEA 13 1 Community Edition 以与 Scala 配合使用 Scala 插件 v0 36 431 已安装 当我使用向导创建一个新的 Scala SB
  • 使用 Python 将列名称与 CSV 文件中的数据对齐

    这是我用来将数据写入 csv 文件的代码 with open temp csv a as fp a csv writer fp delimiter t data faceXpos faceYpos faceHeight faceWidth
  • Scala(或 Java)中泛型函数的特化

    是否可以在 Scala 中专门化泛型函数 或类 例如 我想编写一个将数据写入 ByteBuffer 的通用函数 def writeData T buffer ByteBuffer data T buffer put data 但由于 put
  • 在没有签名 URL 的情况下使用 CloudFront/S3 设置内容处置

    我有一些具有公共读取访问权限的对象 这些对象仅限于通过 CloudFront 提供服务 当我尝试传递一个response content disposition参数到我的 CloudFront URL 我收到 S3 错误 Request s
  • 如何使用 zend 导入 CSV

    如何使用 zend 框架导入 CSV 文件 我应该使用 zend file transfer 还是有任何我必须研究的特殊类 另外 如果我使用 zend file transfer 是否有任何特殊的 CSV 验证器 你不必使用任何 zend
  • 对 CSV 行使用小写函数

    我正在尝试以小写形式打印 csv 中的所有数据 但我没有任何运气 这是我到目前为止所拥有的 import csv books csv reader open books csv rb for row in books print row 这
  • 更改 Spark SQL 中的 Null 顺序

    我需要能够按升序和降序对列进行排序 并且还允许空值位于第一个或空值位于最后一个 使用 RDD 我可以将 sortByKey 方法与自定义比较器结合使用 我想知道是否有使用 Dataset API 的相应方法 我了解如何将 desc asc
  • Scala 宏的位置怎么了?

    我试图获取宏参数的原始输入字符串 但返回的位置似乎有点偏离 考虑这个宏 例如 object M import scala reflect macros Context import language experimental macros
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • Spark问题中读取大文件 - python

    我已经使用 python 在本地安装了 Spark 并在运行以下代码时 data sc textFile C Users xxxx Desktop train csv data first 我收到以下错误 Py4JJavaError Tra
  • 规范化且不可变的数据模型

    Haskell如何解决 规范化不可变数据结构 问题 例如 让我们考虑一个表示前女友 男友的数据结构 data Man Man name String exes Woman data Woman Woman name String exes
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger

随机推荐