DGL
latest

开始使用

  • 安装和设置
  • DGL 快速入门

先进材料

  • 🆕 使用GraphBolt进行GNN的随机训练
  • 用户指南
  • 用户指南【包含过时信息】
  • 用户指南[过时的]
  • 🆕 教程: 图变换器
  • 教程: dgl.sparse
  • 在CPU上进行训练
  • 在多GPU上进行训练
  • 分布式训练
    • 分布式节点分类
    • 分布式链接预测
  • 使用DGL进行论文研究

API 参考

  • dgl
  • dgl.data
  • dgl.dataloading
  • dgl.DGLGraph
  • dgl.distributed
  • dgl.function
  • dgl.geometry
  • 🆕 dgl.graphbolt
  • dgl.nn (PyTorch)
  • dgl.nn.functional
  • dgl.ops
  • dgl.optim
  • dgl.sampling
  • dgl.sparse
  • dgl.multiprocessing
  • dgl.transforms
  • 用户自定义函数

注释

  • 为DGL做贡献
  • DGL 外部函数接口 (FFI)
  • 性能基准测试

杂项

  • 常见问题解答 (FAQ)
  • 环境变量
  • 资源
DGL
  • Distributed training
  • Distributed Link Prediction
  • Edit on GitHub

注意

Go to the end 下载完整示例代码

分布式链接预测

在本教程中,我们将逐步介绍如何为链接预测任务执行分布式GNN训练。本教程假设您已经阅读了分布式节点分类和GNN的随机训练用于链接预测。一般流程如下所示。

Imgur

图的分区

在本教程中,我们将使用OGBL citation2图 作为示例来说明图分区。我们首先将图加载到DGL图中,并使用AsLinkPredDataset将其转换为训练图、验证边和测试边。

import os
os.environ['DGLBACKEND'] = 'pytorch'
import dgl
import torch as th
from ogb.linkproppred import DglLinkPropPredDataset
data = DglLinkPropPredDataset(name='ogbl-citation2')
graph = data[0]
data = dgl.data.AsLinkPredDataset(data, [0.8, 0.1, 0.1])
graph_train = data[0]
dgl.distributed.partition_graph(graph_train, graph_name='ogbl-citation2', num_parts=4,
                            out_path='4part_data',
                            balance_edges=True)

然后,我们将验证和测试边与图分区一起存储。

import pickle
with open('4part_data/val.pkl', 'wb') as f:
    pickle.dump(data.val_edges, f)
with open('4part_data/test.pkl', 'wb') as f:
    pickle.dump(data.test_edges, f)

分布式训练脚本

分布式链接预测脚本与分布式节点分类脚本非常相似,只需进行少量修改。

初始化网络通信

我们首先初始化网络通信和Pytorch的分布式通信。

import dgl
import torch as th
dgl.distributed.initialize(ip_config='ip_config.txt')
th.distributed.init_process_group(backend='gloo')

配置文件 ip_config.txt 的格式如下:

ip_addr1 [port1]
ip_addr2 [port2]

每一行代表一台机器。第一列是IP地址,第二列是用于连接到该机器上的DGL服务器的端口。端口是可选的,默认端口是30050。

分布式图的参考

DGL的服务器会自动加载图分区。服务器加载分区后,训练器连接到服务器,并可以开始引用集群中的分布式图,如下所示。

g = dgl.distributed.DistGraph('ogbl-citation2')

如代码所示,我们通过名称引用分布式图。这个名称基本上是传递给partition_graph函数的名称,如上节所示。

获取训练和验证节点ID

对于分布式训练,每个训练器可以运行自己的一组训练节点。我们可以通过调用node_split和edge_split来获取训练器中的当前图及其节点ID和边ID。我们还可以通过加载pickle文件来获取有效的边和测试边。

train_eids = dgl.distributed.edge_split(th.ones((g.num_edges(),), dtype=th.bool), g.get_partition_book(), force_even=True)
train_nids = dgl.distributed.node_split(th.ones((g.num_nodes(),), dtype=th.bool), g.get_partition_book())
with open('4part_data/val.pkl', 'rb') as f:
    global_valid_eid = pickle.load(f)
with open('4part_data/test.pkl', 'rb') as f:
    global_test_eid = pickle.load(f)

定义一个GNN模型

对于分布式训练,我们定义GNN模型的方式与 mini-batch训练或 全图训练完全相同。 下面的代码定义了GraphSage模型。

