PySpark UDF 优化挑战

2024-01-19

我正在尝试优化下面的代码。当运行 1000 行数据时,大约需要 12 分钟才能完成。我们的用例需要数据大小约为 25K - 50K 行,这将使此实现完全不可行。

import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf

inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)

test_df = df.select('uid', 'content').limit(1000).repartition(10)

# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1

def load_glove(fn):
    vector_dict = {}
    count = 0
    with open(fn) as inf:
        for line in inf:
            count += 1
            eles = line.strip().split()
            token = eles[0]
            try:
                vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
                assert len(vector_dict[token]) == 300
            except:
                print("Exception in load_glove")
                pass
    return vector_dict

# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = spacy.load('en', max_length=6000000)
  gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
  glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
  spacy_doc = nlp(text)
  doc_vec = numpy.array([0.0] * 300)
  doc_vec = numpy.float32(doc_vec)
  wordcount = 0
  for sentence_id, sentence in enumerate(spacy_doc.sents):
      for word in sentence:
          if word.text in glove_embeddings_dict:
              # Pre-convert to glove dictionary to float32 representations
              doc_vec += numpy.float32(glove_embeddings_dict[word.text])
              wordcount += 1

  # Document Vector is the average of all word vectors in the document
  doc_vec = doc_vec/(1.0 * wordcount)
  return doc_vec.tolist()

spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)

document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()

# print(pandas_document_vector_df)
pandas_document_vector_df.head()

我想知道你们是否可以帮忙回答以下问题

每次迭代都会调用 spacy.load() 和 load_glove() 方法吗? 有没有办法为每个工作节点准备一次 load_glove() 数据,而不是为每行数据准备一次? load_glove 方法返回一个字典对象,最大可达 5GB。有没有办法在主节点上准备好,然后作为参数传递给 UDF?

感谢您的建议。提前致谢!


是的,在当前的实现中,每次运行函数时都会执行所有模型加载代码,这远非最佳。无法将其从驱动程序直接传递到工作节点,但有一种类似的方法 - 在每个工作节点上初始化模型,但只能初始化一次。为此,您必须使用惰性函数,该函数仅在需要实际结果时才会执行 - 因此,对于工作人员而言。

尝试做这样的事情:

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load('en', max_length=6000000)
    SPACY_MODEL = _model
    return SPACY_MODEL

@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = get_spacy_model()
  # your further processing

我认为您可以尝试将手套加载代码添加到类似的函数中。

您可以尝试在这里阅读更多相关内容:https://haridas.in/run-spacy-jobs-on-apache-spark.html https://haridas.in/run-spacy-jobs-on-apache-spark.html(这不是我的博客,只是在尝试使用 Spacy 模型做同样的事情时发现了此信息)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark UDF 优化挑战 的相关文章

