工人

概述

工作者提供两个功能:

  1. 根据调度器的指示执行计算任务

  2. 存储并提供计算结果给其他工作者或客户端

每个工作节点包含一个线程池,用于根据调度器的请求执行任务。它将这些任务的结果本地存储,并根据需求向其他工作节点或客户端提供这些结果。如果工作节点被要求执行一个它没有所有必要数据的任务,那么它将联系其对等工作节点以收集必要的依赖项。

调度器和两个工人Alice和Bob之间的一次典型对话可能如下所示:

Scheduler -> Alice:  Compute ``x <- add(1, 2)``!
Alice -> Scheduler:  I've computed x and am holding on to it!

Scheduler -> Bob:    Compute ``y <- add(x, 10)``!
                     You will need x.  Alice has x.
Bob -> Alice:        Please send me x.
Alice -> Bob:        Sure.  x is 3!
Bob -> Scheduler:    I've computed y and am holding on to it!

存储数据

数据本地存储在一个字典中,位于 .data 属性中,该字典将键映射到函数调用的结果。

>>> worker.data
{'x': 3,
 'y': 13,
 ...
 '(df, 0)': pd.DataFrame(...),
 ...
 }

这个 .data 属性是一个 MutableMapping,通常是内存和磁盘存储的组合,并采用LRU策略在它们之间移动数据。

了解更多:Worker 内存管理

线程池

每个工作线程将计算任务发送给 concurrent.futures.ThreadPoolExecutor 中的一个线程进行计算。这些计算任务与工作线程通信服务器在同一进程中进行,以便它们能够高效地相互访问和共享数据。为了数据局部性的目的,工作线程内的所有线程都被视为同一个工作线程。

如果你的计算主要是数值计算(例如 NumPy 和 Pandas 计算)并且完全释放了 GIL,那么建议使用多个线程和一个进程来运行 dask worker 进程。这样可以减少通信成本并简化部署。

如果你的计算主要是 Python 代码并且不释放 GIL,那么建议使用多个进程和每个进程一个线程来运行 dask worker 进程:

$ dask worker scheduler:8786 --nworkers 8 --nthreads 1

这将启动8个工作进程,每个进程都有自己的大小为1的ThreadPoolExecutor。

如果你的计算是外部于Python且运行时间长且不释放GIL,那么要注意在计算运行时,工作进程将无法与其他工作进程或调度器通信。这种情况应避免。如果你没有链接自己的自定义C/Fortran代码,那么这个话题可能不适用。

命令行工具

使用 dask worker 命令行工具来启动一个单独的工作者。关于命令行选项的更多细节,请查看 命令行工具文档

内部调度

查看专用页面:工作状态机

API 文档

class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, **kwargs)[源代码]

Dask 分布式集群中的工作节点

工人执行两项功能:

  1. 从本地字典提供数据

  2. 对这些数据以及来自同行的数据进行计算

工作者会向调度器报告他们的数据,并在必要时使用该调度器从其他工作者那里收集数据以执行计算。

你可以使用 dask worker 命令行应用程序启动一个工作进程:

$ dask worker scheduler-ip:port

使用 --help 标志查看更多选项:

$ dask worker --help

此文档字符串的其余部分是关于工作线程用于管理和跟踪内部计算的内部状态。

状态

信息状态

这些属性在执行过程中不会显著变化。

  • nthreads: int:

    此工作进程使用的 nthreads 数量

  • 执行器: dict[str, concurrent.futures.Executor]:

    执行器用于执行计算。始终包含默认执行器。

  • 本地目录: 路径:

    本地机器上存储临时文件的路径

  • 调度器: PooledRPCCall:

    调度器的位置。参见 .ip/.port 属性。

  • 名称: 字符串:

    别名

  • 服务: {str: Server}:

    在此工作节点上运行的辅助Web服务器

  • service_ports: {str: port}:

  • transfer_outgoing_count_limit: int

    并发传出数据传输的最大数量。另请参见 distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit

  • batched_stream: BatchedSend

    我们与调度器通信的批处理流

  • 日志: [(消息)]

    一个结构化且可查询的日志。请参见 Worker.story

易失状态

