在文件处理的时候,经常会遇见大文件数据,单进程处理速度太慢,可以通过多进程来提升效率
应用场景一:同时并行处理多个小文件,处理完成后 写回多个文件
def read_wiki_data(infile,outfile,param1):
"""
单个文件的处理逻辑
"""
ngram_dict = defaultdict(int)
localtime = time.asctime(time.localtime(time.time()))
with open(infile,'r',encoding="utf-8") as fr,open(outfile,'w',encoding="utf-8") as fw:
for i,line in enumerate(fr):
fw.write('{}\t{}\n'.format(line,param1))
print("end")
def pipeline_mult_processing(input_dir,output_dir):
"""
:param input_dir:
:param output_dir:
:return:
"""
# 假定参数固定为1
param1 =1
params = []
for f in Path(input_dir).iterdir():
if not Path(output_dir).joinpath(f.parent.name).is_dir():
Path(output_dir).joinpath(f.parent.name).mkdir(parents=True)
output_file = Path(output_dir).joinpath(f.parent.name, f.name)
params.append((f,output_file,param1))
# 开启20个线程
with Pool(processes=20) as pool:
pool.starmap(read_wiki_data,params)
# 调用方式
freeze_support()
pipeline_mult_processing(input_dir,output_dir)
主要使用了 pool.starmap 函数,特别强调,自定义传递参数 并行写入并放到list里面即可以,params = [(inf1,outf1,param1),(inf1,outf1,param2)]