API

集群

class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, enable_cudf_spill=False, cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, protocol=None, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_allocator_external_lib_list=None, rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, log_spilling=False, worker_class=None, pre_import=None, **kwargs)[来源]

dask.distributed.LocalCluster 的一个变体,每个进程使用一个 GPU。

这为每个Dask工作进程分配了一个不同的CUDA_VISIBLE_DEVICES环境变量。

对于具有复杂架构映射CPU、GPU和网络硬件的机器,例如NVIDIA DGX-1和DGX-2,此类创建一个本地集群,尽可能尊重此硬件。

每个工作进程都会自动分配正确的CPU核心和网络接口卡,以最大化性能。如果UCX和UCX-Py可用,可以使用InfiniBand和NVLink连接来优化数据传输性能。

Parameters:
CUDA_VISIBLE_DEVICESstr, list of int, or None, default None

限制活动的GPU。可以是字符串(如"0,1,2,3")、列表(如[0, 1, 2, 3]),或None以使用所有可用的GPU。

n_workersint or None, default None

工作线程的数量。可以是一个整数或None以回退到由CUDA_VISIBLE_DEVICES指定的GPU。当指定了CUDA_VISIBLE_DEVICES时,n_workers的值必须小于或等于CUDA_VISIBLE_DEVICES中指定的GPU数量,如果小于,则只使用前n_workers个GPU。

threads_per_workerint, default 1

每个Dask工作进程使用的线程数。

memory_limitint, float, str, or None, default “auto”

主机LRU缓存的大小,用于确定工作进程何时开始将数据溢出到磁盘(如果启用了JIT-Unspill,则不可用)。可以是整数(字节)、浮点数(总系统内存的一部分)、字符串(如"5GB""5000M"),或"auto"、0,或None表示不进行内存管理。

device_memory_limitint, float, str, or None, default 0.8

CUDA设备LRU缓存的大小,用于确定工作进程何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如"5GB""5000M"),或"auto"、0,或None以禁用溢出到主机(即允许完全使用设备内存)。

enable_cudf_spillbool, default False

启用自动 cuDF 溢出。

警告

这不应与JIT-Unspill一起使用。

cudf_spill_statsint, default 0

设置cuDF溢出统计级别。如果enable_cudf_spill=False,此选项无效。

local_directorystr or None, default None

本地机器上存储临时文件的路径。可以是一个字符串(如 "path/to/files")或 None 以回退到本地 Dask 配置中的 dask.temporary-directory 值,如果未设置则使用当前工作目录。

shared_filesystem: bool or None, default None

上面的local_directory是否在所有工作节点之间共享。 如果None,则使用“jit-unspill-shared-fs”配置值,其 默认值为True。请注意,在所有其他情况下,此选项默认为False, 但在本地集群中,它默认为True——我们假设所有工作节点使用 相同的文件系统。

protocolstr or None, default None

用于通信的协议。可以是一个字符串(如 "tcp""ucx"),或者 None 以自动选择正确的协议。

enable_tcp_over_ucxbool, default None

设置环境变量以启用通过UCX的TCP,即使不支持或禁用了InfiniBand和NVLink。

enable_infinibandbool, default None

设置环境变量以启用InfiniBand上的UCX,需要 protocol="ucx" 并且当 True 时意味着 enable_tcp_over_ucx=True

enable_nvlinkbool, default None

设置环境变量以启用通过NVLink的UCX,需要protocol="ucx"并且当True时意味着enable_tcp_over_ucx=True

enable_rdmacmbool, default None

设置环境变量以启用UCX RDMA连接管理器支持, 需要 protocol="ucx"enable_infiniband=True

rmm_pool_sizeint, str or None, default None

初始化每个工作线程的RMM池大小。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如"5GB""5000M"),或None以禁用RMM池。

注意

此大小是每个工作者的配置,而不是集群范围的。

rmm_maximum_pool_sizeint, str or None, default None

rmm_pool_size被设置时,此参数表示 最大池大小。 可以是整数(字节)、浮点数(总设备内存的比例)、字符串 (如"5GB""5000M")或None。默认情况下,使用GPU上的总可用 内存。必须指定rmm_pool_size以使用RMM池 并设置最大池大小。

注意

当与–enable-rmm-async配对时,由于碎片化,无法保证最大大小。

注意

此大小是每个工作者的配置,而不是整个集群的。

