设备溢出
默认情况下,当GPU的内存利用率达到80%时,Dask-CUDA会启用从GPU到主机内存的溢出。
这可以根据工作负载的需求进行更改,或者通过显式设置device_memory_limit来完全禁用。
此参数接受整数或字符串形式的内存大小,或表示GPU总内存百分比的浮点数:
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(device_memory_limit=50000) # spilling after 50000 bytes
cluster = LocalCUDACluster(device_memory_limit="5GB") # spilling after 5 GB
cluster = LocalCUDACluster(device_memory_limit=0.3) # spilling after 30% memory utilization
可以通过将device_memory_limit设置为0来禁用内存溢出:
cluster = LocalCUDACluster(device_memory_limit=0) # spilling disabled
同样适用于 dask cuda worker,并且可以通过设置 --device-memory-limit 来控制溢出:
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ dask cuda worker --device-memory-limit 50000
$ dask cuda worker --device-memory-limit 5GB
$ dask cuda worker --device-memory-limit 0.3
$ dask cuda worker --device-memory-limit 0
JIT-不溢出
Dask 和 Dask-CUDA 中的常规溢出存在一些显著问题。它不是跟踪单个对象,而是跟踪任务输出。 这意味着返回 CUDA 对象集合的任务要么会溢出所有 CUDA 对象,要么一个也不会溢出。 其他问题包括对象重复、错误的溢出顺序和未跟踪共享设备缓冲区 (参见讨论)。
为了解决所有这些问题,Dask-CUDA 引入了 JIT-Unspilling,这可以显著提高性能和内存使用率。 对于需要大量溢出的工作负载 (例如在可用内存少于数据的基础设施上进行大型连接),我们经常 看到超过 50% 的改进(即,原本需要 300 秒的任务可能只需要 110 秒)。对于不需要溢出的工作负载, 我们预计不会看到太大差异。
为了启用JIT-Unspilling,请使用jit_unspill参数:
>>> import dask
>>> from distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster(n_workers=10, device_memory_limit="1GB", jit_unspill=True)
>>> client = Client(cluster)
>>> with dask.config.set(jit_unspill=True):
... cluster = LocalCUDACluster(n_workers=10, device_memory_limit="1GB")
... client = Client(cluster)
或者设置worker参数 --enable-jit-unspill
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ dask cuda worker --enable-jit-unspill
或者环境变量 DASK_JIT_UNSPILL=True
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ DASK_JIT_UNSPILL=True dask cuda worker
限制
JIT-Unspill 将 CUDA 对象(例如 cudf.Dataframe)包装在 ProxyObject 中。
由 ProxyObject 实例代理的对象在被访问时将进行 JIT 反序列化。该实例的行为与代理对象相同,并且可以像代理对象一样被访问/使用。
ProxyObject 有一些限制,并不能完美地模仿被代理的对象。
最明显的是,使用 instance() 进行类型检查可以按预期工作,但直接
类型检查则不行:
>>> import numpy as np
>>> from dask_cuda.proxy_object import asproxy
>>> x = np.arange(3)
>>> isinstance(asproxy(x), type(x))
True
>>> type(asproxy(x)) is type(x)
False
因此,如果遇到问题,请记住始终可以使用unproxy()直接访问代理对象,或者设置DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True以启用兼容模式,该模式会自动在所有函数输入上调用unproxy()。
cuDF 溢出
当使用Dask cuDF(即Dask DataFrame)执行ETL工作流时,通常最好利用cuDF中的本地溢出支持。
原生cuDF溢出与上述其他方法相比具有重要优势。当使用JIT-非溢出或默认溢出时,工作器只能溢出任务的输入或输出。这意味着在任务执行期间创建的任何数据在任务完成之前是完全无法访问的。然而,当使用cuDF溢出时,可以在任务执行期间根据需要溢出/非溢出单个设备缓冲区。
在部署LocalCUDACluster时,可以通过enable_cudf_spill参数启用cuDF溢出功能:
>>> from distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True)
>>> client = Client(cluster)
同样适用于 dask cuda worker:
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ dask cuda worker --enable-cudf-spill
统计
当启用 cuDF 溢出功能时,还可以让 cuDF 收集基本的溢出统计信息。收集这些信息是了解使用 cuDF 的内存密集型工作流程性能的有用方法。
当部署LocalCUDACluster时,可以通过cudf_spill_stats参数启用cuDF溢出功能:
>>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True, cudf_spill_stats=1)
同样适用于 dask cuda worker:
$ dask cuda worker --enable-cudf-spill --cudf-spill-stats 1
为了让每个dask-cuda工作流中的工作节点打印溢出统计信息,可以这样做:
def spill_info():
from cudf.core.buffer.spill_manager import get_global_manager
print(get_global_manager().statistics)
client.submit(spill_info)
请参阅cuDF 溢出文档以获取有关可用溢出统计选项的更多信息。
限制
尽管cuDF溢出是大多数使用Dask cuDF的ETL工作流程的最佳选择,但如果该工作流程在cudf.DataFrame和其他数据格式(例如cupy.ndarray)之间进行转换,其效果将大大降低。一旦底层设备缓冲区“暴露”给外部内存引用,它们就会变得“不可溢出”由cuDF。在这种情况下(例如,Dask-CUDA + XGBoost),JIT-Unspill通常是更好的选择。