无法将函数并行映射到 tarfile 成员

2023-12-29

我有一个包含 bz2 压缩文件的 tar 文件。我想应用该功能clean_file到每个 bz2 文件,并整理结果。在系列中,使用循环很容易:

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool

def clean_file(member):
    if '.bz2' in str(member):

        f = tr.extractfile(member)

        with bz2.open(f, "rt") as bzinput:
            dicts = []
            for i, line in enumerate(bzinput):
                line = line.replace('"name"}', '"name":" "}')
                dat = json.loads(line)
                dicts.append(dat)

        bzinput.close()
        f.close()
        del f, bzinput

        processed = dicts[0]
        return processed

    else:
        pass


# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)


# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
    processed_files.append(clean_file(m))
    i+=1
    print('done '+str(i)+'/'+str(num_files))
    

但是,我需要能够并行执行此操作。我正在尝试使用的方法Pool像这样:

# Apply the clean_file function in parallel
if __name__ == '__main__':
   with Pool(2) as p:
      processed_files = list(p.map(clean_file, members))

但这会返回一个 OSError:

Traceback (most recent call last):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "parse_data.py", line 19, in clean_file
    for i, line in enumerate(bzinput):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
    return self._buffer.read1(size)
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
    data = self.read(len(byte_view))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
    data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
    for obj in iterable:
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
    raise value
OSError: Invalid data stream

所以我猜这种方式无法正确访问 data.tar 或其他内容中的文件。如何并行应用该功能?

我猜这适用于任何包含 bz2 文件的 tar 存档,但这是我重现错误的数据:https://github.com/johnf1004/reproduct_tar_error https://github.com/johnf1004/reproduce_tar_error


您没有指定您正在运行的平台,但我怀疑它是 Windows,因为您有...

if __name__ == '__main__':
    main()

...这对于在使用操作系统功能的平台上创建进程的代码是必需的spawn用于创建新流程。但这也意味着当创建一个新进程(例如您正在创建的进程池中的所有进程)时,每个进程都会从程序的最顶部重新执行源程序。这意味着每个池进程正在执行以下代码:

tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)

但是,我不明白为什么这本身会导致错误,但我不能确定。然而,问题可能是,这是在调用工作函数之后执行的,clean_file正在被调用,所以tr尚未设置。如果这段代码前面clean_file它可能有效,但这只是一个猜测。当然提取成员members = tr.getmembers()在每个池进程中都是浪费的。每个进程都需要打开 tar 文件,最好只打开一次。

但很明显,您发布的堆栈跟踪与您的代码不匹配。你展示:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))

然而你的代码没有任何参考tqdm或使用方法imap。现在,当您发布的代码与产生异常的代码不太匹配时,分析您的实际问题变得更加困难。

如果您在 Mac 上运行,它可能正在使用fork要创建新进程,当主进程创建了多个线程(您不一定会看到,也许是通过tarfile模块),然后创建一个新进程,我已指定代码以确保spawn用于创建新流程。无论如何,下面的代码should工作。它还引入了一些优化。如果没有,请发布新的堆栈跟踪。

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import get_context

def open_tar():
    # open once for each process in the pool
    global tr
    tr = tarfile.open('data.tar')

def clean_file(member):
    f = tr.extractfile(member)

    with bz2.open(f, "rt") as bzinput:
        for line in bzinput:
            line = line.replace('"name"}', '"name":" "}')
            dat = json.loads(line)
            # since you are returning just the first occurrence:
            return dat

def main():
    with tarfile.open('data.tar') as tr:
        members = tr.getmembers()
    # just pick members where '.bz2' is in member:
    filtered_members = filter(lambda member: '.bz2' in str(member), members)
    ctx = get_context('spawn')
    # open tar file just once for each process in the pool:
    with ctx.Pool(initializer=open_tar) as pool:
        processed_files = pool.map(clean_file, filtered_members)
        print(processed_files)

# required for when processes are created using spawn:
if __name__ == '__main__':
    main()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法将函数并行映射到 tarfile 成员 的相关文章

