• Docs >
  • Distributed RPC Framework
Shortcuts

分布式 RPC 框架

分布式 RPC 框架通过一组原语提供了多机模型训练的机制,以允许远程通信,并提供了一个更高级别的 API,用于自动区分分布在多台机器上的模型。

警告

RPC 包中的 API 是稳定的。有多个正在进行的工作项目,以改进性能和错误处理,这些将在未来的版本中发布。

警告

CUDA 支持在 PyTorch 1.9 中引入,并且仍然是一个 测试版 功能。 并非 RPC 包的所有功能都与 CUDA 支持兼容,因此不建议使用这些功能。这些不支持的功能包括:RRefs、 JIT 兼容性、分布式自动求导和分布式优化器,以及性能分析。这些不足将在未来的版本中得到解决。

注意

请参阅PyTorch分布式概述 以简要了解与分布式训练相关的所有功能。

基础

分布式 RPC 框架使得远程运行函数变得容易,支持引用远程对象而不需要复制真实数据,并提供 autograd 和优化器 API 以透明地在 RPC 边界上运行反向传播和更新参数。这些功能可以分为四组 API。

  1. 远程过程调用 (RPC) 支持在指定的目标工作节点上运行带有给定参数的函数,并获取返回值或创建对返回值的引用。主要有三种 RPC API: rpc_sync()(同步), rpc_async()(异步),以及 remote()(异步并返回对远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续执行,请使用同步 API。否则,使用异步 API 获取一个 future,并在调用方需要返回值时等待该 future。remote() API 在需要在远程创建某些内容但不需要将其获取到调用方时非常有用。想象一下,一个驱动进程正在设置一个参数服务器和一个训练器。驱动程序可以在参数服务器上创建一个嵌入表,然后将该嵌入表的引用与训练器共享,但驱动程序本身永远不会在本地使用该嵌入表。在这种情况下,rpc_sync()rpc_async() 不再适用,因为它们总是意味着返回值将立即或在未来返回给调用方。

  2. 远程引用(RRef)作为指向本地或远程对象的分布式共享指针。它可以与其他工作节点共享,并且引用计数将透明地处理。每个RRef只有一个所有者,对象仅存在于该所有者上。持有RRef的非所有者工作节点可以通过显式请求从所有者获取对象的副本。当工作节点需要访问某些数据对象,但本身既不是创建者(调用remote()的调用者)也不是对象的所有者时,这非常有用。分布式优化器,正如我们将在下面讨论的那样,是这种用例的一个例子。

  3. 分布式自动求导将所有参与前向传播的工作节点上的本地自动求导引擎连接在一起,并在反向传播期间自动调用它们来计算梯度。这在进行分布式模型并行训练、参数服务器训练等需要跨多台机器的前向传播时特别有用。通过此功能,用户代码不再需要担心如何跨RPC边界发送梯度以及应以何种顺序启动本地自动求导引擎,这在存在嵌套和相互依赖的RPC调用时可能会变得非常复杂。

  4. 分布式优化器的构造函数接受一个 Optimizer()(例如,SGD()Adagrad()等)和一个参数RRefs列表,在每个不同的RRef所有者上创建一个 Optimizer()实例,并在运行step()时相应地更新参数。当你有 分布式前向和后向传播时,参数和梯度将分散在多个工作节点上,因此需要在每个涉及的工作节点上都有一个优化器。分布式优化器将所有这些本地优化器包装成一个,并提供一个简洁的构造函数和step() API。

RPC

在使用RPC和分布式自动求导原语之前,必须进行初始化。要初始化RPC框架,我们需要使用 init_rpc(),它将初始化RPC框架、RRef框架和分布式自动求导。

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[源代码]

初始化RPC原语,例如本地RPC代理和分布式自动求导,这会立即使当前进程准备好发送和接收RPC。

Parameters
  • 名称 (字符串) – 此节点的全局唯一名称。(例如, Trainer3, ParameterServer2, Master, Worker1) 名称只能包含数字、字母、下划线、冒号、 和/或破折号,并且必须少于128个字符。

  • backend (BackendType, 可选) – RPC 后端实现的类型。支持的值是 BackendType.TENSORPIPE(默认值)。 更多信息请参见 后端

  • rank (int) – 该节点的全局唯一ID/rank。

  • world_size (int) – 组中的工作进程数量。

  • rpc_backend_options (RpcBackendOptions, 可选) – 传递给 RpcAgent 构造函数的选项。它必须是 RpcBackendOptions 的特定代理子类,并包含代理特定的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用 init_method = "env://" 初始化的底层进程组执行 rendezvous,这意味着需要正确设置环境变量 MASTER_ADDRMASTER_PORT。有关更多信息,请参阅 后端,并查找可用的选项。

以下API允许用户远程执行函数以及创建对远程数据对象的引用(RRefs)。在这些API中,当传递一个Tensor作为参数或返回值时,目标工作节点将尝试创建一个具有相同元信息(即形状、步幅等)的Tensor。我们故意禁止传输CUDA张量,因为如果源和目标工作节点上的设备列表不匹配,可能会导致崩溃。在这种情况下,应用程序可以始终在调用方显式地将输入张量移动到CPU,并在必要时在被调用方将其移动到所需的设备上。

警告

RPC 中的 TorchScript 支持是一个原型功能,可能会发生变化。自 v1.5.0 起,torch.distributed.rpc 支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用方的并行性,因为执行 TorchScript 函数不需要 GIL。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[源代码]

向工作线程 to 发起一个阻塞的 RPC 调用以运行函数 func。RPC 消息的发送和接收与 Python 代码的执行是并行的。此方法是线程安全的。

Parameters
  • (strWorkerInfoint) – 目标工作节点的名称/秩/WorkerInfo

  • func (可调用对象) – 一个可调用函数,例如 Python 可调用对象、内置操作符(例如 add())和注解的 TorchScript 函数。

  • args (元组) – 用于 func 调用的参数元组。

  • kwargs (字典) – 是一个用于func调用的关键字参数字典。

  • timeout (float, 可选) – 用于此RPC的超时时间,单位为秒。如果RPC在此时间内未完成,将引发一个指示超时的异常。值为0表示无限超时,即永远不会引发超时错误。如果未提供,将使用初始化时设置的默认值或通过_set_rpc_timeout设置的值。

Returns

返回运行 funcargskwargs 的结果。

Example::

确保在所有工作节点上正确设置了MASTER_ADDRMASTER_PORT。请参考init_process_group() API以获取更多详细信息。例如,

导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # 在 worker 0 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

下面是一个使用RPC运行TorchScript函数的示例。

>>> # 在两个工作节点上:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # 在 worker 0 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[源代码]

进行一个非阻塞的RPC调用以在工作者to上运行函数func。RPC消息的发送和接收与Python代码的执行是并行的。此方法是线程安全的。此方法将立即返回一个Future,可以等待该Future。

Parameters
  • (strWorkerInfoint) – 目标工作节点的名称/等级/WorkerInfo

  • func (可调用对象) – 一个可调用函数,例如 Python 可调用对象、内置操作符(例如 add())和注解的 TorchScript 函数。

  • args (元组) – 用于 func 调用的参数元组。

  • kwargs (字典) – 是一个用于func调用的关键字参数字典。

  • timeout (float, 可选) – 用于此RPC的超时时间,单位为秒。如果RPC在此时间内未完成,将引发一个指示超时的异常。值为0表示无限超时,即永远不会引发超时错误。如果未提供,则使用初始化期间设置的默认值或通过_set_rpc_timeout设置的值。

Returns

返回一个可以等待的 Future 对象。当完成时,可以从 Future 对象中检索到 funcargskwargs 上的返回值。

警告

不支持使用 GPU 张量作为 func 的参数或返回值,因为我们不支持通过网络发送 GPU 张量。您需要在使用它们作为 func 的参数或返回值之前,显式地将 GPU 张量复制到 CPU。

警告

The rpc_async API 不会在发送参数张量之前复制它们的存储,这可能会由不同线程根据 RPC 后端类型完成。调用者应确保这些张量的内容在返回的 Future 完成之前保持完整。

Example::

确保在所有工作节点上正确设置了MASTER_ADDRMASTER_PORT。请参考init_process_group() API以获取更多详细信息。例如,

导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # 在 worker 0 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

下面是一个使用RPC运行TorchScript函数的示例。

>>> # 在两个工作节点上:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # 在 worker 0 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[源代码]

在工作者 to 上远程调用 func 并立即返回结果值的 RRef。 工作者 to 将是返回的 RRef 的所有者,而调用 remote 的工作者是用户。所有者管理其 RRef 的全局引用计数,并且只有当全局范围内没有对该 RRef 的存活引用时,所有者才会被销毁。

Parameters
  • (strWorkerInfoint) – 目标工作节点的名称/秩/WorkerInfo

  • func (可调用对象) – 一个可调用函数,例如 Python 可调用对象、内置操作符(例如 add())和注解的 TorchScript 函数。

  • args (元组) – 用于 func 调用的参数元组。

  • kwargs (字典) – 是一个用于func调用的关键字参数字典。

  • timeout (float, 可选) – 此远程调用的超时时间(以秒为单位)。如果在该超时时间内,此工作节点上未能成功处理在to工作节点上创建RRef的操作,则下次尝试使用该RRef(例如to_here())时,将引发超时错误,指示此失败。值为0表示无限超时,即永远不会引发超时错误。如果未提供,则使用初始化期间设置的默认值或通过_set_rpc_timeout设置的值。

Returns

一个指向结果值的用户 RRef 实例。使用阻塞 API torch.distributed.rpc.RRef.to_here() 在本地检索结果值。

警告

The remote API 不会在发送参数张量的存储之前复制它们,这可能会由不同的线程根据 RPC 后端类型完成。调用者应确保这些张量的内容在返回的 RRef 被所有者确认之前保持完整,这可以通过使用 torch.distributed.rpc.RRef.confirmed_by_owner() API 进行检查。

警告

对于远程 API 的超时等错误,我们以最大努力为基础进行处理。这意味着当由远程发起的远程调用失败时,例如出现超时错误,我们会采取最大努力的方式进行错误处理。这意味着错误会被处理并异步设置在结果 RRef 上。如果在处理之前应用程序尚未使用该 RRef(例如to_here或 fork 调用),那么未来对该RRef的使用将会适当地引发错误。然而,用户应用程序有可能在错误被处理之前使用该RRef。在这种情况下,错误可能不会被引发,因为它们尚未被处理。

示例:

确保在所有工作节点上正确设置 ``MASTER_ADDR`` 和 ``MASTER_PORT``。
请参阅 :meth:`~torch.distributed.init_process_group` API 获取更多详细信息。例如,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # 在工作节点 0 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # 在工作节点 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是使用 RPC 运行 TorchScript 函数的示例。

>>> # 在所有工作节点上:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # 在工作节点 0 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # 在工作节点 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)[源代码]

获取给定工作名称的 WorkerInfo。 使用此 WorkerInfo 以避免在每次调用时传递昂贵的字符串。

Parameters

worker_name (str) – 工作者的字符串名称。如果 None,返回当前工作者的ID。(默认 None

Returns

WorkerInfo 实例,对应给定的 worker_name 或当前工作者的 WorkerInfo,如果 worker_nameNone

torch.distributed.rpc.shutdown(graceful=True, timeout=0)[源代码]

执行RPC代理的关闭操作,然后销毁RPC代理。这将阻止本地代理接受未完成的请求,并通过终止所有RPC线程来关闭RPC框架。如果graceful=True,这将阻塞直到所有本地和远程RPC进程到达此方法并等待所有未完成的工作完成。否则,如果graceful=False,这是一个本地关闭操作,并且不会等待其他RPC进程到达此方法。

警告

对于由Future对象返回的 rpc_async()future.wait()不应在shutdown()之后调用。

Parameters

graceful (bool) – 是否进行优雅关闭。如果为True,这将1) 等待直到没有待处理的系统消息用于UserRRefs并删除它们;2) 阻塞直到所有本地和远程RPC进程都已调用此方法,并等待所有未完成的工作完成。

