torch_geometric.distributed
当前进程的上下文信息。 |
|
实现 |
|
实现 |
|
对 |
|
一个分布式和异步邻居采样器的实现,由 |
|
用于创建分布式数据加载例程的基类。 |
|
一个分布式加载器,用于从节点执行采样。 |
|
一个分布式加载器,用于从边进行采样。 |
- class DistContext(rank: int, global_rank: int, world_size: int, global_world_size: int, group_name: str, role: DistRole = DistRole.WORKER)[source]
当前进程的上下文信息。
- class LocalFeatureStore[source]
实现
FeatureStore接口,作为分布式训练的本地特征存储。- get_all_tensor_attrs() List[LocalTensorAttr][source]
返回所有已注册的张量属性。
- Return type:
List[LocalTensorAttr]
- lookup_features(index: Tensor, is_node_feat: bool = True, input_type: Optional[Union[str, Tuple[str, str, str]]] = None) Future[source]
查找本地/远程功能。
- Return type:
- classmethod from_data(node_id: Tensor, x: Optional[Tensor] = None, y: Optional[Tensor] = None, edge_id: Optional[Tensor] = None, edge_attr: Optional[Tensor] = None) LocalFeatureStore[source]
从同质的 PyG 张量创建本地特征存储。
- Parameters:
node_id (torch.Tensor) – 每个本地节点的全局标识符。
x (torch.Tensor, optional) – 节点特征。 (默认:
None)y (torch.Tensor, optional) – 节点标签。(默认值:
None)edge_id (torch.Tensor, optional) – 每个本地边的全局标识符。(默认值:
None)edge_attr (torch.Tensor, optional) – The edge features. (default:
None)
- Return type:
- classmethod from_hetero_data(node_id_dict: Dict[str, Tensor], x_dict: Optional[Dict[str, Tensor]] = None, y_dict: Optional[Dict[str, Tensor]] = None, edge_id_dict: Optional[Dict[Tuple[str, str, str], Tensor]] = None, edge_attr_dict: Optional[Dict[Tuple[str, str, str], Tensor]] = None) LocalFeatureStore[source]
从异构的 PyG 张量创建本地图存储。
- Parameters:
node_id_dict (Dict[NodeType, torch.Tensor]) – 每个节点类型的每个本地节点的全局标识符。
x_dict (Dict[NodeType, torch.Tensor], optional) – 每个节点类型的节点特征。(默认值:
None)y_dict (Dict[NodeType, torch.Tensor], optional) – 每个节点类型的节点标签。(默认值:
None)edge_id_dict (Dict[EdgeType, torch.Tensor], optional) – 每种边类型的每个本地边的全局标识符。 (默认:
None)edge_attr_dict (Dict[EdgeType, torch.Tensor], optional) – 每种边类型的边特征。(默认值:
None)
- Return type:
- class LocalGraphStore[source]
实现
GraphStore接口,作为分布式训练的本地图存储。- get_partition_ids_from_nids(ids: Tensor, node_type: Optional[str] = None) Tensor[source]
返回特定节点类型的节点ID的分区ID。
- Return type:
- get_partition_ids_from_eids(eids: Tensor, edge_type: Optional[Tuple[str, str, str]] = None)[source]
返回特定边类型的边ID的分区ID。
- classmethod from_data(edge_id: Tensor, edge_index: Tensor, num_nodes: int, is_sorted: bool = False) LocalGraphStore[source]
从同质或异质的PyG图中创建一个本地图存储。
- Parameters:
edge_id (torch.Tensor) – 每个本地边的全局标识符。
edge_index (torch.Tensor) – 本地边的索引。
num_nodes (int) – 本地图中的节点数量。
- Return type:
- classmethod from_hetero_data(edge_id_dict: Dict[Tuple[str, str, str], Tensor], edge_index_dict: Dict[Tuple[str, str, str], Tensor], num_nodes_dict: Dict[str, int], is_sorted: bool = False) LocalGraphStore[source]
从异质的PyG图中创建一个本地图存储。
- Parameters:
edge_id_dict (Dict[EdgeType, torch.Tensor]) – 每种边类型的每个本地边的全局标识符。
edge_index_dict (Dict[EdgeType, torch.Tensor]) – 每种边类型的本地边索引。
num_nodes_dict (
Dict[str,int]) – (Dict[str, int]): 每种节点类型的节点数量。
- Return type:
- class Partitioner(data: Union[Data, HeteroData], num_parts: int, root: str, recursive: bool = False)[source]
对图及其特征进行分区,这些特征属于
Data或HeteroData对象。分区数据输出将如下所示。
同构图:
root/ |-- META.json |-- node_map.pt |-- edge_map.pt |-- part0/ |-- graph.pt |-- node_feats.pt |-- edge_feats.pt |-- part1/ |-- graph.pt |-- node_feats.pt |-- edge_feats.pt异构图:
root/ |-- META.json |-- node_map/ |-- ntype1.pt |-- ntype2.pt |-- edge_map/ |-- etype1.pt |-- etype2.pt |-- part0/ |-- graph.pt |-- node_feats.pt |-- edge_feats.pt |-- part1/ |-- graph.pt |-- node_feats.pt |-- edge_feats.pt- Parameters:
- class DistNeighborSampler(current_ctx: DistContext, data: Tuple[LocalFeatureStore, LocalGraphStore], num_neighbors: Union[NumNeighbors, List[int], Dict[Tuple[str, str, str], List[int]]], channel: Optional[Queue] = None, replace: bool = False, subgraph_type: Union[SubgraphType, str] = 'directional', disjoint: bool = False, temporal_strategy: str = 'uniform', time_attr: Optional[str] = None, concurrency: int = 1, device: Optional[device] = None, **kwargs)[source]
分布式和异步邻居采样器的实现,由
DistNeighborLoader和DistLinkNeighborLoader使用。- async node_sample(inputs: Union[NodeSamplerInput, DistEdgeHeteroSamplerInput]) Union[SamplerOutput, HeteroSamplerOutput][source]
从
NodeSamplerInput或DistEdgeHeteroSamplerInput执行逐层分布式采样,并返回采样过程的输出。注意
在分布式训练的情况下,需要在每层之后同步机器之间的结果。
- Return type:
- async edge_sample(inputs: EdgeSamplerInput, sample_fn: Callable, num_nodes: Union[int, Dict[str, int]], disjoint: bool, node_time: Optional[Union[Tensor, Dict[str, Tensor]]] = None, neg_sampling: Optional[NegativeSampling] = None) Union[SamplerOutput, HeteroSamplerOutput][source]
从
EdgeSamplerInput执行逐层分布式采样,并返回采样过程的输出。注意
在分布式训练的情况下,需要在每层之后同步机器之间的结果。
- Return type:
- class DistLoader(current_ctx: DistContext, master_addr: Optional[str] = None, master_port: Optional[Union[int, str]] = None, channel: Optional[Queue] = None, num_rpc_threads: int = 16, rpc_timeout: int = 180, dist_sampler: Optional[DistNeighborSampler] = None, **kwargs)[source]
用于创建分布式数据加载例程的基类。
- Parameters:
current_ctx (DistContext) – 当前进程的分布式上下文信息。
master_addr (str, optional) – 用于分布式加载器通信的RPC地址。 指的是主节点的IP地址。(默认值:
None)channel (mp.Queue, optional) – 用于消息传递的通信通道。 (默认值:
None)num_rpc_threads (int, optional) – 由
TensorPipeAgent使用的线程池中的线程数量,用于执行请求。(默认值:16)rpc_timeout (int, optional) – RPC请求的默认超时时间(以秒为单位)。 如果RPC在此时间范围内未完成,将引发异常。 调用者可以在必要时为单个RPC在
rpc_sync()和rpc_async()中覆盖此超时时间。 (默认值:180)
- class DistNeighborLoader(data: Tuple[LocalFeatureStore, LocalGraphStore], num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]], master_addr: str, master_port: Union[int, str], current_ctx: DistContext, input_nodes: Union[Tensor, None, str, Tuple[str, Optional[Tensor]]] = None, input_time: Optional[Tensor] = None, dist_sampler: Optional[DistNeighborSampler] = None, replace: bool = False, subgraph_type: Union[SubgraphType, str] = 'directional', disjoint: bool = False, temporal_strategy: str = 'uniform', time_attr: Optional[str] = None, transform: Optional[Callable] = None, concurrency: int = 1, num_rpc_threads: int = 16, filter_per_worker: Optional[bool] = False, async_sampling: bool = True, device: Optional[device] = None, **kwargs)[source]
一个分布式加载器,用于从节点执行采样。
- Parameters:
data (tuple) – 一个 (
FeatureStore,GraphStore) 数据对象。num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]) – 每次迭代中为每个节点采样的邻居数量。 如果某个条目设置为
-1,则将包含所有邻居。 在异质图中,也可以接受一个字典,表示为每种边类型采样的邻居数量。master_addr (str) – 用于分布式加载器通信的RPC地址, 即主节点的IP地址。
current_ctx (DistContext) – 当前进程的分布式上下文信息。
并发 (int, 可选) – 用于定义异步处理队列最大大小的RPC并发数。 (默认:
1)
所有其他参数遵循
torch_geometric.loader.NeighborLoader的接口。
- class DistLinkNeighborLoader(data: Tuple[LocalFeatureStore, LocalGraphStore], num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]], master_addr: str, master_port: Union[int, str], current_ctx: DistContext, edge_label_index: Union[Tensor, None, Tuple[str, str, str], Tuple[Tuple[str, str, str], Optional[Tensor]]] = None, edge_label: Optional[Tensor] = None, edge_label_time: Optional[Tensor] = None, dist_sampler: Optional[DistNeighborSampler] = None, replace: bool = False, subgraph_type: Union[SubgraphType, str] = 'directional', disjoint: bool = False, temporal_strategy: str = 'uniform', neg_sampling: Optional[NegativeSampling] = None, neg_sampling_ratio: Optional[Union[int, float]] = None, time_attr: Optional[str] = None, transform: Optional[Callable] = None, concurrency: int = 1, num_rpc_threads: int = 16, filter_per_worker: Optional[bool] = False, async_sampling: bool = True, device: Optional[device] = None, **kwargs)[source]
一个分布式加载器,用于从边进行采样。
- Parameters:
data (tuple) – 一个 (
FeatureStore,GraphStore) 数据对象。num_neighbors (List[int] 或 Dict[Tuple[str, str, str], List[int]]) – 每次迭代中为每个节点采样的邻居数量。 如果某个条目设置为
-1,则将包含所有邻居。 在异质图中,也可以接受一个字典,表示要为每种边类型采样的邻居数量。master_addr (str) – RPC address for distributed loader communication, i.e. the IP address of the master node.
current_ctx (DistContext) – Distributed context information of the current process.
并发 (int, 可选) – 用于定义异步处理队列最大大小的RPC并发数。 (默认:
1)
所有其他参数遵循
torch_geometric.loader.LinkNeighborLoader的接口。