如何在单线程中运行 dask.distributed 集群?

2024-03-10

如何在单个线程中运行完整的 Dask.distributed 集群?我想用它来调试或分析。

注意:这是一个常见问题。我将这里的问题和答案添加到 Stack Overflow 中,仅供将来重用。


本地调度程序

如果您可以使用单机调度程序的 API(仅计算),那么您可以使用单线程调度程序

x.compute(scheduler='single-threaded')

分布式调度器-单机

如果你想在单台机器上运行 dask.distributed 集群,你可以不带参数启动客户端

from dask.distributed import Client
client = Client()  # Starts local cluster
x.compute()

这使用许多线程但在一台机器上运行

分布式调度程序 - 单进程

或者,如果您想在单个进程中运行所有内容,那么您可以使用processes=False keyword

from dask.distributed import Client
client = Client(processes=False)  # Starts local cluster
x.compute()

尽管计算发生在单独的线程池中,但所有通信和控制都发生在单个线程中。

分布式调度程序 - 单线程

要在单个线程中运行控制、通信和计算,您需要创建一个 Tornado concurrent.futures Executor。请注意,此 Tornado API 可能不是公开的。

from dask.distributed import Scheduler, Worker, Client
from tornado.concurrent import DummyExecutor
from tornado.ioloop import IOLoop
import threading

loop = IOLoop()
e = DummyExecutor()
s = Scheduler(loop=loop)
s.start()
w = Worker(s.address, loop=loop, executor=e)
loop.add_callback(w._start)

async def f():
    async with Client(s.address, start=False) as c:
        future = c.submit(threading.get_ident)
        result = await future
        return result

>>> threading.get_ident() == loop.run_sync(f)
True
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在单线程中运行 dask.distributed 集群? 的相关文章

随机推荐