这些属性跟踪此工作程序尝试完成的任务的进度。在下面的描述中,key 是我们想要计算的任务的名称,而 dep 是我们想要从他人那里收集的依赖数据的名称。

  • 线程{key: int}

    任务运行的线程ID

  • active_threads: {int: key}

    当前在活动线程上运行的键

  • 状态: WorkerState

    封装的状态机。参见 BaseWorkerWorkerState

参数
scheduler_ip: str, 可选
scheduler_port: int, 可选
scheduler_file: str, 可选
host: str, 可选
数据: MutableMapping, 类型, 无

用于存储的对象,默认构建一个基于磁盘的LRU字典。

如果提供了一个可调用对象来构造存储对象,并且调用签名中有一个名为 worker_local_directory 的参数,那么它将接收到工作者的 attr:local_directory 作为参数。

nthreads: int, 可选
local_directory: str, 可选

我们放置本地资源的目录

名称: str, 可选
memory_limit: int, float, string

此工作线程应使用的内存字节数。设置为零表示无限制。设置为 ‘auto’ 则按系统.MEMORY_LIMIT * min(1, nthreads / total_cores) 计算。可以使用字符串或数字,如 5GB 或 5e9。

memory_target_fraction: float 或 False

尝试保持在内存的分数(默认:从配置键 distributed.worker.memory.target 读取)

memory_spill_fraction: float 或 False

开始将数据溢出到磁盘的内存比例(默认:从配置键 distributed.worker.memory.spill 读取)

memory_pause_fraction: float 或 False

我们停止运行新任务时的内存比例(默认值:从配置键 distributed.worker.memory.pause 读取)

max_spill: int, string 或 False

溢出到磁盘的字节数限制。(默认:从配置键 distributed.worker.memory.max-spill 读取)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
要使用的执行器。根据类型,它有以下含义:
  • 执行器实例:默认执行器。

  • Dict[str, Executor]: 将名称映射到 Executor 实例。如果字典中没有 “default” 键,将使用 ThreadPoolExecutor(nthreads) 创建一个 “default” 执行器。

  • 字符串 “offload”,它指的是用于卸载通信的同一个线程池。这导致用于反序列化和计算的是同一个线程。

资源: dict

该工作人员拥有的资源,例如 {'GPU': 2}

nanny: str

联系保姆的地址,如果存在

lifetime: str

在“1小时”等时间量之后,我们优雅地关闭工作线程。默认值为None,表示没有明确的关闭时间。

lifetime_stagger: str

时间量如“5分钟”来错开生命周期值 实际生命周期将在生命周期 +/- 生命周期错开范围内均匀随机选择

lifetime_restart: bool

是否在工作进程达到其生命周期后重新启动,默认值为 False

kwargs: 可选

ServerNode 构造函数的附加参数

示例

使用命令行启动一个工作进程:

$ dask scheduler
Start scheduler at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
batched_send(msg: dict[str, Any]) None[源代码]

实现 BaseWorker 抽象方法。

通过批量通信向调度器发送一个“发送即忘”的消息。

如果我们当前没有连接到调度器,消息将会被静默丢弃!

async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[源代码]

关闭工作线程

关闭工作线程上运行的异步操作,停止所有执行器和通信。如果请求,这也会关闭保姆。

参数
超时

关闭单个指令的超时时间(秒)

executor_wait

如果为 True,则同步关闭执行器,否则异步关闭。

保姆

如果为真,关闭保姆

原因

关闭工作者的理由

返回
str | None

如果工作线程已经在关闭状态或失败,则返回 None,否则返回 “OK”

async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[源代码]

优雅地关闭一个工作进程

这首先通知调度器我们正在关闭,并请求它将我们的数据移动到其他地方。之后,我们正常关闭。

property data: collections.abc.MutableMapping[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]

所有已完成任务的 task payload,无论这些任务是在此Worker上计算的,还是在其他地方计算并通过网络传输到这里的。

在使用默认配置时,这是一个 zict 缓冲区,当目标阈值超过时会自动溢出到磁盘。如果禁用了溢出,它将是一个普通的字典。它也可能是用户在初始化 Worker 或 Nanny 时传递的自定义类似字典的对象。Worker 逻辑应将其视为不透明对象,并坚持使用 MutableMapping API。

备注

这个相同的集合也可以在 self.state.dataself.memory_manager.data 中找到。