rmm_managed_memorybool, default False

初始化每个工作线程时使用RMM,并将其设置为使用托管内存。如果禁用,仍然可以通过指定rmm_pool_size来使用RMM。

警告

托管内存目前与NVLink不兼容。尝试同时启用两者将导致异常。

rmm_async: bool, default False

使用RMM初始化每个工作器,并将其设置为使用RMM的异步分配器。 有关更多信息,请参见rmm.mr.CudaAsyncMemoryResource

警告

异步分配器需要CUDA Toolkit 11.2或更高版本。它也与RMM池和托管内存不兼容。尝试同时启用两者将导致异常。

rmm_allocator_external_lib_list: str, list or None, default None

设置RMM为分配器的外部库列表。 支持的选项有:["torch", "cupy"]。可以是逗号分隔的字符串 (如"torch,cupy")或字符串列表(如["torch", "cupy"])。 如果为None,则没有外部库会使用RMM作为其分配器。

rmm_release_threshold: int, str or None, default None

rmm.async is True 并且池大小超过此值时,池中未使用的内存将在下一个同步点释放。 可以是整数(字节)、浮点数(总设备内存的比例)、字符串(如 "5GB""5000M")或 None。默认情况下,此功能是禁用的。

注意

此大小是每个工作者的配置,而不是集群范围的。

rmm_log_directorystr or None, default None

用于写入每个工作线程的RMM日志文件的目录。客户端和调度程序不会记录在这里。可以是一个字符串(如"/path/to/logs/")或None来禁用日志记录。

注意

只有在指定了rmm_pool_sizermm_managed_memory=True时,日志记录才会启用。

rmm_track_allocationsbool, default False

如果为True,则使用rmm.mr.TrackingResourceAdaptor包装每个工作线程使用的内存资源,该适配器跟踪分配的内存量。

注意

此选项允许Dask仪表板收集和报告额外的诊断信息。然而,这会带来显著的开销,因此应仅用于调试和内存分析。

jit_unspillbool or None, default None

启用即时取消溢出。可以是布尔值或None以回退到本地Dask配置中的dask.jit-unspill值,如果未设置则禁用取消溢出。

注意

这是实验性的,不支持内存溢出到磁盘。有关更多信息,请参见 proxy_object.ProxyObjectproxify_host_file.ProxifyHostFile

log_spillingbool, default True

启用将溢出操作直接记录到distributed.Worker,并使用INFO日志级别。

pre_importstr, list or None, default None

预导入库作为Worker插件,以防止长时间导入影响后续的Dask操作。应该是一个逗号分隔的名称列表,例如“cudf,rmm”或一个字符串列表,例如[“cudf”,“rmm”]。

Raises:
TypeError

如果启用了InfiniBand或NVLink并且protocol!="ucx"

ValueError

如果请求了RMM池、RMM管理内存或RMM异步分配器,但无法导入RMM。 如果同时启用了RMM管理内存和异步分配器。 如果设置了RMM最大池大小但未设置RMM池大小。 如果设置了RMM最大池大小但使用了RMM异步分配器。 如果设置了RMM释放阈值但未使用RMM异步分配器。

另请参阅

LocalCluster

示例

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
new_worker_spec()[source]

返回下一个工作者的名称和规格

Returns:
d: dict mapping names to worker specs

另请参阅

scale

命令行界面

工作者

dask cuda

启动一个带有GPU的分布式工作器,并将其附加到现有的调度器上。

可以通过传递SCHEDULER参数的URI或通过--scheduler-file选项传递的调度程序文件来指定调度程序。

查看 https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html#dask-cuda-worker 以获取信息。

dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

选项

--host <host>

服务主机的IP地址;应该对调度程序和其他工作者可见。可以是一个字符串(如"127.0.0.1")或None以回退到由--interface指定的接口地址或默认接口。

--nthreads <nthreads>

每个Dask工作进程使用的线程数。

Default:

1

--name <name>

工作线程的唯一名称。可以是一个字符串(如"worker-1")或 None表示一个无名的工作线程。

--memory-limit <memory_limit>

主机LRU缓存的大小,用于确定工作进程何时开始将数据溢出到磁盘(如果启用了JIT-Unspill,则不可用)。可以是整数(字节)、浮点数(总系统内存的一部分)、字符串(如"5GB""5000M"),或"auto"、0,或None表示不进行内存管理。