Example::

确保在所有工作节点上正确设置了MASTER_ADDRMASTER_PORT。请参考init_process_group() API以获取更多详细信息。例如,

导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # 在 worker 0 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # 做一些工作
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # 准备关闭
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # 等待 worker 0 完成工作,然后关闭。
>>> rpc.shutdown()
class torch.distributed.rpc.WorkerInfo

一个封装系统中工人信息的结构。 包含工人的姓名和ID。这个类不是直接构造的,而是可以通过get_worker_info()获取实例,并且可以将结果传递给函数,如rpc_sync()rpc_async()remote(),以避免在每次调用时复制字符串。

property id

全局唯一ID,用于标识工作者。

property name

工人的名字。

RPC 包还提供了装饰器,允许应用程序指定在调用端如何处理给定的函数。

torch.distributed.rpc.functions.async_execution(fn)[源代码]

一个用于函数的装饰器,指示该函数的返回值保证是一个Future对象,并且此函数可以在RPC被调用方异步运行。更具体地说,被调用方提取由包装函数返回的Future,并将后续处理步骤作为回调安装到该Future。安装的回调将在Future完成时读取其值,并将该值作为RPC响应发送回去。这也意味着返回的Future仅存在于被调用方一侧,并且永远不会通过RPC发送。当包装函数(fn)的执行需要暂停和恢复时,例如由于包含rpc_async()或等待其他信号,此装饰器非常有用。

