自适应部署

动机

大多数 Dask 部署是静态的,只有一个调度器和固定数量的工作节点。这导致了可预测的行为,但在两种情况下会造成资源的浪费:

  1. 用户可能没有在使用集群,或者他们可能正在忙于解释最近的结果或图表,因此工作节点处于空闲状态,占用了其他潜在用户宝贵的共享资源。

  2. 用户可能非常活跃,并受限于其原始分配。

特别高效的用户可能会学会在会话期间手动添加和删除工作线程,但这很少见。相反,我们希望Dask集群的大小能够匹配任何给定时间的计算需求。这就是本文件中讨论的*自适应部署*的目标。


Dask 自适应扩展

这些对于交互式工作负载特别有帮助,这些工作负载的特点是长时间的不活动状态被短时间的重度活动打断。自适应部署可以导致更快的分析,为用户提供更多的能力,但对计算资源的压力却大大减少。

自适应

为了使设置自适应部署变得简单,一些 Dask 部署解决方案提供了 .adapt() 方法。以下是使用 dask_kubernetes.KubeCluster 的示例。

from dask_kubernetes import KubeCluster

cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=100)  # scale between 0 and 100 workers

更多关键词选项,请参见下面的 Adaptive 类:

Adaptive(cluster[, interval, minimum, ...])

根据调度器负载自适应分配工作者。

依赖于资源管理器

Dask 调度器本身不知道如何启动工作节点。相反,它依赖于外部资源调度器,如 Kubernetes、Yarn、SGE、SLURM、Mesos 或其他内部系统(参见 如何部署 Dask 集群 以获取选项)。为了使用自适应部署,您必须提供某种机制让调度器启动新的工作节点。通常,这是通过使用 如何部署 Dask 集群 中列出的解决方案之一,或者通过从 Cluster 超类派生并实现该 API 来完成的。

Cluster([asynchronous, loop, quiet, name, ...])

集群对象的超级类

缩放启发式

Dask 调度器跟踪各种信息,这些信息对于正确分配工作者的数量非常有用:

  1. 每个函数和任务的历史运行时间,以及它目前能够为用户运行的所有函数

  2. 每个工作节点使用的内存量和可用内存量

  3. 哪些工人因各种原因(如专业硬件的存在)而空闲或饱和

通过这些,它能够通过将所有待处理任务的累积预期运行时间除以 target_duration 参数(默认为五秒)来确定目标工作线程数。这个工作线程数作为资源管理器的基准请求。这个数量可以根据多种原因进行调整:

  1. 如果集群需要更多内存,那么它将选择目标工作节点数量或当前工作节点数量的两倍(取两者中较大的一个)

  2. 如果目标值超出最小值和最大值的范围,则会被裁剪以适应该范围。

此外,在缩减规模时,Dask 优先选择那些空闲且内存中数据最少的 worker。它在停用 worker 之前将数据移动到其他机器上。为了避免集群大小快速上下波动,我们只在经过几个周期后,确认停用该 worker 是一个好主意时(由 wait_countinterval 参数控制),才会停用 worker。

API

class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None = None, minimum: int | None = None, maximum: int | float | None = None, wait_count: int | None = None, target_duration: str | float | timedelta | None = None, worker_key: Callable[[WorkerState], Hashable] | None = None, **kwargs: Any)[源代码]

根据调度器负载自适应分配工作者。这是一个超类。

包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与能够使用集群资源管理器创建和销毁 Dask 工作者的系统配对。通常它被内置于已有的解决方案中,而不是直接由用户使用。它最常从各种 Dask 集群类的 .adapt(...) 方法中使用。

参数
集群: 对象

必须有 scale 和 scale_down 方法/协程

间隔timedelta 或 str, 默认值为 “1000 ms”

检查之间的毫秒数

wait_count: int, default 3

在移除一个worker之前,应该建议移除它的连续次数。

target_duration: timedelta 或 str, 默认 “5s”

我们希望计算所需的时间。这会影响我们扩展的积极性。

worker_key: Callable[WorkerState]

在缩减规模时将工作者分组的功能 更多信息请参见 Scheduler.workers_to_close

最小值: int

保留的最少工作线程数

最大值: int

最大工作线程数

**kwargs:

传递给 Scheduler 的额外参数。workers_to_close

注释

子类可以重写 Adaptive.target()Adaptive.workers_to_close() 来控制集群何时应该调整大小。默认实现检查每个工作者的任务是否过多或可用内存是否过少(参见 distributed.Scheduler.adaptive_target())。间隔、最小值、最大值、等待计数和目标持续时间的值可以在 dask 配置中的 distributed.adaptive 键下指定。

示例

这通常从现有的 Dask 类中使用,例如 KubeCluster

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster()
>>> cluster.adapt(minimum=10, maximum=100)

或者,您可以通过从 Dask 的 Cluster 超类继承来在自己的 Cluster 类中使用它。

>>> from distributed.deploy import Cluster
>>> class MyCluster(Cluster):
...     def scale_up(self, n):
...         """ Bring worker count up to n """
...     def scale_down(self, workers):
...        """ Remove worker addresses from cluster """
>>> cluster = MyCluster()
>>> cluster.adapt(minimum=10, maximum=100)
class distributed.deploy.Cluster(asynchronous=False, loop=None, quiet=False, name=None, scheduler_sync_interval=1)[源代码]

集群对象的超级类

此类包含 Dask 集群管理器类的通用功能。

要实现这个类,你必须提供

  1. 一个 scheduler_comm 属性,它是遵循 distributed.core.rpc API 的调度器连接。

  2. 实现 scale,该函数接受一个整数并按此数量扩展集群的工作节点,或者将 _supports_scaling 设置为 False

为此,您应该得到以下内容:

  1. 一个标准的 __repr__

  2. 一个实时的 IPython 小部件

  3. 自适应缩放

  4. 与 dask-labextension 集成

  5. 一个 scheduler_info 属性,包含 Scheduler.identity() 的最新副本,上述许多内容都使用它。

  6. 收集日志的方法