torch_geometric.distributed

DistContext

当前进程的上下文信息。

LocalFeatureStore

实现FeatureStore接口,作为分布式训练的本地特征存储。

LocalGraphStore

实现GraphStore接口,作为分布式训练的本地图存储。

Partitioner

DataHeteroData对象的图及其特征进行分区。

DistNeighborSampler

一个分布式和异步邻居采样器的实现,由DistNeighborLoaderDistLinkNeighborLoader使用。

DistLoader

用于创建分布式数据加载例程的基类。

DistNeighborLoader

一个分布式加载器,用于从节点执行采样。

DistLinkNeighborLoader

一个分布式加载器,用于从边进行采样。

class DistContext(rank: int, global_rank: int, world_size: int, global_world_size: int, group_name: str, role: DistRole = DistRole.WORKER)[source]

当前进程的上下文信息。

class LocalFeatureStore[source]

实现FeatureStore接口,作为分布式训练的本地特征存储。

get_all_tensor_attrs() List[LocalTensorAttr][source]

返回所有已注册的张量属性。

Return type:

List[LocalTensorAttr]

lookup_features(index: Tensor, is_node_feat: bool = True, input_type: Optional[Union[str, Tuple[str, str, str]]] = None) Future[source]

查找本地/远程功能。

Return type:

Future

classmethod from_data(node_id: Tensor, x: Optional[Tensor] = None, y: Optional[Tensor] = None, edge_id: Optional[Tensor] = None, edge_attr: Optional[Tensor] = None) LocalFeatureStore[source]

从同质的 张量创建本地特征存储。

Parameters:
Return type:

LocalFeatureStore

classmethod from_hetero_data(node_id_dict: Dict[str, Tensor], x_dict: Optional[Dict[str, Tensor]] = None, y_dict: Optional[Dict[str, Tensor]] = None, edge_id_dict: Optional[Dict[Tuple[str, str, str], Tensor]] = None, edge_attr_dict: Optional[Dict[Tuple[str, str, str], Tensor]] = None) LocalFeatureStore[source]

从异构的 张量创建本地图存储。