注意

要启用异步执行,应用程序必须将此装饰器返回的函数对象传递给RPC API。如果RPC检测到此装饰器安装的属性,它就知道该函数返回一个Future对象,并将相应地处理它。然而,这并不意味着在定义函数时,此装饰器必须是外层的。例如,当与@staticmethod@classmethod结合使用时,@rpc.functions.async_execution需要是内部装饰器,以允许目标函数被识别为静态或类函数。该目标函数仍然可以异步执行,因为当访问时,静态或类方法保留了@rpc.functions.async_execution安装的属性。

Example::

返回的 Future 对象可以来自 rpc_async()then()Future 构造函数。下面的示例展示了直接使用由 then() 返回的 Future

>>> from torch.distributed import rpc
>>>
>>> # 省略设置和关闭RPC
>>>
>>> # 在所有工作节点上
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # 此函数在“worker1”上运行,并通过`then(cb)` API安装回调后立即返回。
>>>     # 同时,到“worker2”的`rpc_async`可以并发运行。
>>>     # 当该`rpc_async`的返回值到达“worker1”时,“worker1”将根据lambda函数运行
>>>     # 并设置之前返回的`Future`的值,然后触发RPC将结果发送回“worker0”。
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # 在工作节点0上
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # 输出 tensor([3., 3.])