Default:

'auto'

--device-memory-limit <device_memory_limit>

CUDA设备LRU缓存的大小,用于确定工作进程何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如"5GB""5000M"),或"auto"或0以禁用溢出到主机(即允许完全使用设备内存)。

Default:

'0.8'

--enable-cudf-spill, --disable-cudf-spill

启用自动 cuDF 溢出。警告:不应与 JIT-Unspill 一起使用。

Default:

False

--cudf-spill-stats <cudf_spill_stats>

设置cuDF溢出统计级别。如果未指定–enable-cudf-spill,则此选项无效。

--rmm-pool-size <rmm_pool_size>

初始化每个工作线程的RMM池大小。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如"5GB""5000M"),或None以禁用RMM池。

注意

此大小是每个工作者的配置,而不是整个集群的。

--rmm-maximum-pool-size <rmm_maximum_pool_size>

当指定--rmm-pool-size时,此参数表示最大池大小。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如"5GB""5000M")或None。默认情况下,使用GPU上的总可用内存。必须指定rmm_pool_size以使用RMM池并设置最大池大小。

注意

当与–enable-rmm-async配对时,由于碎片化,无法保证最大大小。

注意

此大小是每个工作者的配置,而不是整个集群的。

--rmm-managed-memory, --no-rmm-managed-memory

使用RMM初始化每个工作器,并将其设置为使用托管内存。如果禁用,仍然可以通过指定--rmm-pool-size来使用RMM。

警告

托管内存目前与NVLink不兼容。尝试同时启用两者将导致失败。

Default:

False

--rmm-async, --no-rmm-async

使用RMM初始化每个工作器,并将其设置为使用RMM的异步分配器。有关更多信息,请参见rmm.mr.CudaAsyncMemoryResource

警告

异步分配器需要CUDA Toolkit 11.2或更新版本。它还与RMM池和托管内存不兼容,尝试同时启用两者将导致失败。

Default:

False

--set-rmm-allocator-for-libs <rmm_allocator_external_lib_list>

将RMM设置为外部库的分配器。提供一个以逗号分隔的库列表进行设置,例如,“torch,cupy”。

Options:

cupy | torch

--rmm-release-threshold <rmm_release_threshold>

rmm.asyncTrue 且池大小超过此值时,池中未使用的内存将在下一个同步点释放。可以是整数(字节)、浮点数(总设备内存的比例)、字符串(如 "5GB""5000M")或 None。默认情况下,此功能是禁用的。

注意

此大小是每个工作者的配置,而不是整个集群的。

--rmm-log-directory <rmm_log_directory>

用于写入每个工作线程的RMM日志文件的目录。客户端和调度程序不会记录在这里。可以是一个字符串(如"/path/to/logs/")或None来禁用日志记录。

注意

只有在指定了--rmm-pool-size--rmm-managed-memory时,日志记录才会启用。

--rmm-track-allocations, --no-rmm-track-allocations

跟踪由RMM进行的内存分配。如果True,则使用rmm.mr.TrackingResourceAdaptor包装每个工作进程的内存资源,该适配器允许查询RMM分配的内存量。

Default:

False

--pid-file <pid_file>

写入进程PID的文件。

--resources <resources>

任务约束的资源,如 "GPU=2 MEM=10e9"

--dashboard, --no-dashboard

启动仪表板。

Default:

True

--dashboard-address <dashboard_address>

提供仪表板的相对地址(如果启用)。

Default:

':0'

--local-directory <local_directory>

本地机器上存储临时文件的路径。可以是一个字符串(如 "path/to/files")或 None 以回退到本地 Dask 配置中的 dask.temporary-directory 值,如果未设置则使用当前工作目录。

--shared-filesystem, --no-shared-filesystem

如果指定了–shared-filesystem,则通知JIT-Unspilllocal_directory是一个可供所有工作节点使用的共享文件系统,而–no-shared-filesystem则通知它不能假设这是一个共享文件系统。如果两者都未指定,JIT-Unspill将根据“jit-unspill-shared-fs”指定的Dask配置值来决定。请注意,共享文件系统必须支持os.link()操作。

--scheduler-file <scheduler_file>

文件名到JSON编码的调度程序信息。与等效的dask scheduler选项一起使用。

--protocol <protocol>

协议如 tcp, tls, 或 ucx

