活动内存管理器

Active Memory Manager,或称 AMM,是一个实验性的守护进程,用于优化Dask集群中工作者的内存使用。它默认启用,但可以被禁用/配置。详情请参见 启用Active Memory Manager

内存不平衡和重复

每当Dask任务返回数据时,它会被存储在执行该任务的工作节点上,只要它是对其他任务的依赖,被 Client 通过 Future 引用,或者作为 已发布数据集 的一部分。

Dask 根据 CPU 占用率、资源 和位置性将任务分配给工作节点。在任务之间没有关联、计算时间相同、返回数据大小相同且没有资源限制的简单使用情况下,人们会观察到工作节点之间的内存占用达到完美平衡。然而,在所有其他使用情况下,随着计算的进行,可能会导致内存使用的不平衡。

当一个任务在工作者上运行并需要输入来自不同工作者的任务输出时,Dask 将透明地在工作者之间传输数据,最终在不同工作者上产生相同数据的多个副本。这通常是可取的,因为它避免了如果以后再次需要数据时重新传输数据。然而,这也导致了集群中整体内存使用量的增加。

启用活动内存管理器

AMM 默认启用。可以通过 Dask 配置文件 禁用或调整它:

distributed:
  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      measure: optimistic

以上是推荐的设置,并且将每两秒运行所有启用的 *AMM 策略*(见下文)。或者,您可以从 Client 手动启动/停止 AMM,或触发一次性迭代:

>>> client.amm.start()  # Start running every 2 seconds
>>> client.amm.stop()  # Stop running periodically
>>> client.amm.running()
False
>>> client.amm.run_once()

政策

AMM 本身不做任何事情。用户必须启用 策略,这些策略会建议关于 Dask 数据的操作。AMM 运行这些策略并实施它们的建议,只要它们不损害数据完整性。这些建议可以是两种类型之一:

  • 将内存中Dask任务的数据从一个工作节点复制到另一个工作节点。这不应与由任务依赖关系引起的复制混淆。

  • 删除内存中任务的一个或多个副本。AMM永远不会删除任务的最后一个副本,即使策略要求这样做。

没有“移动”操作。移动操作分两步进行:首先,一个策略创建一个副本;在下一个AMM迭代中,相同的或另一个策略删除原始文件(如果复制成功)。

除非政策对哪些工作者应受到影响施加限制,否则AMM将首先在内存使用率最低的工作者上自动创建副本,并首先从内存使用率最高的工作者上删除它们。

通过 Dask 配置可以启用、禁用和配置单个策略:

distributed:
  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      measure: optimistic
      policies:
      - class: distributed.active_memory_manager.ReduceReplicas
      - class: my_package.MyPolicy
        arg1: foo
        arg2: bar

请参见下方,了解如上例所示的自定义策略。

默认的 Dask 配置文件包含了一系列内置的合理策略,这些策略通常是人们所期望的。你应该首先尝试在你的 Dask 配置中只使用 start: true,看看它是否适合你的需求,然后再调整单个策略。

内置策略

ReduceReplicas

distributed.active_memory_manager.ReduceReplicas

参数

此策略在默认的 Dask 配置中启用。每当一个 Dask 任务在多个工作节点上被复制,并且额外的副本似乎不服务于正在进行的计算时,此策略会删除所有多余的副本。

备注

此策略与 replicate() 以及 scatter()broadcast=True 参数不兼容。如果你调用 replicate() 创建额外的副本,然后运行此策略,它将删除所有副本,只保留一个(但不一定是新创建的副本)。

RetireWorker

distributed.active_memory_manager.RetireWorker

参数
地址str

即将退休的工人的地址。

这是一个特殊策略,绝不应该出现在 Dask 配置文件中。

它由 distributed.Client.retire_workers() 动态注入,并且在自适应集群缩减规模时也会注入。此策略负责将所有仅在即将退役的工作者内存中的任务移动到其他工作者。一旦工作者不再唯一持有任何任务的数据,此策略会自动从活动内存管理器中卸载自身,然后工作者被关闭。

如果同时有多名员工退休,AMM中将会安装多个此策略的实例。

如果活动内存管理器被禁用,distributed.Client.retire_workers() 和自适应扩展将启动一个临时的管理器,将其策略安装到其中,并在完成后关闭它。

自定义策略

高级用户可以通过子类化 ActiveMemoryManagerPolicy 来编写自己的策略。该类应定义两个方法:

__init__

自定义策略可以通过 __init__ 参数从 Dask 配置中加载参数。如果你不需要配置,则不需要实现此方法。

run

此方法不接受任何参数,并由AMM每2秒(或AMM间隔时间)调用一次。它必须产生零个或多个以下 Suggestion 命名元组:

yield Suggestion("复制", <TaskState>)

在内存使用最低且尚未持有副本的工作节点上创建目标任务的一个副本。要创建多个副本,你需要多次生成相同的命令。

yield Suggestion("复制", <任务状态>, {<工作状态>, <工作状态>, ...})