Parameters:
  • node_id_dict (Dict[NodeType, torch.Tensor]) – 每个节点类型的每个本地节点的全局标识符。

  • x_dict (Dict[NodeType, torch.Tensor], optional) – 每个节点类型的节点特征。(默认值:None

  • y_dict (Dict[NodeType, torch.Tensor], optional) – 每个节点类型的节点标签。(默认值:None

  • edge_id_dict (Dict[EdgeType, torch.Tensor], optional) – 每种边类型的每个本地边的全局标识符。 (默认: None)

  • edge_attr_dict (Dict[EdgeType, torch.Tensor], optional) – 每种边类型的边特征。(默认值:None

Return type:

LocalFeatureStore

class LocalGraphStore[source]

实现GraphStore接口,作为分布式训练的本地图存储。

get_partition_ids_from_nids(ids: Tensor, node_type: Optional[str] = None) Tensor[source]

返回特定节点类型的节点ID的分区ID。

Return type:

Tensor

get_partition_ids_from_eids(eids: Tensor, edge_type: Optional[Tuple[str, str, str]] = None)[source]

返回特定边类型的边ID的分区ID。

get_all_edge_attrs() List[EdgeAttr][source]

返回所有已注册的边缘属性。

Return type:

List[EdgeAttr]

classmethod from_data(edge_id: Tensor, edge_index: Tensor, num_nodes: int, is_sorted: bool = False) LocalGraphStore[source]

从同质或异质的图中创建一个本地图存储。

Parameters:
  • edge_id (torch.Tensor) – 每个本地边的全局标识符。

  • edge_index (torch.Tensor) – 本地边的索引。

  • num_nodes (int) – 本地图中的节点数量。

  • is_sorted (bool) – 边是否按列/目标节点排序(CSC格式)。(默认: False)

Return type:

LocalGraphStore

classmethod from_hetero_data(edge_id_dict: Dict[Tuple[str, str, str], Tensor], edge_index_dict: Dict[Tuple[str, str, str], Tensor], num_nodes_dict: Dict[str, int], is_sorted: bool = False) LocalGraphStore[source]

从异质的图中创建一个本地图存储。

Parameters:
  • edge_id_dict (Dict[EdgeType, torch.Tensor]) – 每种边类型的每个本地边的全局标识符。

  • edge_index_dict (Dict[EdgeType, torch.Tensor]) – 每种边类型的本地边索引。

  • num_nodes_dict (Dict[str, int]) – (Dict[str, int]): 每种节点类型的节点数量。

  • is_sorted (bool) – 边是否按列/目标节点排序(CSC格式)。(默认: False)

Return type:

LocalGraphStore

class Partitioner(data: Union[Data, HeteroData], num_parts: int, root: str, recursive: bool = False)[source]

对图及其特征进行分区,这些特征属于 DataHeteroData 对象。

分区数据输出将如下所示。

同构图:

root/
|-- META.json
|-- node_map.pt
|-- edge_map.pt
|-- part0/
    |-- graph.pt
    |-- node_feats.pt
    |-- edge_feats.pt
|-- part1/
    |-- graph.pt
    |-- node_feats.pt
    |-- edge_feats.pt

异构图:

root/
|-- META.json
|-- node_map/
    |-- ntype1.pt
    |-- ntype2.pt
|-- edge_map/
    |-- etype1.pt
    |-- etype2.pt
|-- part0/
    |-- graph.pt
    |-- node_feats.pt
    |-- edge_feats.pt
|-- part1/
    |-- graph.pt
    |-- node_feats.pt
    |-- edge_feats.pt
Parameters:
  • data (DataHeteroData) – 数据对象。

  • num_parts (int) – 分区数量。

  • recursive (bool, 可选) – 如果设置为 True,将使用多级递归二分法而不是多级k路分区。 (默认: False)

  • root (str) – 分区数据集应保存的根目录。

generate_partition()[source]

生成分区。

class DistNeighborSampler(current_ctx: DistContext, data: Tuple[LocalFeatureStore, LocalGraphStore], num_neighbors: Union[NumNeighbors, List[int], Dict[Tuple[str, str, str], List[int]]], channel: Optional[Queue] = None, replace: bool = False, subgraph_type: Union[SubgraphType, str] = 'directional', disjoint: bool = False, temporal_strategy: str = 'uniform', time_attr: Optional[str] = None, concurrency: int = 1, device: Optional[device] = None, **kwargs)[source]

分布式和异步邻居采样器的实现,由DistNeighborLoaderDistLinkNeighborLoader使用。

async node_sample(inputs: Union[NodeSamplerInput, DistEdgeHeteroSamplerInput]) Union[SamplerOutput, HeteroSamplerOutput][source]

NodeSamplerInputDistEdgeHeteroSamplerInput执行逐层分布式采样,并返回采样过程的输出。

注意

在分布式训练的情况下,需要在每层之后同步机器之间的结果。

Return type:

Union[SamplerOutput, HeteroSamplerOutput]

async edge_sample(inputs: EdgeSamplerInput, sample_fn: Callable, num_nodes: Union[int, Dict[str, int]], disjoint: bool, node_time: Optional[Union[Tensor, Dict[str, Tensor]]] = None, neg_sampling: Optional[NegativeSampling] = None) Union[SamplerOutput, HeteroSamplerOutput][source]

EdgeSamplerInput执行逐层分布式采样,并返回采样过程的输出。

注意

在分布式训练的情况下,需要在每层之后同步机器之间的结果。

Return type:

Union[SamplerOutput, HeteroSamplerOutput]

async sample_one_hop(srcs: Tensor, one_hop_num: int, seed_time: Optional[Tensor] = None, src_batch: Optional[Tensor] = None, edge_type: Optional[Tuple[str, str, str]] = None) SamplerOutput[source]

srcs中的一组种子节点进行一跳邻居采样。 如果种子节点位于本地分区,则在当前机器上评估采样函数。如果种子节点来自远程分区,则向包含该分区的远程机器发送请求。

Return type:

SamplerOutput

class DistLoader(current_ctx: DistContext, master_addr: Optional[str] = None, master_port: Optional[Union[int, str]] = None, channel: Optional[Queue] = None, num_rpc_threads: int = 16, rpc_timeout: int = 180, dist_sampler: Optional[DistNeighborSampler] = None, **kwargs)[source]

用于创建分布式数据加载例程的基类。

Parameters:
  • current_ctx (DistContext) – 当前进程的分布式上下文信息。

  • master_addr (str, optional) – 用于分布式加载器通信的RPC地址。 指的是主节点的IP地址。(默认值:None

  • master_port (intstr, 可选) – 用于与主节点进行RPC通信的开放端口。(默认值:None

  • channel (mp.Queue, optional) – 用于消息传递的通信通道。 (默认值: None)

  • num_rpc_threads (int, optional) – 由TensorPipeAgent使用的线程池中的线程数量,用于执行请求。(默认值:16

  • rpc_timeout (int, optional) – RPC请求的默认超时时间(以秒为单位)。 如果RPC在此时间范围内未完成,将引发异常。 调用者可以在必要时为单个RPC在rpc_sync()rpc_async()中覆盖此超时时间。 (默认值:180

class DistNeighborLoader(data: Tuple[LocalFeatureStore, LocalGraphStore], num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]], master_addr: str, master_port: Union[int, str], current_ctx: DistContext, input_nodes: Union[Tensor, None, str, Tuple[str, Optional[Tensor]]] = None, input_time: Optional[Tensor] = None, dist_sampler: Optional[DistNeighborSampler] = None, replace: bool = False, subgraph_type: Union[SubgraphType, str] = 'directional', disjoint: bool = False, temporal_strategy: str = 'uniform', time_attr: Optional[str] = None, transform: Optional[Callable] = None, concurrency: int = 1, num_rpc_threads: int = 16, filter_per_worker: Optional[bool] = False, async_sampling: bool = True, device: Optional[device] = None, **kwargs)[source]

一个分布式加载器,用于从节点执行采样。

Parameters:
  • data (tuple) – 一个 (FeatureStore, GraphStore) 数据对象。

  • num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]) – 每次迭代中为每个节点采样的邻居数量。 如果某个条目设置为 -1,则将包含所有邻居。 在异质图中,也可以接受一个字典,表示为每种边类型采样的邻居数量。

  • master_addr (str) – 用于分布式加载器通信的RPC地址, 主节点的IP地址。

  • master_port (Union[int, str]) – 用于与主节点进行RPC通信的开放端口。

  • current_ctx (DistContext) – 当前进程的分布式上下文信息。

  • 并发 (int, 可选) – 用于定义异步处理队列最大大小的RPC并发数。 (默认: 1)

所有其他参数遵循 torch_geometric.loader.NeighborLoader 的接口。

class DistLinkNeighborLoader(data: Tuple[LocalFeatureStore, LocalGraphStore], num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]], master_addr: str, master_port: Union[int, str], current_ctx: DistContext, edge_label_index: Union[Tensor, None, Tuple[str, str, str], Tuple[Tuple[str, str, str], Optional[Tensor]]] = None, edge_label: Optional[Tensor] = None, edge_label_time: Optional[Tensor] = None, dist_sampler: Optional[DistNeighborSampler] = None, replace: bool = False, subgraph_type: Union[SubgraphType, str] = 'directional', disjoint: bool = False, temporal_strategy: str = 'uniform', neg_sampling: Optional[NegativeSampling] = None, neg_sampling_ratio: Optional[Union[int, float]] = None, time_attr: Optional[str] = None, transform: Optional[Callable] = None, concurrency: int = 1, num_rpc_threads: int = 16, filter_per_worker: Optional[bool] = False, async_sampling: bool = True, device: Optional[device] = None, **kwargs)[source]

一个分布式加载器,用于从边进行采样。

Parameters:
  • data (tuple) – 一个 (FeatureStore, GraphStore) 数据对象。

  • num_neighbors (List[int] 或 Dict[Tuple[str, str, str], List[int]]) – 每次迭代中为每个节点采样的邻居数量。 如果某个条目设置为 -1,则将包含所有邻居。 在异质图中,也可以接受一个字典,表示要为每种边类型采样的邻居数量。

  • master_addr (str) – RPC address for distributed loader communication, i.e. the IP address of the master node.

  • master_port (Union[int, str]) – 用于与主节点进行RPC通信的开放端口。

  • current_ctx (DistContext) – Distributed context information of the current process.

  • 并发 (int, 可选) – 用于定义异步处理队列最大大小的RPC并发数。 (默认: 1)

所有其他参数遵循 torch_geometric.loader.LinkNeighborLoader 的接口。