工人
内容
工人¶
概述¶
工作者提供两个功能:
根据调度器的指示执行计算任务
存储并提供计算结果给其他工作者或客户端
每个工作节点包含一个线程池,用于根据调度器的请求执行任务。它将这些任务的结果本地存储,并根据需求向其他工作节点或客户端提供这些结果。如果工作节点被要求执行一个它没有所有必要数据的任务,那么它将联系其对等工作节点以收集必要的依赖项。
调度器和两个工人Alice和Bob之间的一次典型对话可能如下所示:
Scheduler -> Alice: Compute ``x <- add(1, 2)``!
Alice -> Scheduler: I've computed x and am holding on to it!
Scheduler -> Bob: Compute ``y <- add(x, 10)``!
You will need x. Alice has x.
Bob -> Alice: Please send me x.
Alice -> Bob: Sure. x is 3!
Bob -> Scheduler: I've computed y and am holding on to it!
存储数据¶
数据本地存储在一个字典中,位于 .data 属性中,该字典将键映射到函数调用的结果。
>>> worker.data
{'x': 3,
'y': 13,
...
'(df, 0)': pd.DataFrame(...),
...
}
这个 .data 属性是一个 MutableMapping,通常是内存和磁盘存储的组合,并采用LRU策略在它们之间移动数据。
了解更多:Worker 内存管理
线程池¶
每个工作线程将计算任务发送给 concurrent.futures.ThreadPoolExecutor 中的一个线程进行计算。这些计算任务与工作线程通信服务器在同一进程中进行,以便它们能够高效地相互访问和共享数据。为了数据局部性的目的,工作线程内的所有线程都被视为同一个工作线程。
如果你的计算主要是数值计算(例如 NumPy 和 Pandas 计算)并且完全释放了 GIL,那么建议使用多个线程和一个进程来运行 dask worker 进程。这样可以减少通信成本并简化部署。
如果你的计算主要是 Python 代码并且不释放 GIL,那么建议使用多个进程和每个进程一个线程来运行 dask worker 进程:
$ dask worker scheduler:8786 --nworkers 8 --nthreads 1
这将启动8个工作进程,每个进程都有自己的大小为1的ThreadPoolExecutor。
如果你的计算是外部于Python且运行时间长且不释放GIL,那么要注意在计算运行时,工作进程将无法与其他工作进程或调度器通信。这种情况应避免。如果你没有链接自己的自定义C/Fortran代码,那么这个话题可能不适用。
API 文档¶
- class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, **kwargs)[源代码]¶
Dask 分布式集群中的工作节点
工人执行两项功能:
从本地字典提供数据
对这些数据以及来自同行的数据进行计算
工作者会向调度器报告他们的数据,并在必要时使用该调度器从其他工作者那里收集数据以执行计算。
你可以使用
dask worker命令行应用程序启动一个工作进程:$ dask worker scheduler-ip:port
使用
--help标志查看更多选项:$ dask worker --help
此文档字符串的其余部分是关于工作线程用于管理和跟踪内部计算的内部状态。
状态
信息状态
这些属性在执行过程中不会显著变化。
- nthreads:
int: 此工作进程使用的 nthreads 数量
- nthreads:
- 执行器:
dict[str, concurrent.futures.Executor]: 执行器用于执行计算。始终包含默认执行器。
- 执行器:
- 本地目录:
路径: 本地机器上存储临时文件的路径
- 本地目录:
- 调度器:
PooledRPCCall: 调度器的位置。参见
.ip/.port属性。
- 调度器:
- 名称:
字符串: 别名
- 名称:
- 服务:
{str: Server}: 在此工作节点上运行的辅助Web服务器
- 服务:
service_ports:
{str: port}:- transfer_outgoing_count_limit:
int 并发传出数据传输的最大数量。另请参见
distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit。
- transfer_outgoing_count_limit:
- batched_stream:
BatchedSend 我们与调度器通信的批处理流
- batched_stream:
- 日志:
[(消息)] 一个结构化且可查询的日志。请参见
Worker.story
- 日志:
易失状态
这些属性跟踪此工作程序尝试完成的任务的进度。在下面的描述中,
key是我们想要计算的任务的名称,而dep是我们想要从他人那里收集的依赖数据的名称。- 线程:
{key: int} 任务运行的线程ID
- 线程:
- active_threads:
{int: key} 当前在活动线程上运行的键
- active_threads:
- 状态:
WorkerState 封装的状态机。参见
BaseWorker和WorkerState
- 状态:
- 参数
- scheduler_ip: str, 可选
- scheduler_port: int, 可选
- scheduler_file: str, 可选
- host: str, 可选
- 数据: MutableMapping, 类型, 无
用于存储的对象,默认构建一个基于磁盘的LRU字典。
如果提供了一个可调用对象来构造存储对象,并且调用签名中有一个名为
worker_local_directory的参数,那么它将接收到工作者的 attr:local_directory作为参数。- nthreads: int, 可选
- local_directory: str, 可选
我们放置本地资源的目录
- 名称: str, 可选
- memory_limit: int, float, string
此工作线程应使用的内存字节数。设置为零表示无限制。设置为 ‘auto’ 则按系统.MEMORY_LIMIT * min(1, nthreads / total_cores) 计算。可以使用字符串或数字,如 5GB 或 5e9。
- memory_target_fraction: float 或 False
尝试保持在内存的分数(默认:从配置键 distributed.worker.memory.target 读取)
- memory_spill_fraction: float 或 False
开始将数据溢出到磁盘的内存比例(默认:从配置键 distributed.worker.memory.spill 读取)
- memory_pause_fraction: float 或 False
我们停止运行新任务时的内存比例(默认值:从配置键 distributed.worker.memory.pause 读取)
- max_spill: int, string 或 False
溢出到磁盘的字节数限制。(默认:从配置键 distributed.worker.memory.max-spill 读取)
- executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
- 要使用的执行器。根据类型,它有以下含义:
执行器实例:默认执行器。
Dict[str, Executor]: 将名称映射到 Executor 实例。如果字典中没有 “default” 键,将使用
ThreadPoolExecutor(nthreads)创建一个 “default” 执行器。字符串 “offload”,它指的是用于卸载通信的同一个线程池。这导致用于反序列化和计算的是同一个线程。
- 资源: dict
该工作人员拥有的资源,例如
{'GPU': 2}- nanny: str
联系保姆的地址,如果存在
- lifetime: str
在“1小时”等时间量之后,我们优雅地关闭工作线程。默认值为None,表示没有明确的关闭时间。
- lifetime_stagger: str
时间量如“5分钟”来错开生命周期值 实际生命周期将在生命周期 +/- 生命周期错开范围内均匀随机选择
- lifetime_restart: bool
是否在工作进程达到其生命周期后重新启动,默认值为 False
- kwargs: 可选
ServerNode 构造函数的附加参数
示例
使用命令行启动一个工作进程:
$ dask scheduler Start scheduler at 127.0.0.1:8786 $ dask worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786
- batched_send(msg: dict[str, Any]) None[源代码]¶
实现 BaseWorker 抽象方法。
通过批量通信向调度器发送一个“发送即忘”的消息。
如果我们当前没有连接到调度器,消息将会被静默丢弃!
- async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[源代码]¶
关闭工作线程
关闭工作线程上运行的异步操作,停止所有执行器和通信。如果请求,这也会关闭保姆。
- 参数
- 超时
关闭单个指令的超时时间(秒)
- executor_wait
如果为 True,则同步关闭执行器,否则异步关闭。
- 保姆
如果为真,关闭保姆
- 原因
关闭工作者的理由
- 返回
- str | None
如果工作线程已经在关闭状态或失败,则返回 None,否则返回 “OK”
- async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[源代码]¶
优雅地关闭一个工作进程
这首先通知调度器我们正在关闭,并请求它将我们的数据移动到其他地方。之后,我们正常关闭。
- property data: collections.abc.MutableMapping[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]¶
所有已完成任务的 task payload,无论这些任务是在此Worker上计算的,还是在其他地方计算并通过网络传输到这里的。
在使用默认配置时,这是一个 zict 缓冲区,当目标阈值超过时会自动溢出到磁盘。如果禁用了溢出,它将是一个普通的字典。它也可能是用户在初始化 Worker 或 Nanny 时传递的自定义类似字典的对象。Worker 逻辑应将其视为不透明对象,并坚持使用 MutableMapping API。
备注
这个相同的集合也可以在
self.state.data和self.memory_manager.data中找到。- 类型
toctree是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。
- digest_metric(name: collections.abc.Hashable, value: float) None[源代码]¶
通过调用 Server.digest_metric 实现 BaseWorker.digest_metric
- async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]¶
执行一个任务。实现 BaseWorker 抽象方法。
- async gather(who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][源代码]¶
调度器使用的端点。由 Scheduler.rebalance() 和 Scheduler.replicate() 使用。
- async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]¶
实现 BaseWorker 抽象方法
- get_current_task() Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]][源代码]¶
获取我们当前正在运行的任务的键
这只能在任务中运行才有意义
参见
get_worker
示例
>>> from dask.distributed import get_worker >>> def f(): ... return get_worker().get_current_task()
>>> future = client.submit(f) >>> future.result() 'f-1234'
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[源代码]¶
覆盖 BaseWorker 方法以增加验证
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None[源代码]¶
在给定主题下记录事件
- 参数
- 主题str, list[str]
记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。
- 消息
事件消息以记录。注意这必须是 msgpack 可序列化的。
参见
Client.log_event
- async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[源代码]¶
等待一段时间,然后将一个对等工作线程从忙碌状态中取出。实现 BaseWorker 抽象方法。
- async start_unsafe()[源代码]¶
尝试启动服务器。这不是幂等的,也没有防止并发启动尝试的保护措施。
这旨在被子类覆盖或调用。为了安全启动,请使用
Server.start代替。如果配置了
death_timeout,我们将要求此协程在该超时到达之前完成。如果达到超时,我们将关闭实例并引发asyncio.TimeoutError。
- property worker_address¶
为了与Nanny的API兼容
保姆¶
Dask 工作进程默认由一个小型的 Nanny 进程启动、监控和管理。
- class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[源代码]¶
管理工作进程的流程
保姆进程会启动工作进程,监视它们,并在必要时杀死或重启它们。如果你想使用
Client.restart方法,或者在工作进程达到其内存限制的终止部分时自动重启它,这是必要的。Nanny 的参数大部分与 Worker 的参数相同,以下列出例外情况。
- 参数
- env: dict, 可选
在Nanny初始化时设置的环境变量将被确保在Worker进程中也设置。此参数允许覆盖或设置Worker的环境变量。也可以使用选项
distributed.nanny.environ来设置环境变量。优先级如下保姆参数
现有的环境变量
Dask 配置
备注
一些环境变量,如
OMP_NUM_THREADS,必须在导入 numpy 之前设置才能生效。其他变量,如MALLOC_TRIM_THRESHOLD_``(参见 :ref:`memtrim`),必须在启动 Linux 进程之前设置。如果在本处或 ``distributed.nanny.environ中设置这些变量将无效;它们必须在distributed.nanny.pre-spawn-environ中设置,以便在生成子进程之前设置,即使这意味着会污染运行 Nanny 的进程。出于同样的原因,请注意将
distributed.worker.multiprocessing-method从spawn更改为fork或forkserver可能会抑制某些环境变量;如果这样做,您应该在启动dask-worker之前在 shell 中自行设置这些变量。
参见
Worker
- close_gracefully(reason: str = 'nanny-close-gracefully') None[源代码]¶
如果我们不应该尝试在工作者消失时重新启动它们,这是一个信号。
这作为集群关闭过程的一部分使用。