Python 多处理池突然停止

2023-12-29

我正在尝试根据我的要求执行并行处理,并且代码似乎可以按预期并行处理 4k-5k 元素。但是,一旦要处理的元素开始增加,代码就会处理一些列表,然后在没有抛出任何错误的情况下,程序突然停止运行。

我检查过,程序没有挂起,RAM 可用(我有 16 Gb RAM),CPU 利用率甚至不到 30%。似乎无法弄清楚发生了什么。我有 100 万个元素需要处理。

def get_items_to_download():
    #iterator to fetch all items that are to be downloaded
    yield download_item

def start_download_process():
    multiproc_pool = multiprocessing.Pool(processes=10)
    for download_item in get_items_to_download():
        multiproc_pool.apply_async(start_processing, args = (download_item, ), callback = results_callback)
    
    multiproc_pool.close()
    multiproc_pool.join()

def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False

def results_callback(result):
    print(result)

if __name__ == "__main__":
    start_download_process()

UPDATE -

发现错误 - BrokenPipeError: [Errno 32] Broken pipeline

Trace -

Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 125, in worker
put((job, i, result))
File "/usr/lib/python3.6/multiprocessing/queues.py", line 347, in put
self._writer.send_bytes(obj)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

代码看起来是正确的。我唯一能想到的是你的所有进程都挂起等待完成。这里有一个建议:而不是使用提供的回调机制apply_async, 使用AsyncResult返回的对象以从进程获取返回值。您可以致电get在此对象上指定一个超时值(下面任意指定 30 秒,可能不够长)。如果任务在这段时间内没有完成,则会抛出超时异常(如果您愿意,您可以捕获它)。但这将检验进程挂起的假设。只需确保指定足够大的超时值,以便任务应在该时间段内完成。我还将任务提交分成了1000个批次,并不是因为我认为1,000,000个的大小是一个问题per se,但只是为了让您没有 1,000,000 个结果对象的列表。但是,如果您发现不再因此而挂起,请尝试增加批处理大小,看看是否确实会产生影响。

import multiprocessing

def get_items_to_download():
    #iterator to fetch all items that are to be downloaded
    yield download_item

BATCH_SIZE = 1000

def start_download_process():
    with multiprocessing.Pool(processes=10) as multiproc_pool:
        results = []
        for download_item in get_items_to_download():
            results.append(multiproc_pool.apply_async(start_processing, args = (download_item, )))
            if len(results) == BATCH_SIZE:
                process_results(results)
                results = []
        if len(results):
            process_results(results)
    

def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False

TIMEOUT_VALUE = 30 # or some suitable value

def process_results(results):
    for result in results:
        return_value = result.get(TIMEOUT_VALUE) # will cause an exception if process is hanging
        print(return_value)

if __name__ == "__main__":
    start_download_process()

Update

根据谷歌搜索多个页面来查找损坏的管道错误,您的错误似乎可能是内存耗尽的结果。看Python 多处理:增加池大小后出现损坏的管道异常 https://stackoverflow.com/questions/45230593/python-multiprocessing-broken-pipe-exception-after-increasing-pool-size, 例如。以下返工attempts使用更少的内存。如果有效,您可以尝试增加批量大小:

import multiprocessing


BATCH_SIZE = 1000
POOL_SIZE = 10


def get_items_to_download():
    #iterator to fetch all items that are to be downloaded
    yield download_item


def start_download_process():
    with multiprocessing.Pool(processes=POOL_SIZE) as multiproc_pool:
        items = []
        for download_item in get_items_to_download():
            items.append(download_item)
            if len(items) == BATCH_SIZE:
                process_items(multiproc_pool, items)
                items = []
        if len(items):
            process_items(multiproc_pool, items)


def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False