import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn
import torch.optim as optim

class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))

    def forward(self, blocks, x):
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            x = layer(block, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

num_hidden = 256
num_labels = len(th.unique(g.ndata['labels'][0:g.num_nodes()]))
num_layers = 2
lr = 0.001
model = SAGE(g.ndata['feat'].shape[1], num_hidden, num_labels, num_layers)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

对于分布式训练,我们需要将模型转换为使用Pytorch的DistributedDataParallel的分布式模型。

model = th.nn.parallel.DistributedDataParallel(model)

我们还定义了一个边预测器 EdgePredictor 来预测节点表示对的边分数

from dgl.nn import EdgePredictor
predictor = EdgePredictor('dot')

分布式小批量采样器

我们可以使用DistEdgeDataLoader,它是EdgeDataLoader的分布式版本,来创建一个用于链接预测的分布式小批量采样器。

Training loop

分布式训练的训练循环也与单进程训练完全相同。

import sklearn.metrics
import numpy as np

epoch = 0
for epoch in range(10):
    for step, (input_nodes, pos_graph, neg_graph, mfgs) in enumerate(dataloader):
        pos_graph = pos_graph
        neg_graph = neg_graph
        node_inputs = mfgs[0].srcdata[dgl.NID]
        batch_inputs = g.ndata['feat'][node_inputs]

        batch_pred = model(mfgs, batch_inputs)
        pos_feature = batch_pred
        pos_graph.ndata['h'] = batch_pred
        pos_src, pos_dst = pos_graph.edges()
        pos_score = predictor(pos_feature[pos_src], pos_feature[pos_dst])

        neg_feature = batch_pred
        neg_graph.ndata['h'] = batch_pred
        neg_src, neg_dst = neg_graph.edges()
        neg_score = predictor(neg_feature[pos_src], neg_feature[pos_dst])

        score = th.cat([pos_score, neg_score])
        label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)])
        loss = F.binary_cross_entropy_with_logits(score, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

推理

在推理阶段,我们使用训练循环后的模型来获取节点的嵌入。

def inference(model, graph, node_features, args):
    with th.no_grad():
        sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
        train_dataloader = dgl.dataloading.DistNodeDataLoader(
            graph, th.arange(graph.num_nodes()), sampler,
            batch_size=1024,
            shuffle=False,
            drop_last=False)

        result = []
        for input_nodes, output_nodes, mfgs in train_dataloader:
            node_inputs = mfgs[0].srcdata[dgl.NID]
            inputs = node_features[node_inputs]
            result.append(model(mfgs, inputs))

        return th.cat(result)

node_reprs = inference(model, g, g.ndata['feat'], args)

测试边被编码为 ((positive_edge_src, positive_edge_dst), (negative_edge_src, negative_edge_dst))。因此,我们可以通过正对和负对获得真实值。

test_pos_src = global_test_eid[0][0]
test_pos_dst = global_test_eid[0][1]
test_neg_src = global_test_eid[1][0]
test_neg_dst = global_test_eid[1][1]
test_labels = th.cat([th.ones_like(test_pos_src), th.zeros_like(test_neg_src)]).cpu().numpy()

然后,我们使用点积预测器来获取正负测试对的分数,以计算诸如AUC之类的指标:

h_pos_src = node_reprs[test_pos_src]
h_pos_dst = node_reprs[test_pos_dst]
h_neg_src = node_reprs[test_neg_src]
h_neg_dst = node_reprs[test_neg_dst]
score_pos = predictor(h_pos_src, h_pos_dst)
score_neg = predictor(h_neg_src, h_neg_dst)

test_preds = th.cat([score_pos, score_neg]).cpu().numpy()
auc = skm.roc_auc_score(test_labels, test_preds)

设置分布式训练环境

分布式训练环境的设置与分布式节点分类类似。更多详情请参考: 设置分布式训练环境

脚本的总运行时间: (0 分钟 0.000 秒)

下载 Jupyter 笔记本: 2_link_prediction.ipynb

下载 Python 源代码: 2_link_prediction.py

Gallery generated by Sphinx-Gallery

Previous Next

© Copyright 2018, DGL Team. Revision 2ee440a6.

Built with Sphinx using a theme provided by Read the Docs.
Read the Docs v: latest
Versions
latest
2.2.x
2.1.x
2.0.x
1.1.x
1.0.x
0.9.x
0.8.x
0.7.x
0.6.x
Downloads
On Read the Docs
Project Home
Builds