教程:在K8s集群上训练节点分类模型

GraphScope专为处理大型图数据而设计,这类数据通常难以容纳在单台机器的内存中。借助Vineyard作为分布式内存数据管理器,GraphScope支持运行在由Kubernetes(k8s)管理的集群上。

在本教程中,我们将回顾第一个教程中展示的示例,演示GraphScope如何在Kubernetes集群上处理节点分类任务。

请注意,由于本教程设计为在k8s集群上运行,您需要先配置好k8s环境才能运行示例。

在Kubernetes上创建会话并加载图

import graphscope
from graphscope.dataset import load_ogbn_mag

# enable logging
graphscope.set_option(show_log=True)  

# init the GraphScope session
sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')

在幕后,会话尝试启动一个协调器,这是后端引擎的入口。协调器管理一组k8s pod(默认2个pod),并在其上运行学习引擎。对于集群中的每个pod,都有一个vineyard实例在内存中为分布式数据提供服务。

日志显示GraphScope协调器服务已连接,意味着会话启动成功,当前Python客户端已连接到该会话。

您也可以通过这种方式检查会话的状态。

# check the status of the session
sess

运行此单元格,您可能会看到一个状态字段显示为“active”。除了状态信息外,它还会打印此会话的其他元信息,例如工作节点(pods)数量、用于连接的协调器端点等。

# load the obgn_mag dataset in "sess" as a graph
graph = load_ogbn_mag(sess, "/dataset/ogbn_mag_small/")

# print the schema of the graph
print(graph)

图神经网络 (GNNs)

# define the features for learning, we chose the original 128-dimension feature
i_features = []
for i in range(128):
    i_features.append("feat_" + str(i))

# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
    graph,
    nodes=[("paper", i_features)],
    edges=[("paper", "cites", "paper")],
    gen_labels=[
        ("train", "paper", 100, (0, 75)),
        ("val", "paper", 100, (75, 85)),
        ("test", "paper", 100, (85, 100)),
    ],
)

# Then we define the training process using the example EgoGraphSAGE model with tensorflow.
try:
    # https://www.tensorflow.org/guide/migrate
    import tensorflow.compat.v1 as tf
    tf.disable_v2_behavior()
except ImportError:
    import tensorflow as tf

import argparse
import graphscope.learning.graphlearn.python.nn.tf as tfg
from graphscope.learning.examples import EgoGraphSAGE
from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader
from graphscope.learning.examples.tf.trainer import LocalTrainer

def parse_args():
  argparser = argparse.ArgumentParser("Train EgoSAGE Unsupervised.")
  argparser.add_argument('--batch_size', type=int, default=128)
  argparser.add_argument('--features_num', type=int, default=128)
  argparser.add_argument('--hidden_dim', type=int, default=128)
  argparser.add_argument('--output_dim', type=int, default=128)
  argparser.add_argument('--nbrs_num', type=list, default=[5, 5])
  argparser.add_argument('--neg_num', type=int, default=5)
  argparser.add_argument('--learning_rate', type=float, default=0.0001)
  argparser.add_argument('--epochs', type=int, default=1)
  argparser.add_argument('--agg_type', type=str, default="mean")
  argparser.add_argument('--drop_out', type=float, default=0.0)
  argparser.add_argument('--sampler', type=str, default='random')
  argparser.add_argument('--neg_sampler', type=str, default='in_degree')
  argparser.add_argument('--temperature', type=float, default=0.07)
  return argparser.parse_args()

args = parse_args()

# define model
dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]
model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)

# prepare the training dataset
train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler, 
                                           neg_sampler=args.neg_sampler, batch_size=args.batch_size,
                                           node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)
src_emb = model.forward(train_data.src_ego)
dst_emb = model.forward(train_data.dst_ego)
neg_dst_emb = model.forward(train_data.neg_dst_ego)
loss = tfg.unsupervised_softmax_cross_entropy_loss(
    src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)
optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)

# Start training
trainer = LocalTrainer()
trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)

最后,会话(session)负责管理集群中的资源,因此在不再需要这些资源时及时释放非常重要。当所有图计算任务完成后,可以通过调用会话的close方法来释放资源。

# close the session
sess.close()