Python 多处理作业到 Celery 任务但 AttributeError

2024-03-23

我做了一个像这样的多处理函数,

import multiprocessing
import pandas as pd
import numpy as np

def _apply_df(args):
    df, func, kwargs = args
    return df.apply(func, **kwargs)

def apply_by_multiprocessing(df, func, **kwargs):
    workers = kwargs.pop('workers')
    pool = multiprocessing.Pool(processes=workers)
    result = pool.map(_apply_df, [(d, func, kwargs)
            for d in np.array_split(df, workers)])
    pool.close()
    return pd.concat(list(result))

def square(x):
    return x**x

if __name__ == '__main__':
    df = pd.DataFrame({'a':range(10), 'b':range(10)})
    apply_by_multiprocessing(df, square, axis=1, workers=4)  
    ## run by 4 processors

上面的“apply_by_multiprocessing”可以并行执行Pandas Dataframe apply。但是当我执行 Celery 任务时,它引发了断言错误:“Worker”对象没有属性“_config”。

from celery import shared_task

@shared_task
def my_multiple_job():
    df = pd.DataFrame({'a':range(10), 'b':range(10)})
    apply_by_multiprocessing(df, square, axis=1, workers=4)  

它的错误跟踪是这样的,

  File "/Users/yong27/work/goldstar/kinmatch/utils.py", line 14, in apply_by_multiprocessing
    pool = multiprocessing.Pool(processes=workers)
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 146, in __init__
    self._setup_queues()
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 238, in _setup_queues
    self._inqueue = self._ctx.SimpleQueue()
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__
    self._rlock = ctx.Lock()
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock
    return Lock(ctx=self.get_context())
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 164, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 60, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 118, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'

看来是因为 Celery Worker 不是一个正常的进程。我怎么解决这个问题?我正在使用Python3.4、Django 1.6.2、celery 3.1.10、django-celery 3.1.9、pandas 0.12.0。


这个问题有很好的答案在另一个问题中 https://stackoverflow.com/questions/27904162/using-multiprocessing-pool-from-celery-task-raises-exception

基本上,它是一个Celery 的已知问题 https://github.com/celery/celery/issues/1709#issuecomment-127246663 and a 肮脏的黑客提供:它对我有用,我刚刚在定义任务的同一文件中添加了以下代码:

from celery.signals import worker_process_init
from multiprocessing import current_process

@worker_process_init.connect
def fix_multiprocessing(**kwargs):
    try:
        current_process()._config
    except AttributeError:
        current_process()._config = {'semprefix': '/mp'}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理作业到 Celery 任务但 AttributeError 的相关文章

