7.3 编程API
本节涵盖了训练脚本中常用的核心Python组件。DGL提供了三种分布式数据结构和各种API,用于初始化、分布式采样和工作负载分配。
DistGraph
用于访问分布式存储图的结构和特征。DistTensor
用于访问跨机器分区的节点/边特征张量。DistEmbedding
用于访问跨机器分区的可学习节点/边嵌入张量。
DGL分布式模块的初始化
dgl.distributed.initialize()
初始化分布式模块。如果由训练器调用,此API会创建采样器进程并与图服务器建立连接;如果由图服务器调用,此API会启动一个服务循环以监听训练器/采样器的请求。此API必须在torch.distributed.init_process_group()
和任何其他dgl.distributed
API之前调用,如下所示:
dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo')
注意
如果训练脚本包含需要在服务器上调用的用户定义函数(UDFs)(详见DistTensor和DistEmbedding部分),这些UDFs必须在initialize()
之前声明。
分布式图
DistGraph
是一个用于访问集群中图结构和节点/边特征的Python类。每台机器负责且仅负责一个分区。它加载分区数据(分区中的图结构以及节点数据和边数据),并使其对集群中的所有训练器可访问。DistGraph
提供了DGLGraph
API的一小部分用于数据访问。
分布式模式 vs 独立模式
DistGraph
可以在两种模式下运行:分布式模式 和 独立模式。
当用户在 Python 命令行或 Jupyter Notebook 中执行训练脚本时,它以独立模式运行。也就是说,它在单个进程中运行所有计算,并且不与其他任何进程通信。因此,独立模式要求输入图只有一个分区。此模式主要用于开发和测试(例如,在 Jupyter Notebook 中开发和运行代码)。
当用户使用启动脚本执行训练脚本时(参见启动脚本部分),DistGraph
以分布式模式运行。启动工具在后台启动服务器(节点/边特征访问和图采样),并自动加载每台机器上的分区数据。DistGraph
与集群中的服务器连接,并通过网络访问它们。
DistGraph 创建
在分布式模式下,创建DistGraph
需要提供图分区期间给出的图名称。图名称用于识别集群中加载的图。
import dgl
g = dgl.distributed.DistGraph('graph_name')
在独立模式下运行时,它会在本地机器上加载图数据。因此,用户需要提供分区配置文件,该文件包含有关输入图的所有信息。
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
注意
DGL 只允许一个单一的 DistGraph
对象。销毁一个 DistGraph 并创建一个新对象的行为是未定义的。
访问图结构
DistGraph
提供了一组API来访问图结构。目前,大多数API提供图信息,例如节点和边的数量。DistGraph的主要使用场景是运行采样API以支持小批量训练(参见分布式采样)。
print(g.num_nodes())
访问节点/边数据
与DGLGraph
类似,DistGraph
提供了ndata
和edata
来访问节点和边中的数据。
不同的是,DistGraph
中的ndata
/edata
返回的是DistTensor
,而不是底层框架的张量。
用户还可以将新的DistTensor
分配给DistGraph
作为节点数据或边数据。
g.ndata['train_mask'] # <dgl.distributed.dist_graph.DistTensor at 0x7fec820937b8>
g.ndata['train_mask'][0] # tensor([1], dtype=torch.uint8)
分布式张量
如前所述,DGL将节点/边特征分片并存储在一组机器中。 DGL提供了类似张量的接口来访问集群中分区的节点/边特征。在分布式设置中,DGL仅支持密集的节点/边特征。
DistTensor
管理着分区并存储在多个机器上的密集张量。目前,分布式张量必须与图的节点或边相关联。换句话说,DistTensor 中的行数必须与图中的节点数或边数相同。以下代码创建了一个分布式张量。除了张量的形状和数据类型外,用户还可以提供一个唯一的张量名称。如果用户想要引用一个持久的分布式张量(即使 DistTensor
对象消失,该张量仍然存在于集群中),这个名称非常有用。
tensor = dgl.distributed.DistTensor((g.num_nodes(), 10), th.float32, name='test')
注意
DistTensor
的创建是一个同步操作。所有训练器都必须调用创建操作,并且只有当所有训练器都调用时,创建才会成功。
用户可以将DistTensor
添加到DistGraph
对象中,作为节点数据或边数据之一。
g.ndata['feat'] = tensor
注意
节点数据名称和张量名称不必相同。前者从DistGraph
(在训练器进程中)识别节点数据,而后者识别DGL服务器中的分布式张量。
DistTensor
具有与常规张量相同的API来访问其元数据,例如形状和数据类型。它还支持索引读取和写入,但不支持计算操作符,例如求和和平均值。
data = g.ndata['feat'][[1, 2, 3]]
print(data)
g.ndata['feat'][[3, 4, 5]] = data
注意
目前,DGL 在机器运行多个服务器时,不提供对来自多个训练器的并发写入的保护。这可能导致数据损坏。避免对同一行数据进行并发写入的一种方法是在机器上运行一个服务器进程。
分布式DistEmbedding
DGL 提供了 DistEmbedding
来支持需要节点嵌入的传导模型。创建分布式嵌入与创建分布式张量非常相似。
def initializer(shape, dtype):
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
emb = dgl.distributed.DistEmbedding(g.num_nodes(), 10, init_func=initializer)
在内部,分布式嵌入是建立在分布式张量之上的,因此与分布式张量的行为非常相似。例如,当创建嵌入时,它们会被分片并存储在集群中的所有机器上。它可以通过一个名称唯一标识。
注意
初始化函数在服务器进程中调用。因此,它必须在dgl.distributed.initialize
之前声明。
因为嵌入是模型的一部分,用户需要将它们附加到一个优化器上进行小批量训练。目前,DGL提供了一个稀疏Adagrad优化器SparseAdagrad
(DGL将在以后添加更多用于稀疏嵌入的优化器)。用户需要从模型中收集所有分布式嵌入并将它们传递给稀疏优化器。如果一个模型同时具有节点嵌入和常规的密集模型参数,并且用户希望对嵌入执行稀疏更新,他们需要创建两个优化器,一个用于节点嵌入,另一个用于密集模型参数,如下面的代码所示:
sparse_optimizer = dgl.distributed.SparseAdagrad([emb], lr=lr1)
optimizer = th.optim.Adam(model.parameters(), lr=lr2)
feats = emb(nids)
loss = model(feats)
loss.backward()
optimizer.step()
sparse_optimizer.step()
注意
DistEmbedding
不继承 torch.nn.Module
,
所以我们建议在您自己的神经网络模块之外使用它。
分布式采样
DGL 提供了两个层次的 API 用于采样节点和边以生成小批量(参见小批量训练部分)。低层次的 API 要求用户编写代码来明确定义如何采样一层节点(例如,使用 dgl.sampling.sample_neighbors()
)。高层次的采样 API 实现了一些流行的采样算法,用于节点分类和链接预测任务(例如,NodeDataLoader
和 EdgeDataLoader
)。
分布式采样模块遵循相同的设计,并提供两个级别的采样API。对于较低级别的采样API,它提供了sample_neighbors()
用于在DistGraph
上进行分布式邻居采样。此外,DGL提供了一个分布式DataLoader(DistDataLoader
)用于分布式采样。分布式DataLoader与Pytorch DataLoader具有相同的接口,只是用户在创建dataloader时不能指定工作进程的数量。工作进程在dgl.distributed.initialize()
中创建。
注意
当在DistGraph
上运行dgl.distributed.sample_neighbors()
时,采样器无法在具有多个工作进程的Pytorch DataLoader中运行。主要原因是Pytorch DataLoader在每个epoch中创建新的采样工作进程,这导致多次创建和销毁DistGraph
对象。
使用低级API时,采样代码与单进程采样类似。唯一的区别是用户需要使用 dgl.distributed.sample_neighbors()
和
DistDataLoader
。
def sample_blocks(seeds):
seeds = th.LongTensor(np.asarray(seeds))
blocks = []
for fanout in [10, 25]:
frontier = dgl.distributed.sample_neighbors(g, seeds, fanout, replace=True)
block = dgl.to_block(frontier, seeds)
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
return blocks
dataloader = dgl.distributed.DistDataLoader(dataset=train_nid,
batch_size=batch_size,
collate_fn=sample_blocks,
shuffle=True)
for batch in dataloader:
...
高级采样API(NodeDataLoader
和
EdgeDataLoader
)有分布式对应版本
(DistNodeDataLoader
和
DistEdgeDataLoader
)。代码与单进程采样完全相同。
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
for batch in dataloader:
...
拆分工作负载
要训练模型,用户首先需要将数据集分割为训练集、验证集和测试集。对于分布式训练,这一步通常在我们调用dgl.distributed.partition_graph()
来分割图之前完成。我们建议将数据分割存储在布尔数组中,作为节点数据或边数据。对于节点分类任务,这些布尔数组的长度是图中的节点数量,每个元素表示一个节点是否存在于训练集/验证集/测试集中。对于链接预测任务,应使用类似的布尔数组。dgl.distributed.partition_graph()
会根据图分割结果分割这些布尔数组(因为它们存储为图的节点数据或边数据),并将它们与图分区一起存储。
在分布式训练过程中,用户需要将训练节点/边分配给每个训练器。同样,我们也需要以相同的方式分割验证集和测试集。DGL提供了node_split()
和edge_split()
来在运行时为分布式训练分割训练集、验证集和测试集。这两个函数以图分区前构建的布尔数组作为输入,将它们分割并返回本地训练器的一部分。默认情况下,它们确保所有部分具有相同数量的节点/边。这对于同步SGD非常重要,因为它假设每个训练器具有相同数量的小批量。
下面的示例将训练集分割,并返回本地进程的节点子集。
train_nids = dgl.distributed.node_split(g.ndata['train_mask'])