我有一个应用程序可以并行执行 Python 对象,这些对象处理要从 Google Storage(我的项目存储桶)下载的数据。该集群是使用 Google Dataproc 创建的。问题是数据从未被下载!我编写了一个测试程序来尝试理解这个问题。
我编写了以下函数来从存储桶中复制文件并查看在工作人员上创建文件是否有效:
from subprocess import call
from os.path import join
def copyDataFromBucket(filename,remoteFolder,localFolder):
call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder]
def execTouch(filename,localFolder):
call(["touch",join(localFolder,"touched_"+filename)])
我已经通过从 python shell 调用这个函数来测试它并且它可以工作。但是,当我使用 Spark-submit 运行以下代码时,不会下载文件(但不会引发错误):
# ...
filesRDD = sc.parallelize(fileList)
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output')
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output')
# ...
execTouch 函数有效(我可以看到每个工作进程上的文件),但 copyDataFromBucket 函数什么也不做。
那么我做错了什么?