Shortcuts

torch.futures 的源代码

from __future__ import annotations

from typing import cast, Callable, Generic, List, Optional, Type, TypeVar, Union

import torch

__all__ = ['Future', 'collect_all', 'wait_all']

T = TypeVar("T")
S = TypeVar("S")


class _PyFutureMeta(type(torch._C.Future), type(Generic)):  # type: ignore[misc, no-redef]
    pass


class Future(torch._C.Future, Generic[T], metaclass=_PyFutureMeta):
    r"""
    封装一个 ``torch._C.Future``,它封装了一个可调用对象的异步执行,例如 :meth:`~torch.distributed.rpc.rpc_async`。它还公开了一组用于添加回调函数和设置结果的API。

    .. 警告:: GPU支持是一个测试版功能,可能会发生变化。
    """

    def __init__(self, *, devices: Optional[List[Union[int, str, torch.device]]] = None):
        r"""
        创建一个未设置的空 ``Future``。如果未来打算持有包含CUDA张量的值,则必须在构造时指定它们的CUDA设备(这是仅在 ``torch.cuda.is_available()`` 返回 ``True`` 时支持的)。这是为了确保适当的CUDA流同步。通过 ``then`` 方法返回的子未来将继承这些设备。

        参数:
            devices(``List[Union[int, str, torch.device]]``, 可选): 允许驻留在此未来值中的张量的设备集,以及允许在其上操作回调的设备集。
        """
        if devices is None:
            devices = []
        super().__init__([torch.device(d) for d in devices])

[docs] def done(self) -> bool: r""" 如果此 ``Future`` 已完成,则返回 ``True``。如果它有结果或异常,则 ``Future`` 已完成。 如果值包含驻留在GPU上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,``Future.done()`` 也会返回 ``True``,因为在这种情况下结果已经可用,只要进行适当的同步(参见 :meth:`wait`)。 """ return super().done()
[docs] def wait(self) -> T: r""" 阻塞直到此 ``Future`` 的值准备就绪。 如果值包含驻留在GPU上的张量,则执行与填充这些张量的内核(在设备上执行)的额外同步。这种同步是非阻塞的,这意味着 ``wait()`` 将在当前流中插入必要的指令,以确保在异步内核之后正确调度在这些流上排队的进一步操作,但一旦完成,``wait()`` 将返回,即使这些内核仍在运行。当访问和使用值时,不需要进一步的同步,只要不更改流。 返回: 此 ``Future`` 持有的值。如果创建值的函数(回调或RPC)抛出错误,此 ``wait`` 方法也将抛出错误。 """ return super().wait()
[docs] def value(self) -> T: r""" 获取已完成的未来的值。 此方法只能在调用 :meth:`wait` 完成后或在传递给 :meth:`then` 的回调函数内调用。在其他情况下,此 ``Future`` 可能尚未持有值,调用 ``value()`` 可能会失败。 如果值包含驻留在GPU上的张量,则此方法不会执行任何额外的同步。这应该事先通过调用 :meth:`wait` 单独完成(除了在回调中,:meth:`then` 已经处理了这种情况)。 返回: 此 ``Future`` 持有的值。如果创建值的函数(回调或RPC)抛出错误,此 ``value()`` 方法也将抛出错误。 """ return super().value()
[docs] def then(self, callback: Callable[[Future[T]], S]) -> Future[S]: r""" 将给定的回调函数附加到此 ``Future``,当 ``Future`` 完成时将运行该回调。可以向同一个 ``Future`` 添加多个回调,但不能保证它们的执行顺序(要强制执行特定顺序,请考虑链接:``fut.then(cb1).then(cb2)``)。回调必须接受一个参数,即对此 ``Future`` 的引用。回调函数可以使用 :meth:`value` 方法获取值。请注意,如果此 ``Future`` 已经完成,则给定的回调将立即内联运行。 如果 ``Future`` 的值包含驻留在GPU上的张量,回调可能会在内核填充这些张量尚未在设备上完成执行时被调用。但是,回调将以一些专用流设置为当前流(从全局池中获取),这些流将与这些内核同步。因此,回调在这些张量上执行的任何操作都将在内核完成后在设备上进行调度。换句话说,只要回调不切换流,它就可以安全地操作结果,而无需任何额外的同步。这与 :meth:`wait` 的非阻塞行为类似。 同样,如果回调返回一个包含驻留在GPU上的张量的值,即使生成这些张量的内核仍在设备上运行,它也可以这样做,只要回调在执行期间没有更改流。如果想要更改流,必须小心地与原始流重新同步,即回调被调用时当前的流。 参数: callback(``Callable``): 一个 ``Callable``,接受此 ``Future`` 作为唯一参数。 返回: 一个新的 ``Future`` 对象,持有 ``callback`` 的返回值,并在给定的 ``callback`` 完成后标记为完成。 .. 注意:: 请注意,如果回调函数抛出异常,无论是通过原始未来完成时抛出异常并调用 ``fut.wait()``,还是通过回调中的其他代码,``then`` 返回的未来将相应地标记为遇到错误。但是,如果此回调后来完成了其他未来,这些未来不会被标记为完成时出错,用户负责独立处理这些未来的完成/等待。 示例:: >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_FUTURES) >>> def callback(fut): ... print(f"RPC返回值是 {fut.wait()}。") >>> fut = torch.futures.Future() >>> # 插入的回调将在从 "worker1" 接收到响应时打印返回值 >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"链式回调完成。{x.wait()}") ... ) >>> fut.set_result(5) RPC返回值是 5。 Chained cb done. None """ return cast(Future[S], super().then(callback))
[docs] def add_done_callback(self, callback: Callable[[Future[T</
优云智算