调度器概述
内容
调度器概述¶
在我们创建了一个 dask 图之后,我们使用调度器来运行它。Dask 目前实现了几个不同的调度器:
dask.threaded.get: 一个由线程池支持的调度器dask.multiprocessing.get: 一个由进程池支持的调度器dask.get: 一个同步调度器,适合调试distributed.Client.get: 一个用于在多台机器上执行图的分布式调度器。它存在于外部的 distributed 项目中。
get 函数¶
所有调度器的入口点是一个 get 函数。它接受一个 dask 图,以及一个要计算的键或键列表:
>>> from operator import add
>>> dsk = {'a': 1,
... 'b': 2,
... 'c': (add, 'a', 'b'),
... 'd': (sum, ['a', 'b', 'c'])}
>>> get(dsk, 'c')
3
>>> get(dsk, 'd')
6
>>> get(dsk, ['a', 'b', 'c'])
[1, 2, 3]
使用 compute 方法¶
在使用 dask 集合时,您很少需要直接与调度器 get 函数交互。每个集合都有一个默认调度器,以及一个内置的 compute 方法,用于计算集合的输出:
>>> import dask.array as da
>>> x = da.arange(100, chunks=10)
>>> x.sum().compute()
4950
compute 方法接受多个关键字参数:
scheduler: 所需调度器的名称,作为字符串("threads","processes","single-threaded"等),一个get函数,或一个dask.distributed.Client对象。覆盖集合的默认设置。**kwargs: 传递给调度器get函数的额外关键字参数。
另请参阅:配置调度器。
compute 函数¶
您可能希望一次性从多个 dask 集合中计算结果。类似于每个集合上的 compute 方法,有一个通用的 compute 函数,它接受多个集合并返回多个结果。这会合并每个集合的图,因此中间结果是共享的:
>>> y = (x + 1).sum()
>>> z = (x + 1).mean()
>>> da.compute(y, z) # Compute y and z, sharing intermediate results
(5050, 50.5)
这里 x + 1 中间结果只计算了一次,而调用 y.compute() 和 z.compute() 会计算两次。对于共享许多中间结果的大型图表,这可以带来显著的性能提升。
compute 函数适用于任何 dask 集合,并且可以在 dask.base 中找到。为了方便起见,它也被导入到每个集合的顶层命名空间中。
>>> from dask.base import compute
>>> compute is da.compute
True
配置调度器¶
dask 集合各自有一个默认调度器:
dask.array和dask.dataframe默认使用线程调度器dask.bag默认使用多进程调度器。
在大多数情况下,默认设置是不错的选择。然而,有时你可能想使用不同的调度器。有两种方法可以做到这一点。
在
compute方法中使用scheduler关键字:>>> x.sum().compute(scheduler='processes')
使用
dask.config.set。这可以作为上下文管理器使用,或者用于全局设置调度器:# As a context manager >>> with dask.config.set(scheduler='processes'): ... x.sum().compute() # Set globally >>> dask.config.set(scheduler='processes') >>> x.sum().compute()
此外,每个调度器可能会接受一些特定于该调度器的额外关键字。例如,多进程和线程调度器各自接受一个 num_workers 关键字,该关键字设置要使用的进程或线程数量(默认为核心数)。这可以通过在调用 compute 时传递关键字来设置:
# Compute with 4 threads
>>> x.compute(num_workers=4)
或者,多进程和线程调度器将检查是否使用 dask.config.set 设置了全局池:
>>> from concurrent.futures import ThreadPoolExecutor
>>> with dask.config.set(pool=ThreadPoolExecutor(4)):
... x.compute()
多进程调度器还支持 不同的上下文`_(“spawn”、“forkserver”、“fork”),你可以通过 ``dask.config.set` 来设置。默认上下文是“spawn”,但你可以设置一个不同的上下文:
>>> with dask.config.set({"multiprocessing.context": "forkserver"}):
... x.compute()
有关每个调度器的各个选项的更多信息,请参阅每个调度器 get 函数的文档字符串。
调试调度器¶
调试并行代码可能会很困难,因为传统的工具如 pdb 在处理多线程或多进程时效果不佳。为了在调试时解决这个问题,我们推荐使用在 dask.get 中找到的同步调度器。这会以串行方式运行所有内容,使其能够很好地与 pdb 配合使用:
>>> dask.config.set(scheduler='single-threaded')
>>> x.sum().compute() # This computation runs serially instead of in parallel
共享内存调度器还提供了一组回调,可用于诊断和分析。您可以在此处了解更多关于调度器回调和诊断的信息 这里。
更多信息¶
有关共享内存(线程或多进程)调度器设计的详细信息,请参阅 共享内存。
有关分布式内存调度器的信息,请参见 distributed。