在列出的候选人中,在内存最低的工作者上创建目标任务的一个副本。

yield Suggestion("删除", <TaskState>)

在整个集群中,删除内存使用率最高的worker上的目标任务的一个副本。

yield Suggestion("drop", <TaskState>, {<WorkerState>, <WorkerState>, ...})

在列出的候选人中,删除目标任务在内存最高的worker上的一个副本。

AMM 会静默拒绝不可接受的建议,例如:

  • 删除任务的最后一个副本

  • 从没有持有任何副本的工作者子集中删除一个副本

  • 从当前需要它进行计算的工作节点中删除一个副本

  • 复制一个尚未在内存中的任务

  • 创建比工作节点更多的任务副本

  • 在已经持有任务的工作者上创建任务的副本

  • 在暂停或退休的工作者上创建副本

通常,设计尽可能简单的策略是一个好主意,并让 AMM 通过忽略一些建议来处理上述边缘情况。

可选地,run 方法可以检索 AMM 刚刚选择的哪个工作者,如下所示:

ws = (yield Suggestion("drop", ts))

run 方法可以访问以下属性:

self.manager

该策略所附加的 ActiveMemoryManagerExtension

self.manager.scheduler

Scheduler 将应用这些建议。从那里你可以访问各种属性,如 tasksworkers

self.manager.workers_memory

只读映射 {WorkerState: bytes}。bytes 是当前 AMM 迭代中,所有策略接受的所有建议实施后,工作者预期的 RAM 使用量。请注意,如果您愿意始终在内存使用量最低和最高的工作者上创建/删除副本,则无需访问此映射 - AMM 将为您处理。

self.manager.pending

只读映射 {TaskState: ({<WorkerState>, ...}, {<WorkerState>, ...})}。第一个集合包含将根据到目前为止接受的建议接收任务新副本的工作者;第二个集合包含将失去副本的工作者。

self.manager.policies

在AMM中注册的一组策略。策略可以如下注销自身:

def run(self):
    self.manager.policies.drop(self)

示例

以下自定义策略确保键 “foo” 和 “bar” 始终在所有工作节点上复制。新工作节点在连接到调度器后不久将收到副本。如果目标键不在任何地方的内存中,或者所有工作节点已经持有副本,则该策略将不执行任何操作。请注意,此示例与内置策略 ReduceReplicas 不兼容。

在mymodule.py中(调度器必须能够访问它):

from distributed.active_memory_manager import ActiveMemoryManagerPolicy, Suggestion


class EnsureBroadcast(ActiveMemoryManagerPolicy):
    def __init__(self, key):
        self.key = key

    def run(self):
        ts = self.manager.scheduler.tasks.get(self.key)
        if not ts:
            return
        for _ in range(len(self.manager.scheduler.workers) - len(ts.who_has)):
            yield Suggestion("replicate", ts)

请注意,该策略不会费心测试边缘情况,例如暂停的工作者或其他策略也在请求副本;AMM会处理这些情况。理论上,你可以将最后两行重写如下(以牺牲一些CPU周期为代价):

for _ in range(1000):
    yield Suggestion("replicate", ts)

在 distributed.yaml 中:

distributed:
  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      policies:
      - class: mymodule.EnsureBroadcast
        key: foo
      - class: mymodule.EnsureBroadcast
        key: bar

我们也可以使用一个包含多个键的单一策略实例——上述设计仅仅是为了说明你可以同时运行多个相同策略的实例。

API 参考

class distributed.active_memory_manager.ActiveMemoryManagerExtension(scheduler: distributed.scheduler.Scheduler, policies: set[distributed.active_memory_manager.ActiveMemoryManagerPolicy] | None = None, *, measure: str | None = None, register: bool = True, start: bool | None = None, interval: float | None = None)[源代码]

调度器扩展,优化整个集群的内存使用。它可以手动触发或每隔几秒钟自动触发;每次迭代时,它会执行以下一项或两项操作:

  • 创建内存中任务的新副本

  • 销毁内存中任务的副本;这永远不会销毁最后一个可用副本。

没有 ‘移动’ 操作。移动操作分两步进行:首先创建一个副本,然后在下一次迭代中删除原始文件(如果复制成功)。

此扩展通过 dask 配置部分的 distributed.scheduler.active-memory-manager 进行配置。

amm_handler(method: str) Any[源代码]

调度器处理程序,由客户端通过 AMMClientProxy 调用

interval: float

每这么多秒自动运行一次

measure: str

要使用的内存测量。必须是 distributed.scheduler.MemoryState 的属性或属性之一。

pending: dict[distributed.scheduler.TaskState, tuple[set[distributed.scheduler.WorkerState], set[distributed.scheduler.WorkerState]]]

每个任务的待处理复制和删除 此属性仅存在于 self.run() 的作用域内。

policies: set[distributed.active_memory_manager.ActiveMemoryManagerPolicy]

所有有效政策

run_once() None[源代码]

一次性运行所有策略并异步(触发即遗忘)执行其建议以复制/丢弃任务