类型

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

digest_metric(name: collections.abc.Hashable, value: float) None[源代码]

通过调用 Server.digest_metric 实现 BaseWorker.digest_metric

async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]

执行一个任务。实现 BaseWorker 抽象方法。

async gather(who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][源代码]

调度器使用的端点。由 Scheduler.rebalance() 和 Scheduler.replicate() 使用。

async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]

实现 BaseWorker 抽象方法

get_current_task() Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]][源代码]

获取我们当前正在运行的任务的键

这只能在任务中运行才有意义

参见

get_worker

示例

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[源代码]

覆盖 BaseWorker 方法以增加验证

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[源代码]

在给定主题下记录事件

参数
主题str, list[str]

记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。

消息

事件消息以记录。注意这必须是 msgpack 可序列化的。

参见

Client.log_event
async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[源代码]

等待一段时间,然后将一个对等工作线程从忙碌状态中取出。实现 BaseWorker 抽象方法。

async start_unsafe()[源代码]

尝试启动服务器。这不是幂等的,也没有防止并发启动尝试的保护措施。

这旨在被子类覆盖或调用。为了安全启动,请使用 Server.start 代替。

如果配置了 death_timeout,我们将要求此协程在该超时到达之前完成。如果达到超时,我们将关闭实例并引发 asyncio.TimeoutError

transfer_outgoing_bytes: int

当前向其他工作者开放的数据传输总大小

transfer_outgoing_bytes_total: int

传输到其他工作者的数据总量(包括进行中和失败的传输)

transfer_outgoing_count: int

当前向其他工作者开放的数据传输数量

transfer_outgoing_count_total: int

自工作进程启动以来,向其他工作进程传输的数据总数

trigger_profile() None[源代码]

从所有正在计算的线程中获取一个帧

将这些帧合并到现有的配置文件计数中

property worker_address

为了与Nanny的API兼容

保姆

Dask 工作进程默认由一个小型的 Nanny 进程启动、监控和管理。

class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[源代码]

管理工作进程的流程

保姆进程会启动工作进程,监视它们,并在必要时杀死或重启它们。如果你想使用 Client.restart 方法,或者在工作进程达到其内存限制的终止部分时自动重启它,这是必要的。

Nanny 的参数大部分与 Worker 的参数相同,以下列出例外情况。

参数
env: dict, 可选

在Nanny初始化时设置的环境变量将被确保在Worker进程中也设置。此参数允许覆盖或设置Worker的环境变量。也可以使用选项 distributed.nanny.environ 来设置环境变量。优先级如下

  1. 保姆参数

  2. 现有的环境变量

  3. Dask 配置

备注

一些环境变量,如 OMP_NUM_THREADS,必须在导入 numpy 之前设置才能生效。其他变量,如 MALLOC_TRIM_THRESHOLD_``(参见 :ref:`memtrim`),必须在启动 Linux 进程之前设置。如果在本处或 ``distributed.nanny.environ 中设置这些变量将无效;它们必须在 distributed.nanny.pre-spawn-environ 中设置,以便在生成子进程之前设置,即使这意味着会污染运行 Nanny 的进程。

出于同样的原因,请注意将 distributed.worker.multiprocessing-methodspawn 更改为 forkforkserver 可能会抑制某些环境变量;如果这样做,您应该在启动 dask-worker 之前在 shell 中自行设置这些变量。

参见

Worker
async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'][源代码]

关闭工作进程,停止所有通信。

close_gracefully(reason: str = 'nanny-close-gracefully') None[源代码]

如果我们不应该尝试在工作者消失时重新启动它们,这是一个信号。

这作为集群关闭过程的一部分使用。

async instantiate() distributed.core.Status[源代码]

启动一个本地工作进程

直到进程启动并且调度器正确通知为止,一直阻塞

async kill(timeout: float = 5, reason: str = 'nanny-kill') None[源代码]

终止本地工作进程

阻塞直到进程停止并且调度器被正确通知

log_event(topic, msg)[源代码]

在给定主题下记录事件

参数
主题str, list[str]

记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。

消息

事件消息以记录。注意这必须是 msgpack 可序列化的。

参见

Client.log_event
async start_unsafe()[源代码]

启动保姆进程,启动本地进程,开始监控