Shortcuts

远程引用协议

本笔记描述了远程引用协议的设计细节,并详细介绍了不同场景下的消息流程。在继续之前,请确保您熟悉分布式RPC框架

背景

RRef 代表远程引用。它是一个位于本地或远程工作器上的对象的引用,并且在底层透明地处理引用计数。从概念上讲,它可以被视为一个分布式共享指针。应用程序可以通过调用 remote() 来创建一个 RRef。每个 RRef 由调用 remote() 的被调用工作器(即所有者)拥有,并且可以被多个用户使用。所有者存储实际数据并跟踪全局引用计数。每个 RRef 可以通过一个全局 RRefId 唯一标识,该标识在调用 remote() 时在调用者上分配。

在所有者工作线程上,只有一个 OwnerRRef 实例,其中包含实际数据,而在用户工作线程上,可以根据需要存在多个 UserRRefs,并且 UserRRef 不持有数据。所有在所有者上的使用都将通过全局唯一的 RRefId 检索唯一的 OwnerRRef 实例。当 UserRRef 作为参数或返回值在 rpc_sync()rpc_async()remote() 调用中使用时,将创建一个 UserRRef,并且所有者将根据更新引用计数进行通知。当全局没有 UserRRef 实例并且所有者上也没有对 OwnerRRef 的引用时,OwnerRRef 及其数据将被删除。

假设

RRef协议的设计基于以下假设。

  • 瞬时网络故障:RRef 设计通过重试消息来处理瞬时网络故障。它无法处理节点崩溃或永久性网络分区。当这些事件发生时,应用程序应关闭所有工作节点,恢复到之前的检查点,并继续训练。

  • 非幂等UDF:我们假设提供给 rpc_sync()rpc_async()remote() 的用户定义函数(UDF)不是幂等的,因此不能重试。然而,内部RRef控制消息是幂等的,并且在消息失败时会重试。

  • 消息乱序传递:我们不假设任何节点对之间的消息传递顺序,因为发送方和接收方都使用多个线程。无法保证哪个消息会先被处理。

RRef 生命周期

该协议的目标是在适当的时间删除一个OwnerRRef。 删除OwnerRRef的正确时机是当没有存活的 UserRRef实例并且用户代码也没有持有对该 OwnerRRef的引用时。棘手的部分是确定是否存在任何存活的 UserRRef实例。

设计推理

用户可以在三种情况下获得UserRRef

  1. 从所有者接收一个 UserRRef

  2. 从另一个用户接收一个 UserRRef

  3. 创建一个由另一个工作线程拥有的新 UserRRef

案例1是最简单的,其中所有者将其RRef传递给用户,所有者调用rpc_sync()rpc_async(),或 remote()并使用其RRef作为参数。在这种情况下,用户上将创建一个新的UserRRef。由于所有者是调用者,它可以轻松更新其本地OwnerRRef的引用计数。

唯一的要求是任何 UserRRef 必须在销毁时通知所有者。因此,我们需要第一个保证:

G1. 当任何UserRRef被删除时,所有者将会收到通知。

由于消息可能会延迟或乱序到达,我们需要额外的保证来确保删除消息不会过早处理。如果A向B发送一条涉及RRef的消息,我们称A上的RRef(父RRef)和B上的RRef(子RRef)。

G2. 父RRef在子RRef被所有者确认之前不会被删除。

在情况2和3中,所有者可能对RRef分叉图只有部分或完全没有了解。例如,一个RRef可以在一个用户上构建,并且在所有者接收到任何RPC调用之前,创建者用户可能已经与其他用户共享了该RRef,而这些用户可能进一步共享该RRef。一个不变性是,任何RRef的分叉图总是一棵树,因为分叉一个RRef总是在被调用者上创建一个新的UserRRef实例(除非被调用者是所有者),因此每个RRef都有一个单一的父节点。

树中任何 UserRRef 的所有者的视图有三个阶段:

1) 未知 -> 2) 已知 -> 3) 已删除。

所有者的整个树的视图不断变化。当所有者认为没有存活的UserRRef实例时,它会删除其OwnerRRef实例,即当OwnerRRef被删除时,所有UserRRef实例可能确实被删除或未知。危险的情况是当某些分支未知而其他分支被删除时。

G2 trivially guarantees that no parent UserRRef can be deleted before the owner knows all of its children UserRRef instances. However, it is possible that the child UserRRef may be deleted before the owner knows its parent UserRRef.

考虑以下示例,其中 OwnerRRef 分叉到 A,然后 A 分叉到 Y,Y 分叉到 Z:

OwnerRRef -> A -> Y -> Z

如果所有Z的消息,包括删除消息,都在Y的消息之前被所有者处理,所有者将在知道Y存在之前得知Z的删除。尽管如此,这并不会引起任何问题。因为,至少Y的一个祖先(A)将是存活的,并且它将阻止所有者删除OwnerRRef。更具体地说,如果所有者不知道Y,由于G2,A不能被删除,并且所有者知道A,因为它是A的父节点。

