Python multiprocessing.pool 与类目标函数和神经进化的交互

2023-11-29

警告,这将是很长一段时间,因为我想尽可能具体。


确切的问题:这是一个多重处理问题。我确保我的类都按照之前实验中构建/预期的方式运行。

编辑:事先说过线程。


当我在线程环境中运行问题的玩具示例时,一切正常;然而,当我转向真正的问题时,代码就崩溃了。具体来说,我得到一个TypeError: can't pickle _thread.lock objects错误。完整堆栈位于底部。

我这里的线程需求与我改编代码的示例有点不同——https://github.com/CMA-ES/pycma/issues/31。在此示例中,我们有一个适应度函数,每次评估都可以独立调用该函数,并且所有函数调用都不能相互交互。然而,在我的实际问题中,我们正在尝试使用遗传算法来优化神经网络权重。遗传算法将建议潜在的权重,我们需要在我们的环境中评估这些神经网络控制器权重。在单线程情况下,我们只能有一个环境,在其中使用简单的 for 循环评估权重:[nn.evaluate(weights) for weights in potential_candidates],找到表现最好的个体,并在下一轮突变中使用这些权重。然而,我们不能简单地在线程环境中进行一次模拟。

因此,我不是传递单个函数来评估,而是传递一个函数列表(每个人一个,其中环境相同,但我们已经分叉了进程,以便通信流不会在个体之间交互。 )

还有一件事需要立即注意: 我正在使用整洁的并行评估数据结构

from clean.parallel import ParallelEvaluator # 使用 multiprocessing.Pool

玩具示例代码:

NPARAMS = nn.flat_init_weights.shape[0]    # make this a 1000-dimensional problem.
NPOPULATION = 5                            # use population size of 5.
MAX_ITERATION = 100                        # run each solver for 100 function calls.

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(x):
    time.sleep(0.1)
    return sum(x**2)

# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
#     return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, weights, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = []
    for i, w in enumerate(weights):
        jobs.append(self.pool.apply_async(self.eval_function[i], (w, ) + args))

    return [job.get() for job in jobs]

ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [fitness]*NPOPULATION)

# time both
for eval_all in [parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION, 
                                                     'popsize': NPOPULATION})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X))
    es.disp()

必要背景:

当我从玩具示例切换到真实代码时,上述操作失败了。

我的课程是:

LevelGenerator (simple GA class that implements mutate, etc)
GridGame (OpenAI wrapper; launches a Java server in which to run the simulation; 
          handles all communication between the Agent and the environment)
Agent    (neural-network class, has an evaluate fn which uses the NN to play a single rollout)
Objective (handles serializing/de-serializing weights: numpy <--> torch; launching the evaluate function)

# The classes get composed to get the necessary behavior:
env   = GridGame(Generator)
agent = NNAgent(env)                # NNAgent is a subclass of (Random) Agent)
obj   = PyTorchObjective(agent)

# My code normally all interacts like this in the single-threaded case:

def test_solver(solver): # Solver: CMA-ES, Differential Evolution, EvolutionStrategy, etc
    history = []
    for j in range(MAX_ITERATION):
        solutions = solver.ask() #2d-numpy array. (POPSIZE x NPARAMS)
        fitness_list = np.zeros(solver.popsize)
        for i in range(solver.popsize):
            fitness_list[i] = obj.function(solutions[i], len(solutions[i]))
        solver.tell(fitness_list)
        result = solver.result() # first element is the best solution, second element is the best fitness
        history.append(result[1])

        scores[j] = fitness_list

    return history, result

所以,当我尝试运行时:

NPARAMS = nn.flat_init_weights.shape[0]        
NPOPULATION = 5                                
MAX_ITERATION = 100                            

_x = NNAgent(GridGame(Generator))

gyms = [_x.mutate(0.0) for _ in range(NPOPULATION)]
objs = [PyTorchObjective(a) for a in gyms]

def evaluate(objective, weights):
    return objective.fun(weights, len(weights))

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(agent):
    return agent.evalute()

# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
#     return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, X, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = []
    for i, x in enumerate(X):
        jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))

    return [job.get() for job in jobs]

ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [obj.fun for obj in objs])
# obj.fun takes in the candidate weights, loads them into the NN, and then evaluates the NN in the environment.

# time both
for eval_all in [parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION, 
                                                     'popsize': NPOPULATION})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X, NPARAMS))
    es.disp()

我收到以下错误:

TypeError                            Traceback (most recent call last)
<ipython-input-57-3e6b7bf6f83a> in <module>
      6     while not es.stop():
      7         X = es.ask()
----> 8         es.tell(X, eval_all(X, NPARAMS))
      9     es.disp()

<ipython-input-55-2182743d6306> in _evaluate2(self, X, *args)
     14         jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
     15 
