multiprocessing.Pool:使用 apply_async 的回调选项时调用辅助函数

2023-12-25

流量如何apply_async调用可迭代(?)函数和回调函数之间的工作?

设置:我正在读取 2000 个文件目录中所有文件的一些行,有些有数百万行,有些只有几行。提取一些标题/格式/日期数据来表征每个文件。这是在 16 个 CPU 的机器上完成的,因此对其进行多处理是有意义的。

目前,预期结果正在发送到列表(ahlala)这样我就可以打印出来;稍后,这将写入 *.csv。这是我的代码的简化版本,最初基于this https://stackoverflow.com/questions/12483512/python-multiprocessing-apply-async-only-uses-one-process非常有帮助的帖子。

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala

注意,如果我把所有的都放在代码中,代码就可以工作Z(),辅助函数,进入X(), Y(), or results(),但这是否是重复的或者可能比可能的速度慢?我知道每次函数调用都会调用回调函数,但是什么时候调用回调函数呢?是之后吗pool.apply_async()...完成流程的所有工作?如果在第一个函数的作用域(?)内调用这些辅助函数,不是应该更快吗?pool.apply_async()需要(在这种情况下,X())?如果没有,我应该将辅助函数放入results()?

其他相关想法:守护进程为什么没有显示?我也很困惑如何对事物进行排队,以及这是否是问题所在。这似乎是一个开始学习的地方 http://hairycode.org/2013/07/23/first-steps-with-celery-how-to-not-trip/,但是在使用时可以安全地忽略队列apply_async,或者仅在明显的时间效率低下?


您在这里询问了一大堆不同的事情,所以我会尽力涵盖所有内容:

您传递给的函数callback一旦工作进程返回结果,就会在主进程(而不是工作进程)中执行。它是在一个线程中执行的Pool对象在内部创建。该线程消耗来自 a 的对象result_queue,用于获取所有工作进程的结果。线程从队列中取出结果后,执行callback。当回调正在执行时,无法从队列中提取其他结果,因此回调快速完成非常重要。以你的例子为例,一旦有一个电话X or Y你通过apply_async完成后,结果将被放入result_queue由工作进程执行,然后结果处理线程将从result_queue, 和你的callback将被执行。

其次,我怀疑您没有看到示例代码发生任何事情的原因是因为您的所有工作函数调用都失败了。如果工作函数失败,callback永远不会被执行。除非您尝试从AsyncResult https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult调用返回的对象apply_async。但是,由于您没有保存任何这些对象,因此您永远不会知道发生了故障。如果我是你,我会尝试使用pool.apply当您进行测试时,以便您在错误发生时立即看到它们。

工作人员可能失败的原因(至少在您提供的示例代码中)是因为X and Y被定义为另一个函数内的函数。multiprocessing通过在主进程中对它们进行 pickle 并在工作进程中取消它们来将函数和对象传递给工作进程。在其他函数内部定义的函数是不可picklable的,这意味着multiprocessing将无法在工作进程中成功取消它们。要解决此问题,请在模块的顶层定义这两个函数,而不是嵌入到模块中dirwalker功能。

你绝对应该继续打电话Z from X and Y,不在results。那样,Z可以在所有工作进程中同时运行,而不必在主进程中一次运行一个调用。请记住,你的callback函数应该尽可能快,这样就不会阻碍处理结果。执行中Z在那里会减慢速度。

下面是一些与您正在执行的操作类似的简单示例代码,希望能让您了解代码应该是什么样子:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

Output:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]

Edit:

您可以使用以下命令创建在父进程和子进程之间共享的 dict 对象multiprocessing.Manager class:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

然后使X and Y采用第二个参数称为helper_dict(或者任何你想要的名字),一切就都准备好了。

需要注意的是,这是通过创建一个包含普通字典的服务器进程来实现的,并且所有其他进程都通过代理对象与该字典进行通信。因此,每次读取或写入字典时,您都在进行 IPC。这使得它比真正的听写慢很多。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

multiprocessing.Pool:使用 apply_async 的回调选项时调用辅助函数 的相关文章

随机推荐