如果RRef是在用户端创建的,情况会稍微复杂一些:

OwnerRRef
    ^
    |
    A -> Y -> Z

如果 Z 调用 to_here()UserRRef 上,至少在 Z 被删除时,所有者知道 A,否则,to_here() 不会完成。如果 Z 没有调用 to_here(),则所有者可能在收到来自 A 和 Y 的任何消息之前收到来自 Z 的所有消息。在这种情况下,由于 OwnerRRef 的实际数据尚未创建,因此也没有什么可以删除的。这就像 Z 根本不存在一样。因此,这仍然是可行的。

实现

G1 是通过在 UserRRef 析构函数中发送删除消息来实现的。为了提供 G2,父 UserRRef 在每次被分叉时都会放入一个上下文中,由新的 ForkId 索引。父 UserRRef 只有在收到子节点的确认消息(ACK)后才会从上下文中移除,而子节点只有在得到所有者的确认后才会发送 ACK。

协议场景

现在让我们讨论上述设计如何在四种场景中转化为协议。

用户共享RRef与所有者作为返回值

import torch
import torch.distributed.rpc as rpc

# 在 worker A 上
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# 假设 rref 的 RRefId 为 100 且 ForkId 为 1
rref.to_here()

在这种情况下,UserRRef 是在用户工作节点 A 上创建的,然后它与远程消息一起传递给所有者工作节点 B,接着 B 创建了 OwnerRRef。方法 remote() 会立即返回,这意味着 UserRRef 可以在所有者知道它之前被分叉/使用。

在所有者上,当接收到 remote() 调用时,它将创建 OwnerRRef,并返回一个ACK以确认 {100, 1}RRefIdForkId)。只有在接收到此ACK后,A才能删除其 UserRRef。这涉及 G1G2G1 是显而易见的。对于 G2OwnerRRefUserRRef 的子节点,并且 UserRRef 在接收到所有者的ACK之前不会被删除。

user_to_owner_ret.png

上图展示了消息流,其中实线箭头包含用户函数,虚线箭头是内置消息。请注意,从A到B的前两条消息(remote()to_here())可能以任何顺序到达B,但最终的删除消息只有在以下情况下才会发送:

  • B确认UserRRef {100, 1}(G2),并且

  • Python GC 同意删除本地的 UserRRef 实例。这发生在 RRef 不再在作用域内并且有资格进行垃圾回收时。

用户共享RRef,所有者作为参数

import torch
import torch.distributed.rpc as rpc

# 在 worker A 和 worker B 上
def func(rref):
  pass

# 在 worker A 上
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# 假设 rref 的 RRefId 为 100 且 ForkId 为 1
rpc.rpc_async('B', func, args=(rref, ))

在这种情况下,在A上创建了UserRRef之后,A将其作为参数用于后续对B的RPC调用中。A将保持UserRRef {100, 1}的活动状态,直到收到来自B的确认(G2,不是RPC调用的返回值)。这是必要的,因为A在收到所有先前消息之前不应发送删除消息,否则,OwnerRRef可能会在使用之前被删除,因为我们不保证消息传递的顺序。这是通过创建RRef的子ForkId,将它们保存在一个映射中,直到收到所有者确认子ForkId来完成的。下图显示了消息流。

user_to_owner_arg.png

请注意,UserRRef 可能在 func 完成之前甚至在开始之前就在 B 上被删除了。然而,这是可以的,因为当 B 为子 ForkId 发送 ACK 时,它已经获取了 OwnerRRef 实例,这将防止它被过早删除。

所有者与用户共享RRef

所有者到用户的场景是最简单的情况,所有者可以在本地更新引用计数,而不需要任何额外的控制消息来通知其他人。关于G2,这与父节点立即从所有者接收到ACK的情况相同,因为父节点本身就是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# 在 worker B 和 worker C 上
def func(rref):
  pass

# 在 worker B 上,创建一个本地 RRef
rref = RRef("data")
# 假设 rref 的 RRefId 是 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上图显示了消息流。请注意,当 OwnerRRef 在 rpc_async 调用后退出作用域时,它不会被删除,因为内部有一个映射会在存在任何已知分支的情况下保持其存活,在这种情况下是 UserRRef {100, 1}。(G2)

用户共享RRef给用户

这是最复杂的情况,其中调用者用户(父 UserRRef)、被调用者用户(子 UserRRef)和所有者都需要参与。

import torch
import torch.distributed.rpc as rpc

# 在 worker A 和 worker C 上
def func(rref):
  pass

# 在 worker A 上
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# 假设 rref 的 RRefId 为 100 且 ForkId 为 1
rpc.rpc_async('C', func, args=(rref, ))
用户到用户的示意图

当C从A接收到子UserRRef时,它会向所有者B发送一个fork请求。之后,当B在C上确认UserRRef时,C将并行执行两个操作:1) 向A发送子ACK,以及2) 运行用户提供的函数。在此期间,父节点(A)将保持其UserRRef {100, 1}以实现G2

优云智算