property running: bool

如果AMM被定期触发,则返回True;否则返回False。

scheduler: distributed.scheduler.Scheduler

对持有此扩展的调度器的反向引用

start() None[源代码]

开始每 self.interval 秒执行一次,直到调度器关闭

stop() None[源代码]

停止周期性执行

workers_memory: dict[distributed.scheduler.WorkerState, int]

当前在每个工作节点上分配的内存(以字节为单位),加上/减去待处理的操作。此属性仅存在于 self.run() 的作用域内。

class distributed.active_memory_manager.ActiveMemoryManagerPolicy[源代码]

抽象父类

abstract run() collections.abc.Generator[distributed.active_memory_manager.Suggestion, Optional[distributed.scheduler.WorkerState], None][源代码]

此方法由 ActiveMemoryManager 每隔几秒钟调用一次,或者每当用户调用 client.amm.run_once 时调用。

它是一个迭代器,必须发出 Suggestion 对象:

  • 建议("复制", <任务状态>)

  • 建议("复制", <任务状态>, {要复制到的潜在工作者的子集})

  • Suggeston("drop", <TaskState>)

  • 建议("删除", <任务状态>, {要从其中删除的潜在工作者的子集})

每个元素的产生表示希望创建或销毁一个键的单个副本。如果没有提供工人的子集,它默认使用集群中的所有工人。无论是ActiveMemoryManager还是Worker都可能决定忽略该请求,例如因为这将删除键的最后一个副本,或者因为该键当前在该工人上需要。

你可以选择性地获取决定将键复制到或从中删除的工作节点,如下所示:

choice = (yield Suggestion("replicate", ts))

choice 要么是 WorkerState,要么是 None;如果 ActiveMemoryManager 选择忽略请求,则返回后者。

当前待处理(已接受)的建议可以在 self.manager.pending 中检查;这包括之前由同一方法产生的建议。

每个工作者的当前内存使用情况,在所有待处理的建议之后,可以在 self.manager.workers_memory 上进行检查。

class distributed.active_memory_manager.Suggestion(op, ts, candidates)[源代码]
candidates: set[distributed.scheduler.WorkerState] | None

字段编号2的别名

op: Literal['replicate', 'drop']

字段编号 0 的别名

ts: distributed.scheduler.TaskState

字段编号1的别名

class distributed.active_memory_manager.AMMClientProxy(client: distributed.client.Client)[源代码]

从dask客户端操作AMM的便捷访问器

用法: client.amm.start() 等。

如果客户端是异步的,所有方法都是异步的;如果客户端是同步的,所有方法都是同步的。

run_once() Any[源代码]
running() Any[源代码]
start() Any[源代码]
stop() Any[源代码]
class distributed.active_memory_manager.ReduceReplicas[源代码]

确保内存中的任务不会在比预期更多的worker上复制;丢弃多余的副本。

class distributed.active_memory_manager.RetireWorker(address: str)[源代码]

在其他地方复制工作节点上所有唯一的内存中任务,为关闭该节点做准备。

在任何给定时间,AMM 可能已经注册了该策略的多个实例,每个实例对应一个当前正在退役的工作者 - 这意味着大多数时候根本不会有任何实例被注册。因此,该策略不会出现在 dask 配置 (distributed.yaml) 中。实例由 distributed.Scheduler.retire_workers() 添加,并在工作者退役后自动移除。如果 dask 配置中禁用了 AMM,retire_workers() 将启动一个临时的即席实例。

失败条件

可能没有合适的工人来接收即将退休工人的任务。这种情况发生在两种使用场景中:

  1. 这是集群中唯一的工人,或者

  2. 所有工作人员要么暂停工作,要么正在退休

在任何情况下,此策略都无法移出所有键并将 no_recipients 布尔值设置为 True。retire_workers() 将中止退休。

存在第三种使用场景,即由于某种原因任务未能被复制,例如因为其接收者无响应但调度器尚未知晓。在这种情况下,我们将等待下一次AMM迭代并再次尝试(可能会使用不同的接收工作者,例如如果接收工作者挂起但尚未被宣告死亡)。

使用溢出任务停用工作者

在第一次迭代中,该策略建议其他工作者应获取即将退休工作者所有唯一的内存中任务。通常,这意味着在接下来的几刻,退休工作者将受到来自集群其余部分的 distributed.worker.Worker.get_data() 调用的轰炸。如果工作者的大部分管理内存已被溢出,这可能成为一个问题,因为它可能使工作者超过其终止阈值。为了防止这种情况,采取了以下两项措施:

  • 在每次迭代中,此策略会丢弃所有已在其他地方复制的即将退休工作线程上的任务。这为更多任务从溢出文件中移出并复制到另一个工作线程上腾出了空间。

  • 一旦工作线程通过 pause 阈值,get_data() 将出站连接的数量限制为 1。

参数
地址: 字符串

要退役的工作者的URI

done() bool[源代码]

如果可以安全关闭工作进程,则返回 True;否则返回 False。