当与 TorchScript 装饰器结合使用时,此装饰器必须是外层的。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # 省略设置和关闭RPC
>>>
>>> # 在所有工作节点上
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # 在工作节点0上
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # 输出 tensor([2., 2.])

当与静态方法或类方法结合使用时,此装饰器必须是内部的。

>>> from torch.distributed import rpc
>>>
>>> # 省略设置和关闭RPC
>>>
>>> # 在所有工作节点上
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # 在工作节点0上
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # 输出 tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # 输出 tensor([4., 4.])

这个装饰器也适用于 RRef 助手,即, torch.distributed.rpc.RRef.rpc_sync(), torch.distributed.rpc.RRef.rpc_async(), 和 torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # 重用上面的AsyncExecutionClass类
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # 输出 tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # 输出 tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # 输出 tensor([4., 4.])

后端

RPC 模块可以利用不同的后端来执行节点之间的通信。要使用的后端可以在 init_rpc() 函数中指定,通过传递 BackendType 枚举的某个值。无论使用哪种后端,RPC API 的其余部分都不会改变。每个后端还定义了自己的 RpcBackendOptions 类的子类,其实例也可以传递给 init_rpc() 以配置后端的行为。

class torch.distributed.rpc.BackendType(value)

可用的后端枚举类。

PyTorch 自带一个内置的 BackendType.TENSORPIPE 后端。 其他后端可以通过使用 register_backend() 函数来注册。

