原生安装 PySpark 也支持 S3 访问

2024-01-03

我想从 PySpark 读取存储在 S3 上的 Parquet 数据。

我从这里下载了 Spark:

http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz

并天真地安装到Python

cd python
python setup.py install

这似乎运行良好,我可以导入 pyspark,创建 SparkContext 等。但是,当我去阅读一些可公开访问的镶木地板数据时,我得到以下信息:

import pyspark
sc = pyspark.SparkContext('local[4]')
sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3://bucket-name/mydata.parquet')

我收到以下异常

Py4JJavaError: An error occurred while calling o55.parquet.
: java.io.IOException: No FileSystem for scheme: s3
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

这个错误是从谷歌搜索中弹出的。到目前为止,所提供的解决方案都没有帮助。

我在一台个人计算机上使用Linux(Ubuntu 16.04),没有安装太多其他东西(一切都很简单)。

Update

我降级到http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.4.tgz http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.4.tgz默认情况下包含 AWS。

不幸的是,现在我的 AWS 凭证没有被获取。我尝试过一些事情:

  1. 将它们作为 SparkConf 参数包含在内

    conf = (pyspark.SparkConf()
                   .set('fs.s3.awsAccessKeyId', ...')
                   .set('fs.s3.awsSecretAccessKey', '...'))
    sc = pyspark.SparkContext('local[4]', conf=conf)
    
  2. 将它们包含在我的本地 .aws/credentials 文件中
  3. 将它们包含在 URL 中(不起作用,因为我的访问密钥有一个正斜杠)

不幸的是,在所有情况下我都会收到如下回溯

IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).'

使用预构建的 Spark 2.X 二进制文件的 Hadoop-2.4 版本(我相信它附带 s3 功能),您可以通过编程方式配置 Spark 以通过以下方式提取 s3 数据:

import pyspark
conf = pyspark.SparkConf()

sc = pyspark.SparkContext('local[4]', conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "")

sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3n://bucket-name/mydata.parquet')

需要注意的关键一点是前缀s3n在存储桶的 URI 和配置名称中

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