---> 16     return [job.get() for job in jobs]

<ipython-input-55-2182743d6306> in <listcomp>(.0)
     14         jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
     15 
---> 16     return [job.get() for job in jobs]

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    429                         break
    430                     try:
--> 431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: can't pickle _thread.lock objects

我还在这里读到,这可能是由于这是一个类函数这一事实引起的——类型错误:无法 pickle _thread.lock 对象——所以我创建了全局范围的适应度函数def fitness(agent): return agent.evalute(),但这也不起作用。

我认为这个错误可能是因为最初我在 PyTorchObjective 类中将评估函数作为 lambda 函数,但当我更改它时它仍然崩溃。

任何见解都将不胜感激,并感谢您阅读这面巨大的文字墙。


您没有使用多线程。您正在使用多个进程。

您传递给的所有参数apply_async,包括函数本身,在底层被序列化(pickled)并通过 IPC 通道传递给工作进程(读取multiprocessing文档了解详情)。因此,您不能传递任何与本质上是进程本地的事物相关联的实体。这包括大多数同步原语,因为它们必须使用锁来执行原子操作。

每当这种情况发生时(正如此错误消息中显示的许多其他问题一样), 您可能太聪明了,将一个已经内置并行化逻辑的对象传递给并行化框架。


如果你想用这样的“并行化对象”创建“多级并行化”,你会更好:

  • 正确使用该对象的并行化机制,而不用担心多个级别:无论如何,您一次不能做比您拥有的核心更多的事情;或者
  • create and use these "parallelized objects" inside worker processes
    • but you are likely to hit multiprocessing limitations here since its worker processes are deliberately prohibited from spawning their own pools.
      • 您可以让工作人员向工作队列添加额外的项目,但可能会命中Queue也有局限性。
    • 所以对于这样的场景,更先进的第三方分布式工作队列解决方案可能更可取。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python multiprocessing.pool 与类目标函数和神经进化的交互 的相关文章

  • 努力理解 Python 需要虚拟环境的原因

    我来自 JavaScript 所以熟悉 NPM 在那里 您可以全局安装包 通过使用 g 标志 或在项目中本地安装包 在 Python 中 他们有这些虚拟环境 我仍然有点不确定为什么需要它们 我知道这是为了在一台机器上拥有不同版本的相同包 是
  • 分页后重新显示当前标题

    我正在使用 Wea syPrint 创建文档 我有一些带有名称的部分 其中一些可能跨越多个页面 当节太长时 就会出现分页符 我想做的是重新显示当前部分的名称 最好使用相同的格式 以下 MWE 显示了分页符后如何不显示节标题 h1 First
  • 如何在 Windows 和 Python 2.7 上模拟 os.path.samefile 行为?

    给定两个路径 我必须比较它们是否指向同一个文件 在 Unix 中 这可以通过以下方式完成os path samefile 但正如文档所述 它在 Windows 中不可用 模拟此功能的最佳方法是什么 它不需要模拟常见情况 就我而言 有以下简化
  • Anaconda / 求解环境:初始冻结求解失败。使用灵活的求解重试

    我尝试安装 anaconda 软件包 出现以下消息 求解环境 初始冻结求解失败 使用灵活的解决方案重试 解决环境 current repodata json 中的 repodata 失败 将使用下一个 repodata 源重试 收集包元数据
  • 求 Petersen 子图中的哈密顿路径

    我开始使用 IDE Jupyter Python 3 6 并出现了一个问题 我必须通过IDE绘制Petersen子图中的哈密顿路径 但我不知道该怎么做 我显示有关该图的信息 彼得森图 https en wikipedia org wiki
  • 使用Python下载YouTube视频到某个目录

    我已尝试使用以下代码在 YouTube 中下载视频并且它可以正常工作 但我想将视频保存在特定位置 现在它正在将视频保存在C Users Download 如果我想将视频保存在桌面上 我需要对代码进行哪些更改 from future impo
  • 使用 Python 自动化旧的 DOS 应用程序

    有没有办法从Python 在Windows上 自动化旧的DOS应用程序 16位 可能需要模拟器 例如DOSBox 我想将密钥和字符串发送到应用程序 检测 DOS 屏幕 的更新并获取应用程序输出 如果 DOS 应用程序能够 隐藏 运行 即不显
  • 多输出回归问题的多重损失

    所以我试图训练一个 CNN 模型来预测 4 个实值输出 回归问题 我尝试使用均方误差作为损失函数 我的问题是我是否将输出层分支为 4 个不同的输出层 其中有 4 个不同的输出层由于最后一层的权重是单独更新的 loss 4 MSE 确实可以使
  • 您必须使用 dtype float(Tensorflow) 为占位符张量“Placeholder”提供值

    import tensorflow as tf import os import sklearn preprocessing import pandas as pd import numpy as np print os getcwd os
  • 如何在 difflibs html 输出中突出显示每行超过两个字符

    我在用difflib HtmlDiff比较两个文件 我希望在输出的 html 中突出显示差异 当一行中最多有两个不同的字符时 这已经有效 a 2 000 b 2 120 但是 当一行上有更多不同的字符时 在输出中整行将被标记为红色 在左侧
  • 使用 Python 从基于 AJAX 的网站提取信息

    我正在尝试使用 Python 检索基于 ajax 的网站 例如 www snapbird org 上的查询结果 由于它没有显示在页面源中 我不确定如何继续 我是一个Python新手 因此如果我能得到一个指向正确方向的指针那就太好了 如果更容
  • 如何对 glob.glob 进行数字排序?

    我在一个文件夹中有一堆按数字排序的文件 当我尝试对 glob glob 进行排序时 我从来没有以正确的顺序获得文件 文件示例和预期输出排序 folder C Users user Desktop folder 1 sample mp3 C
  • 如何使用 python 子进程杀死性能记录?

    我正在尝试使用性能实用程序 https www brendangregg com perf html监视我的系统 它将在 python 脚本中启动和终止 我创建了一个沙箱 如下所示 extra params F 99 g a record
  • 在没有 paramiko 的情况下通过 python 运行 ssh 时,“伪终端不会被分配,因为 stdin 不是终端”

    我在 Python 中运行 ssh 而不使用像 Paramiko 这样的外部库 我这样做有我的理由 而不是通过外部库 基本上我正在做subprocess Popen ssh t bla command 执行此操作时我收到以下消息 Pseud
  • Django 查询集和生成器

    出乎意料的是 我想知道以下使用生成器迭代结果集的方式是否会对正常迭代产生任何积极或消极的影响 eg def all items generator for item in Item objects all yield item for it
  • 如何将焦点集中到 python Tkinter 文本小部件?

    我希望能够打开应用程序 GUI 并让它自动将光标放置到特定的文本小部件中 最好的情况是 应用程序启动后 有人就可以开始输入 而无需单击文本小部件 这只是显示问题的一个小示例 from Tkinter import root Tk Windo
  • Tensorflow:为什么 tf.case 给我错误的结果?

    我正在尝试使用tf case https www tensorflow org api docs python tf case https www tensorflow org api docs python tf case 有条件地更新张
  • 就地改变 numpy 函数输出数组

    我正在尝试编写一个对数组执行数学运算并返回结果的函数 一个简化的例子可以是 def original func A return A 1 A 1 为了加速并避免为每个函数调用分配新的输出数组 我希望将输出数组作为参数 并就地更改它 def
  • 使用 pytz 获取时区的国家/地区代码?

    我在用着pytz http pytz sourceforge net country information 我已经阅读了整个文档表 但没有看到如何做到这一点 我有一个时区 美国 芝加哥 我想要的只是获取该时区的相应国家 地区代码 美国 它
  • 访问 django for 循环中的元素

    我有一个 Django 模板 其中包含以下代码 该模板创建多个按钮并尝试通过单击 在同一按钮上 删除 隐藏其中一个按钮 for h in helicopters div class btn group div