--interface <interface>

用于连接到调度程序的外部接口。通常使用以太网接口进行连接,而不是InfiniBand接口(如果有的话)。可以是一个字符串(如"eth0"用于NVLink或"ib0"用于InfiniBand)或None以回退到默认接口。

--preload <preload>

应该由每个工作进程加载的模块,例如 "foo.bar""/path/to/foo.py"

--death-timeout <death_timeout>

关闭前等待调度程序的秒数

--dashboard-prefix <dashboard_prefix>

仪表板的前缀。可以是一个字符串(如…)或None表示没有前缀。

--tls-ca-file <tls_ca_file>

用于TLS的CA证书文件(PEM格式)。可以是一个字符串(如"path/to/certs"),或者None表示没有证书。

--tls-cert <tls_cert>

TLS的证书文件(PEM格式)。可以是一个字符串(如"path/to/certs"),或者None表示没有证书。

--tls-key <tls_key>

TLS的私钥文件(PEM格式)。可以是一个字符串(如"path/to/certs"),或者None表示没有私钥。

--enable-tcp-over-ucx, --disable-tcp-over-ucx

设置环境变量以启用UCX上的TCP,即使不支持或禁用InfiniBand和NVLink。

--enable-infiniband, --disable-infiniband

设置环境变量以启用InfiniBand上的UCX,启用时意味着--enable-tcp-over-ucx

设置环境变量以启用通过NVLink的UCX,启用时意味着--enable-tcp-over-ucx

--enable-rdmacm, --disable-rdmacm

设置环境变量以启用UCX RDMA连接管理器支持,需要--enable-infiniband

--enable-jit-unspill, --disable-jit-unspill

启用即时取消溢出。可以是布尔值或None以回退到本地Dask配置中的dask.jit-unspill值,如果未设置则禁用取消溢出。

注意

这是实验性的,不支持内存溢出到磁盘。有关更多信息,请参见 proxy_object.ProxyObjectproxify_host_file.ProxifyHostFile

--worker-class <worker_class>

使用与Distributed默认类不同的类(distributed.Worker)来生成distributed.Nanny

--pre-import <pre_import>

预导入库作为Worker插件,以防止长时间导入影响后续的Dask操作。应该是一个逗号分隔的名称列表,例如“cudf,rmm”。

--multiprocessing-method <multiprocessing_method>

用于启动多进程新进程的方法

Options:

spawn | fork | forkserver

参数

SCHEDULER

可选参数

PRELOAD_ARGV

可选参数

集群配置

dask cuda

查询现有GPU集群的配置。

可以通过传递到SCHEDULER参数的URI或通过--scheduler-file选项传递的调度器文件来指定集群。

dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

选项

--scheduler-file <scheduler_file>

文件名到JSON编码的调度程序信息。与等效的dask scheduler选项一起使用。

--tls-ca-file <tls_ca_file>

用于TLS的CA证书文件(PEM格式)。可以是一个字符串(如"path/to/certs"),或者None表示没有证书。

--tls-cert <tls_cert>

TLS的证书文件(PEM格式)。可以是一个字符串(如"path/to/certs"),或者None表示没有证书。

--tls-key <tls_key>

TLS的私钥文件(PEM格式)。可以是一个字符串(如"path/to/certs"),或者None表示没有私钥。

参数

SCHEDULER

可选参数

PRELOAD_ARGV

可选参数

客户端初始化

dask_cuda.initialize.initialize(create_cuda_context=True, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, protocol='ucx')[source]

创建CUDA上下文并根据用户参数初始化UCX-Py。

有时初始化CUDA上下文很方便,特别是在启动创建各种线程的Dask工作进程之前。

为了确保UCX正常工作,重要的是要确保它使用正确的选项进行初始化。这对于客户端尤其重要,因为客户端无法配置为使用像LocalCUDAClusterdask cuda worker这样的参数来使用UCX。此函数将确保根据用户传递的标志和选项提供UCX配置。

此函数也可用于工作线程预加载脚本中,用于配置主线 Dask.distributed 的 UCX。 https://docs.dask.org/en/latest/setup/custom-startup.html

您可以使用以下YAML将其添加到您的全局配置中:

distributed:
  worker:
    preload:
      - dask_cuda.initialize

有关Dask配置的更多信息,请参见https://docs.dask.org/en/latest/configuration.html