class torch.distributed.rpc.RpcBackendOptions

一个封装了传递给RPC后端的选项的抽象结构。可以将此类的实例传递给 init_rpc() 以使用特定配置初始化RPC,例如RPC超时和 init_method 的使用。

property init_method

指定如何初始化进程组的URL。 默认是 env://

property rpc_timeout

一个浮点数,表示用于所有RPC的超时时间。如果RPC在此时间范围内未完成,它将完成并抛出一个异常,指示其已超时。

TensorPipe 后端

TensorPipe 代理(默认)利用 TensorPipe 库,该库提供了一种原生的点对点通信原语,特别适用于机器学习,从根本上解决了 Gloo 的一些局限性。与 Gloo 相比,它具有异步的优势,允许大量传输同时进行,每个传输以自己的速度进行,而不会相互阻塞。它只会在需要时按需打开节点对之间的管道,当一个节点失败时,只有与其相关的管道会被关闭,而所有其他管道将继续正常工作。此外,它能够支持多种不同的传输方式(当然包括 TCP,还有共享内存、NVLink、InfiniBand 等),并能自动检测它们的可用性并协商每条管道使用的最佳传输方式。

TensorPipe 后端在 PyTorch v1.6 中引入,并且正在积极开发中。目前,它仅支持 CPU 张量,GPU 支持即将推出。它带有基于 TCP 的传输,就像 Gloo 一样。它还能够自动将大张量分块并通过多个套接字和线程进行多路复用,以实现非常高的带宽。代理将能够自行选择最佳传输,无需干预。

示例:

>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20秒超时
>>>     )
>>> )
>>>
>>> # 省略worker2上的init_rpc调用
class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[源代码]

用于 TensorPipeAgent 的后端选项,继承自 RpcBackendOptions

Parameters
  • num_worker_threads (int, 可选) – 线程池中线程的数量,由 TensorPipeAgent 用于执行 请求(默认值:16)。

  • rpc_timeout (float, 可选) – RPC请求的默认超时时间,单位为秒(默认值:60秒)。如果RPC在此时间段内未完成,将引发指示此情况的异常。调用者可以在rpc_sync()rpc_async()中为单个RPC覆盖此超时时间(如果需要)。

  • init_method (字符串, 可选) – 用于初始化分布式存储以进行汇合的URL。它可以接受与init_process_group()相同参数的任何值(默认值:env://)。

  • device_maps (Dict[str, Dict], 可选) – 从当前工作进程到被调用者的设备映射。键是被调用者的工作进程名称,值是一个字典(Dict 类型,包含 intstrtorch.device),用于将当前工作进程的设备映射到被调用者的工作进程的设备。(默认值:None

  • 设备(List[int, str, 或 torch.device],可选)—— RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将初始化为从其自身的 device_maps 和其对等方的 device_maps 中的相应设备的所有本地设备。在处理 CUDA RPC 请求时,代理将正确同步此 List 中所有设备的 CUDA 流。

property device_maps

设备地图位置。

property devices

本地代理使用的所有设备。

property init_method

指定如何初始化进程组的URL。 默认是 env://

property num_worker_threads

TensorPipeAgent 用于执行请求的线程池中的线程数量。

property rpc_timeout

一个浮点数,表示用于所有RPC的超时时间。如果RPC在此时间范围内未完成,它将完成并抛出一个异常,指示其已超时。

set_device_map(to, device_map)[源代码]

设置每个RPC调用者和被调用者之间的设备映射。此函数可以被多次调用以逐步添加设备布局配置。

Parameters
  • (str) – 被调用者名称。

  • device_map (字典int, str, 或 torch.device) – 从当前工作进程到被调用者的设备映射。此映射必须是可逆的。

示例

>>> # 两个工作线程
>>> def add(x, y):
>>>     print(x)  # 张量([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # 在工作线程 0 上
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>>     # 将 worker0 的 cuda:0 映射到 worker1 的 cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # 将 worker0 的 cuda:1 映射到 worker1 的 cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # 第一个参数将被移动到 worker1 的 cuda:1 上。当
>>> # 发送返回值时,它将遵循设备映射的反向,因此将被移动回 cuda:0 和
>>> # cuda:1 在 worker0 上
>>> print(rets[0])  # 张量([2., 2.], device='cuda:0')
>>> print(rets[1])  # 张量([2., 2.], device='cuda:1')
set_devices(devices)[源代码]

设置由 TensorPipe RPC 代理使用的本地设备。当处理 CUDA RPC 请求时,TensorPipe RPC 代理将正确同步此 List 中所有设备的 CUDA 流。

Parameters

设备列表int, str, 或 torch.device)– 由 TensorPipe RPC 代理使用的本地设备。

注意

RPC框架不会自动重试任何 rpc_sync()rpc_async()remote() 调用。原因是RPC框架无法确定操作是否是幂等的,以及是否可以安全地重试。因此,处理失败并在必要时重试是应用程序的责任。RPC通信基于TCP,因此可能会由于网络故障或间歇性网络连接问题而导致失败。在这种情况下,应用程序需要适当重试,并合理地进行退避,以确保网络不会被激进的重试所淹没。

RRef

警告

当前不支持在使用 CUDA 张量时使用 RRefs

一个 RRef(远程引用)是对远程工作线程上某种类型 T(例如 Tensor)的值的引用。这个句柄保持所有者上的引用远程值的活动状态,但不意味着该值将来会被传输到本地工作线程。RRef 可以在多机训练中使用,通过持有对存在于其他工作线程上的 nn.Modules 的引用,并在训练期间调用适当的功能来检索或修改它们的参数。有关更多详细信息,请参阅 远程引用协议

class torch.distributed.rpc.PyRRef(RRef)

一个封装了对远程工作线程上某种类型值的引用的类。此句柄将保持远程引用值在工作线程上的存活。当满足以下任一条件时,UserRRef 将被删除:1) 在应用程序代码和本地 RRef 上下文中都没有对该引用的引用,或者 2) 应用程序已调用优雅关闭。在已删除的 RRef 上调用方法会导致未定义的行为。RRef 实现仅提供尽力而为的错误检测,应用程序不应在 rpc.shutdown() 之后使用 UserRRefs

