您可以使用迭代文件的块
for chunk in zip(*[f]*chunksize):
(这是一个应用程序石斑鱼食谱,它从迭代器收集项目f
分成不同大小的组chunksize
。注意:这不会立即消耗整个文件,因为zip
返回 Python3 中的迭代器。)
import concurrent.futures as CF
import itertools as IT
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
def worker(line):
line = line.strip()
logger.info(line)
chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
for chunk in zip(*[f]*chunksize):
futures = [executor.submit(worker, line) for line in chunk]
# wait for these futures to complete before processing another chunk
CF.wait(futures)
现在,您在评论中正确地指出这不是最佳选择。
可能有一些工人需要很长时间,并且占据了整个工作岗位。
通常,如果每次对工作人员的调用都花费大致相同的时间,那么这并不是什么大问题。然而,这里有一种按需推进文件句柄的方法。它使用一个threading.Condition
通知sprinkler
前进文件句柄。
import logging
import threading
import Queue
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
SENTINEL = object()
def worker(cond, queue):
for line in iter(queue.get, SENTINEL):
line = line.strip()
logger.info(line)
with cond:
cond.notify()
logger.info('notify')
def sprinkler(cond, queue, num_workers):
with open("big_file") as f:
for line in f:
logger.info('advancing filehandle')
with cond:
queue.put(line)
logger.info('waiting')
cond.wait()
for _ in range(num_workers):
queue.put(SENTINEL)
num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()
threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
t.start()
for t in threads:
t.join()