def compute_chunksize(iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
    if extra:
        chunksize += 1
    return chunksize


def process_items(multiproc_pool, items):
    chunksize = compute_chunksize(len(items))
    # you must iterate the iterable returned:
    for return_value in multiproc_pool.imap(start_processing, items, chunksize):
        print(return_value)


if __name__ == "__main__":
    start_download_process()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理池突然停止 的相关文章

随机推荐

  • Openshift 上的 WordPress 响应速度非常慢

    我刚刚将我的 Wordpress 网站移至 OpenShift PAAS 生态系统上的可扩展 PHP 盒上 但我立即注意到该网站的响应速度非常慢 大约 3000 4000 毫秒 但是 当它开始响应时 页面加载 渲染速度绝对很快 这是网址 h
  • onclick 事件不适用于选项

    以下是我的代码 在 Firefox 中运行良好 但在 chrome 中则不行 请让我知道如何解决这个问题 主要思想是根据选择框的选定值调用js函数
  • 如何以编程方式确定我的应用程序正在 iPhone、iPad 或 iPhone 4 上运行?

    我刚刚使用 cocos2d 完成了我的 iPhone 游戏 但在将其发布到 AppStore 之前 我想让它在 iPad 屏幕更大 和 iPhone 4 分辨率更大 上运行 那么 我如何以编程方式确定我的应用程序正在 iPhone iPad
  • Rabbitmq:在无限循环中重新处理失败的消息

    这是我的rabbitmq配置
  • 如何在 ObservableCollection 上执行 foreach lambda 表达式?

    我如何执行foreachObservableCollection 上的 lambda 表达式 没有方法foreach与 ObservableCollection 一起使用 尽管此方法与 List 一起存在 有没有可用的扩展方法 BCL 中默
  • 惰性初始状态 - 它是什么以及如何使用它?

    我是新来反应 Hooks 的 我正在尝试利用useState在我的代码中 当我使用它时 我发现了一个术语 惰性初始状态 https reactjs org docs hooks reference html lazy initial sta
  • 质数 JavaScript

    有人可以指导我在这里获取素数吗 这是家庭作业 所以我不想要答案 但如果有一些指示 我将不胜感激 这真的让我很烦 我想我已经很接近了 但我遇到的问题是 25 和 35 这些不是素数 但这个函数正在返回它们 var getPrimeNumber
  • 在本机应用程序中使用 Webrtc

    我的问题是 1 是否可以在我的 Native App 中拥有一个 WebView 由 Native 框架提供 实例 并扩展它以支持 Webrtc 如果 1 为 是 则可能执行以下操作 1 在 Android 上构建 webrtc 2 在 A
  • Angular 2 和 Angularfire2 中的三向绑定

    我正在尝试使用 AngularFire 2 2 0 0 beta 2 将输入元素三路绑定到 Angular js 2 2 0 0 rc 4 中的 firebase 数据库 我有一个非常简单的 html 例如
  • 安装 ruby​​ 1.9.3 时遇到问题

    我通过命令安装了 ruby 1 9 3rvm install 1 9 3在 mac 的终端上 安装后我收到这些错误 见下文 有人有任何想法来解决这个问题吗 运行 configure prefix Users Keta rvm rubies
  • Flutter pdf 生成图像速度太慢

    我正在 flutter 中开发 pdf 生成器应用程序 但是当我想向 pdf 添加图像时 需要很长时间 我也想知道如何添加多个图像 我使用 3 个库 图像选择器 pdf 打印 这是我的代码 Future getImage async var
  • Caliburn Micro WPF 窗口管理

    我想使用 caliburn micro 启动一个 WPF 应用程序 这样我就可以尽可能地使用 TDD 我之前在 WP7 中使用过 caliburn micro 但 WPF 似乎是另一艘船 并且文档不完整与 WP7 一样 我已经用我的 Boo
  • 是否可以在 Scala 解释器中定义伴随类/模块?

    在 Scala 解释器中进行测试通常很方便 然而 我遇到的一个问题是 我必须重构使用隐式转换的代码 因为定义一个与现有类同名的对象does not使其成为 REPL 中的配套模块 因此 当我翻译回 真实源代码 时 我不能确信我的代码仍然可以
  • 临时和表达行为

    这是明确定义的行为吗 const char p std string Hello std string World c str std cout lt lt p 我不知道 原因 不 这是未定义的行为 两个都std string临时对象和返回
  • Javascript 数组查找效率:关联与存储关联?

    我一直在阅读 他们说关联数组不会给你提供与数组相同的效率 关联数组可以在 O N 时间内查找内容 而数组可以在 O 1 时间内查找内容 这是我的问题 在快速查找值并且不占用太多内存方面 哪一个更有效 联想 var myVars new Ar
  • 在 C# 中创建自定义 ODBC / OLE 驱动程序

    有谁知道如何最好用 C 创建 ODBC 或 OLE 驱动程序 我想要做的是创建一个可以在 Excel 和 Access 中使用的自定义数据源 或者想出另一种方法来做到这一点吗 预先感谢您的回复 C 中有一个 OLE DB 驱动程序的简化版本
  • 根据行数调整 jqGrid 的大小? - 网格高度?

    我遇到了与帖子中详细说明的相同问题根据行数调整 jqGrid 的大小 https stackoverflow com questions 1972806 一些建议 看起来完全合乎逻辑 不起作用 因为当我尝试使用以下命令获取网格的高度时var
  • Thymeleaf 注册页面 - 执行处理器“org.thymeleaf.spring4.processor.attr.SpringInputGeneralFieldAttrProcessor”期间出错

    我正在为一个网站制作一个注册页面 我知道为了创建新用户 需要一个 id 所以我们有这个字段
  • mongodb 性能不佳

    我目前正在使用 mongodb 并且我发现查询性能非常差 可能需要几秒钟 场景如下 我有一个结构文件 id xxx userId yyy a 1 b 2 counter 1 在测试中 userId value could be 1 200
  • Python 多处理池突然停止

    我正在尝试根据我的要求执行并行处理 并且代码似乎可以按预期并行处理 4k 5k 元素 但是 一旦要处理的元素开始增加 代码就会处理一些列表 然后在没有抛出任何错误的情况下 程序突然停止运行 我检查过 程序没有挂起 RAM 可用 我有 16