PySpark 使用临时 AWS 令牌进行 s3 身份验证的问题

2024-02-24

我已经设置了本地 PySpark,但是每次我尝试使用 s3a 协议读取文件 s3 时,它都会返回 403 AccessDenied 错误。我尝试连接的账户仅支持 AWS ShouldRole,它为我提供了临时 Access_key、Secret_key 和 session_token

我使用的是 Spark 2.4.4、Hadoop 2.7.3 和 aws-java-sdk-1.7.4 jar 文件。我知道问题不在于我的安全令牌,因为我可以在 boto3 中使用相同的凭据来查询相同的存储桶。我正在设置 Spark 会话,如下所示:

spark.sparkContext._conf.setAll([
[('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'), 
('fs.s3a.aws.credentials.provider','org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider'),
("fs.s3a.endpoint", "s3-ap-southeast-2.amazonaws.com"),
('fs.s3a.access.key', "..."),
('fs.s3a.secret.key', "..."),
('fs.s3a.session.token', "...")])
])

spark_01 = spark.builder.config(conf=conf).appName('s3connection').getOrCreate()

df = spark_01.read.load('s3a://<some bucket>')

我得到的错误是这样的:

com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: ... , AWS Error Code

更新: 完整的错误堆栈:

19/10/08 16:37:17 WARN FileStreamSink: Error while looking for metadata directory.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/readwriter.py", line 166, in load
    return self._df(self._jreader.load(path))
  File "/usr/local/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o47.load.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: DFF18E66D647F534, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: ye5NgB5wRhmHpn37tghQ0EuO9K6vPDE/1+Y6m3Y5sGqxD9iFOktFUjdqzn6hd/aHoakEXmafA9o=
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:557)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)```

为了解决这个问题,我们需要做以下两件事。 (我发现你已经在代码中做了第二件事,所以只需要第一件事。)

  1. 仅使用 hadoop-aws-2.8.5.jar,而不是使用 aws-java-sdk-1.7.4.jar 和 hadoop-aws-2.7.7.jar。 (请参阅《开始》部分https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html)
  2. 设置 fs.s3a.aws.credentials.provider 如下。 对于你的代码, ('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider') 这使您能够使用令牌密钥。通过此设置,您可以在显示或使用系统环境变量(例如 AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY 和 AWS_SESSION_TOKEN)时在代码中提供密钥。

作为参考,此设置 ('fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain') 也可用于从 ~/.aws/credentials 加载凭证密钥,而无需在源代码中进行设置。 (看,http://wrschneider.github.io/2019/02/02/spark-credentials-file.html http://wrschneider.github.io/2019/02/02/spark-credentials-file.html)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark 使用临时 AWS 令牌进行 s3 身份验证的问题 的相关文章

随机推荐

  • 加速 Python

    这确实是两个问题 但它们非常相似 为了简单起见 我想我应该把它们放在一起 Firstly 给定一个已建立的 python 项目 除了简单的代码内优化之外 还有哪些不错的方法可以加速它 Secondly 用python从头开始编写程序时 有哪
  • 将服务从一项活动传递到另一项活动

    如何将服务从一项活动传递到另一项活动 我有一个音乐播放器 Activity1 它显示歌曲列表 当您单击它时 它会启动 Activity1 中的服务 我有一个按钮 用户可以单击该按钮来打开 Activity2 那么我将服务从 Activity
  • 是否可以将 stickylistviewheader 与 crisbanes pulltorefresh 合并?

    我构建了一个应用程序 其中需要 pulltorefresh 和 StickylistHeaders 我已经在应用程序中实现了 pulltorefresh 但无法使其与 StickyListHeaders 一起使用 是否可以合并这两个库 或者
  • 如何避免在回发时从 asp.net 重复输入?

    我有一个从模板表中提取数据的下拉列表 我有一个添加按钮来插入新模板 添加按钮将弹出 jQuery 弹出窗口以插入新值 将有一个保存按钮来保存新数据 On Save Click 我输入新数据并关闭弹出窗口 问题是这样的 当我刷新页面时 页面再
  • 将 C++ lambda 转换为 C 函数

    我正在编写一些包装代码 其中外部库调用 C 函数 使用可变参数模板等 关键点是外部库需要一个 c 函数 这通常没问题 因为这是合法的 LibraryFuncType fn params 虽然我可以轻松地手动完成此操作 但我想使用以下方法自动
  • 使用 DeviceWatcher 监视 USB 驱动器并检索设备信息?

    I m a WinForms开发人员和我已经知道如何使用 WMI 监视连接或断开连接的 USB 但不久前我发现了设备观察者 http msdn microsoft com en us library windows devices enum
  • 将字符串转换为八进制数的最Pythonic方法

    我希望使用存储在配置文件中的文件掩码来更改文件的权限 由于 os chmod 需要八进制数 因此我需要将字符串转换为八进制数 例如 000 gt 0000 or 0o000 for you python 3 folks 644 gt 064
  • Docker 组成和主机名

    我有一个包含 2 个名为 web 和 db 的服务 容器 的撰写文件 version 2 services web image nodejs latest ports 80 db image mysql latest ports 3306
  • is_null($var) 和 ($var === null) 有什么区别?

    这之间有什么区别吗 if is null var do something 和这个 if var null do something 检查变量是否包含 null 时哪种形式更好 有什么我应该注意的边缘情况吗 我初始化了所有变量 因此不存在的
  • 将模块导入 Pyscript

    当我们编写 Python 代码时 我们通常使用导入的包和模块 例如 我们在编码时可能会这样写 import numpy import requests from bs4 import BeautifulSoup 当我们尝试将 python
  • 在 git 中,如何仅从更改的行中删除 Windows 行结尾?

    有时 当我尝试向开源项目贡献代码时 该项目尚未格式化并且包含 UNIX 和 Window 行结尾 我的 智能 IDE 会以某种方式检测每个文件使用哪种类型的结尾 如果它检测到 Windows 行结尾 那么我的所有更改都将具有 Windows
  • 更改 Sysem.Variants.VarToWideStr 的区域设置格式

    我的应用程序上的第三方组件 FastReports 广泛使用 System Variants VarToWideStr 函数 这很好 只是它忽略了我需要该应用程序使用的区域设置 Example FormatSettings ShortDat
  • Spark 跨接收器的结构化流一致性

    我想在以下情况下更好地理解 Spark 2 2 结构化流的一致性模型 一个来源 Kinesis 从此源向 2 个不同接收器进行 2 次查询 一个用于存档目的的文件接收器 S3 另一个用于处理数据的接收器 数据库或文件 尚未决定 我想了解跨接
  • 覆盖 django 的模型相关管理器

    我如何才能超越关系经理 例如 user entry set django db models fields related RelatedManager 但我需要自己的经理 我尝试这段代码 但这不起作用 class EntryManager
  • 为什么 QObject ::findChildren 返回具有公共基类的子级?

    我使用 QObject 作为复合模式的基类 假设我有一个父类 File 在一个人为的示例中 我向其中添加不同类型的子类 HeaderSection 和 PageSection File HeaderSection 和 PageSection
  • 简单表达式缺少参数类型

    遵循播放 websocket 示例 http www playframework com documentation 2 3 x ScalaWebSockets我遇到了一个奇怪的问题 文档中的以下示例正在运行 Future successf
  • 弹出并刷新视图控制器

    我有三个视图控制器 当我到达第三个视图控制器时 我使用 poptorootviewcontroller 弹出到我的第一个视图控制器 但是当我在第三个视图控制器中使用 popviewcontroller 我想返回到我的第二个视图控制器 时 它
  • 为什么 PowerShell 无法识别带引号的参数?

    当您直接调用脚本 在 PowerShell 控制台或 ISE 中 或通过另一个 PowerShell 实例调用脚本时 为什么 PowerShell 对带引号的参数的处理方式有所不同 这是脚本 TestQuotes ps1 param str
  • scala 中的非最终单例对象有什么意义?

    我一直以为objectScala 中的声明将被编译为final类 因为它们是由有效的匿名类实现的 自从final与非最终类相比 类更容易被 JVM 优化 我认为最终性有好处并且没有成本 所以所有object实施将是最终的 但我一定错过了一些
  • PySpark 使用临时 AWS 令牌进行 s3 身份验证的问题

    我已经设置了本地 PySpark 但是每次我尝试使用 s3a 协议读取文件 s3 时 它都会返回 403 AccessDenied 错误 我尝试连接的账户仅支持 AWS ShouldRole 它为我提供了临时 Access key Secr