使用线程
如果 dask 工作人员可以共享线程,那么您的代码应该可以正常工作。如果您没有显式初始化 dask 集群,dask.Array 将使用默认参数创建一个使用进程的集群。这会导致您所看到的行为。要解决此问题,请使用线程显式创建集群:
# use threads, not processes
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)
arr = np.load('myarr.npy', mmap_mode='r')
da = dda.from_array(arr).rechunk(chunks=(100, 310, 310))
da.to_zarr('myarr.zarr', mode='w')
使用进程或分布式工作人员
如果您使用的集群无法共享线程,例如 JobQueue、KubernetesCluster 等,则可以使用以下命令来读取 npy 文件,假设它位于网络文件系统上或以某种方式可供所有工作人员使用。
这是一个工作流程,从内存映射创建一个空数组,然后使用映射读取作业dask.array.map_blocks。关键是使用block_info
可选关键字,它提供有关数组中块的位置的信息,我们可以使用 dask 工作程序来切片新的 mmap 数组对象:
def load_npy_chunk(da, fp, block_info=None, mmap_mode='r'):
"""Load a slice of the .npy array, making use of the block_info kwarg"""
np_mmap = np.load(fp, mmap_mode=mmap_mode)
array_location = block_info[0]['array-location']
dim_slicer = tuple(list(map(lambda x: slice(*x), array_location)))
return np_mmap[dim_slicer]
def dask_read_npy(fp, chunks=None, mmap_mode='r'):
"""Read metadata by opening the mmap, then send the read job to workers"""
np_mmap = np.load(fp, mmap_mode=mmap_mode)
da = dda.empty_like(np_mmap, chunks=chunks)
return da.map_blocks(load_npy_chunk, fp=fp, mmap_mode=mmap_mode, meta=da)
这对我来说适用于相同大小的演示(您可以在最后添加 xarray.DataArray 创建/格式化步骤,但 dask 操作工作正常,并且工作内存对我来说保持在 1GB 以下):
import numpy as np, dask.array as dda, xarray as xr, pandas as pd, dask.distributed
### insert/import above functions here
# save a large numpy array
np.save('myarr.npy', np.empty(shape=(47789, 310, 310), dtype=np.float32))
cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(cluster)
da = dask_read_npy('myarr.npy', chunks=(300, -1, -1), mmap_mode='r')
da.to_zarr('myarr.zarr', mode='w')