警告

RRefs只能由RPC模块进行序列化和反序列化。 在不使用RPC的情况下序列化和反序列化RRefs(例如,Python pickle, torch save() / load(), JIT save() / load()等)将 导致错误。

Parameters
  • (对象) – 要被此 RRef 包装的值。

  • type_hint (类型, 可选) – 应传递给 TorchScript 编译器的 Python 类型,作为 value 的类型提示。

Example::

为了简洁起见,以下示例跳过了RPC初始化和关闭代码。有关这些细节,请参阅RPC文档。

  1. 使用 rpc.remote 创建一个 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # 从RRef获取值的副本
>>> x = rref.to_here()
  1. 从本地对象创建一个RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 与其他工作者共享一个RRef

>>> # 在 worker0 和 worker1 上:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # 在 worker0 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # 以下 RPC 将 rref 与 worker1 共享,引用计数会自动更新。
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None

使用 RRef 作为反向传播的根节点运行反向传播。如果提供了 dist_autograd_ctx_id, 我们将使用提供的 ctx_id 从 RRef 的所有者开始执行分布式反向传播。在这种情况下, 应使用 get_gradients() 来检索梯度。如果 dist_autograd_ctx_idNone,则假定这是一个本地自动求导图, 并且我们只执行本地反向传播。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。 RRef 的值应为标量张量。

Parameters
  • dist_autograd_ctx_id (int, 可选) – 我们应该检索梯度的分布式自动求导上下文ID(默认值:-1)。

  • retain_graph (bool, 可选) – 如果False,用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为True是不必要的,通常可以通过更有效的方式解决。通常,您需要将其设置为True以多次运行反向传播(默认值:False)。

Example::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool

返回此 RRef 是否已被所有者确认。 OwnerRRef 始终返回 true,而 UserRRef 仅在所有者知道此 UserRRef 时返回 true。

is_owner(self: torch._C._distributed_rpc.PyRRef) bool

