API
集群
- class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, enable_cudf_spill=False, cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, protocol=None, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_allocator_external_lib_list=None, rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, log_spilling=False, worker_class=None, pre_import=None, **kwargs)[来源]
dask.distributed.LocalCluster的一个变体,每个进程使用一个 GPU。这为每个Dask工作进程分配了一个不同的
CUDA_VISIBLE_DEVICES环境变量。对于具有复杂架构映射CPU、GPU和网络硬件的机器,例如NVIDIA DGX-1和DGX-2,此类创建一个本地集群,尽可能尊重此硬件。
每个工作进程都会自动分配正确的CPU核心和网络接口卡,以最大化性能。如果UCX和UCX-Py可用,可以使用InfiniBand和NVLink连接来优化数据传输性能。
- Parameters:
- CUDA_VISIBLE_DEVICESstr, list of int, or None, default None
限制活动的GPU。可以是字符串(如
"0,1,2,3")、列表(如[0, 1, 2, 3]),或None以使用所有可用的GPU。- n_workersint or None, default None
工作线程的数量。可以是一个整数或
None以回退到由CUDA_VISIBLE_DEVICES指定的GPU。当指定了CUDA_VISIBLE_DEVICES时,n_workers的值必须小于或等于CUDA_VISIBLE_DEVICES中指定的GPU数量,如果小于,则只使用前n_workers个GPU。- threads_per_workerint, default 1
每个Dask工作进程使用的线程数。
- memory_limitint, float, str, or None, default “auto”
主机LRU缓存的大小,用于确定工作进程何时开始将数据溢出到磁盘(如果启用了JIT-Unspill,则不可用)。可以是整数(字节)、浮点数(总系统内存的一部分)、字符串(如
"5GB"或"5000M"),或"auto"、0,或None表示不进行内存管理。- device_memory_limitint, float, str, or None, default 0.8
CUDA设备LRU缓存的大小,用于确定工作进程何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如
"5GB"或"5000M"),或"auto"、0,或None以禁用溢出到主机(即允许完全使用设备内存)。- enable_cudf_spillbool, default False
启用自动 cuDF 溢出。
警告
这不应与JIT-Unspill一起使用。
- cudf_spill_statsint, default 0
设置cuDF溢出统计级别。如果
enable_cudf_spill=False,此选项无效。- local_directorystr or None, default None
本地机器上存储临时文件的路径。可以是一个字符串(如
"path/to/files")或None以回退到本地 Dask 配置中的dask.temporary-directory值,如果未设置则使用当前工作目录。- shared_filesystem: bool or None, default None
上面的local_directory是否在所有工作节点之间共享。 如果
None,则使用“jit-unspill-shared-fs”配置值,其 默认值为True。请注意,在所有其他情况下,此选项默认为False, 但在本地集群中,它默认为True——我们假设所有工作节点使用 相同的文件系统。- protocolstr or None, default None
用于通信的协议。可以是一个字符串(如
"tcp"或"ucx"),或者None以自动选择正确的协议。- enable_tcp_over_ucxbool, default None
设置环境变量以启用通过UCX的TCP,即使不支持或禁用了InfiniBand和NVLink。
- enable_infinibandbool, default None
设置环境变量以启用InfiniBand上的UCX,需要
protocol="ucx"并且当True时意味着enable_tcp_over_ucx=True。- enable_nvlinkbool, default None
设置环境变量以启用通过NVLink的UCX,需要
protocol="ucx"并且当True时意味着enable_tcp_over_ucx=True。- enable_rdmacmbool, default None
设置环境变量以启用UCX RDMA连接管理器支持, 需要
protocol="ucx"和enable_infiniband=True。- rmm_pool_sizeint, str or None, default None
初始化每个工作线程的RMM池大小。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如
"5GB"或"5000M"),或None以禁用RMM池。注意
此大小是每个工作者的配置,而不是集群范围的。
- rmm_maximum_pool_sizeint, str or None, default None
当
rmm_pool_size被设置时,此参数表示 最大池大小。 可以是整数(字节)、浮点数(总设备内存的比例)、字符串 (如"5GB"或"5000M")或None。默认情况下,使用GPU上的总可用 内存。必须指定rmm_pool_size以使用RMM池 并设置最大池大小。注意
当与–enable-rmm-async配对时,由于碎片化,无法保证最大大小。
注意
此大小是每个工作者的配置,而不是整个集群的。
- rmm_managed_memorybool, default False
初始化每个工作线程时使用RMM,并将其设置为使用托管内存。如果禁用,仍然可以通过指定
rmm_pool_size来使用RMM。警告
托管内存目前与NVLink不兼容。尝试同时启用两者将导致异常。
- rmm_async: bool, default False
使用RMM初始化每个工作器,并将其设置为使用RMM的异步分配器。 有关更多信息,请参见
rmm.mr.CudaAsyncMemoryResource。警告
异步分配器需要CUDA Toolkit 11.2或更高版本。它也与RMM池和托管内存不兼容。尝试同时启用两者将导致异常。
- rmm_allocator_external_lib_list: str, list or None, default None
设置RMM为分配器的外部库列表。 支持的选项有:
["torch", "cupy"]。可以是逗号分隔的字符串 (如"torch,cupy")或字符串列表(如["torch", "cupy"])。 如果为None,则没有外部库会使用RMM作为其分配器。- rmm_release_threshold: int, str or None, default None
当
rmm.async is True并且池大小超过此值时,池中未使用的内存将在下一个同步点释放。 可以是整数(字节)、浮点数(总设备内存的比例)、字符串(如"5GB"或"5000M")或None。默认情况下,此功能是禁用的。注意
此大小是每个工作者的配置,而不是集群范围的。
- rmm_log_directorystr or None, default None
用于写入每个工作线程的RMM日志文件的目录。客户端和调度程序不会记录在这里。可以是一个字符串(如
"/path/to/logs/")或None来禁用日志记录。注意
只有在指定了
rmm_pool_size或rmm_managed_memory=True时,日志记录才会启用。- rmm_track_allocationsbool, default False
如果为True,则使用
rmm.mr.TrackingResourceAdaptor包装每个工作线程使用的内存资源,该适配器跟踪分配的内存量。注意
此选项允许Dask仪表板收集和报告额外的诊断信息。然而,这会带来显著的开销,因此应仅用于调试和内存分析。
- jit_unspillbool or None, default None
启用即时取消溢出。可以是布尔值或
None以回退到本地Dask配置中的dask.jit-unspill值,如果未设置则禁用取消溢出。注意
这是实验性的,不支持内存溢出到磁盘。有关更多信息,请参见
proxy_object.ProxyObject和proxify_host_file.ProxifyHostFile。- log_spillingbool, default True
启用将溢出操作直接记录到
distributed.Worker,并使用INFO日志级别。- pre_importstr, list or None, default None
预导入库作为Worker插件,以防止长时间导入影响后续的Dask操作。应该是一个逗号分隔的名称列表,例如“cudf,rmm”或一个字符串列表,例如[“cudf”,“rmm”]。
- Raises:
- TypeError
如果启用了InfiniBand或NVLink并且
protocol!="ucx"。- ValueError
如果请求了RMM池、RMM管理内存或RMM异步分配器,但无法导入RMM。 如果同时启用了RMM管理内存和异步分配器。 如果设置了RMM最大池大小但未设置RMM池大小。 如果设置了RMM最大池大小但使用了RMM异步分配器。 如果设置了RMM释放阈值但未使用RMM异步分配器。
另请参阅
LocalCluster
示例
>>> from dask_cuda import LocalCUDACluster >>> from dask.distributed import Client >>> cluster = LocalCUDACluster() >>> client = Client(cluster)
命令行界面
工作者
dask cuda
启动一个带有GPU的分布式工作器,并将其附加到现有的调度器上。
可以通过传递SCHEDULER参数的URI或通过--scheduler-file选项传递的调度程序文件来指定调度程序。
查看 https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html#dask-cuda-worker 以获取信息。
dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...
选项
- --host <host>
服务主机的IP地址;应该对调度程序和其他工作者可见。可以是一个字符串(如
"127.0.0.1")或None以回退到由--interface指定的接口地址或默认接口。
- --nthreads <nthreads>
每个Dask工作进程使用的线程数。
- Default:
1
- --name <name>
工作线程的唯一名称。可以是一个字符串(如
"worker-1")或None表示一个无名的工作线程。
- --memory-limit <memory_limit>
主机LRU缓存的大小,用于确定工作进程何时开始将数据溢出到磁盘(如果启用了JIT-Unspill,则不可用)。可以是整数(字节)、浮点数(总系统内存的一部分)、字符串(如
"5GB"或"5000M"),或"auto"、0,或None表示不进行内存管理。- Default:
'auto'
- --device-memory-limit <device_memory_limit>
CUDA设备LRU缓存的大小,用于确定工作进程何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如
"5GB"或"5000M"),或"auto"或0以禁用溢出到主机(即允许完全使用设备内存)。- Default:
'0.8'
- --enable-cudf-spill, --disable-cudf-spill
启用自动 cuDF 溢出。警告:不应与 JIT-Unspill 一起使用。
- Default:
False
- --cudf-spill-stats <cudf_spill_stats>
设置cuDF溢出统计级别。如果未指定–enable-cudf-spill,则此选项无效。
- --rmm-pool-size <rmm_pool_size>
初始化每个工作线程的RMM池大小。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如
"5GB"或"5000M"),或None以禁用RMM池。注意
此大小是每个工作者的配置,而不是整个集群的。
- --rmm-maximum-pool-size <rmm_maximum_pool_size>
当指定
--rmm-pool-size时,此参数表示最大池大小。可以是整数(字节)、浮点数(总设备内存的一部分)、字符串(如"5GB"或"5000M")或None。默认情况下,使用GPU上的总可用内存。必须指定rmm_pool_size以使用RMM池并设置最大池大小。注意
当与–enable-rmm-async配对时,由于碎片化,无法保证最大大小。
注意
此大小是每个工作者的配置,而不是整个集群的。
- --rmm-managed-memory, --no-rmm-managed-memory
使用RMM初始化每个工作器,并将其设置为使用托管内存。如果禁用,仍然可以通过指定
--rmm-pool-size来使用RMM。警告
托管内存目前与NVLink不兼容。尝试同时启用两者将导致失败。
- Default:
False
- --rmm-async, --no-rmm-async
使用RMM初始化每个工作器,并将其设置为使用RMM的异步分配器。有关更多信息,请参见
rmm.mr.CudaAsyncMemoryResource。警告
异步分配器需要CUDA Toolkit 11.2或更新版本。它还与RMM池和托管内存不兼容,尝试同时启用两者将导致失败。
- Default:
False
- --set-rmm-allocator-for-libs <rmm_allocator_external_lib_list>
将RMM设置为外部库的分配器。提供一个以逗号分隔的库列表进行设置,例如,“torch,cupy”。
- Options:
cupy | torch
- --rmm-release-threshold <rmm_release_threshold>
当
rmm.async为True且池大小超过此值时,池中未使用的内存将在下一个同步点释放。可以是整数(字节)、浮点数(总设备内存的比例)、字符串(如"5GB"或"5000M")或None。默认情况下,此功能是禁用的。注意
此大小是每个工作者的配置,而不是整个集群的。
- --rmm-log-directory <rmm_log_directory>
用于写入每个工作线程的RMM日志文件的目录。客户端和调度程序不会记录在这里。可以是一个字符串(如
"/path/to/logs/")或None来禁用日志记录。注意
只有在指定了
--rmm-pool-size或--rmm-managed-memory时,日志记录才会启用。
- --rmm-track-allocations, --no-rmm-track-allocations
跟踪由RMM进行的内存分配。如果
True,则使用rmm.mr.TrackingResourceAdaptor包装每个工作进程的内存资源,该适配器允许查询RMM分配的内存量。- Default:
False
- --pid-file <pid_file>
写入进程PID的文件。
- --resources <resources>
任务约束的资源,如
"GPU=2 MEM=10e9"。
- --dashboard, --no-dashboard
启动仪表板。
- Default:
True
- --dashboard-address <dashboard_address>
提供仪表板的相对地址(如果启用)。
- Default:
':0'
- --local-directory <local_directory>
本地机器上存储临时文件的路径。可以是一个字符串(如
"path/to/files")或None以回退到本地 Dask 配置中的dask.temporary-directory值,如果未设置则使用当前工作目录。
如果指定了–shared-filesystem,则通知JIT-Unspilllocal_directory是一个可供所有工作节点使用的共享文件系统,而–no-shared-filesystem则通知它不能假设这是一个共享文件系统。如果两者都未指定,JIT-Unspill将根据“jit-unspill-shared-fs”指定的Dask配置值来决定。请注意,共享文件系统必须支持os.link()操作。
- --scheduler-file <scheduler_file>
文件名到JSON编码的调度程序信息。与等效的
dask scheduler选项一起使用。
- --protocol <protocol>
协议如 tcp, tls, 或 ucx
- --interface <interface>
用于连接到调度程序的外部接口。通常使用以太网接口进行连接,而不是InfiniBand接口(如果有的话)。可以是一个字符串(如
"eth0"用于NVLink或"ib0"用于InfiniBand)或None以回退到默认接口。
- --preload <preload>
应该由每个工作进程加载的模块,例如
"foo.bar"或"/path/to/foo.py"。
- --death-timeout <death_timeout>
关闭前等待调度程序的秒数
- --dashboard-prefix <dashboard_prefix>
仪表板的前缀。可以是一个字符串(如…)或
None表示没有前缀。
- --tls-ca-file <tls_ca_file>
用于TLS的CA证书文件(PEM格式)。可以是一个字符串(如
"path/to/certs"),或者None表示没有证书。
- --tls-cert <tls_cert>
TLS的证书文件(PEM格式)。可以是一个字符串(如
"path/to/certs"),或者None表示没有证书。
- --tls-key <tls_key>
TLS的私钥文件(PEM格式)。可以是一个字符串(如
"path/to/certs"),或者None表示没有私钥。
- --enable-tcp-over-ucx, --disable-tcp-over-ucx
设置环境变量以启用UCX上的TCP,即使不支持或禁用InfiniBand和NVLink。
- --enable-infiniband, --disable-infiniband
设置环境变量以启用InfiniBand上的UCX,启用时意味着
--enable-tcp-over-ucx。
- --enable-nvlink, --disable-nvlink
设置环境变量以启用通过NVLink的UCX,启用时意味着
--enable-tcp-over-ucx。
- --enable-rdmacm, --disable-rdmacm
设置环境变量以启用UCX RDMA连接管理器支持,需要
--enable-infiniband。
- --enable-jit-unspill, --disable-jit-unspill
启用即时取消溢出。可以是布尔值或
None以回退到本地Dask配置中的dask.jit-unspill值,如果未设置则禁用取消溢出。注意
这是实验性的,不支持内存溢出到磁盘。有关更多信息,请参见
proxy_object.ProxyObject和proxify_host_file.ProxifyHostFile。
- --worker-class <worker_class>
使用与Distributed默认类不同的类(
distributed.Worker)来生成distributed.Nanny。
- --pre-import <pre_import>
预导入库作为Worker插件,以防止长时间导入影响后续的Dask操作。应该是一个逗号分隔的名称列表,例如“cudf,rmm”。
- --multiprocessing-method <multiprocessing_method>
用于启动多进程新进程的方法
- Options:
spawn | fork | forkserver
参数
- SCHEDULER
可选参数
- PRELOAD_ARGV
可选参数
集群配置
dask cuda
查询现有GPU集群的配置。
可以通过传递到SCHEDULER参数的URI或通过--scheduler-file选项传递的调度器文件来指定集群。
dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...
选项
- --scheduler-file <scheduler_file>
文件名到JSON编码的调度程序信息。与等效的
dask scheduler选项一起使用。
- --tls-ca-file <tls_ca_file>
用于TLS的CA证书文件(PEM格式)。可以是一个字符串(如
"path/to/certs"),或者None表示没有证书。
- --tls-cert <tls_cert>
TLS的证书文件(PEM格式)。可以是一个字符串(如
"path/to/certs"),或者None表示没有证书。
- --tls-key <tls_key>
TLS的私钥文件(PEM格式)。可以是一个字符串(如
"path/to/certs"),或者None表示没有私钥。
参数
- SCHEDULER
可选参数
- PRELOAD_ARGV
可选参数
客户端初始化
- dask_cuda.initialize.initialize(create_cuda_context=True, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, protocol='ucx')[source]
创建CUDA上下文并根据用户参数初始化UCX-Py。
有时初始化CUDA上下文很方便,特别是在启动创建各种线程的Dask工作进程之前。
为了确保UCX正常工作,重要的是要确保它使用正确的选项进行初始化。这对于客户端尤其重要,因为客户端无法配置为使用像
LocalCUDACluster和dask cuda worker这样的参数来使用UCX。此函数将确保根据用户传递的标志和选项提供UCX配置。此函数也可用于工作线程预加载脚本中,用于配置主线 Dask.distributed 的 UCX。 https://docs.dask.org/en/latest/setup/custom-startup.html
您可以使用以下YAML将其添加到您的全局配置中:
distributed: worker: preload: - dask_cuda.initialize
有关Dask配置的更多信息,请参见https://docs.dask.org/en/latest/configuration.html。
- Parameters:
- create_cuda_contextbool, default True
在初始化时创建CUDA上下文。
- enable_tcp_over_ucxbool, default None
设置环境变量以启用通过UCX的TCP,即使不支持或禁用了InfiniBand和NVLink。
- enable_infinibandbool, default None
设置环境变量以启用InfiniBand上的UCX,当
True时,意味着enable_tcp_over_ucx=True。- enable_nvlinkbool, default None
设置环境变量以启用通过NVLink的UCX,当
True时,意味着enable_tcp_over_ucx=True。- enable_rdmacmbool, default None
设置环境变量以启用UCX RDMA连接管理器支持, 需要
enable_infiniband=True。
显式通信
- class dask_cuda.explicit_comms.comms.CommsContext(client: Client | None = None)[源代码]
显式通信的通信处理器
- Parameters:
- client: Client, optional
指定用于通信的客户端。如果为None,则使用默认客户端。
- run(coroutine, *args, workers=None, lock_workers=False)[来源]
在多个工作线程上运行一个协程
协程将工作者的状态字典作为第一个参数,并将
*args作为后续参数。- Parameters:
- coroutine: coroutine
在每个工作节点上运行的函数
- *args:
coroutine的参数- workers: list, optional
工人列表。默认是所有工人
- lock_workers: bool, optional
使用distributed.MultiLock来获取对工作者的独占访问权限。使用此标志以支持并行运行。
- Returns:
- ret: list
每个工作者的输出列表
- stage_keys(name: str, keys: Iterable[Hashable]) Dict[int, set][source]
在给定名称下的工作节点上暂存密钥
在显式通信任务中,使用 pop_staging_area(…, name) 来访问暂存的键和相关的数据。
- Parameters:
- name: str
暂存区域的名称
- keys: iterable
阶段的关键
- Returns:
- dict
将每个工作节点等级映射到其暂存键集的字典
注释
在显式通信的上下文中,暂存是指复制Dask键的责任的行为。当暂存一个键时,拥有该键的工作者(由Dask调度器分配)将键的引用和关联数据保存到其本地暂存区域。从这一点开始,如果调度器取消了该键,工作者(以及在该工作者上运行的任务)现在对该键和关联数据拥有独占访问权。这样,暂存使得长时间运行的显式通信任务能够尽快释放输入数据。
- submit(worker, coroutine, *args, wait=False)[source]
在单个工作线程上运行一个协程
协程将工作者的状态字典作为第一个参数,并将
*args作为后续参数。- Parameters:
- worker: str
运行
coroutine的工作者- coroutine: coroutine
在工作节点上运行的函数
- *args:
coroutine的参数- wait: boolean, optional
如果为True,则在返回之前等待协程完成。
- Returns:
- ret: object or Future
如果 wait=True,coroutine 的结果 如果 wait=False,可以在稍后等待的 Future。
- dask_cuda.explicit_comms.dataframe.shuffle.shuffle(df: DataFrame, column_names: List[str], npartitions: int | None = None, ignore_index: bool = False, batchsize: int | None = None) DataFrame[源代码]
对DataFrame的划分进行排序,以便列中的所有值对齐
这使用显式通信执行基于任务的洗牌。它需要完整的数据集读取、序列化和洗牌。这是昂贵的。如果可能,您应该避免洗牌。
这不会保留有意义的索引/分区方案。如果在并行情况下执行,这是不确定的。
需要一个激活的客户端。
- Parameters:
- df: dask.dataframe.DataFrame
要打乱的数据框
- column_names: list of strings
我们想要拆分的列名列表。
- npartitions: int or None
期望的输出分区数量。如果为None,则输出分区的数量等于df.npartitions
- ignore_index: bool
在洗牌时忽略索引。如果为True,性能可能会提高,但索引值将不会被保留。
- batchsize: int
洗牌由多轮组成,每轮中每个工作节点分区,然后进行全对全通信,交换其数据帧分区的一部分。批量大小是每个工作节点在每轮中处理的分区数量。 如果为-1,每个工作节点将在单轮中处理其所有分区,并且所有减少内存使用的技术都将被禁用,这在内存压力不是问题时可能会更快。 如果为None,则使用DASK_EXPLICIT_COMMS_BATCHSIZE的值,如果未设置,则默认为1,因此默认情况下,我们优先考虑鲁棒性而非性能。
- Returns:
- df: dask.dataframe.DataFrame
打乱的数据框