使用 Spark (Python) 和 Dataproc 从 Google Storage 下载文件

2024-04-13

我有一个应用程序可以并行执行 Python 对象,这些对象处理要从 Google Storage(我的项目存储桶)下载的数据。该集群是使用 Google Dataproc 创建的。问题是数据从未被下载!我编写了一个测试程序来尝试理解这个问题。 我编写了以下函数来从存储桶中复制文件并查看在工作人员上创建文件是否有效:

from subprocess import call
from os.path import join

def copyDataFromBucket(filename,remoteFolder,localFolder):
  call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder]

def execTouch(filename,localFolder):
  call(["touch",join(localFolder,"touched_"+filename)])

我已经通过从 python shell 调用这个函数来测试它并且它可以工作。但是,当我使用 Spark-submit 运行以下代码时,不会下载文件(但不会引发错误):

# ...
filesRDD = sc.parallelize(fileList)
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output')
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output')
# ...

execTouch 函数有效(我可以看到每个工作进程上的文件),但 copyDataFromBucket 函数什么也不做。

那么我做错了什么?


问题显然出在 Spark 上下文上。通过调用“hadoop fs”替换对“gsutil”的调用可以解决此问题:

from subprocess import call
from os.path import join

def copyDataFromBucket(filename,remoteFolder,localFolder):
  call(["hadoop","fs","-copyToLocal",join(remoteFolder,filename),localFolder]

我还做了一个测试,将数据发送到存储桶。只需要将“-copyToLocal”替换为“-copyFromLocal”

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Spark (Python) 和 Dataproc 从 Google Storage 下载文件 的相关文章

随机推荐