当有更多可用机器时,Spark 仅使用一台工作机器

2024-01-25

我正在尝试通过 Spark 并行化机器学习预测任务。我之前已经在其他任务中成功使用过 Spark 多次,并且之前没有遇到过并行化问题。

在这个特定任务中,我的集群有 4 个工作线程。我在具有 4 个分区的 RDD 上调用 mapPartitions。映射函数从磁盘加载模型(引导脚本分发执行此操作所需的所有内容;我已经验证它存在于每台从机上)并对 RDD 分区中的数据点执行预测。

代码运行,但仅使用一个执行器。其他执行程序的日志显示“已调用关闭挂钩”。在代码的不同运行中,它使用不同的机器,但一次仅使用一台机器。

如何让 Spark 同时使用多台机器?

我通过 Zeppelin 笔记本在 Amazon EMR 上使用 PySpark。代码片段如下。

%spark.pyspark

sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")

from ModelLoader import ModelLoader
from MyClassifier import MyClassifier

def load_models():
    models_path = '/home/hadoop/models'
    model_loader = ModelLoader(models_path)

    models = model_loader.load_models()
    return models

def process_file(file_contents, models):
    filename = file_contents[0]
    filetext = file_contents[1]
    pred = MyClassifier.predict(filetext, models)
    return (filename, pred)

def process_partition(file_list):
    models = load_models()
    for file_contents in file_list:
        pred = process_file(file_contents, models)
        yield pred


all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')

正如预期的那样,有四个任务,但它们都在同一个执行器上运行!

我正在运行集群,并且可以提供资源管理器中可用的日志。我只是还不知道该去哪里寻找。


这里要提两点(但不确定他们是否能解决您的问题):

  1. wholeTextFiles uses WholeTextFileInputFormat这延伸了CombineFileInputFormat,并且因为CombineFileInputFormat,它会尝试将一组小文件合并到一个分区中。因此,如果您将分区数设置为 2,您“可能”会得到两个分区,但这并不能保证,这取决于您正在读取的文件的大小。
  2. 的输出wholeTextFiles是一个 RDD,其中每条记录都包含整个文件(并且每个记录/文件无法拆分,因此它将最终位于单个分区/工作线程中)。因此,如果您仅读取一个文件,则尽管您在示例中将分区设置为 4,但最终您仍会将整个文件存储在一个分区中。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

当有更多可用机器时,Spark 仅使用一台工作机器 的相关文章

