我正在尝试通过 Spark 并行化机器学习预测任务。我之前已经在其他任务中成功使用过 Spark 多次,并且之前没有遇到过并行化问题。
在这个特定任务中,我的集群有 4 个工作线程。我在具有 4 个分区的 RDD 上调用 mapPartitions。映射函数从磁盘加载模型(引导脚本分发执行此操作所需的所有内容;我已经验证它存在于每台从机上)并对 RDD 分区中的数据点执行预测。
代码运行,但仅使用一个执行器。其他执行程序的日志显示“已调用关闭挂钩”。在代码的不同运行中,它使用不同的机器,但一次仅使用一台机器。
如何让 Spark 同时使用多台机器?
我通过 Zeppelin 笔记本在 Amazon EMR 上使用 PySpark。代码片段如下。
%spark.pyspark
sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")
from ModelLoader import ModelLoader
from MyClassifier import MyClassifier
def load_models():
models_path = '/home/hadoop/models'
model_loader = ModelLoader(models_path)
models = model_loader.load_models()
return models
def process_file(file_contents, models):
filename = file_contents[0]
filetext = file_contents[1]
pred = MyClassifier.predict(filetext, models)
return (filename, pred)
def process_partition(file_list):
models = load_models()
for file_contents in file_list:
pred = process_file(file_contents, models)
yield pred
all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')
正如预期的那样,有四个任务,但它们都在同一个执行器上运行!
我正在运行集群,并且可以提供资源管理器中可用的日志。我只是还不知道该去哪里寻找。