Parameters:
create_cuda_contextbool, default True

在初始化时创建CUDA上下文。

enable_tcp_over_ucxbool, default None

设置环境变量以启用通过UCX的TCP,即使不支持或禁用了InfiniBand和NVLink。

enable_infinibandbool, default None

设置环境变量以启用InfiniBand上的UCX,当True时,意味着enable_tcp_over_ucx=True

enable_nvlinkbool, default None

设置环境变量以启用通过NVLink的UCX,当True时,意味着enable_tcp_over_ucx=True

enable_rdmacmbool, default None

设置环境变量以启用UCX RDMA连接管理器支持, 需要 enable_infiniband=True

显式通信

class dask_cuda.explicit_comms.comms.CommsContext(client: Client | None = None)[源代码]

显式通信的通信处理器

Parameters:
client: Client, optional

指定用于通信的客户端。如果为None,则使用默认客户端。

run(coroutine, *args, workers=None, lock_workers=False)[来源]

在多个工作线程上运行一个协程

协程将工作者的状态字典作为第一个参数,并将*args作为后续参数。

Parameters:
coroutine: coroutine

在每个工作节点上运行的函数

*args:

coroutine 的参数

workers: list, optional

工人列表。默认是所有工人

lock_workers: bool, optional

使用distributed.MultiLock来获取对工作者的独占访问权限。使用此标志以支持并行运行。

Returns:
ret: list

每个工作者的输出列表

stage_keys(name: str, keys: Iterable[Hashable]) Dict[int, set][source]

在给定名称下的工作节点上暂存密钥

在显式通信任务中,使用 pop_staging_area(…, name) 来访问暂存的键和相关的数据。

Parameters:
name: str

暂存区域的名称

keys: iterable

阶段的关键

Returns:
dict

将每个工作节点等级映射到其暂存键集的字典

注释

在显式通信的上下文中,暂存是指复制Dask键的责任的行为。当暂存一个键时,拥有该键的工作者(由Dask调度器分配)将键的引用和关联数据保存到其本地暂存区域。从这一点开始,如果调度器取消了该键,工作者(以及在该工作者上运行的任务)现在对该键和关联数据拥有独占访问权。这样,暂存使得长时间运行的显式通信任务能够尽快释放输入数据。

submit(worker, coroutine, *args, wait=False)[source]

在单个工作线程上运行一个协程

协程将工作者的状态字典作为第一个参数,并将*args作为后续参数。

Parameters:
worker: str

运行coroutine的工作者

coroutine: coroutine

在工作节点上运行的函数

*args:

coroutine 的参数

wait: boolean, optional

如果为True,则在返回之前等待协程完成。

Returns:
ret: object or Future

如果 wait=True,coroutine 的结果 如果 wait=False,可以在稍后等待的 Future。

dask_cuda.explicit_comms.dataframe.shuffle.shuffle(df: DataFrame, column_names: List[str], npartitions: int | None = None, ignore_index: bool = False, batchsize: int | None = None) DataFrame[源代码]

对DataFrame的划分进行排序,以便列中的所有值对齐

这使用显式通信执行基于任务的洗牌。它需要完整的数据集读取、序列化和洗牌。这是昂贵的。如果可能,您应该避免洗牌。

这不会保留有意义的索引/分区方案。如果在并行情况下执行,这是不确定的。

需要一个激活的客户端。

Parameters:
df: dask.dataframe.DataFrame

要打乱的数据框

column_names: list of strings

我们想要拆分的列名列表。

npartitions: int or None

期望的输出分区数量。如果为None,则输出分区的数量等于df.npartitions

ignore_index: bool

在洗牌时忽略索引。如果为True,性能可能会提高,但索引值将不会被保留。

batchsize: int

洗牌由多轮组成,每轮中每个工作节点分区,然后进行全对全通信,交换其数据帧分区的一部分。批量大小是每个工作节点在每轮中处理的分区数量。 如果为-1,每个工作节点将在单轮中处理其所有分区,并且所有减少内存使用的技术都将被禁用,这在内存压力不是问题时可能会更快。 如果为None,则使用DASK_EXPLICIT_COMMS_BATCHSIZE的值,如果未设置,则默认为1,因此默认情况下,我们优先考虑鲁棒性而非性能。

Returns:
df: dask.dataframe.DataFrame

打乱的数据框