我处于具有集群、紧密耦合互连和支持 Lustre 文件系统的 HPC 环境中。我们一直在探索如何利用 Dask 不仅提供计算,而且充当分布式缓存来加速我们的工作流程。我们专有的数据格式是 n 维且规则的,并且我们编写了一个惰性读取器以传递到 from_array/from_delayed 方法中。
我们在跨 Dask 集群加载和保存大于内存的数据集时遇到了一些问题。
hdf5 示例:
# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected
我们还从我们自己的一些文件格式加载了数据集(使用切片、dask.delayed 和 from_delayed),并且随着文件大小的增加,性能也出现了类似的下降。
我的问题:使用 Dask 作为分布式缓存是否存在固有的瓶颈?所有数据都会被迫通过调度程序吗?工作人员是否能够利用 Lustre,或者功能和/或 I/O 是否以某种方式序列化?如果是这样的话,不对海量数据集调用 persist,而只让 Dask 在需要时处理数据和计算会更有效吗?
-
使用 Dask 作为分布式缓存是否存在固有的瓶颈?
每个系统都存在瓶颈,但听起来您还没有接近我期望 Dask 遇到的瓶颈。
我怀疑你遇到了其他事情。
-
所有数据都会被迫通过调度程序吗?
不,工作人员可以执行自己加载数据的函数。这些数据将保留在工作人员身上。
-
工作人员是否能够利用 Lustre,或者功能和/或 I/O 是否以某种方式序列化?
Workers 只是 Python 进程,因此如果集群上运行的 Python 进程可以利用 Lustre(几乎可以肯定是这种情况),那么 Dask Workers 就可以利用 Lustre。
-
如果是这样的话,不对海量数据集调用 persist,而只让 Dask 在需要时处理数据和计算会更有效吗?
这当然很常见。这里需要权衡 NFS 的分布式带宽和分布式内存的可用性。
在你的位置上,我会使用 Dask 的诊断来找出是什么占用了这么多时间。您可能想阅读有关的文档了解绩效 http://dask.pydata.org/en/latest/understanding-performance.html以及关于仪表板 http://dask.pydata.org/en/latest/diagnostics-distributed.html尤其。该部分有一个视频可能特别有帮助。我想问两个问题:
- 工人是否一直在执行任务? (状态页、任务流图)
- 在这些任务中,什么占用了时间? (个人资料页)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)