教程:在K8s集群上训练节点分类模型¶
GraphScope专为处理大型图数据而设计,这类数据通常难以容纳在单台机器的内存中。借助Vineyard作为分布式内存数据管理器,GraphScope支持运行在由Kubernetes(k8s)管理的集群上。
在本教程中,我们将回顾第一个教程中展示的示例,演示GraphScope如何在Kubernetes集群上处理节点分类任务。
请注意,由于本教程设计为在k8s集群上运行,您需要先配置好k8s环境才能运行示例。
在Kubernetes上创建会话并加载图¶
import graphscope
from graphscope.dataset import load_ogbn_mag
# enable logging
graphscope.set_option(show_log=True)
# init the GraphScope session
sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')
在幕后,会话尝试启动一个协调器,这是后端引擎的入口。协调器管理一组k8s pod(默认2个pod),并在其上运行学习引擎。对于集群中的每个pod,都有一个vineyard实例在内存中为分布式数据提供服务。
日志显示GraphScope协调器服务已连接,意味着会话启动成功,当前Python客户端已连接到该会话。
您也可以通过这种方式检查会话的状态。
# check the status of the session
sess
运行此单元格,您可能会看到一个状态字段显示为“active”。除了状态信息外,它还会打印此会话的其他元信息,例如工作节点(pods)数量、用于连接的协调器端点等。
# load the obgn_mag dataset in "sess" as a graph
graph = load_ogbn_mag(sess, "/dataset/ogbn_mag_small/")
# print the schema of the graph
print(graph)
图神经网络 (GNNs)¶
# define the features for learning, we chose the original 128-dimension feature
i_features = []
for i in range(128):
i_features.append("feat_" + str(i))
# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
graph,
nodes=[("paper", i_features)],
edges=[("paper", "cites", "paper")],
gen_labels=[
("train", "paper", 100, (0, 75)),
("val", "paper", 100, (75, 85)),
("test", "paper", 100, (85, 100)),
],
)
# Then we define the training process using the example EgoGraphSAGE model with tensorflow.
try:
# https://www.tensorflow.org/guide/migrate
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
except ImportError:
import tensorflow as tf
import argparse
import graphscope.learning.graphlearn.python.nn.tf as tfg
from graphscope.learning.examples import EgoGraphSAGE
from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader
from graphscope.learning.examples.tf.trainer import LocalTrainer
def parse_args():
argparser = argparse.ArgumentParser("Train EgoSAGE Unsupervised.")
argparser.add_argument('--batch_size', type=int, default=128)
argparser.add_argument('--features_num', type=int, default=128)
argparser.add_argument('--hidden_dim', type=int, default=128)
argparser.add_argument('--output_dim', type=int, default=128)
argparser.add_argument('--nbrs_num', type=list, default=[5, 5])
argparser.add_argument('--neg_num', type=int, default=5)
argparser.add_argument('--learning_rate', type=float, default=0.0001)
argparser.add_argument('--epochs', type=int, default=1)
argparser.add_argument('--agg_type', type=str, default="mean")
argparser.add_argument('--drop_out', type=float, default=0.0)
argparser.add_argument('--sampler', type=str, default='random')
argparser.add_argument('--neg_sampler', type=str, default='in_degree')
argparser.add_argument('--temperature', type=float, default=0.07)
return argparser.parse_args()
args = parse_args()
# define model
dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]
model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)
# prepare the training dataset
train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler,
neg_sampler=args.neg_sampler, batch_size=args.batch_size,
node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)
src_emb = model.forward(train_data.src_ego)
dst_emb = model.forward(train_data.dst_ego)
neg_dst_emb = model.forward(train_data.neg_dst_ego)
loss = tfg.unsupervised_softmax_cross_entropy_loss(
src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)
optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)
# Start training
trainer = LocalTrainer()
trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)
最后,会话(session)负责管理集群中的资源,因此在不再需要这些资源时及时释放非常重要。当所有图计算任务完成后,可以通过调用会话的close方法来释放资源。
# close the session
sess.close()