使用命名空间和共享内存字典时关闭管理器错误“AttributeError:'ForkAwareLocal'对象没有属性'连接'”

2023-11-30

我在尝试着:

  1. 在进程之间共享数据帧
  2. 根据对该数据帧执行的计算(但不更改)更新共享字典

我正在使用一个multiprocessing.Manager()创建一个dict在共享内存中(用于存储结果)和Namespace存储/共享我想要读取的数据框。

import multiprocessing

import pandas as pd
import numpy as np


def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    n = 100
    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

    for p in jobs:
        p.join()

    print(shared_dict[1])

运行上面的代码时,它会写入shared_dict当我的打印语句使用一些数据执行时正确。我还收到有关经理的错误:

Process Process-88:
Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 788, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/henrysorsky/Library/Preferences/PyCharm2019.2/scratches/scratch_13.py", line 34, in edit_df_in_shared_dict
    row_to_insert = namespace.df.loc[ind]
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 1099, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 792, in _callmethod
    self._connect()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 779, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

我知道这是来自经理的,似乎是因为它没有正确关闭。我在网上唯一能找到的类似问题:

python服务器中进程之间共享列表

建议加入所有子进程,我已经在这样做了。


因此,经过一整晚的睡眠后,我意识到实际上是共享内存中数据帧的读取导致了问题,并且在第 20 个子进程左右,其中一些子进程无法读取。我添加了一次运行的最大进程数,这解决了这个问题。

对于任何想知道的人,我使用的代码是:

import multiprocessing

import pandas as pd
import numpy as np

def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    # region define inputs

    max_jobs_running = 4
    n = 100

    # endregion

    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    jobs_running = 0
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

        jobs_running += 1

        if jobs_running >= max_jobs_running:
            while jobs_running >= max_jobs_running:
                jobs_running = 0
                for p in jobs:
                    jobs_running += p.is_alive()

    for p in jobs:
        p.join()

    for key, value in shared_dict.items():
        print(f"key: {key}")
        print(f"value: {value}")
        print("-" * 50)

这可能会更好地由Queue and Pool设置而不是我的黑客修复。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用命名空间和共享内存字典时关闭管理器错误“AttributeError:'ForkAwareLocal'对象没有属性'连接'” 的相关文章

随机推荐