随机推荐

  • 如何使用纯 CSS 在 HTML 中获取一棵树

    我正在尝试遵循这个tutorial http odyniec net articles turning lists into trees 这是my code https github com gurjeet CSSTree so far 本
  • python 使用 for 循环将用户输入添加到空列表中

    所以我想使用此代码将用户输入添加到空列表中 no of num int input enter the number of numbers you would like to add n this will store the number
  • 在经典ASP页面调用存储过程

    我一整天都在努力从经典 ASP 页面调用存储过程 我有一些基本的菜鸟问题 首先 这是向我的命令添加参数的最佳方法吗 cmd Parameters Append cmd CreateParameter SubmissionDate adDBT
  • 如何使用 cmp 将排序从 python 2 转换为 python 3?

    我正在尝试将用 python 2 编写的代码转换为 python 3 nums 30 31 num sort cmp lambda x y cmp y x x y 不知道如何在 python 3 中做到这一点 因为 cmp 被删除了 我相信
  • 测试 Dart 值是否实际上是一个函数?

    是否可以测试一个值是否是一个可以调用的函数 我可以轻松测试 null 但之后我不知道如何确保传入的参数实际上是一个函数 void myMethod funcParam if funcParam null How to test if fun
  • 检查 VSTS 中以前的构建信息(VSTS API)

    在执行 VSTS 构建期间是否可以访问以前的构建变量 例如 我可以获得先前构建的 Build SourceVersion 或 Build QueuedBy 吗 我可以通过 Build SourceVersion 等构建变量获取当前构建信息
  • 使用 LDAP 用户名作为实体名称

    我正在使用 Hashicorp Vault 1 9 0 并且启用了 LDAP 身份验证方法 该方法按预期正常工作 但有一个问题 LDAP 是我们独特的身份验证方法 当然 除了令牌之外 并且 LDAP 用户不是提前在 Vault 中创建的 因
  • 在 PHP 中搜索并获得双重结果

    这是我的 PHP 代码 我用它从 PHPMyAdmin 的数据库 Mysql 中进行搜索 但是当我得到结果时 它显示双倍 我不明白为什么是双的 如果它是 for foreach 循环 那么我将使用什么来代替它 请帮我解决代码
  • Python (CherryPy) Web 应用程序部署在本地,但在 Intranet 上不可见

    我使用 CherryPy 创建了一个 Python Web 应用程序 并部署在我的本地计算机上 当我尝试从家里的另一台计算机上查看它时 没有任何结果 但是 如果我创建一个简单的 html 文件 并使用以下命令部署它 python m Sim
  • 如何将 SAP .txt 提取转换为 .csv 文件

    我有一个 txt 文件 如下面报告的示例所示 我想将其转换为 csv 表 但我没有取得太大成功 Mack3 Line Item Journal Time 14 22 33 Date 03 10 2015 Panteni Ledger 1L
  • Jquery随机单词不重复

    我需要在 div 中显示随机单词而不重复该单词 随机单词将每隔随机秒 3 5 秒 附加一个 div 如果数组中的所有值都显示在 div 中 则会发出警报 例子 b a c d ALERT DONE Not b a b c d d a a c
  • 无法聚焦 Web 元素来发送密钥

    我有一组测试 需要登录然后执行搜索 我的应用程序是 Angular JS 我正在起诉 Protractor 事实上 我的测试工作正常 直到我收到我的应用程序的新套件 并且此时我的所有测试都失败了 element by id mainGlob
  • C# 访问另一个用户的注册表

    我当前使用的 Windows 服务有问题 基本上 我将一些值存储在HKCU注册表 来自以管理员身份运行的 GUI 工具 并从该 GUI 中启动一项服务 该服务使用SYSTEM帐户来运行 我相信这是我的问题 我无法访问服务中使用 GUI 工具
  • 从命令行将 JAR 依赖项与可执行 JAR (Über JAR) 捆绑在一起

    我正在尝试从命令行创建可执行 jar JAR 中的主类具有我已打包到另一个普通 JAR 文件中的依赖项 我想将依赖项 JAR 与可执行 JAR 打包在一起 以便提供单个 JAR 文件 到目前为止我已经尝试过以下内容 依赖Hello clas
  • C++ 和抽象类中的继承

    我在正确处理存在抽象类的方法重写时遇到问题 在我的类层次结构中 我会尝试解释一下 class AbstractClass public virtual void anyMethod 0 class A public AbstractClas
  • 使用 WinSCP .NET 未找到方法异常 (EventWaitHandle..ctor)

    我正在尝试使用 PowerShell 和 WinSCP NET 程序集连接到 SFTP 服务器 代码无法打开会话 session Open sessionOptions 在我发现的日志中 Exception System MissingMe
  • 每个版本的 IE 在新窗口中打开链接

    我一直在设计一个网站 并使用 Safari 和 Chrome 进行大部分测试 我刚刚尝试测试 Firefox 也很顺利 让 IE 来解决吧 由于某种原因 对于从 IE 6 到 IE 10 的每个版本 每个链接都会打开一个新窗口 在 IE 1
  • 将文本包裹在圆形 div 内[重复]

    这个问题在这里已经有答案了 我想完成这样的事情 我尝试创建一个 div 并给它一个border radius of 50 问题是文本溢出了圆角 我怎样才能做到这一点 以便整个圆圈可以被填满而不溢出 那这个呢 div border 1px s
  • XML 在 python 中行走[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我是 python 新手 想了解解析
  • Python 多处理作业到 Celery 任务但 AttributeError

    我做了一个像这样的多处理函数 import multiprocessing import pandas as pd import numpy as np def apply df args df func kwargs args retur