规划器¶
TorchRec 规划器负责确定分布式训练和推理中最具性能、最平衡的分片计划。
生成分片计划的主要API是EmbeddingShardingPlanner.plan
- class torchrec.distributed.types.ShardingPlan(plan: Dict[str, ModuleShardingPlan])¶
分片计划的表示。这使用了较大包装模型的FQN(即使用DistributedModelParallel包装的模型) 当需要TorchRec可组合性时,应使用EmbeddingModuleShardingPlan。
- plan¶
按模块路径键控的字典 按参数名称键控的参数分片规范字典。
- Type:
Dict[str, EmbeddingModuleShardingPlan]
- get_plan_for_module(module_path: str) Optional[ModuleShardingPlan]¶
- Parameters:
module_path (str) –
- Returns:
按参数名称键控的参数分片规范字典。如果给定 module_path 不存在分片规范,则为 None。
- Return type:
可选[模块分片计划]
- class torchrec.distributed.planner.planners.EmbeddingShardingPlanner(topology: Optional[Topology] = None, batch_size: Optional[int] = None, enumerator: Optional[Enumerator] = None, storage_reservation: Optional[StorageReservation] = None, proposer: Optional[Union[Proposer, List[Proposer]]] = None, partitioner: Optional[Partitioner] = None, performance_model: Optional[PerfModel] = None, stats: Optional[Union[Stats, List[Stats]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True, callbacks: Optional[List[Callable[[List[ShardingOption]], List[ShardingOption]]]] = None)¶
根据提供的分片器、拓扑结构和约束条件,为具有可分片参数的给定模块提供优化的分片计划。
- Parameters:
topology (可选[Topology]) – 当前进程组的拓扑结构。
batch_size (可选[int]) – 模型的批量大小。
enumerator (可选[Enumerator]) – 要使用的枚举器
storage_reservation (可选[StorageReservation]) – 使用的存储预留
proposer (可选[Union[Proposer, List[Proposer]]]) – 要使用的提议者
partitioner (可选[Partitioner]) – 要使用的分区器
performance_model (可选[PerfModel]) – 要使用的性能模型
stats (可选[联合[Stats, 列表[Stats]]]) – 要使用的统计信息
constraints (Optional[Dict[str, ParameterConstraints]]) – 分片时每个表的约束条件。
debug (bool) – 是否打印调试信息。
示例:
ebc = EmbeddingBagCollection(tables=eb_configs, device=torch.device("meta")) planner = EmbeddingShardingPlanner() plan = planner.plan( module=ebc, sharders=[EmbeddingBagCollectionSharder()], )
- collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan¶
在排名0上调用self.plan(…)并进行广播
- Parameters:
模块 (nn.Module) – 要分片的模块。
sharders (可选[列表[ModuleSharder[nn.Module]]]) – 用于分片的分片器
pg (可选[dist.ProcessGroup]) – 用于集体操作的进程组
- Returns:
模块的分片计划。
- Return type:
- plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan¶
根据提供的分片器、拓扑结构和约束条件,为具有可分片参数的给定模块提供优化的分片计划。
- Parameters:
模块 (nn.Module) – 要分片的模块。
sharders (List[ModuleSharder[nn.Module]]) – 用于分片的sharders。
- Returns:
模块的分片计划。
- Return type:
- class torchrec.distributed.planner.enumerators.EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None, use_exact_enumerate_order: Optional[bool] = False)¶
为给定的nn.Module生成嵌入分片选项,考虑用户提供的约束。
- Parameters:
拓扑 (拓扑) – 设备拓扑。
batch_size (int) – 批量大小。
约束 (可选[字典[字符串, 参数约束]]) – 参数名称到提供的参数约束的字典。
estimator (可选[Union[ShardEstimator, List[ShardEstimator]]]) – 分片性能估计器。
use_exact_enumerate_order (bool) – 是否按照name_children枚举的确切顺序枚举可分割参数
- enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption]¶
根据模块和分片器生成相关的分片选项。
- Parameters:
模块 (nn.Module) – 需要分片的模块。
sharders (List[ModuleSharder[nn.Module]]) – 为模块提供的sharders。
- Returns:
填充了值的有效分片选项。
- Return type:
列表[分片选项]
- populate_estimates(sharding_options: List[ShardingOption]) None¶
参见类描述。
- class torchrec.distributed.planner.partitioners.GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)¶
贪婪分区器。
- Parameters:
sort_by (SortBy) – 按存储或性能对分片选项进行降序排序(即大表将首先放置)。
balance_modules (bool) – 是否首先按模块排序,其中较小的模块将首先排序。实际上,这将使每个模块中的表以平衡的方式排列。
- partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption]¶
根据每个分片选项的partition_by属性,在拓扑上放置分片选项。 在放置结束时,拓扑、存储和性能会被更新。
- Parameters:
提案 (列表[分片选项]) – 填充的分片选项列表。
storage_constraint (Topology) – 设备拓扑结构。
- Returns:
所选计划的分片选项列表。
- Return type:
列表[分片选项]
示例:
sharding_options = [ ShardingOption(partition_by="uniform", shards=[ Shards(storage=1, perf=1), Shards(storage=1, perf=1), ]), ShardingOption(partition_by="uniform", shards=[ Shards(storage=2, perf=2), Shards(storage=2, perf=2), ]), ShardingOption(partition_by="device", shards=[ Shards(storage=3, perf=3), Shards(storage=3, perf=3), ]) ShardingOption(partition_by="device", shards=[ Shards(storage=4, perf=4), Shards(storage=4, perf=4), ]), ] topology = Topology(world_size=2) # First [sharding_options[0] and sharding_options[1]] will be placed on the # topology with the uniform strategy, resulting in topology.devices[0].perf.total = (1,2) topology.devices[1].perf.total = (1,2) # Finally sharding_options[2] and sharding_options[3]] will be placed on the # topology with the device strategy (see docstring of `partition_by_device` for # more details). topology.devices[0].perf.total = (1,2) + (3,4) topology.devices[1].perf.total = (1,2) + (3,4) # The topology updates are done after the end of all the placements (the other # in the example is just for clarity).
- class torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation(percentage: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: Optional[int] = None)¶
为模型预留存储空间,以便通过启发式计算进行分片。存储预留包括密集张量存储、KJT存储以及总存储的额外百分比。
- Parameters:
percentage (float) – 额外存储百分比,用于保留作为启发式存储计算之外的误差余量。
parameter_multiplier (float) – 总参数存储的启发式乘数。
dense_tensor_estimate (Optional[int]) – 密集张量的存储估计,如果未提供,则使用默认的启发式估计。
- class torchrec.distributed.planner.proposers.GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)¶
以贪婪的方式提出分片计划。
按性能对每个可分片参数的分片选项进行排序。 在每次迭代中,找到当前存储使用量最大的参数,并尝试其下一个分片选项。
- Parameters:
use_depth (bool) – 当启用时,fqn的分片选项会根据max(shard.perf.total)进行排序,否则分片选项会根据sum(shard.perf.total)进行排序。
threshold (可选[int]) – 提前停止的阈值。当指定时,提议者在提案的连续perf_rating比best_perf_rating差时停止提议。
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None¶
向提案者提供反馈。
- Parameters:
partitionable (bool) – 计划是否可分区。
plan (可选[列表[ShardingOption]]) – 提供反馈的计划。
perf_rating (可选[float]) – 计划的性能评分。
storage_constraint (可选[Topology]) – 计划的存储约束。
- load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None¶
将搜索空间加载到提议器中。
- Parameters:
search_space (List[ShardingOption]) – 要加载的搜索空间。
enumerator (Enumerator) – 用于生成搜索空间的枚举器。
- propose() Optional[List[ShardingOption]]¶
提出一个分片计划。
- Returns:
提议的计划。
- Return type:
可选[List[ShardingOption]]
- class torchrec.distributed.planner.shard_estimators.EmbeddingPerfEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, is_inference: bool = False)¶
嵌入墙时间性能估计器。此估计器估计给定分片选项的墙时间。
- Parameters:
拓扑 (拓扑) – 设备拓扑。
约束条件 (可选[字典[字符串, 参数约束]]) – 参数约束。
is_inference (bool) – 估计器是否用于推理。
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None¶
估计给定分片选项的墙时间。
- Parameters:
sharding_options (List[ShardingOption]) – 分片选项列表。
sharder_map (可选[字典[字符串, 模块分片器[神经网络模块]]]) – 分片器映射。
- classmethod perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device: str, sharding_type: str, batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], input_data_type_size: float, table_data_type_size: float, output_data_type_size: float, fwd_a2a_comm_data_type_size: float, bwd_a2a_comm_data_type_size: float, fwd_sr_comm_data_type_size: float, bwd_sr_comm_data_type_size: float, num_poolings: List[float], hbm_mem_bw: float, ddr_mem_bw: float, hbm_to_ddr_mem_bw: float, intra_host_bw: float, inter_host_bw: float, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: Optional[float] = None, is_inference: bool = False, prefetch_pipeline: bool = False, expected_cache_fetches: float = 0, uneven_sharding_perf_multiplier: float = 1.0) List[Perf]¶
尝试将性能建模为相对墙时间的函数。
- Parameters:
shard_sizes (List[List[int]]) – 每个分片的(local_rows, local_cols)列表。
compute_kernel (str) – 计算内核。
compute_device (str) – 计算设备。
sharding_type (str) – tw, rw, cw, twrw, dp.
batch_sizes (List[int]) – 每个输入特征的批量大小。
world_size (int) – 所有主机的设备数量。
local_world_size (int) – 每个主机的设备数量。
input_lengths (List[float]) – 每个输入查询特征的平均查找次数的列表。
input_data_type_size (float) – 分布式数据并行输入的数据类型大小。
table_data_type_size (float) – 表的数据类型大小。
output_data_type_size (float) – 输出嵌入的数据类型大小。
fwd_comm_data_type_size (float) – 前向通信期间分布式数据并行输入的数据类型大小。
bwd_comm_data_type_size (float) – 反向通信期间分布式数据并行输入的数据类型大小。
num_poolings (List[float]) – 每个样本的池化次数,通常为1.0。
hbm_mem_bw (float) – 设备HBM的带宽。
ddr_mem_bw (float) – 系统DDR内存的带宽。
hbm_to_ddr_bw (float) – 设备HBM与系统DDR之间的带宽。
intra_host_bw (float) – 单个主机内的带宽,例如多个线程。
inter_host_bw (float) – 两台主机之间的带宽,例如多台机器。
is_pooled (bool) – 如果嵌入输出是池化的(即 EmbeddingBag),则为 True,如果是非池化/顺序的(即 Embedding),则为 False。
is_weighted (bool = False) – 如果模块是EBC并且是加权的,通常表示一个id分数列表特征。
is_inference (bool = False) – 如果用于推理规划。
caching_ratio (可选[float] = None) – 用于确定设备带宽的缓存比率。
prefetch_pipeline (bool = False) – 是否启用预取管道。
expected_cache_fetches (float) – 全局批次中预期的缓存获取次数
uneven_sharding_perf_multiplier (float = 1.0) – 用于考虑不均匀分片性能的乘数
- Returns:
每个分片的性能列表。
- Return type:
列表[浮点数]
- class torchrec.distributed.planner.shard_estimators.EmbeddingStorageEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, pipeline_type: PipelineType = PipelineType.NONE, run_embedding_at_peak_memory: bool = False, is_inference: bool = False)¶
嵌入存储使用量估算器
- Parameters:
拓扑 (拓扑) – 设备拓扑。
约束条件 (可选[字典[字符串, 参数约束]]) – 参数约束。
pipeline_type (PipelineType) – 管道的类型,如果有的话。将在内存估计期间确定输入复制因子。
run_embedding_at_peak_memory (bool) –
如果嵌入的前向/后向将在HBM使用达到峰值时执行。当设置为TRUE时,任何在嵌入前向/后向期间的临时内存分配,只要在output_dist之前的输出大小将被计入HBM存储成本。否则它们不会被计入,因为它们会被实际的内存峰值“隐藏”。
仅在pipeline_type设置为向后兼容时生效(不影响使用旧的无管道公式的模型)
默认设置为false,因为对于RecSys来说,这通常是false,因为内存峰值发生在密集前向的结束/密集后向的开始。
is_inference (bool) – 如果模型是推理模型。默认为 False。
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None¶
估算每个分片选项的存储成本。
- Parameters:
sharding_options (List[ShardingOption]) – 分片选项列表。
sharder_map (可选[字典[字符串, 模块分片器[nn.模块]]]) – 从模块类型到分片器的映射。