python struct.error: 'i' 格式需要 -2147483648 <= number <= 2147483647

2024-01-06

Problem

我愿意使用多处理模块进行特征工程(multiprocessing.Pool.starmap()。 但是,它给出了如下错误消息。我猜这个错误消息与输入的大小有关(2147483647 = 2^31 − 1?),因为相同的代码对于一小部分来说可以顺利工作(frac=0.05)输入数据帧(train_scala、test、ts)。我将数据帧的类型转换为尽可能小的,但它并没有变得更好。

anaconda 版本是 4.3.30,Python 版本是 3.6(64 位)。 系统内存大小超过128GB,核心数量超过20个。 您想建议任何指针或解决方案来克服这个问题吗?如果这个问题是由多处理模块的大数据引起的,我应该使用多少小数据来利用Python3上的多处理模块?

Code:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

错误信息:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

额外信息

  • HistoryCutoffs 是一个整数列表
  • train_scala 是一个 pandas DataFrame (377MB)
  • 测试是 pandas DataFrame (15MB)
  • ts 是一个 pandas DataFrame (547MB)
  • ul_parts_path 是目录列表(字符串)
  • is_train_seq 是布尔值列表

额外代码:方法 multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

进程间的通信协议使用pickling,腌制数据的前缀是腌制数据的大小。对于你的方法,所有论点在一起作为一个对象进行腌制。

您生成了一个对象,当腌制时,该对象大于适合的大小istruct formatter(一个四字节有符号整数),它打破了代码所做的假设。

您可以将数据帧的读取委托给子进程,只发送加载数据帧所需的元数据。它们的总大小接近 1GB,太多数据无法通过进程之间的管道共享。

引用自编程指南 section https://docs.python.org/3/library/multiprocessing.html#programming-guidelines:

继承比 pickle/unpickle 更好

当使用spawn or forkserver多种类型的启动方法multiprocessing需要是可腌制的,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它。

如果您不是在 Windows 上运行并使用spawn or forkserver方法,您可以将数据帧加载为全局变量before启动子进程,此时子进程将通过正常的操作系统写时复制内存页面共享机制“继承”数据。

请注意,在 Python 3.8 中,此限制针对非 Windows 系统提高到了 unsigned long long(8 字节),因此您现在可以发送和接收 4EiB https://en.wikipedia.org/wiki/Exbibyte数据的。看这次提交 https://github.com/python/cpython/commit/bccacd19fa7b56dcf2fbfab15992b6b94ab6666b和Python问题#35152 https://bugs.python.org/issue35152 and #17560 https://bugs.python.org/issue17560.

如果您无法升级并且无法使用资源继承,并且不是在 Windows 上运行,则使用此补丁:

import functools
import logging
import struct
import sys

logger = logging.getLogger()


def patch_mp_connection_bpo_17560():
    """Apply PR-10305 / bpo-17560 connection send/receive max size update

    See the original issue at https://bugs.python.org/issue17560 and 
    https://github.com/python/cpython/pull/10305 for the pull request.

    This only supports Python versions 3.3 - 3.7, this function
    does nothing for Python versions outside of that range.

    """
    patchname = "Multiprocessing connection patch for bpo-17560"
    if not (3, 3) < sys.version_info < (3, 8):
        logger.info(
            patchname + " not applied, not an applicable Python version: %s",
            sys.version
        )
        return

    from multiprocessing.connection import Connection

    orig_send_bytes = Connection._send_bytes
    orig_recv_bytes = Connection._recv_bytes
    if (
        orig_send_bytes.__code__.co_filename == __file__
        and orig_recv_bytes.__code__.co_filename == __file__
    ):
        logger.info(patchname + " already applied, skipping")
        return

    @functools.wraps(orig_send_bytes)
    def send_bytes(self, buf):
        n = len(buf)
        if n > 0x7fffffff:
            pre_header = struct.pack("!i", -1)
            header = struct.pack("!Q", n)
            self._send(pre_header)
            self._send(header)
            self._send(buf)
        else:
            orig_send_bytes(self, buf)

    @functools.wraps(orig_recv_bytes)
    def recv_bytes(self, maxsize=None):
        buf = self._recv(4)
        size, = struct.unpack("!i", buf.getvalue())
        if size == -1:
            buf = self._recv(8)
            size, = struct.unpack("!Q", buf.getvalue())
        if maxsize is not None and size > maxsize:
            return None
        return self._recv(size)

    Connection._send_bytes = send_bytes
    Connection._recv_bytes = recv_bytes

    logger.info(patchname + " applied")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python struct.error: 'i' 格式需要 -2147483648 <= number <= 2147483647 的相关文章

随机推荐