原生安装 PySpark 也支持 S3 访问 的相关文章

  • 需要根据数据框中的行号应用不同的公式

    我正在努力在数据框中找到某种移动平均值 该公式将根据正在计算的行数而变化 实际场景是我需要计算Z列 Edit 2 以下是我正在使用的实际数据 Date Open High Low Close 0 01 01 2018 1763 95 176
  • 按每个元素中出现的数字对字符串列表进行排序[重复]

    这个问题在这里已经有答案了 我有一个脚本 其目的是对不断下载到服务器上的空间数据集文件进行排序和处理 我的列表目前大致如下 list file t00Z wrff02 grib2 file t00Z wrff03 grib2 file t0
  • 学习Python中的解析器

    我记得我读过有关解析器的内容 您只需提供一些示例行 它就知道如何解析某些文本 它只是确定两条线之间的差异 以了解可变部分是什么 我以为它是用 python 写的 但我不确定 有谁知道那是什么图书馆吗 可能你的意思是模板制作器 http co
  • 在 Python 中延迟转置列表

    所以 我有一个延迟生成的可迭代的三元组 我试图弄清楚如何将其转换为 3 个可迭代对象 分别由元组的第一个 第二个和第三个元素组成 然而 我希望这件事能懒惰地完成 所以 举例来说 我希望 1 2 3 4 5 6 7 8 9 将变成 1 4 7
  • 如何在Python + Selenium中获取元素的值

    我在我的 Python 3 6 3 代码中得到了这个 HTML 元素 作为 Selenium网页元素当然 span class ocenaCzastkowa masterTooltip style color 000000 alt 5 sp
  • 我可以同时打开两个 Tkinter Windows 吗?

    可以同时打开2个窗口吗 import tkinter as Tk import random import math root Tk Tk canvas Tk Canvas root background image Tk PhotoIma
  • Discord.py 斜线命令在 cogs 中不起作用

    我正在构建一个不和谐的机器人 并且想要在 cogs 内使用斜杠命令 但这些命令不显示或工作 这是代码 cog guild ids 858573429787066368 861507832934563851 class Slash comma
  • NumPy 数组与 SQLite

    我在 Python 中见过的最常见的 SQLite 接口是sqlite3 但是有什么东西可以很好地与 NumPy 数组或 rearray 配合使用吗 我的意思是 它可以识别数据类型 不需要逐行插入 并提取到 NumPy rec 数组中 有点
  • 在多核上运行 python 线程

    我知道Python 2 7不允许在不同的内核上运行多个线程 你需要使用multiprocessing模块以实现某种程度的并发性 我正在看concurrent futuresPython 3 4 中的模块 是否使用ThreadPoolExec
  • lmfit模型拟合然后预测

    我正在领养lmfit进行曲线拟合并使用拟合模型进行预测 然而下面的代码并没有达到我想要的效果 能否请你帮忙 谢谢 import numpy as np from lmfit import Model def linearModel x a0
  • cxfreeze virtualenv 中缺少 distutils 模块

    从 python3 2 项目运行 cxfreeze 二进制文件时 我收到以下运行时错误 project dist project distutils init py 13 UserWarning The virtualenv distuti
  • matplotlib:渲染到缓冲区/访问像素数据

    我想使用 matplotlib 生成的图作为 OpenGL 中的纹理 到目前为止 我遇到的 matplotlib 的 OpenGL 后端要么不成熟 要么已经停止使用 所以我想避免使用它们 我当前的方法是将图形保存到临时 png 文件中 并从
  • 右键单击 QPushButton 上的 contextMenu

    对于我的应用程序 我在 Qt Designer 中创建了一个 GUI 并将其转换为 python 2 6 代码 关于一些QPushButton 与设计器创建 我想添加右键单击上下文菜单 菜单选项取决于应用程序状态 如何实现这样的上下文菜单
  • matplotlib vlines 图中未应用 y 轴的最小值

    我正在 matplotlib 中绘制 vlines 图 数据集中的所有 y 值如下 gt 0 我希望 y 轴最底部的刻度能够读取0 但相反 我得到 500 这是代码 usr bin env python import numpy as np
  • “KMeans”对象没有属性“k”

    我使用 Yellowbrick 包绘制数据集的肘部曲线 以使用 KMeans 作为模型找到数据集的最佳簇数 我正在使用 Scikit learn KMeans 和 Yellowbrick kelbowvisualizer 函数 生成了肘部曲
  • Django 1.7:如何使用 html/css 文件作为模板发送电子邮件

    从 Django 1 7 开始 可以send email 使用新参数 html message 不幸的是 没有关于如何使用它的全面指南 新手友好 或者至少我找不到它 我需要使发送的电子邮件变得漂亮 因此 我试图弄清楚如何将我的消息包含到 h
  • 列表中的特定范围(python)

    我有一个从文本字符串中提取的整数列表 因此当我打印该列表 我称之为test I get 135 2256 1984 3985 1991 1023 1999 我想打印或制作一个仅包含特定范围内的数字的新列表 例如1000 2000之间 我尝试
  • 在Python中随机交错2个数组

    假设我有两个数组 a 1 2 3 4 b 5 6 7 8 9 我想将这两个数组交错为变量 c 注意 a 和 b 不一定具有相同的长度 但我不希望它们以确定性的方式交错 简而言之 仅仅压缩这两个数组是不够的 我不想要 c 1 5 2 6 3
  • 解析整数集的字符串并列出间隔

    I have 2 5 7 9 12 string 我想从中获取 2 5 7 8 9 12 列表 python中有没有内置的函数 Thanks UPD 我想 直接的答案是No 不管怎样 谢谢你的 片段 使用一个 建议者斯文 马尔纳克 s 2
  • Django 中使用外键的抽象基类继承

    我正在尝试在 Django 支持的网站上进行模型继承 以遵守 DRY 我的目标是使用一个名为 BasicCompany 的抽象基类来为三个子类提供通用信息 Butcher Baker CandlestickMaker 它们位于各自的应用程序

随机推荐