随机推荐

  • VS2008如何从Web Developer更改为C# Developer设置

    我刚刚重新安装了 vs2008 第一次运行时不小心选择了 Web Developer 而不是 C Developer 现在我习惯的所有键绑定都是错误的 如何将其更改为 C Developer 我尝试了 devenv resetsetting
  • URL Selenium 超出最大重试次数 [重复]

    这个问题在这里已经有答案了 因此 我希望遍历 URL 数组并打开不同的 URL 以使用 Selenium 进行网页抓取 问题是 一旦我点击第二个 browser get url 我就会收到 URL 超出最大重试次数 和 无法建立连接 因为目
  • Symfony2,原则 2:getResult 对象

    posts em gt find Application BlogBundle Entity Post 1 print r posts 为什么我得到了 Barii BlogBundle Entity Post Object id Barii
  • 欧拉计划#29

    嗯 解决了这个问题之后通过天真 的STL集 我正在阅读论坛条目 在那里我找到了这个条目 include
  • Java Swing JToolBar

    我创造了JToolBar Java 摇摆 我在框架上设置了一个背景图像 其中包含JToolBar 我想要我的JToolBar是透明的 以便保持在框架上的图像应该是可见的 我在用setOpaque false 但它对我的工具栏没有任何影响 以
  • 重置子元素的不透明度 - Maple 浏览器(三星电视应用程序)

    我在创建具有子元素的透明元素时遇到问题 使用此代码 子元素从父元素获取不透明度值 我需要将子元素的不透明度重置 设置为任意值 参考浏览器是Maple Browser for a Samsung TV Application video ca
  • 如何在material-ui中将焦点设置在MenuItem上

    我正在尝试以编程方式将焦点设置在 激活 material ui 中菜单组件内的菜单项之一上 我可以通过按 Tab 键手动执行此操作 但我需要以编程方式执行此操作以响应按键事件 menu menu
  • 如何在 Telerik ASP .NET MVC 网格上将布尔值从 true/false 转换为 yes/no

    我希望能够更改 ASP NET MVC 中不可编辑的 Telerik AJAX 网格上不可编辑列的显示值 有问题的列是一个布尔值 因此显示转换将为 Yes true 和 No False 我做了一些实验 发现这有效 不确定它是否会保留在可编
  • 从 Schittkowski DAE 测试套件中求解 PENDULUM2?

    我只是试图解决 Schittkowski DAE 测试套件中的 DAE 问题之一 http klaus schittkowski de mc dae htm http klaus schittkowski de mc dae htm 但没有
  • 如何以相同的方式修改或替换字典中的每个值?

    给定一个像这样的字典myDict ten 10 fourteen 14 six 6 如何修改每个值 例如 我想将每个值除以二 这样myDict变成 ten 5 fourteen 7 six 3 就位 而不是创建新字典 迭代键和值 for k
  • 检查 URL 的内容:是文件还是网页?

    我有一个应用程序 需要根据内容采取不同的操作URL 如果内容是文件 我需要下载它 但是 如果内容是网页 我需要打开它 据我所知 有两种 URL 类型 直接链接 例如 https dl ssl google com android repos
  • WPF 应用程序的多线程策略需要建议

    我正在构建一个单窗口 WPF 应用程序 窗口中是一个列表项 当然 它们保存在数据库中 我需要定期启动一个后台任务 从 Atom feed 更新数据库 当每个新项目添加到数据库中时 UI 中的列表也必须更新以反映这一点 我不希望这个后台任务减
  • 滑块输入延迟

    有没有办法使sliderInput http shiny rstudio com reference shiny latest sliderInput html等待几秒钟 然后它会更改其对应的input 多变的 我有一个栏正在控制需要根据值
  • 使用 Mac 应用程序部署 Qt 框架以及 otool 的使用

    我在使用我的 Mac 应用程序部署 Qt 框架时遇到问题 我希望有人能知道为什么当我在干净的 Mac 即不是开发人员 Mac 上运行该应用程序时会出现此错误 操作系统 10 7 2 并使用 XCode 错误消息 Library not lo
  • 如何使用列表(或元组)作为字符串格式化值

    假设这个变量 s Python rocks x s s s 0 s 1 现在我想替换更长的列表 并分别添加所有列表值 例如 s 0 s 1 s n 似乎不正确 引用自文档 给定格式 值 如果格式 需要一个参数 值可以 是单个非元组对象 4
  • 如何在 F# 中的集群配置中创建参与者

    我正在创建一个示例Akka Cluster具有三个节点 A B 和 C 其中 A 是灯塔 到目前为止 从日志来看 当没有参与者或参与者是本地的 使用创建的 时 集群工作正常spawn and spawnOpt 我想从 B 创建一个 acto
  • TextMate:注释行快捷方式不再起作用(我的瑞士布局上的 Cmd-/ 或 Cmd-Shift-7)

    一段时间以来 我在 TextMate 中遇到了一种非常奇怪的行为 I had troubles to use the keyboard shortcut for commenting a line which is Cmd or on my
  • 如何在 Swift 中为 IOS 生成 RSA 非对称密钥对?

    我需要一种在 Swift 中生成 RSA 非对称密钥对的方法 我不需要将它存储在钥匙串或任何东西中 我只需要生成一个密钥对并将两个密钥放入字符串变量中 这些密钥确实需要与另一端的 PHP 兼容 我将使用对称加密来保护私钥并将其存储在手机上
  • isset 具有可变数组键数

    该数组是多维的 并且具有可变数量的子键 例如 arr a b c X 在这种情况下为3 我想创建一个函数 它接受一个像这样的字符串a b c作为参数并检查数组中是否包含该键 然后取消设置它 unset arr a b c 如果我给它a b那
  • PySpark UDF 优化挑战

    我正在尝试优化下面的代码 当运行 1000 行数据时 大约需要 12 分钟才能完成 我们的用例需要数据大小约为 25K 50K 行 这将使此实现完全不可行 import pyspark sql types as Types import n