仅当有免费工人可用时如何生成未来

2023-12-02

我正在尝试将从大文件行中提取的信息发送到某个服务器上运行的进程。

为了加快速度,我想并行地使用一些线程来执行此操作。

使用 Python 2.7 向后移植并发期货我试过这个:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()

然而,这是有问题的,因为所有 future 都会立即提交,因此我的机器会耗尽内存,因为完整的文件会加载到内存中。

我的问题是,是否有一种简单的方法可以仅在有免费工人可用时提交新的 future。


您可以使用迭代文件的块

for chunk in zip(*[f]*chunksize):

(这是一个应用程序石斑鱼食谱,它从迭代器收集项目f分成不同大小的组chunksize。注意:这不会立即消耗整个文件,因为zip返回 Python3 中的迭代器。)


import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)

现在,您在评论中正确地指出这不是最佳选择。 可能有一些工人需要很长时间,并且占据了整个工作岗位。

通常,如果每次对工作人员的调用都花费大致相同的时间,那么这并不是什么大问题。然而,这里有一种按需推进文件句柄的方法。它使用一个threading.Condition通知sprinkler前进文件句柄。

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

仅当有免费工人可用时如何生成未来 的相关文章

随机推荐

  • 如何在 Fortran 中的 do 循环中跳过一些迭代

    例如 我想以 2 的增量从 1 循环到 500 但是 对于每 8 个循环 我想跳过接下来的 18 个循环 使 do 变量增加 18 我怎么做 我的代码是 event 0 do i 1 500 2 event event 1 if event
  • 为什么总是调用超类构造函数[重复]

    这个问题在这里已经有答案了 我有以下2个课程 public class classA classA System out println A class classB extends classA classB System out pri
  • 创建一个安全的 Lua 沙箱..?

    现在我正在做很多事情 local env print print setfenv 函数 环境 然后使用元方法来锁定实例上的属性 但它确实效率低下并且有很多绕过 我用谷歌搜索了它 我发现的一切都与此相同 不起作用 在 Lua 5 1 中 沙箱
  • 尝试将 $element 注入 ng-view 会导致未知提供者错误

    我想知道这是否是一个错误或记录在某处 似乎将 element 注入 ng view 指令附加的控制器失败 这是一个例子 脚本 js controller MainCtrl route routeParams location element
  • MongoDB 游标内存泄漏(OutOfMemory)?

    这就是我读取大型 MongoDB 表的方式 每个对象的属性中都有非常大的数据块 DBCursor cursor collection find my query while cursor hasNext DBObject object cu
  • Google Sheet API 值批量更新,正文中范围数量的限制

    https developers google com sheets api reference rest v4 spreadsheets values batchUpdate 这里的文档没有说明一次调用可以一次更新主体中的多少个范围 有人
  • ThreeJS - 绕对象自身轴旋转

    我试图围绕它自己的轴旋转对象 但没有任何效果 我尝试了文档中的所有函数 欧拉函数等 但它根本不想旋转 您可以使用如下所示的模式在其自身 局部 轴上旋转对象 var axis new THREE Vector3 x y z normalize
  • IOS 中的自定义字体未反映在设备上

    我跟着这个tutorial自定义字体显示在我的故事板上 但是当我执行我的应用程序 在模拟器或设备上 时 字体没有反映出来 谁能帮忙 这就是我所做的 1 downloaded a ttf file and copied it to my pr
  • 监控 JQuery 发出的所有 AJAX 请求?

    有没有办法监视页面上使用 JQuery 发出的所有 ajax 请求 并使用每个请求的结果调用回调函数 例如我发出 ajax 请求 get foo foo bar get bar bar foo 然后 每次完成这些 ajax 请求时 我都会调
  • Robolectric:“未找到 AndroidManifest.xml”和“无法找到资源 ID #0x7f09001b”

    我正在使用 Robolectric 进行一些测试 但遇到了一个无法解决的问题 当我运行测试时 AndroidManifest 出现以下错误 警告 在 AndroidManifest xml 中找不到清单文件 仅回退到 Android 操作系
  • 获取 iPhone 中的运营商详细信息

    如何获取 iPhone 当前使用的运营商详细信息 如 Airtel 或 Idea 等 是否有可能获得这些详细信息 或者有什么方法可以识别我们当前正在使用哪个运营商 我正在开发一个基于运营商的应用程序 如果用户更改他的 SIM 运营商 那么该
  • 如何将 symfony yaml 配置文件转换为 xml 格式?

    我想要将 Symfony 配置文件作为 XML 文件 我知道 symfony 书中有很多代码示例 但并未显示所有配置类型 有没有办法将 Symfony 演示项目提供的现有 YAML 文件转换为 XML 文件以使用这些文件作为基础 我发现了一
  • 无法确定何时隐藏和显示使用 JQuery 的加载动画

    我有一个加载动画 最初隐藏在我的 application js 文件中 loading field hide 我有一个自动完成字段 我希望动画在用户开始输入时出现 并在自动完成建议结果出现时消失 下面是我的 jquery ui 自动完成插件
  • 如何将 BitArray 转换为单个 int?

    我怎样才能转换BitArray到一个单一的int private int getIntFromBitArray BitArray bitArray if bitArray Length gt 32 throw new ArgumentExc
  • x86 操作码有模式吗? (方向和大小位除外)

    许多重要的 x86 指令 例如 MOV 和 ADD 的操作码部分将最后两位标准化为方向位和数据大小位 此处灰色显示的操作码部分是否有任何模式 例如 对于 ADD 某些指令使用代码 000000 而其他指令则使用代码 100000 None
  • 在 Android 中使用 SQLCipher 加密/解密现有数据库

    我使用下面的代码来加密和解密我能够加密的数据库 但是当我尝试解密时 我收到以下异常 我引用了这个文档 and 测试用例太仍然面临同样的问题 例外 sqlite returned error code 26 msg file is encry
  • 将图像保存到小程序中的文件中?

    所以事情是这样的 我正在尝试为网页游戏做一个小程序来生成 自定义 头像 这个头像是针对一个国家的军队的 所以头像取决于用户选择的图像 并且图片上的框架也代表用户所属的四边形 所以我的计划是让他们从计算机上的文件中进行选择 然后他们选择他们所
  • 在 CakePHP 3 中按关联模型的条件查找

    我有两张桌子orders and sub orders 他们的协会是 orders gt hasMany SubOrders foreignKey gt order id 两个表都有invoice no and sub invoice中的列
  • Postgresql 是否在事务中隐式包装 select 语句?

    PostgreSQL 实际上将每个 SQL 语句都视为在事务中执行 如果您不发出 BEGIN 命令 则每个单独的语句都有一个隐式的 BEGIN 和 如果成功 COMMIT 围绕它 From 教程交易 这是否意味着即使 select 语句也会
  • 仅当有免费工人可用时如何生成未来

    我正在尝试将从大文件行中提取的信息发送到某个服务器上运行的进程 为了加快速度 我想并行地使用一些线程来执行此操作 使用 Python 2 7 向后移植并发期货我试过这个 f open big file with ThreadPoolExec