返回当前节点是否是此RRef的所有者。

local_value(self: torch._C._distributed_rpc.PyRRef) object

如果当前节点是所有者,则返回对本地值的引用。否则,抛出异常。

owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo

返回拥有此RRef的节点的工人信息。

owner_name(self: torch._C._distributed_rpc.PyRRef) str

返回拥有此RRef的节点的worker名称。

remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

创建一个辅助代理,以便轻松启动一个远程,使用RRef的所有者作为目标,在该RRef引用的对象上运行函数。更具体地说,rref.remote().func_name(*args, **kwargs) 等同于以下内容:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
Parameters

timeout (浮点数, 可选) – 用于 rref.remote() 的超时时间。如果在超时时间内未能成功创建此 RRef,则下次尝试使用该 RRef 时(例如 to_here),将引发超时错误。如果未提供,将使用默认的 RPC 超时时间。请参阅 rpc.remote() 以了解 RRef 的具体超时语义。

Example::
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # 返回 torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # 返回 tensor([[1., 1., 1., 1.]])
rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

创建一个辅助代理,以便轻松使用 rpc_async 启动,使用 RRef 的所有者作为目标,在 RRef 引用的对象上运行函数。更具体地说,rref.rpc_async().func_name(*args, **kwargs) 与以下内容相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
Parameters

timeout (float, 可选) – 用于 rref.rpc_async() 的超时时间。 如果在该时间段内未完成调用,将引发一个指示此情况的异常。如果未提供此参数,将使用默认的 RPC 超时时间。

Example::
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # 返回 torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # 返回 tensor([[1., 1., 1., 1.]])
rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

创建一个辅助代理,以便轻松使用 rpc_sync 启动,使用 RRef 的所有者作为目标,在 RRef 引用的对象上运行函数。更具体地说,rref.rpc_sync().func_name(*args, **kwargs) 与以下内容相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
Parameters

超时 (浮点数, 可选) – 用于 rref.rpc_sync() 的超时时间。 如果在该时间段内未完成调用,将引发相应的异常。如果未提供此参数,将使用默认的 RPC 超时时间。

Example::
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # 返回 torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # 返回 tensor([[1., 1., 1., 1.]])
to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

阻塞调用,将RRef的值从所有者节点复制到本地节点并返回。如果当前节点是所有者,则返回对本地值的引用。

Parameters

超时 (浮点数, 可选) – 用于to_here的超时时间。如果在该时间段内调用未完成,将引发一个指示此情况的异常。如果未提供此参数,将使用默认的RPC超时时间(60秒)。

远程模块

警告

当前不支持在使用 CUDA 张量时使用 RemoteModule

RemoteModule 是一种在不同进程中远程创建 nn.Module 的简便方法。实际模块位于远程主机上,但本地主机有一个该模块的句柄,并可以像常规的 nn.Module 一样调用该模块。然而,调用会涉及 RPC 调用到远程端,并且可以通过 RemoteModule 支持的额外 API 异步执行。

class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[源代码]

RemoteModule 实例只能在 RPC 初始化后创建。

它在指定的远程节点上创建一个用户指定的模块。 它的行为类似于普通的 nn.Module,除了 forward 方法是在远程节点上执行的。 它会处理自动梯度记录,以确保反向传播将梯度传播回相应的远程模块。

它基于module_clsforward方法的签名生成两个方法forward_asyncforwardforward_async异步运行并返回一个Future。forward_asyncforward的参数与module_cls返回的模块的forward方法的参数相同。

例如,如果 module_cls 返回一个 nn.Linear 的实例, 该实例具有 forward 方法签名:def forward(input: Tensor) -> Tensor:, 生成的 RemoteModule 将具有 2 个方法,其签名如下:

def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
Parameters
  • remote_device (str) – 目标工作线程上的设备,我们希望将此模块放置在该设备上。 格式应为“<工作线程名称>/<设备>”,其中设备字段可以解析为torch.device类型。 例如,“trainer0/cpu”,“trainer0”,“ps0/cuda:0”。 此外,设备字段是可选的,默认值为“cpu”。

  • module_cls (nn.Module) –

    要远程创建的模块的类。例如,

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    

  • args (序列, 可选) – 传递给 module_cls 的参数。

  • kwargs (字典, 可选) – 传递给 module_cls 的 kwargs。