随机推荐

  • PHP脚本自动创建文件结构表示[重复]

    这个问题在这里已经有答案了 可能的重复 PHP 遍历文件夹并显示 HTML 内容 https stackoverflow com questions 2769175 php iterate through folders and displ
  • getActivity() 找不到符号 symbol :方法 getActivity()

    我需要有关我的彩信应用程序的此文件的帮助 getActivity 导致构建错误 Error cannot find symbol method getActivity 到目前为止 我已经尝试了很多方法来使这项工作正常进行 例如extends
  • 无法使用装饰器覆盖类对象的 str 结果

    请参阅下面我的回答 发布问题后我意识到发生了什么事 我尝试使用装饰器覆盖类的字符串表示形式不起作用 我一定错过了一些东西 但不知道它是什么 from functools import wraps def str dec obj wraps
  • 水晶报告 .net 4 的问题

    我之前在 net 3 5 中使用过 Crystal Reports 但自从升级到 Net 4 0 后 我遇到了以下错误 错误 5 找不到类型或命名空间名称 CrystalDecisions 是否缺少 using 指令或程序集引用 C Use
  • Robocopy 命令行 - 带空格的文件

    我正在尝试运行以下命令 robocopy exe eisdevl nas gatech edu coldfusion devl cfapps cfeis mybeap eisdevl nas gatech edu coldfusion te
  • 在Java中,如何将十六进制字符串转换为byte[]? [复制]

    这个问题在这里已经有答案了 我在 Java 中使用以下函数将加密字符串转换为十六进制格式 public static String toHex byte buf StringBuffer strbuf new StringBuffer bu
  • 在 ggplot 和 stat_function() 中叠加对数正态密度

    我尝试通过叠加一个函数stat function in ggplot但无法弄清楚我的错误 这个例子产生了一个漂亮的图 data lt data frame x rt 10000 df 7 ggplot data data aes x x g
  • 当窗口移动到屏幕左上角时如何禁用窗口最大化?

    我有一个设置了ResizeMode CanResizeWithGrip 和AllowTransparency true 的窗口 它工作正常 直到它移动到屏幕顶部 然后自动最大化 如何阻止它最大化 以便我可以将屏幕显示为位于屏幕顶部的窗口 T
  • 如何在测试中手动模拟 Svg?

    我在我的应用程序中使用存根文件来模拟图像 这对我来说 99 的时间都有效 但是 我有一个组件可以根据输入渲染不同的图像 因此我希望能够在单元测试中检查输入是否创建了正确的输出 基本上我想做的是 如果用户输入 狮子 我的组件将显示狮子的图片
  • 使用外部 jar“不是托管类型”的 Spring 启动

    我有一个正在拉入公共罐子的弹簧应用程序 该 jar 包含带注释的 DTO 类 运行 mvn clean build 命令成功运行并构建 jar 一旦我运行 java jar target MyApp 1 0 0 BUILD SNAPSHOT
  • 将 mime 多部分主体部分写入输出流时出错

    我有执行异步文件上传的代码 该代码在我的开发虚拟机上运行良好 但在将其部署到客户端系统后 我不断收到此错误 将 mime 多部分主体部分写入输出流时出错 我知道这是抛出错误的行 但我似乎无法弄清楚为什么 Read the form data
  • 可用的viewcell按钮

    我有 tableview 我在其中对 tableviewcell 进行了子类化 单元格中有一个水平滚动视图 我向滚动视图添加动态按钮 我的要求 1 当我第一次点击 row0 上的按钮时 我需要为点击的按钮设置不同的 BG 颜色 并在数组中添
  • 运行“app”时出错:Android studio 3.1 中出现未知错误

    我已经将我的 android studio 更新到了新的稳定版3 1版 构建项目后无法运行 如果有人遇到同样的问题或找到任何解决方案 请告诉我 只需前往 运行 编辑配置 并向下滚动到窗口底部 在这里您会看到一个选项 发射前 首先 删除小窗口
  • 如何使用 GNU Parallel 编写多核排序

    GNU 并行 http www gnu org software parallel GNU并行是一个shell工具 用于使用一台或多台计算机并行执行作业 例如 如果我想编写一个多核版本wc我可以做 cat XXX parallel bloc
  • 如何使用 awk 每 n 行插入一个空行?

    我有一个像这样的输入文件 line 1 line 2 line 3 line 4 line 5 line 6 我想使用 awk 每隔几行插入一个空行 例如 每两个 line 1 line 2 line 3 line 4 line 5 lin
  • Mac 上的 Mercurial“未提供用户名”错误

    我刚刚在 OSX Mountain Lion Max 10 8 上安装了 Mercurial 在第一次提交时出现错误 abort no username supplied see hg help config 我看到了很多答案 这些答案表明
  • make找不到tools.jar

    运行Ubuntu 12 04 我已经添加到路径 home jeffrey jdk1 6 0 43 lib 我正在尝试使用 Make 从源代码构建 make j16 但遇到错误 build core config mk 268 Error c
  • 来自 pandas Dataframe 的具有不确定性的 LaTeX 表

    我目前正在编写一份报告 其中包含用 python 计算并存储在 pandas DataFrame 中的许多值和不确定性 这些值必须放入报告中 包括错误 目前我唯一的方法是手动将值与错误合并 其中一个示例如下所示 begin tabular
  • 如何MVC 5下拉(多选)框

    我在使用这个下拉框时遇到了问题 似乎无法正确处理 代码如下 查看 Index cshtml using EvaSimulator Models Model EvaSimulator Models ModelVariables ViewBag
  • 当有更多可用机器时,Spark 仅使用一台工作机器

    我正在尝试通过 Spark 并行化机器学习预测任务 我之前已经在其他任务中成功使用过 Spark 多次 并且之前没有遇到过并行化问题 在这个特定任务中 我的集群有 4 个工作线程 我在具有 4 个分区的 RDD 上调用 mapPartiti