在 Windows 上使用多重处理时出现“无法 pickle ”错误

2023-11-22

我正在编写一个多处理程序来使用 Windows 并行处理大型 .CSV 文件。

I found 这个很好的例子对于类似的问题。 在 Windows 下运行它时,我收到一条错误,指出 csv.reader 不可 Picklable。

我想我可以在阅读器子进程中打开 CSV 文件,然后将文件名从父进程发送给它。 但是,我想传递一个已经打开的 CSV 文件(就像代码应该做的那样),具有特定的状态,即真正使用共享对象。

知道如何在 Windows 下做到这一点或者那里缺少什么吗?

这是代码(为了便于阅读,我重新发布):

"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input thread sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across threads so open/close
        # and use it all in the same thread or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

在Windows下运行时,这是我收到的错误:

Traceback (most recent call last):
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 130, in <module>
    main(sys.argv[1:])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 127, in main
    c = CSVWorker(opts.numprocs, args[0], args[1])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 44, in __init__
    self.pin.start()
  File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Python27\lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Python27\lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Python27\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 401, in save_reduce
    save(args)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 548, in save_tuple
    save(element)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 396, in save_reduce
    save(cls)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 753, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

您遇到的问题是由于使用 CSVWorker 类的方法作为流程目标引起的;并且该类有无法 pickle 的成员;那些打开的文件永远不会起作用;

你想要做的就是将该类分成两个类;一个协调所有工作子进程,另一个实际执行计算工作。工作进程将文件名作为参数并根据需要打开各个文件,或者至少等到它们调用其工作方法并打开文件。他们也可以采取multiprocessing.Queues 作为参数或实例成员;可以安全地传递。

在某种程度上,你已经这样做了;你的write_output_csv方法正在子进程中打开文件,但是您的parse_input_csv方法期望找到一个已经打开并准备好的文件作为属性self。坚持以其他方式做,你就会保持良好的状态。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Windows 上使用多重处理时出现“无法 pickle ”错误 的相关文章

  • 如何忽略传递给函数的意外关键字参数?

    假设我有一些功能 f def f a None print a 现在 如果我有一本字典 比如dct a Foo 我可以打电话f dct 并得到结果Foo打印 但是 假设我有一本字典dct2 a Foo b Bar 如果我打电话f dct2
  • 重新索引错误没有意义

    I have DataFrames大小在 100k 到 2m 之间 我正在处理这个问题的框架是如此之大 但请注意 我必须对其他框架执行相同的操作 gt gt gt len data 357451 现在这个文件是通过编译许多文件创建的 所以它
  • Python grpc protobuf 存根生成问题:--grpc_out: protoc-gen-grpc: 插件失败,状态代码 1

    正如问题所说 我从源代码编译了 grpc 并且也做了sudo pip install grpcio 但是 那which grpc python plugin不返回任何内容 这是一个问题 因为route guide的grpc python示例
  • 我应该为 MySQL 使用什么 python 3 库? [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 据我所知 MySQLdb 仍然没有移植到 Python 3 pypy 上似乎有另一个名为 PyMySQL
  • 在 vim 折叠线中语法高亮 Python

    我发现代码折叠 http en wikipedia org wiki Code folding帮助我更好地组织我的文件 因此 在我的底部 vimrc 我启用vim代码折叠 http vimdoc sourceforge net htmldo
  • 从主机名中提取域名

    是否有一种编程方式可以从给定的主机名查找域名 给出 gt www yahoo co jp 返回 gt yahoo co jp 有效但非常慢的方法是 拆分为 并从左侧删除 1 个组 使用 dnspython 加入并查询 SOA 记录 当返回有
  • 使 np.loadtxt 使用多个可能的分隔符

    我有一个程序可以读取数据文件 用户可以选择他们想要使用的列 我希望它对于输入文件更加通用 有时 列可能如下所示 10 34 24 58 8 284 6 121 有时它们可 能看起来像这样 10 34 24 58 8 284 6 121 我希
  • 如何在 Pytorch 中将一维 IntTensor 转换为 int

    如何将一维 IntTensor 转换为整数 这 IntTensor int 给出错误 KeyError Variable containing 423 torch IntTensor of size 1 我所知道的最简单 最干净的方法 In
  • 如何在 Sublime 2 REPL Mac 中运行 Python 3

    我的问题如下 我安装了 sublime 2 和 sublime repl 插件 一切正常 我唯一需要的是更改在控制台内置的 sublimerepl 上运行的 python 版本 我的意思是 我有 python 2 7 5 预先安装了 mav
  • Python:帮助(numpy)在退出时导致段错误

    我遇到了一个奇怪的现象 在 python 解释器中 我执行以下操作 gt gt gt import numpy gt gt gt help numpy 帮助显示正确 但一旦我按 q 返回解释器 Segmentation fault core
  • Scrapy的redirect_urls异常.KeyError

    我是 Scrapy 和 Python 的新手 最近推出了我的第一个蜘蛛 有一个功能似乎以前有效 但现在它只适用于我试图废弃的一些网站 代码行是 item url direct response request meta redirect u
  • 使用 statsmodels.formula.api 中的 ols - 如何删除常数项?

    我正在遵循第一个例子statsmodels教程 http statsmodels sourceforge net devel http statsmodels sourceforge net devel 如何指定在 ols 中不使用常数项进
  • Python 中的十进制到二进制半精度 IEEE 754

    我只能使用以下命令将十进制转换为二进制单精度 IEEE754struct pack模块 或者使用相反的方法 float16 或 float32 numpy frombuffer 是否可以使用 Numpy 将十进制转换为二进制半精度浮点数 我
  • 如何限制scrapy请求对象?

    所以我有一个蜘蛛 我认为它正在泄漏内存 结果当我检查 telnet 控制台 gt gt gt prefs 时 它只是从链接丰富的页面中抓取了太多链接 有时它会超过 100 000 个 现在我已经一遍又一遍地浏览文档和谷歌 但我找不到一种方法
  • Scrapy 抓取并跟踪 href 中的链接

    我对 scrapy 很陌生 我需要从 url 的主页跟踪 href 到多个深度 再次在 href 链接内我有多个 href 我需要遵循这些href 直到到达我想要抓取的页面 我的页面的示例 html 是 初始页 div class page
  • 如何保持 python 3 脚本 (Bot) 运行

    不是母语英语 抱歉 英语可能很蹩脚 我也是编程新手 您好 我正在尝试使用 QueryServer 连接到 TeamSpeak 服务器来创建机器人 经过几天的努力 它有效 只有 1 个问题 而我却被这个问题困扰了 如果您需要检查 这是我正在使
  • Pip 突然使用了错误版本的 Python

    在 os x 上使用 pip 时遇到一个奇怪的问题 据我所知 快速查看我的 bash history 似乎可以确认 我最近没有对我的配置进行任何更改 唉 pip 命令似乎突然使用了与以前不同的 python 版本 到目前为止 我使用命令 p
  • Flask 扩展未在 app.extensions 中注册

    我想访问在我的 Flask 应用程序上注册的一些扩展 我尝试使用app extensions 但我初始化的一些扩展不在字典中 from flask import current app current app extensions get
  • 在多个图表上绘制一条线

    I don t know how this thing is called or even how to describe it so the title may be a little bit misleading The first a
  • 在至少 7 天内连续三天登录该产品的用户

    我有一个用于用户参与的数据框 df 如下所示 time stamp user id 2013 01 01 10 05 23 1 2013 01 03 16 35 23 1 2013 01 06 11 06 35 1 2013 01 10 1

随机推荐