Returns

一个远程模块实例,它包装了用户提供的 Module 创建的 module_cls,它有一个阻塞的 forward 方法和一个异步的 forward_async 方法,该方法返回在远程端用户提供的模块上 forward 调用的未来。

Example::

在两个不同的进程中运行以下代码:

>>> # 在 worker 0 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,可以在这个教程中找到一个与DistributedDataParallel (DDP) 结合的更实用的示例。

get_module_rref()

返回一个指向远程模块的 RRef (RRef[nn.Module])。

Return type

RRef[模块]

remote_parameters(recurse=True)

返回一个指向远程模块参数的 RRef 列表。

这通常可以与DistributedOptimizer结合使用。

Parameters

递归 (布尔值) – 如果为True,则返回远程模块及其所有子模块的参数。否则,仅返回远程模块的直接成员参数。

Returns

远程模块参数的RRef列表(List[RRef[nn.Parameter]])。

Return type

列表[RRef[参数]]

分布式自动求导框架

警告

当前不支持在使用 CUDA 张量时使用分布式自动求导

此模块提供了一个基于RPC的分布式自动求导框架,可用于模型并行训练等应用。简而言之,应用程序可以通过RPC发送和接收梯度记录张量。在前向传播过程中,我们记录梯度记录张量何时通过RPC发送,在反向传播过程中,我们使用此信息通过RPC执行分布式反向传播。更多详情请参见分布式自动求导设计

torch.distributed.autograd.backward(context_id: int, roots: List[张量], retain_graph=False) None

使用提供的根节点启动分布式反向传播。目前实现了快速模式算法,该算法假设在同一分布式自动求导上下文中,跨工作节点发送的所有RPC消息在反向传播过程中都将是自动求导图的一部分。

我们使用提供的根节点来发现自动求导图并计算适当的依赖关系。此方法会阻塞,直到整个自动求导计算完成。

我们在每个节点上的适当 torch.distributed.autograd.context 中累积梯度。要使用的自动求导上下文是根据调用 torch.distributed.autograd.backward() 时传入的 context_id 查找的。如果没有与给定ID对应的有效自动求导上下文,我们将抛出一个错误。您可以使用 get_gradients() API 检索累积的梯度。

Parameters
  • context_id (int) – 我们应该检索梯度的自动微分上下文ID。

  • roots (列表) – 表示自动求导计算根的张量。所有张量都应为标量。

  • retain_graph (bool, 可选) – 如果为False,用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为True是不必要的,通常可以通过更有效的方式解决。通常,您需要将其设置为True以多次运行反向传播。

Example::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
class torch.distributed.autograd.context[源代码]

用于在使用分布式自动求导时包装前向和后向传递的上下文对象。在 with 语句中生成的 context_id 需要在所有工作节点上唯一标识一个分布式后向传递。每个工作节点存储与此 context_id 关联的元数据,这些元数据是正确执行分布式自动求导传递所必需的。

Example::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.get_gradients(context_id: int) Dict[张量, 张量]

从张量到在该张量上累积的适当梯度的映射,该梯度是在分布式自动求导反向传播过程中,根据给定的 context_id 在提供的上下文中检索到的。

Parameters

context_id (int) – 我们应该检索梯度的自动微分上下文ID。

Returns

一个映射,其中键是张量,值是与该张量相关的梯度。

Example::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分布式优化器

请参阅torch.distributed.optim页面以获取关于分布式优化器的文档。

设计笔记

分布式自动求导设计笔记涵盖了基于RPC的分布式自动求导框架的设计,该框架对于模型并行训练等应用非常有用。

RRef 设计笔记涵盖了框架用于引用远程工作者上值的 RRef(远程引用)协议的设计。

教程

RPC教程向用户介绍RPC框架,提供了几个使用torch.distributed.rpc API的示例应用程序,并展示了如何使用分析器来分析基于RPC的工作负载。

优云智算