随机推荐

  • 巨大负载响应时发生 Java 堆空间错误 |空手道1.0.1

    我们目前在测试框架中使用空手道 0 9 6 要求以某种方式将整个响应正文与存储为 json 的文件相匹配 这是一个基于财务的应用程序 我们需要验证整个响应正文 我正在尝试按照以下方式迁移到 1 0 1升级指南 升级到 1 0 1 后 我们对
  • 最好的 jQuery AJAX 多重上传器 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 有什么建议么 可能是一些非
  • PHP 语言环境 de_DE 返回英语语言环境

    我在 Ubuntu 系统上使用 Apache 和 php 7 安装德语语言环境并运行后locale a要检查已安装的语言环境 我得到C C UTF 8 de DE de DE euro de DE iso88591 de DE iso885
  • 带有 Paramiko 和 RSA 密钥文件的嵌套 SSH

    我正在尝试使用 Paramiko 嵌套 SSH 我将从本地计算机连接到服务器 X 然后从那里连接到服务器 Y 在这里 为了连接到服务器 X 我使用用户名 密码身份验证 并使用用户名和密码连接到服务器 Y RSA 密钥 问题是 RSA 密钥托
  • Java:检测客户端与服务器端断开连接[重复]

    这个问题在这里已经有答案了 我正在使用 Socket 和 ServerSocket 编写 Java 客户端 服务器程序 多个客户端同时连接到服务器 检查客户端连接是否已与服务器端断开的最佳方法是什么 现在 当我尝试写入断开连接的客户端时 我
  • 为什么使用HttpClient进行同步连接

    我正在构建一个类库来与 API 交互 我需要调用 API 并处理 XML 响应 我可以看到使用的好处HttpClient对于异步连接 但我所做的是纯粹同步的 所以我看不到比使用有任何显着的好处HttpWebRequest 如果有人能提供任何
  • 如何设置非标准 gstreamer 属性的类型?

    我正在尝试设置pattern财产为videotestsrc 按照正常逻辑 我尝试将变量设置为i32和一根绳子 两者都会失败并出现要求特定类型的错误 查看 gstreamer API 我找不到设置该属性的方法 如何强制变量的类型与预期匹配 l
  • 为什么需要 Swing Utilities 以及如何使用它?

    这主要是针对我的问题here 但我不明白为什么摇摆实用程序是否需要以及它的用途 我正在设计一个 Swing GUI 我不想错过 Swing Utilities 可能提供的任何功能 也有人可以解释一下是什么invokeLater方法的作用及其
  • 如何判断图像何时已在 IE9 的浏览器缓存中?

    IE9显示错误complete具有以下属性 img src http farm2 static flickr com 1104 1434841504 edc671e65c jpg each function console log this
  • 将整数转换为工作日列表

    我在数据库中存储了一个整数 SQLAgent 频率间隔 该整数实际上是计划运行的一周中选定天数的总和 可能的值是这些值的任意组合 周日 1 星期一 2 星期二 4 星期三 8 星期四 16 星期五 32 星期六 64 ex 65 表示该计划
  • 计算字典中的值

    我有一本字典如下 dictA a duck duck goose b goose goose c duck duck duck d goose e duck duck 我希望循环遍历 dictA 并输出一个列表 该列表将向我显示 dictA
  • 从 SceneDelegate 更新屏幕结构的状态

    我来自 React Native 是 Swift 和 SwiftUI 的初学者 我很好奇当应用程序返回前台时如何在特定屏幕上执行操作并更新状态 我想检查通知的状态 允许 拒绝 等 并更新用户界面 这是一些示例代码 这是我要更新的视图 str
  • 在编译时展开循环

    我想将大量行写入以下形式的 C 文件中foo i for i 0 1 n 有没有办法在编译时执行此操作 我想这样做是因为我有一个模板类 template
  • 使用 AWS 弹性负载均衡器和 Nginx 将非 www 转换为 www

    我有一个在 example com 上运行的应用程序 现在我想将所有流量重定向到 www example com 因为我们正在与 Akamai 的 CDN 合作开发您的网站 我的域名停放在 Route53 中 添加了 Elastic Loa
  • 设置 ZF2 中的 (404) 错误页面使用的布局变量

    目前 我使用 BaseController 的 onDispatch 方法 我的所有其他控制器都扩展了该方法 设置了应用程序的整体布局 phtml 使用的几个变量 public function onDispatch MvcEvent e
  • Android NDK 中的文件操作

    出于性能原因 我使用 Android NDK 主要用 C 语言制作应用程序 但似乎 fopen 等文件操作在 Android 中无法正常工作 每当我尝试使用这些功能时 应用程序就会崩溃 如何使用 Android NDK 创建 写入文件 其他
  • 如何将 pandas 中的制表符分隔更改为逗号分隔

    我不知道这是否可能 我正在尝试将 12 个文件附加到一个文件中 其中一个文件以制表符分隔 其余文件以逗号分隔 我将所有 12 个文件加载到 dataframe 中 并循环将其一一附加到一个空 dataframe 中 list of file
  • 如何在R中对两个变量进行交叉制表?

    这似乎是基本的 但我不会明白 我正在尝试计算 R 中数据的频率表 如下所示 1 2 2 1 3 1 我想传输 csv 输出中的双向频率 其行将是数据 A 列中的所有唯一条目 其列将是数据 B 列中的所有唯一条目 单元格值将是这些值出现的次数
  • 托管在 Windows 服务中的 WCF 出现安全异常,即使它应该在完全信任下运行

    我们在 Windows 服务中托管 WCF 服务 NET 4 0 它在大多数机器上运行得很好 但在某些机器上它会抛出以下异常 错误 为 system serviceModel bindings 创建配置节处理程序时发生错误 该程序集不允许部
  • Python multiprocessing.pool 与类目标函数和神经进化的交互

    警告 这将是很长一段时间 因为我想尽可能具体 确切的问题 这是一个多重处理问题 我确保我的类都按照之前实验中构建 预期的方式运行 编辑 事先说过线程 当我在线程环境中运行问题的玩具示例时 一切正常 然而 当我转向真正的问题时 代码就崩溃了