注意
Go to the end 下载完整示例代码
分布式链接预测
在本教程中,我们将逐步介绍如何为链接预测任务执行分布式GNN训练。本教程假设您已经阅读了分布式节点分类和GNN的随机训练用于链接预测。一般流程如下所示。

图的分区
在本教程中,我们将使用OGBL citation2图
作为示例来说明图分区。我们首先将图加载到DGL图中,并使用AsLinkPredDataset
将其转换为训练图、验证边和测试边。
import os
os.environ['DGLBACKEND'] = 'pytorch'
import dgl
import torch as th
from ogb.linkproppred import DglLinkPropPredDataset
data = DglLinkPropPredDataset(name='ogbl-citation2')
graph = data[0]
data = dgl.data.AsLinkPredDataset(data, [0.8, 0.1, 0.1])
graph_train = data[0]
dgl.distributed.partition_graph(graph_train, graph_name='ogbl-citation2', num_parts=4,
out_path='4part_data',
balance_edges=True)
然后,我们将验证和测试边与图分区一起存储。
import pickle
with open('4part_data/val.pkl', 'wb') as f:
pickle.dump(data.val_edges, f)
with open('4part_data/test.pkl', 'wb') as f:
pickle.dump(data.test_edges, f)
分布式训练脚本
分布式链接预测脚本与分布式节点分类脚本非常相似,只需进行少量修改。
初始化网络通信
我们首先初始化网络通信和Pytorch的分布式通信。
import dgl
import torch as th
dgl.distributed.initialize(ip_config='ip_config.txt')
th.distributed.init_process_group(backend='gloo')
配置文件 ip_config.txt 的格式如下:
ip_addr1 [port1]
ip_addr2 [port2]
每一行代表一台机器。第一列是IP地址,第二列是用于连接到该机器上的DGL服务器的端口。端口是可选的,默认端口是30050。
分布式图的参考
DGL的服务器会自动加载图分区。服务器加载分区后,训练器连接到服务器,并可以开始引用集群中的分布式图,如下所示。
g = dgl.distributed.DistGraph('ogbl-citation2')
如代码所示,我们通过名称引用分布式图。这个名称基本上是传递给partition_graph函数的名称,如上节所示。
获取训练和验证节点ID
对于分布式训练,每个训练器可以运行自己的一组训练节点。我们可以通过调用node_split和edge_split来获取训练器中的当前图及其节点ID和边ID。我们还可以通过加载pickle文件来获取有效的边和测试边。
train_eids = dgl.distributed.edge_split(th.ones((g.num_edges(),), dtype=th.bool), g.get_partition_book(), force_even=True)
train_nids = dgl.distributed.node_split(th.ones((g.num_nodes(),), dtype=th.bool), g.get_partition_book())
with open('4part_data/val.pkl', 'rb') as f:
global_valid_eid = pickle.load(f)
with open('4part_data/test.pkl', 'rb') as f:
global_test_eid = pickle.load(f)
定义一个GNN模型
对于分布式训练,我们定义GNN模型的方式与 mini-batch训练或 全图训练完全相同。 下面的代码定义了GraphSage模型。
import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn
import torch.optim as optim
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes, n_layers):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
def forward(self, blocks, x):
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
x = layer(block, x)
if l != self.n_layers - 1:
x = F.relu(x)
return x
num_hidden = 256
num_labels = len(th.unique(g.ndata['labels'][0:g.num_nodes()]))
num_layers = 2
lr = 0.001
model = SAGE(g.ndata['feat'].shape[1], num_hidden, num_labels, num_layers)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
对于分布式训练,我们需要将模型转换为使用Pytorch的DistributedDataParallel的分布式模型。
model = th.nn.parallel.DistributedDataParallel(model)
我们还定义了一个边预测器 EdgePredictor
来预测节点表示对的边分数
from dgl.nn import EdgePredictor
predictor = EdgePredictor('dot')
分布式小批量采样器
我们可以使用DistEdgeDataLoader
,它是EdgeDataLoader
的分布式版本,来创建一个用于链接预测的分布式小批量采样器。
Training loop
分布式训练的训练循环也与单进程训练完全相同。
import sklearn.metrics
import numpy as np
epoch = 0
for epoch in range(10):
for step, (input_nodes, pos_graph, neg_graph, mfgs) in enumerate(dataloader):
pos_graph = pos_graph
neg_graph = neg_graph
node_inputs = mfgs[0].srcdata[dgl.NID]
batch_inputs = g.ndata['feat'][node_inputs]
batch_pred = model(mfgs, batch_inputs)
pos_feature = batch_pred
pos_graph.ndata['h'] = batch_pred
pos_src, pos_dst = pos_graph.edges()
pos_score = predictor(pos_feature[pos_src], pos_feature[pos_dst])
neg_feature = batch_pred
neg_graph.ndata['h'] = batch_pred
neg_src, neg_dst = neg_graph.edges()
neg_score = predictor(neg_feature[pos_src], neg_feature[pos_dst])
score = th.cat([pos_score, neg_score])
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)])
loss = F.binary_cross_entropy_with_logits(score, label)
optimizer.zero_grad()
loss.backward()
optimizer.step()
推理
在推理阶段,我们使用训练循环后的模型来获取节点的嵌入。
def inference(model, graph, node_features, args):
with th.no_grad():
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.DistNodeDataLoader(
graph, th.arange(graph.num_nodes()), sampler,
batch_size=1024,
shuffle=False,
drop_last=False)
result = []
for input_nodes, output_nodes, mfgs in train_dataloader:
node_inputs = mfgs[0].srcdata[dgl.NID]
inputs = node_features[node_inputs]
result.append(model(mfgs, inputs))
return th.cat(result)
node_reprs = inference(model, g, g.ndata['feat'], args)
测试边被编码为 ((positive_edge_src, positive_edge_dst), (negative_edge_src, negative_edge_dst))。因此,我们可以通过正对和负对获得真实值。
test_pos_src = global_test_eid[0][0]
test_pos_dst = global_test_eid[0][1]
test_neg_src = global_test_eid[1][0]
test_neg_dst = global_test_eid[1][1]
test_labels = th.cat([th.ones_like(test_pos_src), th.zeros_like(test_neg_src)]).cpu().numpy()
然后,我们使用点积预测器来获取正负测试对的分数,以计算诸如AUC之类的指标:
h_pos_src = node_reprs[test_pos_src]
h_pos_dst = node_reprs[test_pos_dst]
h_neg_src = node_reprs[test_neg_src]
h_neg_dst = node_reprs[test_neg_dst]
score_pos = predictor(h_pos_src, h_pos_dst)
score_neg = predictor(h_neg_src, h_neg_dst)
test_preds = th.cat([score_pos, score_neg]).cpu().numpy()
auc = skm.roc_auc_score(test_labels, test_preds)
设置分布式训练环境
分布式训练环境的设置与分布式节点分类类似。更多详情请参考: 设置分布式训练环境
脚本的总运行时间: (0 分钟 0.000 秒)