Shortcuts

torch.futures

此包提供了一个Future类型,该类型封装了异步执行和一组实用函数,以简化对Future对象的操作。目前,Future类型主要由分布式RPC框架使用。

class torch.futures.Future(*, devices=None)

封装了一个 torch._C.Future,它封装了一个可调用对象的异步执行,例如 rpc_async()。它还公开了一组用于添加回调函数和设置结果的API。

警告

GPU 支持是一个测试版功能,可能会发生变化。

add_done_callback(callback)[源代码]

将给定的回调函数附加到这个 Future,当 Future 完成时将运行该回调函数。可以向同一个 Future 添加多个回调,但不能保证它们的执行顺序。回调函数必须接受一个参数,即对该 Future 的引用。回调函数可以使用 value() 方法获取值。请注意,如果这个 Future 已经完成,给定的回调将在线运行。

我们建议您使用then()方法,因为它提供了一种在回调完成后进行同步的方式。add_done_callback 如果您的回调不返回任何内容,则可以更便宜。但两者 then()add_done_callback 在底层使用相同的回调注册API。

对于GPU张量,此方法的行为与 then()相同。

Parameters

回调 (Future) – 一个 Callable,它接受一个参数, 该参数是对这个 Future 的引用。

注意

请注意,如果回调函数抛出异常,无论是通过原始future以异常完成并调用fut.wait(),还是通过回调中的其他代码,都必须仔细处理错误。例如,如果此回调随后完成其他future,这些future不会被标记为以错误完成,用户需要独立处理这些future的完成/等待。

Example::
>>> def callback(fut):
...     print("这将在future完成后运行。")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
这将在future完成后运行。
5
done()[源码]

如果这个Future已完成,则返回True。一个Future在有结果或异常时被视为已完成。

如果值包含位于GPU上的张量,Future.done() 将返回True,即使填充这些张量的异步内核尚未在设备上完成运行, 因为在这种情况下结果已经可用,只要执行适当的同步(参见wait())。

Return type

bool

set_exception(result)[源代码]

为此 Future 设置一个异常,这将标记此 Future 为 以错误完成并触发所有附加的回调。请注意, 当在此 Future 上调用 wait()/value() 时,此处设置的异常 将在线引发。

Parameters

结果 (BaseException) – 此 Future 的异常。

Example::
>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[源码]

设置此Future的结果,这将标记此Future为已完成并触发所有附加的回调。请注意,一个Future不能被标记为已完成两次。

如果结果包含位于GPU上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,也可以调用此方法,前提是调用此方法时,这些内核被排队的流被设置为当前流。简而言之,只要不在这之间更改流,就可以在启动这些内核后立即调用此方法,而无需任何额外的同步。此方法将在所有相关的当前流上记录事件,并将使用它们来确保为所有使用此Future的消费者进行适当的调度。

Parameters

结果 (对象) – 这个 Future 的结果对象。

Example::
>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
then(callback)[源代码]

将给定的回调函数附加到这个 Future,当 Future 完成时将运行该回调函数。可以向同一个 Future 添加多个回调,但不能保证它们的执行顺序(为了强制执行特定顺序,请考虑链式调用:fut.then(cb1).then(cb2))。回调函数必须接受一个参数,即对该 Future 的引用。回调函数可以使用 value() 方法来获取值。请注意,如果这个 Future 已经完成,给定的回调将立即在行内运行。

如果 Future 的值包含驻留在 GPU 上的张量,回调可能会在填充这些张量的异步内核尚未在设备上完成执行时被调用。然而,回调将以一些专用流(从全局池中获取)设置为当前流,这些流将与这些内核同步。因此,回调在这些张量上执行的任何操作都将在内核完成后在设备上调度。换句话说,只要回调不切换流,它就可以安全地操作结果,而无需任何额外的同步。这与 wait() 的非阻塞行为类似。

同样地,如果回调函数返回一个包含位于GPU上的张量的值,即使生成这些张量的内核仍在设备上运行,它也可以这样做,只要回调函数在执行期间没有更改流。如果想要更改流,必须小心地将其与原始流重新同步,即回调函数被调用时的当前流。

Parameters

回调 (Callable) – 一个 Callable,它将此 Future 作为唯一参数。

Returns

一个新的 Future 对象,它持有 callback 的返回值,并在给定的 callback 完成时标记为已完成。

Return type

未来[S]

注意

请注意,如果回调函数抛出异常,无论是通过原始future以异常完成并调用fut.wait(),还是通过回调中的其他代码,then返回的future将被适当地标记为遇到的错误。然而,如果此回调随后完成其他future,这些future不会被标记为已完成错误,用户需要独立处理这些future的完成/等待。

Example::
>>> def callback(fut):
...     print(f"RPC返回值是{fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # 插入的回调函数将在接收到来自“worker1”的响应时打印返回值
>>> # 接收到来自“worker1”的响应时
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"链式回调完成。{x.wait()}")
... )
>>> fut.set_result(5)
RPC返回值是5.
链式回调完成。None
value()[源码]

获取已完成的future的值。

此方法应在调用 wait() 完成后调用,或在传递给 then() 的回调函数内部调用。在其他情况下,此 Future 可能尚未持有值,调用 value() 可能会失败。

如果值包含位于GPU上的张量,则此方法将不会执行任何额外的同步。这应该事先通过调用wait()来单独完成(除非在回调中,对于回调,它已经由then()处理)。

Returns

这个 Future 持有的值。如果创建该值的函数(回调或 RPC)抛出了错误,这个 value() 方法也会抛出错误。

Return type

T

wait()[源代码]

阻塞直到这个Future的值准备好。

如果值包含位于GPU上的张量,则会与内核(在设备上执行)进行额外的同步,这些内核可能异步填充这些张量。这种同步是非阻塞的,这意味着wait()将在当前流中插入必要的指令,以确保在这些流上排队的进一步操作在异步内核之后被正确调度,但一旦完成,wait()将返回,即使这些内核仍在运行。只要不更改流,访问和使用这些值时不需要进一步的同步。

Returns

Future持有的值。如果创建该值的函数(回调或RPC)抛出了错误,此wait方法也将抛出错误。

Return type

T

torch.futures.collect_all(futures)[源代码]

将提供的 Future 对象收集到一个组合的 Future 中,该组合的 Future 在所有子 Future 完成时完成。

Parameters

futures (列表) – 一个包含 Future 对象的列表。

Returns

返回一个 Future 对象到传入的 Futures 列表中。

Return type

未来[列表[未来]]

Example::
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[源代码]

等待所有提供的 futures 完成,并返回已完成值的列表。如果任何一个 future 遇到错误,方法将提前退出并报告错误,不再等待其他 futures 完成。

Parameters

futures (列表) – 一个包含 Future 对象的列表。

Returns

已完成的结果列表 Future。如果对任何 Future 调用 wait 时抛出错误,此方法将抛出错误。

Return type

列表

优云智算