随机推荐

  • 添加自定义 DLL 搜索路径@应用程序启动

    我正在绞尽脑汁试图想出一个优雅的解决方案来解决 DLL 加载问题 我有一个应用程序静态链接到加载 DLL 的其他 lib 文件 我没有直接加载 DLL 我希望在可执行文件所在的文件夹之外的另一个文件夹中拥有一些 DLL 例如 working
  • RabbitMQ:快速生产者和慢速消费者

    我有一个应用程序 它使用 RabbitMQ 作为消息队列在两个组件 发送者和接收者 之间发送 接收消息 发送者以非常快的方式发送消息 接收方收到消息后会做一些非常耗时的工作 主要是数据量非常大的数据库写入 由于接收方需要很长时间才能完成任务
  • 方法 JPQL 的查询验证失败

    我正在实现一个查询 该查询返回自定义对象中的输出 通过我的实现 我收到一个错误 方法公共抽象java util List org degs repository ConsolidateresponseRepository transacti
  • 可与类型索引中的和和积合并

    Haskell 中是否有类似于以下类型类的内容 class Mergeable f Type gt Type gt Type where merge f a b gt f c d gt f a c Either b d 特别是 想象有一个S
  • 在使用 VBA 填充列表的 Excel 中输入下拉列表时自动完成

    我正在使用下面的代码将数据从另一张表插入到下拉列表中 当用户从另一个下拉列表中选择某个选项时 即可实现这一点 lstRow Sheets Data Sheet Range D Rows Count End xlUp Row Sheets D
  • 如何使用 grep 查找单词列表

    我有一个文件 A 其中有 100 个单词 并用换行符分隔 我想搜索文件 B 以查看文件 A 中的任何单词是否出现在其中 我尝试了以下方法 但对我不起作用 grep F A B 您需要使用该选项 f grep f A B 选项 F进行固定字符
  • Protractor - 当 DOM 元素更改时,页面对象不会更新

    我正在测试使用 angular js 构建的 SPA 并使用页面对象模式来编写我的测试 在应用程序中 我们有许多将要更新的列表 例如 有一个附件列表 当添加 删除附件时 该列表将会更新 要添加附件 我们有一个模式窗口 当我们上传文件并单击
  • Python/Matplotlib - 调整绘图边缘与 x 轴之间的间距

    如何调整 x 轴和绘图窗口边缘之间的间距 我的 x 轴标签是垂直方向的 它们超出了 Matplotlib 绘制的窗口的边缘 这是一些示例代码 import matplotlib pyplot as plt x 1 2 3 4 5 y 1 2
  • GStreamer 插件搜索路径?

    我可以以某种方式告诉 GStreamer 在指定目录中查找插件吗 Use the GST PLUGIN PATH环境变量指向您想要的目录 或者以编程方式调用 GstRegistry registry registry gst registr
  • Oracle 使用代理模式创建数据库链接

    所以我想在 oracle 中创建一个数据库链接 我的用户名是 jefferson 我想通过 opms 连接 所以我被告知这样做 create database link tmpp connect to jefferson opms iden
  • 使用 PHP 在菜单项上设置活动类

    我有一个简单的菜单 ul li 元素和一个class active 来标记当前页面 一个变量被传递 get 通过 url 选择特定页面 pg PAGE 我对 php 相当陌生 仍在学习中 这工作得很好 但我觉得应该有一个更简单 更短的方法
  • 在 Javascript 中反转数字而不使其成为字符串[重复]

    这个问题在这里已经有答案了 谁能告诉我我的代码哪里出错了 我正在尝试反转数字而不将其更改为字符串 我一直在搜索谷歌并浏览了之前提出的有关该主题的问题 从我可以看到我的代码反映了其他答案 我只能找到不使用 to string 方法的 Java
  • Android中如何声明全局变量?

    我正在创建一个需要登录的应用程序 我创建了主要活动和登录活动 在主要活动中onCreate方法我添加了以下条件 public void onCreate Bundle savedInstanceState super onCreate sa
  • 使用反射获取属性的字符串名称

    有大量的反射示例可以让您获得 一个类中的所有属性 单个属性 前提是您知道字符串名称 有没有一种方法 使用反射 TypeDescriptor 或其他方式 在运行时获取类中属性的字符串名称 前提是我拥有 的只是类和属性的实例 我有一个类的实例
  • 如何将数据推送到 iPhone 应用程序?

    我是 iPhone 应用程序开发新手 我无法弄清楚如何将数据推送到应用程序 具体来说 我试图找到一种方法将新数据 用户帖子 从服务器推送到应用程序 而无需用户刷新 下拉刷新 有可能吗 有一个接近的解决方案 使用Apple推送通知服务 它允许
  • 连接字符串无法按预期工作[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我知道这是一个常见问题 但在寻找参考
  • aws_iam_policy 和 aws_iam_role_policy 之间的区别

    我有一个aws iam role我想添加一个策略 通常 我会创建一个策略aws iam role并将其附加到角色上aws iam role policy attachment 但是 我看过一些使用的文档aws iam role policy
  • 如何从另一个分支获取更改

    我目前正在研究featurex分支 我们的主分支被命名为our team 自从我开始工作以来featurex 对分支进行了更多更改our team 我在本地完成此操作是为了获取所有最新更改our team git checkout our
  • 将 PEM 证书解析为 JSON

    我有 PEM 证书并且正在使用openssl查看其内容 是否可以将输出解析为 JSON 格式 也许有一个 Java 库或 Bash 脚本可以做到这一点 命令 openssl x509 in sample cer noout text out
  • 无法将函数并行映射到 tarfile 成员

    我有一个包含 bz2 压缩文件的 tar 文件 我想应用该功能clean file到每个 bz2 文件 并整理结果 在系列中 使用循环很容易 import pandas as pd import json import os import