API

horovod.tensorflow

class horovod.tensorflow.Compression[源代码]

可选梯度压缩算法,用于allreduce过程中。

none

将所有浮点梯度压缩至16位。

horovod.tensorflow.compression.NoneCompressor 的别名

fp16

horovod.tensorflow.compression.FP16Compressor 的别名

horovod.tensorflow.allgather_object(obj, session=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]

从所有其他进程中序列化并聚合一个对象。

Parameters
  • obj – 一个能够在不丢失任何上下文的情况下被序列化的对象。

  • session – 用于 TensorFlow v1 兼容性的会话。

  • name – 用于allgather操作时的可选名称,默认将使用类类型。

  • process_set – 进程集对象,用于将此操作限制在Horovod进程的子集内。默认为全局进程集。

Returns

在所有层级上全部收集的对象列表。

horovod.tensorflow.broadcast_object(obj, root_rank=0, session=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[source]

将对象从根等级序列化并广播到进程组中的所有其他进程(默认为所有Horovod进程)。

Parameters
  • obj – 一个能够在不丢失任何上下文的情况下被序列化的对象。

  • root_rank – 从此进程的排名,参数将被广播到所有其他进程。

  • session – 用于 TensorFlow v1 兼容性的会话。

  • name – 广播期间使用的可选名称,默认值为类类型。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

root_rank广播的对象。

horovod.tensorflow.broadcast_variables(variables, root_rank, process_set=<horovod.common.process_sets.ProcessSet object>, inplace=False)[source]

将变量从根等级广播到进程组中的所有其他进程(默认为所有Horovod进程)。

可选地,广播可以原地执行,这避免了临时内存分配和碎片化。这仅在 TensorFlow 2.6 或更高版本中受支持。引用变量(TF 2 中的传统支持)必须全部为相同的数据类型。对于资源变量(TF 2 中的默认设置)没有此类限制。

Parameters
  • variables – 用于广播的变量

  • root_rank – 将全局变量广播到所有其他进程的进程的rank。

  • process_set – 进程集对象,用于将此操作限制在Horovod进程的子集内。默认为全局进程集。

  • inplace – 是否执行原地广播操作

horovod.tensorflow.allgather(tensor, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个操作,它将输入张量与所有其他Horovod进程上的相同输入张量进行连接。

拼接操作在第一个维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一个维度可以不同之外。

Returns

一个与tensor类型相同的张量,在所有进程的零维上进行拼接。其形状与输入形状相同,除了第一维度可能更大,并且是不同Horovod进程中所有张量第一维度的总和。

horovod.tensorflow.grouped_allgather(tensors, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个操作,它将输入张量与所有其他Horovod进程上的相同输入张量进行连接。

拼接操作在第一个维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一个维度允许不同之外。

Returns

一个与tensor具有相同秩和类型的张量列表,在所有进程的零维上进行拼接。对于每个返回的张量,其形状与输入形状相同,除了第一维度可能更大,并且是不同Horovod进程中所有张量第一维度的总和。

horovod.tensorflow.broadcast(tensor, root_rank, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[source]

一个操作,它将根节点上的输入张量广播到所有其他Horovod进程上的相同输入张量。

广播操作通过操作名称进行标识。对于给定名称,张量类型和形状在所有Horovod进程中必须相同。只有当所有进程都准备好发送和接收张量时,广播才会开始。

Returns

一个与tensor形状和类型相同的张量,其值从根等级广播而来。

horovod.tensorflow.broadcast_(variables, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个操作,它将输入变量从根等级广播到所有其他Horovod进程中的相同输入变量。该操作是原地执行的。

广播操作由操作名称与变量名称组合而成。对于任何给定名称,变量类型和形状在所有Horovod进程中必须相同。在所有进程都准备好发送和接收所有变量之前,广播不会开始。在每个进程中,所有变量都需要位于同一设备(CPU或GPU)上。

注意:这仅在 TensorFlow 2.6 或更高版本中受支持。

Returns

从根等级广播的更新变量的张量值。

horovod.tensorflow.alltoall(tensor, splits=None, name=None, ignore_name_scope=False, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个操作,将输入张量的切片分散到所有其他Horovod进程,并返回从所有其他Horovod进程收集的切片张量。

切片操作在第一维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一维度允许不同之外。

Parameters
  • tensor – 一个用于通过alltoall进行分发的张量。

  • splits – 一个按秩排序的整数张量,描述将tensor中的多少个元素发送给每个工作进程。分割操作沿着tensor的第一个维度进行。如果未提供splits,则第一个维度将按Horovod进程数量进行均等分割。

  • name – alltoall操作的名称。

  • ignore_name_scope – 如果为True,则忽略TensorFlow在Horovod操作使用的名称中应用的任何外部名称作用域。

Returns

  1. 一个与tensor类型相同的张量,在所有进程的零维度上进行拼接。其形状与输入形状相同,除了第一维度可能更大,并且是从不同Horovod进程收集的张量切片的所有第一维度之和。

  2. 如果提供了splits:一个按等级排序的整数张量,描述了输出张量中有多少元素是从每个工作节点接收的。

horovod.tensorflow.shutdown()

一个关闭 Horovod 的函数。

horovod.tensorflow.is_initialized()

如果 Horovod 已初始化,则返回 True

horovod.tensorflow.start_timeline(file_path, mark_cycles=False)

file_path 处创建时间线文件并开始记录。

Parameters
  • file_path – 时间线文件的字符串路径。

  • mark_cycles – 布尔值,指示是否应在时间线上标记循环(默认值:False)。

Raises a ValueError if Horovod is not initialized.

horovod.tensorflow.stop_timeline()

停止活动时间线记录并关闭文件。

Raises a ValueError if Horovod is not initialized.

horovod.tensorflow.size()

一个返回Horovod进程数量的函数。

Returns

一个整数标量,包含Horovod进程的数量。

horovod.tensorflow.local_size()

一个返回当前进程所在节点内Horovod进程数量的函数。

Returns

一个整数标量,包含本地 Horovod 进程的数量。

horovod.tensorflow.cross_size()

一个返回当前Horovod进程本地等级节点数量的函数。例如,如果作业中有2个节点:一个运行2个进程,另一个运行1个进程,那么每个节点上的第一个进程将具有交叉大小2,而第一个节点上的第二个进程将具有交叉大小1。

Returns

一个整数标量,包含跨 Horovod 进程的数量。

horovod.tensorflow.rank()

一个返回调用进程的Horovod等级的函数。

Returns

一个整数标量,表示调用进程的Horovod等级。

horovod.tensorflow.local_rank()

一个返回调用进程在其运行节点内的本地Horovod等级的函数。例如,如果一个节点上有七个进程在运行,它们的本地等级将是从零到六(包含两端)。

Returns

一个整数标量,表示调用进程的本地 Horovod 等级。

horovod.tensorflow.cross_rank()

一个返回调用进程在作业中跨节点间的跨Horovod等级的函数。进程的跨等级对应其运行所在节点的等级。例如,如果作业中有7个节点,跨等级将是从零到六(包含)。

Returns

一个整数标量,表示调用进程的跨 Horovod 等级。

horovod.tensorflow.is_homogeneous()

如果集群是同构的,则返回 True。

Returns

一个布尔值,表示集群中的每个节点是否拥有相同数量的秩。

horovod.tensorflow.rank_op(name=None)[源代码]

一个返回调用进程的Horovod等级的运算操作。

此操作确定在图执行时的返回值,而非在图构建时,因此允许图在与其执行环境不同的环境中构建。

Returns

一个整数标量,表示调用进程的Horovod等级。

horovod.tensorflow.local_rank_op(name=None)[源代码]

一个操作,返回调用进程在其运行节点内的本地Horovod等级。例如,如果一个节点上有七个进程在运行,它们的本地等级将是从零到六(包含)。

此操作确定图执行时的返回值,而非图构建时的返回值,因此允许图在一个与执行环境不同的环境中构建。

Returns

一个整数标量,表示调用进程的本地 Horovod 等级。

horovod.tensorflow.size_op(process_set_id=0, name=None)[source]

一个返回Horovod进程数量的操作。

此操作确定在图执行时的返回值,而非在图构建时,因此允许图在一个与其执行环境不同的环境中构建。

Returns

一个整数标量,包含Horovod进程的数量。

horovod.tensorflow.local_size_op(name=None)[源代码]

一个操作,返回当前进程所在节点内的Horovod进程数量。

此操作确定图执行时的返回值,而非图构建时的返回值,因此允许图在一个与执行环境不同的环境中构建。

Returns

一个整数标量,包含本地 Horovod 进程的数量。

horovod.tensorflow.process_set_included_op(process_set_id=0, name=None)[源代码]

An op that 0 or 1 depending on whether the current process is included in the specified process set or an error code: HOROVOD_PROCESS_SET_ERROR_INIT if Horovod is not initialized, HOROVOD_PROCESS_SET_ERROR_UNKNOWN_SET if the process set is unknown.

此操作确定图执行时的返回值,而非图构建时的返回值,因此允许图在与其执行环境不同的环境中构建。

Returns

一个值为0、1或错误代码的整数标量。

horovod.tensorflow.mpi_threads_supported()

一个返回标志的函数,指示是否支持MPI多线程。

如果支持MPI多线程,用户可以混合使用Horovod与其他MPI库,例如mpi4py

Returns

一个布尔值,表示是否支持MPI多线程。

horovod.tensorflow.mpi_enabled()

如果当前在运行时启用了MPI模式,则返回True。

如果启用了MPI,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了MPI。

horovod.tensorflow.mpi_built()

如果 Horovod 编译时支持 MPI,则返回 True。

Returns

一个布尔值,表示是否编译了MPI支持。

horovod.tensorflow.gloo_enabled()

如果运行时启用了Gloo模式,则返回True。

如果启用了Gloo,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了Gloo。

horovod.tensorflow.gloo_built()

如果 Horovod 编译时支持 Gloo,则返回 True。

Returns

一个布尔值,表示是否编译了Gloo支持。

horovod.tensorflow.nccl_built()

用于检查Horovod是否使用NCCL支持编译的函数。

Returns

一个整数值,表示是否编译了NCCL支持。 如果编译了NCCL支持,则返回NCCL_VERSION_CODE。否则, 返回0。

horovod.tensorflow.ddl_built()

如果 Horovod 编译时支持 DDL,则返回 True。

Returns

一个布尔值,指示是否编译了DDL支持。

horovod.tensorflow.ccl_built()

如果 Horovod 编译时支持 oneCCL,则返回 True。

Returns

一个布尔值,表示是否编译了oneCCL支持。

horovod.tensorflow.cuda_built()

如果 Horovod 编译时支持 CUDA,则返回 True。

Returns

一个布尔值,指示是否编译了CUDA支持。

horovod.tensorflow.rocm_built()

如果 Horovod 编译时支持 ROCm,则返回 True。

Returns

一个布尔值,表示是否编译了ROCm支持。

class horovod.tensorflow.ProcessSet(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[源代码]

一组将共同运行集体操作的Horovod进程的表示

使用进程等级列表或MPI通信器初始化一个进程集。然后将此实例传递给hvd.init()或hvd.add_process_set()。如果已初始化有效的进程集,process_set_id将被设置为一个数值。

size()Optional[int][源代码]

返回进程集的大小,如果未初始化则返回 None。

rank()Optional[int][源代码]

返回相对于此进程集的排名,如果未初始化则返回None。

这很有用,例如,用于处理 hvd.allgather() 的结果。

请注意,即使使用进程集,Horovod操作(如hvd.broadcast())也不是通过这个相对等级参数化的,而是通过从hvd.rank()获取的全局等级参数化的。

included()Optional[bool][源代码]

返回当前进程是否属于此进程集,如果未初始化则返回None。

horovod.tensorflow.add_process_set(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]])horovod.common.process_sets.ProcessSet[源代码]

在Horovod初始化后添加一个新的process_set并返回它。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。不能存在已包含相同等级的进程集。 返回的进程集将被完全初始化。

horovod.tensorflow.remove_process_set(process_set: horovod.common.process_sets.ProcessSet)bool[源代码]

尝试移除进程集并返回此尝试是否成功。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。如果移除成功,我们将使进程集对象失效。

horovod.tensorflow.join()[源代码]

一个操作,表示该智能体已完成数据处理。

所有未调用join()的等级继续处理allreduce操作。在所有等级都加入之前,此操作不会完成。

Returns

一个整数标量,包含最后加入的等级。

class horovod.tensorflow.LocalGradientAggregationHelper(backward_passes_per_step, allreduce_func, sparse_as_dense, average_aggregated_gradients, rank, optimizer_type, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True, name='')[源代码]

LocalGradientAggregationHelper 在本地聚合梯度更新, 并且仅在每 backward_passes_per_step 时跨机器通信更新一次。 仅支持图模式执行。

register_local_var(var)[源代码]

将一个源/变量注册为工作器本地。Horovod不会对这些源对应的梯度执行任何全局操作,而是返回本地梯度。

compute_gradients(grads, vars)[源代码]

将新的梯度更新应用于本地聚合的梯度,并在每调用backward_passes_per_step次数时执行跨机器通信。

apply_gradients(apply_grads_closure, optimizer, *args, **kwargs)[源代码]

每 backward_passes_per_step 应用更新,这与我们通信本地聚合梯度的批次对齐。

horovod.tensorflow.allreduce(tensor, average=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, name=None, process_set=<horovod.common.process_sets.ProcessSet object>, ignore_name_scope=False)[源代码]

对 tf.Tensor 或 tf.IndexedSlices 执行 allreduce 操作。

此函数对输入张量执行带宽最优环形全归约操作。如果输入是tf.IndexedSlices,该函数将对值和索引执行全收集操作,实质上对表示的张量进行全归约。

Parameters
  • tensor – 要规约的 tf.Tensor、tf.Variable 或 tf.IndexedSlices。 所有进程上的输入形状必须完全相同。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0中移除。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 进程集对象,用于将此操作限制在Horovod进程的子集内。默认为全局进程集。

  • name – allreduce操作的名称

  • ignore_name_scope – 如果为True,则忽略TensorFlow在Horovod操作使用的名称中应用的任何外部名称作用域。

Returns

一个与tensor具有相同形状和类型的张量,在所有进程中求和。

horovod.tensorflow.reducescatter(tensor, device_dense='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, name=None, process_set=<horovod.common.process_sets.ProcessSet object>, ignore_name_scope=False, prescale_factor=1.0, postscale_factor=1.0)[源代码]

在 tf.Tensor 上执行 reducescatter 操作。

此函数对输入张量执行带宽最优的归约和分散操作。

Parameters
  • tensor – 需要归约的 tf.Tensor 或 tf.Variable。 所有进程的输入形状必须完全相同。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_REDUCESCATTER构建的,则默认使用GPU。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • name – reduce_scatter操作的名称

  • ignore_name_scope – 如果为True,则忽略由TensorFlow在Horovod操作使用的名称中应用的任何外部名称作用域。

  • prescale_factor – 在reduce-scatter操作前对张量进行缩放的乘法因子。

  • postscale_factor – 在reducescatter操作后对张量进行缩放的乘法因子。

Returns

一个与tensor具有相同秩和类型的张量,在所有进程中进行求和。 形状与输入形状相同,除了第一维度, 该维度将在不同的Horovod进程之间进行划分。

horovod.tensorflow.grouped_allreduce(tensors, average=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, ignore_name_scope=False, name=None)[源代码]

对一系列 tf.Tensor 或 tf.IndexedSlices 执行分组式 allreduce 操作。

Parameters
  • tensors – 要规约的tf.Tensor、tf.Variable或tf.IndexedSlices序列。 对于在tensors中共享位置的张量,其张量类型和形状在所有Horovod进程中必须保持一致。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前用于缩放张量的乘法因子。

  • postscale_factor – 全规约操作后张量的缩放乘数因子。

  • process_set – 进程集对象,用于将此操作限制在Horovod进程的子集内。默认为全局进程集。

  • name – reduce_scatter操作的名称

  • ignore_name_scope – 如果为True,则忽略TensorFlow在Horovod操作使用的名称中应用的任何外部名称作用域。

Returns

一个与tensors中张量形状和类型相同的张量列表,在所有进程间进行了归约。

horovod.tensorflow.grouped_reducescatter(tensors, device_dense='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

对一系列 tf.Tensor 执行分组式 reducescatter 操作。

Parameters
  • tensors – 要规约的tf.Tensor或tf.Variable序列。 对于在tensors中共享位置的输入,所有Horovod进程上的形状必须相同。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。如果未提供则默认为平均值。

  • process_set – 进程集对象,用于将此操作限制在Horovod进程的子集内。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前用于缩放张量的乘法因子。

  • postscale_factor – 在reducescatter操作后用于缩放张量的乘法因子。

Returns

一个与tensors中张量具有相同秩和类型的张量列表, 在所有进程间进行归约。对于每个返回的张量,其形状与对应的输入形状相同, 除了第一个维度,该维度将在不同的Horovod进程间进行划分。

horovod.tensorflow.broadcast_global_variables(root_rank)[源代码]

将根进程的所有全局变量广播到所有其他进程。

注意:在 TensorFlow 2.0 中已弃用。

Parameters

root_rank – 用于将全局变量广播到所有其他进程的进程的等级。

class horovod.tensorflow.BroadcastGlobalVariablesHook(*args, **kw)[源代码]

在初始化期间,将从根等级广播所有全局变量到所有其他进程的SessionRunHook。

这是必要的,以确保在训练开始时使用随机权重或从检查点恢复时,所有工作节点都能进行一致的初始化。

注意:在 TensorFlow 2.0 中已弃用。

horovod.tensorflow.DistributedOptimizer(optimizer, name=None, use_locking=False, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, gradient_predivide_factor=1.0, average_aggregated_gradients=False, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True)[源代码]

构建一个新的分布式优化器,它在底层使用另一个优化器来计算单进程梯度值,并在所有Horovod节点间的梯度值合并后应用梯度更新。

Parameters
  • optimizer – 用于计算梯度和应用更新的优化器。

  • name – 应用梯度时为创建的操作设置的可选名称前缀。默认为"Distributed"后跟所提供的优化器类型。

  • use_locking – 更新变量时是否使用锁定。 更多信息请参见 Optimizer.__init__。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 在所有reduce操作期间使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • sparse_as_dense – 将所有稀疏梯度视为密集张量。如果原始稀疏梯度具有高密度,这有助于提高性能和内存利用率。默认为 false。

  • backward_passes_per_step – 在调用hvd.allreduce之前要执行的反向传播次数。 这允许在归约和应用更新之前,通过多个小批次累积更新。

  • op – 在不同等级间组合梯度时使用的归约操作。

  • gradient_predivide_factor – 如果操作符为平均值,gradient_predivide_factor 会在求和前后拆分平均值计算。 梯度在求和前按 1.0 / gradient_predivide_factor 进行缩放, 在求和后按 gradient_predivide_factor / 规模 进行缩放。

  • average_aggregated_gradients – 是否对在多个小批次中累积的聚合梯度进行平均。如果为true,则将梯度更新除以backward_passes_per_step。仅适用于backward_passes_per_step > 1的情况。

  • num_groups – 为显式分组分配梯度全归约操作的分组数量。默认为无显式分组。

  • groups – 用于分组梯度allreduce操作的参数。可接受的值为非负整数或tf.Variable的列表的列表。 如果groups是一个非负整数,它表示用于显式分组的梯度allreduce操作的分组数量。 如果groups是tf.Variable的列表的列表,同一内部列表中的变量将被分配到同一组,而未出现在任何列表中的参数将自行形成一个组。 默认为None,表示没有显式分组。

  • process_set – 梯度将仅在与该进程集相关的Horovod进程间进行归约。默认为全局进程集。

  • scale_local_gradients – 是否对局部变量的梯度进行缩放。默认设置为True。

horovod.tensorflow.DistributedGradientTape(gradtape, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, gradient_predivide_factor=1.0, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True)[源代码]

一个包装另一个 tf.GradientTape 的磁带,使用 allreduce 在将梯度应用于模型权重之前合并梯度值。

Parameters
  • gradtape – 用于计算梯度和应用更新的GradientTape。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 在所有reduce操作期间使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • sparse_as_dense – 将所有稀疏梯度视为密集张量。如果原始稀疏梯度具有高密度,这有助于提高性能和内存利用率。默认为 false。

  • op – 在不同等级间合并梯度时使用的归约操作。

  • gradient_predivide_factor – 如果 op == Average,gradient_predivide_factor 会在求和前后分别进行平均分割。梯度在求和前会按 1.0 / gradient_predivide_factor 进行缩放,在求和后会按 gradient_predivide_factor / size 进行缩放。

  • num_groups – 用于显式分组的梯度allreduce操作分配组数。默认为无显式分组。

  • groups – 用于分组梯度allreduce操作的参数。可接受的值为非负整数或tf.Variable的列表的列表。 如果groups是一个非负整数,它表示用于显式分组的梯度allreduce操作分配到的组数。 如果groups是tf.Variable的列表的列表,同一内部列表中的变量将被分配到同一组,而未出现在任何列表中的参数将自行形成一个组。 默认为None,表示没有显式分组。

  • process_set – 梯度将仅在与该进程集相关的Horovod进程间进行归约。默认为全局进程集。

  • scale_local_gradients – 是否缩放局部变量的梯度。默认设置为True。

horovod.tensorflow.PartialDistributedGradientTape(gradtape, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, gradient_predivide_factor=1.0, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, local_layers=None, scale_local_gradients=True)[源代码]

一个包装另一个tf.GradientTape的磁带,使用allreduce操作在将梯度应用于模型权重之前合并梯度值,类似于DistributedGradientTape,但它会跳过对传入local_layers参数中的本地层梯度进行allreduce操作。

Parameters
  • gradtape – 用于计算梯度和应用更新的GradientTape。

  • local_layers – 一组类型为 tf.keras.layers.Layer 的本地层,它们的梯度无需跨节点同步,而是本地保留和应用。 如果未提供,PartialDistributedGradientTape 的功能与 DistributedGradientTape 相同。

其余参数与 DistributedGradientTape 类似。

class horovod.tensorflow.elastic.TensorFlowKerasState(model, optimizer=None, backend=None, **kwargs)[源代码]

TensorFlow Keras 模型和优化器的状态表示。

支持 TensorFlow 2 模型和优化器,以及 kerastf.keras

Parameters
  • model – TensorFlow Keras 模型。

  • optimizer – 可选的优化器,也可以编译进模型中。

  • backend – 对于 TensorFlow v1,Keras 用于获取会话的后端。

  • kwargs – 用于同步的附加属性,将作为对象的属性公开。

save()[源代码]

将状态保存到主机内存。

restore()[源代码]

恢复最后提交的状态,撤销任何未提交的修改。

sync()[源代码]

跨工作节点同步状态。

class horovod.tensorflow.elastic.TensorFlowState(variables=None, session=None, **kwargs)[源代码]

TensorFlow 变量列表的状态表示。

支持 TensorFlow v1 和 v2 两个版本。对于 TensorFlow v2,仅在启用即时执行时可用。

Parameters
  • variables – 要追踪的 tf.Variable 对象列表(默认:tf.global_variables())。

  • session – 对于 TensorFlow v1,用于实例化变量的会话(默认:ops.get_default_session())。

  • kwargs – 用于同步的附加属性,将作为对象的属性公开。

save()[源代码]

将状态保存到主机内存。

restore()[源代码]

恢复最后提交的状态,撤销任何未提交的修改。

sync()[源代码]

跨工作节点同步状态。

horovod.tensorflow.elastic.run(func)[源代码]

用于运行弹性训练过程的装饰器。

此装饰器的目的是允许被包装函数在多个工作者之间并行不间断执行,因为工作者会动态加入或离开系统。当新增一个工作者时,需要将其状态提升到与其他工作者相同的进度点,这是通过在执行func前同步状态对象来实现的。

当添加或移除一个工作节点时,其他工作节点将引发异常,使它们回到这样的同步点,然后再次执行func。这确保了当此类重置事件发生时,工作节点不会出现分歧。

需要注意的是,集合操作(例如广播、全局归约)不能作为包装函数的调用。否则,新工作进程在初始化期间可能会执行这些操作,而其他工作进程正尝试同步状态,从而导致死锁。

Parameters

func – 一个包装函数,接受任意数量的参数或关键字参数。第一个参数 必须是一个 horovod.common.elastic.State 对象,用于在多个 工作节点之间同步状态。

horovod.tensorflow.keras

horovod.tensorflow.keras.shutdown()

一个关闭 Horovod 的函数。

horovod.tensorflow.keras.is_initialized()

如果 Horovod 已初始化,则返回 True

horovod.tensorflow.keras.start_timeline(file_path, mark_cycles=False)

file_path 处创建时间线文件并开始记录。

Parameters
  • file_path – 时间线文件的字符串路径。

  • mark_cycles – 布尔值,指示是否应在时间线上标记循环(默认值:False)。

Raises a ValueError if Horovod is not initialized.

horovod.tensorflow.keras.stop_timeline()

停止活动时间线记录并关闭文件。

Raises a ValueError if Horovod is not initialized.

horovod.tensorflow.keras.size()

一个返回Horovod进程数量的函数。

Returns

一个整数标量,包含Horovod进程的数量。

horovod.tensorflow.keras.local_size()

一个返回当前进程所在节点内Horovod进程数量的函数。

Returns

一个整数标量,包含本地 Horovod 进程的数量。

horovod.tensorflow.keras.cross_size()

一个返回当前Horovod进程本地等级节点数量的函数。例如,如果作业中有2个节点:一个运行2个进程,另一个运行1个进程,那么每个节点上的第一个进程将具有交叉大小2,而第一个节点上的第二个进程将具有交叉大小1。

Returns

一个整数标量,包含跨 Horovod 进程的数量。

horovod.tensorflow.keras.rank()

一个返回调用进程的Horovod等级的函数。

Returns

一个整数标量,表示调用进程的Horovod等级。

horovod.tensorflow.keras.local_rank()

一个返回调用进程在其运行节点内的本地Horovod等级的函数。例如,如果一个节点上有七个进程在运行,它们的本地等级将是从零到六(包含两端)。

Returns

一个整数标量,表示调用进程的本地 Horovod 等级。

horovod.tensorflow.keras.cross_rank()

一个返回调用进程在作业中跨节点间的跨Horovod等级的函数。进程的跨等级对应其运行所在节点的等级。例如,如果作业中有7个节点,跨等级将是从零到六(包含)。

Returns

一个整数标量,表示调用进程的跨 Horovod 等级。

horovod.tensorflow.keras.mpi_threads_supported()

一个返回标志的函数,指示是否支持MPI多线程。

如果支持MPI多线程,用户可以混合使用Horovod与其他MPI库,例如mpi4py

Returns

一个布尔值,表示是否支持MPI多线程。

horovod.tensorflow.keras.mpi_enabled()

如果当前在运行时启用了MPI模式,则返回True。

如果启用了MPI,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了MPI。

horovod.tensorflow.keras.mpi_built()

如果 Horovod 编译时支持 MPI,则返回 True。

Returns

一个布尔值,表示是否编译了MPI支持。

horovod.tensorflow.keras.gloo_enabled()

如果运行时启用了Gloo模式,则返回True。

如果启用了Gloo,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了Gloo。

horovod.tensorflow.keras.gloo_built()

如果 Horovod 编译时支持 Gloo,则返回 True。

Returns

一个布尔值,表示是否编译了Gloo支持。

class horovod.tensorflow.keras.ProcessSet(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[源代码]

一组将共同运行集体操作的Horovod进程的表示

使用进程等级列表或MPI通信器初始化一个进程集。然后将此实例传递给hvd.init()或hvd.add_process_set()。如果已初始化有效的进程集,process_set_id将被设置为一个数值。

size()Optional[int][源代码]

返回进程集的大小,如果未初始化则返回 None。

rank()Optional[int][源代码]

返回相对于此进程集的排名,如果未初始化则返回None。

这很有用,例如,用于处理 hvd.allgather() 的结果。

请注意,即使使用进程集,Horovod操作(如hvd.broadcast())也不是通过这个相对等级参数化的,而是通过从hvd.rank()获取的全局等级参数化的。

included()Optional[bool][源代码]

返回当前进程是否属于此进程集,如果未初始化则返回None。

horovod.tensorflow.keras.add_process_set(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]])horovod.common.process_sets.ProcessSet[源代码]

在Horovod初始化后添加一个新的process_set并返回它。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。不能存在已包含相同等级的进程集。 返回的进程集将被完全初始化。

horovod.tensorflow.keras.remove_process_set(process_set: horovod.common.process_sets.ProcessSet)bool[源代码]

尝试移除进程集并返回此尝试是否成功。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。如果移除成功,我们将使进程集对象失效。

horovod.tensorflow.keras.nccl_built()

用于检查Horovod是否使用NCCL支持编译的函数。

Returns

一个整数值,表示是否编译了NCCL支持。 如果编译了NCCL支持,则返回NCCL_VERSION_CODE。否则, 返回0。

horovod.tensorflow.keras.ddl_built()

如果 Horovod 编译时支持 DDL,则返回 True。

Returns

一个布尔值,指示是否编译了DDL支持。

horovod.tensorflow.keras.ccl_built()

如果 Horovod 编译时支持 oneCCL,则返回 True。

Returns

一个布尔值,表示是否编译了oneCCL支持。

horovod.tensorflow.keras.cuda_built()

如果 Horovod 编译时支持 CUDA,则返回 True。

Returns

一个布尔值,指示是否编译了CUDA支持。

horovod.tensorflow.keras.rocm_built()

如果 Horovod 编译时支持 ROCm,则返回 True。

Returns

一个布尔值,表示是否编译了ROCm支持。

class horovod.tensorflow.keras.Compression[源代码]

可选梯度压缩算法,用于allreduce过程中。

none

将所有浮点梯度压缩至16位。

horovod.tensorflow.compression.NoneCompressor 的别名

fp16

horovod.tensorflow.compression.FP16Compressor 的别名

horovod.tensorflow.keras.DistributedOptimizer(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, backward_passes_per_step=1, average_aggregated_gradients=False, num_groups=0, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, scale_local_gradients=True)[源代码]

一个优化器,它包装了另一个keras.optimizers.Optimizer,在将梯度应用于模型权重之前使用allreduce来平均梯度值。

Parameters
  • optimizer – 用于计算梯度和应用更新的优化器。

  • name – 应用梯度时为创建的操作设置的可选名称前缀。默认为"Distributed"后跟所提供的优化器类型。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • sparse_as_dense – 将所有稀疏梯度视为密集张量。如果原始稀疏梯度具有高密度,这有助于提高性能和内存利用率。默认为 false。

  • gradient_predivide_factor – gradient_predivide_factor 在求和前后进行梯度预分割。 梯度在求和前按 1.0 / gradient_predivide_factor 进行缩放, 在求和后按 gradient_predivide_factor / size 进行缩放。

  • op – 在不同等级间合并梯度时使用的归约操作。默认为平均值。

  • backward_passes_per_step – 在调用hvd.allreduce之前要执行的反向传播次数。这允许在减少和应用更新之前,在多个小批量上累积更新。

  • average_aggregated_gradients – 是否对多个小批次累积的聚合梯度进行平均。 如果为真,则将梯度更新除以backward_passes_per_step。 仅适用于backward_passes_per_step > 1的情况。

  • num_groups – 为显式分组分配梯度全归约操作的分组数量。默认为无显式分组。

  • groups – 用于分组梯度allreduce操作的参数。可接受的值为非负整数或tf.Variable的列表的列表。 如果groups是一个非负整数,它是指定梯度allreduce操作到显式分组的组数。 如果groups是tf.Variable的列表的列表,相同内部列表中的变量将被分配到同一组,而未出现在任何列表中的参数将自行形成一个组。 默认为None,表示没有显式分组。

  • process_set – 梯度将仅在属于此进程集的Horovod进程间进行规约。默认为全局进程集。

  • scale_local_gradients – 是否对局部变量的梯度进行缩放。默认设置为True。

horovod.tensorflow.keras.broadcast_global_variables(root_rank)[源代码]

将根进程的所有全局变量广播到所有其他进程。

Parameters

root_rank – 用于将全局变量广播到所有其他进程的进程等级。

horovod.tensorflow.keras.allreduce(value, name=None, average=None, prescale_factor=1.0, postscale_factor=1.0, op=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[源代码]

对张量兼容值执行allreduce操作。

Parameters
  • value – 一个可兼容张量的待归约值。 所有进程的输入形状必须完全相同。

  • name – 此操作所创建常量的可选名称。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v0.21.0中移除。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • op – 用于跨不同等级合并张量的归约操作。如果未提供则默认为平均值。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

horovod.tensorflow.keras.allgather(value, name=None)[源代码]

对张量兼容值执行allgather操作。

连接操作在第一维度上进行,因此不同进程上的输入值必须具有相同的秩和形状,除了第一维度允许不同之外。

Parameters
  • value – 用于聚合的张量兼容值。

  • name – 此操作所创建常量的可选名称前缀。

horovod.tensorflow.keras.broadcast(value, root_rank, name=None)[源代码]

对张量兼容值执行广播操作。

Parameters
  • value – 一个可进行张量兼容的待归约值。 所有计算节点上的输入形状必须完全一致。

  • root_rank – 从此进程的等级开始,全局变量将被广播到所有其他进程。

  • name – 此操作所创建常量的可选名称。

horovod.tensorflow.keras.reducescatter(value, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>)[源代码]

对张量兼容值执行reduce-scatter操作。

Parameters
  • value – 一个张量兼容的值,用于进行归约和分散操作。 所有进程上的输入形状必须完全相同。

  • name – 此操作所创建常量的可选名称。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

horovod.tensorflow.keras.load_model(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, legacy_opts=False)[源代码]

加载一个使用 Horovod 分布式优化器保存的 Keras 模型。

分布式优化器将包装用于训练已保存模型的基础优化器,以便优化器状态(参数和权重)能够被拾取用于重新训练。

默认情况下,模块 keras.optimizers 中的所有优化器将被加载 并包装,无需指定任何 custom_optimizerscustom_objects

Parameters
  • filepath – 以下选项之一: - 字符串,保存模型的路径,或 - 用于加载模型的 h5py.File 对象

  • custom_optimizers – 加载过程中支持的可选优化器子类列表

  • custom_objects – 可选字典,将名称(字符串)映射到自定义类或函数,这些类或函数将在反序列化过程中被考虑。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • legacy_opts – 如果为True,模型将使用tf.keras.optimizers.legacy.*优化器

Returns

一个 Keras 模型实例。

Raises
  • ImportError – If h5py is not available.

  • ValueError – In case of an invalid savefile.

class horovod.tensorflow.keras.callbacks.BroadcastGlobalVariablesCallback(*args, **kw)[源代码]

Keras回调函数,在初始化期间会将所有全局变量从根等级广播到所有其他进程。

这是必要的,以确保在训练开始时使用随机权重或从检查点恢复时,所有工作节点都能进行一致的初始化。

__init__(root_rank, device='', local_variables=None)[源代码]

构建一个新的 BroadcastGlobalVariablesCallback,它将在初始化期间将所有全局变量从根等级广播到所有其他进程。

Parameters
  • root_rank – 将发送数据的等级,其他等级将接收数据。

  • device – 用于广播的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • local_variables – 一组不需要广播的变量集合。

class horovod.tensorflow.keras.callbacks.MetricAverageCallback(*args, **kw)[源代码]

Keras回调函数,用于在epoch结束时对所有进程的指标进行平均。与ReduceLROnPlateau、TensorBoard及其他基于指标的回调函数结合使用时非常有用。

注意:此回调必须在ReduceLROnPlateau、TensorBoard或其他基于指标的回调之前添加到回调列表中。

__init__(device='')[源代码]

构建一个新的MetricAverageCallback,它将在每个周期结束时对所有进程的指标进行平均。

Parameters

device – 用于allreduce操作的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

class horovod.tensorflow.keras.callbacks.LearningRateScheduleCallback(*args, **kw)[源代码]

LearningRateScheduleCallback 在 start_epochend_epoch 之间的周期内将学习率设置为 initial_lr * multipliermultiplier 可以是一个常数或一个函数 f(epoch) = lr’

如果 multiplier 是一个函数且 staircase=True,学习率调整将在每个周期的开始时发生,并且传递给 multiplier 函数的周期将是一个整数。

如果 multiplier 是一个函数且 staircase=False,学习率调整将在每个批次的开始时发生,传递给 multiplier 函数的轮次将是一个浮点数:epoch’ = epoch + batch / steps_per_epoch。此功能对于平滑的学习率调整调度器非常有用,例如 LearningRateWarmupCallback

initial_lr 是模型优化器在训练开始时的学习率。

__init__(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[源代码]

构建一个新的学习率调度回调。

Parameters
  • initial_lr – 训练开始时的初始学习率。

  • multiplier – 一个常数乘数或函数 f(epoch) = lr’

  • start_epoch – 该调整将应用到的首个周期。默认为0。

  • end_epoch – 该调整将停止应用的轮次(不包含结束值)。 默认为 None。

  • staircase – 是否在训练周期开始时调整学习率 (staircase=True) 或在每个批次开始时调整学习率 (staircase=False)。

  • momentum_correction – 对具有动量的优化器应用动量校正。 默认为 True。

  • steps_per_epoch – 对于Keras版本>=2.0.0,该回调函数将尝试自动检测每个epoch的批次数。如果您使用的是较旧版本的Keras,请提供此值。

class horovod.tensorflow.keras.callbacks.LearningRateWarmupCallback(*args, **kw)[源代码]

实现渐进式学习率预热:

lr = initial_lr / hvd.size() —> lr = initial_lr

initial_lr 是模型优化器在训练开始时的学习率。

该技术在论文《精确、大批次SGD:1小时内训练ImageNet》中有所描述。详情请见https://arxiv.org/pdf/1706.02677.pdf

数学回顾:

\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]
__init__(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[源代码]

构建一个新的学习率预热回调函数,该函数将逐步预热学习率。

Parameters
  • initial_lr – 训练开始时的初始学习率。

  • warmup_epochs – 预热阶段的周期数。默认为5。

  • momentum_correction – 对具有动量的优化器应用动量校正。 默认为 True。

  • steps_per_epoch – 该回调函数将尝试自动检测每个epoch的批次数,适用于Keras版本>=2.0.0。如果您使用的是较旧版本的Keras,请提供此值。

  • verbose – 详细模式,0 或 1。

class horovod.tensorflow.keras.elastic.KerasState(model, optimizer=None, **kwargs)[源代码]

tf.keras 模型和优化器的状态表示。

Parameters
  • model – Keras 模型。

  • optimizer – 可选的优化器,也可以编译进模型中。

  • kwargs – 用于同步的附加属性,将作为对象的属性公开。

class horovod.tensorflow.keras.elastic.CommitStateCallback(*args, **kw)[源代码]

Keras 回调函数,将在每批次结束时每 batches_per_commit 批次提交 state 对象。

class horovod.tensorflow.keras.elastic.UpdateBatchStateCallback(*args, **kw)[源代码]

Keras回调函数,将在每个批次结束时使用当前批次编号更新state.batch的值。在每个周期结束时,批次将重置为0。

如果设置了steps_per_epoch,那么这个回调函数还会确保在重置后的第一个周期中,已处理的批次数量会缩短步骤数。

class horovod.tensorflow.keras.elastic.UpdateEpochStateCallback(*args, **kw)[源代码]

Keras回调函数,将在每个周期结束时使用当前周期编号更新state.epoch的值。

horovod.tensorflow.keras.elastic.run(func)[源代码]

用于运行弹性训练过程的装饰器。

此装饰器的目的是允许被包装函数在多个工作者之间并行不间断执行,因为工作者会动态加入或离开系统。当新增一个工作者时,需要将其状态提升到与其他工作者相同的进度点,这是通过在执行func前同步状态对象来实现的。

当添加或移除一个工作节点时,其他工作节点将引发异常,使它们回到这样的同步点,然后再次执行func。这确保了当此类重置事件发生时,工作节点不会出现分歧。

需要注意的是,集合操作(例如广播、全局归约)不能作为包装函数的调用。否则,新工作进程在初始化期间可能会执行这些操作,而其他工作进程正尝试同步状态,从而导致死锁。

Parameters

func – 一个包装函数,接受任意数量的参数或关键字参数。第一个参数 必须是一个 horovod.common.elastic.State 对象,用于在多个 工作节点之间同步状态。

horovod.keras

horovod.keras.shutdown()

一个关闭 Horovod 的函数。

horovod.keras.size()

一个返回Horovod进程数量的函数。

Returns

一个整数标量,包含Horovod进程的数量。

horovod.keras.local_size()

一个返回当前进程所在节点内Horovod进程数量的函数。

Returns

一个整数标量,包含本地 Horovod 进程的数量。

horovod.keras.cross_size()

一个返回当前Horovod进程本地等级节点数量的函数。例如,如果作业中有2个节点:一个运行2个进程,另一个运行1个进程,那么每个节点上的第一个进程将具有交叉大小2,而第一个节点上的第二个进程将具有交叉大小1。

Returns

一个整数标量,包含跨 Horovod 进程的数量。

horovod.keras.rank()

一个返回调用进程的Horovod等级的函数。

Returns

一个整数标量,表示调用进程的Horovod等级。

horovod.keras.local_rank()

一个返回调用进程在其运行节点内的本地Horovod等级的函数。例如,如果一个节点上有七个进程在运行,它们的本地等级将是从零到六(包含两端)。

Returns

一个整数标量,表示调用进程的本地 Horovod 等级。

horovod.keras.cross_rank()

一个返回调用进程在作业中跨节点间的跨Horovod等级的函数。进程的跨等级对应其运行所在节点的等级。例如,如果作业中有7个节点,跨等级将是从零到六(包含)。

Returns

一个整数标量,表示调用进程的跨 Horovod 等级。

horovod.keras.is_initialized()

如果 Horovod 已初始化,则返回 True

horovod.keras.start_timeline(file_path, mark_cycles=False)

file_path 处创建时间线文件并开始记录。

Parameters
  • file_path – 时间线文件的字符串路径。

  • mark_cycles – 布尔值,指示是否应在时间线上标记循环(默认值:False)。

Raises a ValueError if Horovod is not initialized.

horovod.keras.stop_timeline()

停止活动时间线记录并关闭文件。

Raises a ValueError if Horovod is not initialized.

horovod.keras.mpi_threads_supported()

一个返回标志的函数,指示是否支持MPI多线程。

如果支持MPI多线程,用户可以混合使用Horovod与其他MPI库,例如mpi4py

Returns

一个布尔值,表示是否支持MPI多线程。

horovod.keras.mpi_enabled()

如果当前在运行时启用了MPI模式,则返回True。

如果启用了MPI,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了MPI。

horovod.keras.mpi_built()

如果 Horovod 编译时支持 MPI,则返回 True。

Returns

一个布尔值,表示是否编译了MPI支持。

horovod.keras.gloo_enabled()

如果运行时启用了Gloo模式,则返回True。

如果启用了Gloo,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了Gloo。

horovod.keras.gloo_built()

如果 Horovod 编译时支持 Gloo,则返回 True。

Returns

一个布尔值,表示是否编译了Gloo支持。

horovod.keras.nccl_built()

用于检查Horovod是否使用NCCL支持编译的函数。

Returns

一个整数值,表示是否编译了NCCL支持。 如果编译了NCCL支持,则返回NCCL_VERSION_CODE。否则, 返回0。

horovod.keras.ddl_built()

如果 Horovod 编译时支持 DDL,则返回 True。

Returns

一个布尔值,指示是否编译了DDL支持。

horovod.keras.ccl_built()

如果 Horovod 编译时支持 oneCCL,则返回 True。

Returns

一个布尔值,表示是否编译了oneCCL支持。

horovod.keras.cuda_built()

如果 Horovod 编译时支持 CUDA,则返回 True。

Returns

一个布尔值,指示是否编译了CUDA支持。

horovod.keras.rocm_built()

如果 Horovod 编译时支持 ROCm,则返回 True。

Returns

一个布尔值,表示是否编译了ROCm支持。

class horovod.keras.Compression[源代码]

可选梯度压缩算法,用于allreduce过程中。

none

将所有浮点梯度压缩至16位。

horovod.tensorflow.compression.NoneCompressor 的别名

fp16

horovod.tensorflow.compression.FP16Compressor 的别名

horovod.keras.DistributedOptimizer(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, num_groups=0, groups=None)[源代码]

一个优化器,它包装了另一个keras.optimizers.Optimizer,在将梯度应用于模型权重之前使用allreduce来平均梯度值。

Parameters
  • optimizer – 用于计算梯度和应用更新的优化器。

  • name – 应用梯度时为创建的操作设置的可选名称前缀。默认为"Distributed"后跟所提供的优化器类型。

  • device_dense – 用于密集张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • device_sparse – 用于稀疏张量的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • sparse_as_dense – 将所有稀疏梯度视为密集张量。如果原始稀疏梯度具有高密度,这有助于提高性能和内存利用率。默认为 false。

  • gradient_predivide_factor – gradient_predivide_factor 在求和前后进行梯度预分割。 梯度在求和前按 1.0 / gradient_predivide_factor 进行缩放, 在求和后按 gradient_predivide_factor / size 进行缩放。

  • op – 在不同等级间合并梯度时使用的归约操作。默认为平均值。

  • num_groups – 用于显式分组的梯度allreduce操作分配组数。默认为无显式分组。

  • groups – 用于分组梯度allreduce操作的参数。可接受的值为非负整数或tf.Variable的列表的列表。 如果groups是一个非负整数,它是指定梯度allreduce操作到显式分组的组数。 如果groups是tf.Variable的列表的列表,相同内部列表中的变量将被分配到同一组,而未出现在任何列表中的参数将自行形成一个组。 默认为None,表示没有显式分组。

horovod.keras.PartialDistributedOptimizer(optimizer, name=None, device_dense='', device_sparse='', compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, sparse_as_dense=False, gradient_predivide_factor=1.0, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>, backward_passes_per_step=1, average_aggregated_gradients=False, groups=None, process_set=<horovod.common.process_sets.ProcessSet object>, local_layers=None, scale_local_gradients=True)[源代码]

一个优化器,它包装了另一个keras.optimizers.Optimizer,在将梯度应用于模型权重之前使用allreduce来平均梯度值。

Parameters
  • optimizer – 用于计算梯度和应用更新的优化器。

  • process_set – 梯度将仅在属于此进程集的Horovod进程间进行规约。默认为全局进程集。

  • backward_passes_per_step – 在调用hvd.allreduce之前要执行的反向传播次数。这允许在减少和应用更新之前,在多个小批量上累积更新。

  • average_aggregated_gradients – 是否对多个小批次累积的聚合梯度进行平均。 如果为真,则将梯度更新除以backward_passes_per_step。 仅适用于backward_passes_per_step > 1的情况。

  • local_layers – 一组类型为 tf.keras.layers.Layer 的本地层,它们的梯度无需跨等级同步,而是本地保持和应用。 如果未提供,PartialDistributedOptimizer 的功能与 DistributedOptimizer 相同。

  • scale_local_gradients – 是否对局部变量的梯度进行缩放。默认设置为True。

  • 其余参数与分布式优化器类似。 (这些) –

horovod.keras.broadcast_global_variables(root_rank)[源代码]

将根进程的所有全局变量广播到所有其他进程。

Parameters

root_rank – 用于将全局变量广播到所有其他进程的进程等级。

horovod.keras.allreduce(value, name=None, average=True, prescale_factor=1.0, postscale_factor=1.0, op=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>)[源代码]

对张量兼容值执行allreduce操作。

Parameters
  • value – 一个可兼容张量的待归约值。 所有进程的输入形状必须完全相同。

  • name – 此操作所创建常量的可选名称。

  • average – 如果为True,则计算所有等级的平均值。 否则,计算所有等级的总和。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • op – 用于跨不同等级合并张量的归约操作。

  • compression – 在所有归约操作期间使用的梯度压缩算法。 默认为 Compression.none。

horovod.keras.allgather(value, name=None)[源代码]

对张量兼容值执行allgather操作。

连接操作在第一维度上进行,因此不同进程上的输入值必须具有相同的秩和形状,除了第一维度允许不同之外。

Parameters
  • value – 用于聚合的张量兼容值。

  • name – 此操作所创建常量的可选名称前缀。

horovod.keras.broadcast(value, root_rank, name=None)[源代码]

对张量兼容值执行广播操作。

Parameters
  • value – 一个可兼容张量的待归约值。 所有进程的输入形状必须完全相同。

  • root_rank – 从此进程的等级开始,全局变量将被广播到所有其他进程。

  • name – 此操作所创建常量的可选名称。

horovod.keras.reducescatter(value, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629550132336'>)[源代码]

对张量兼容值执行reduce-scatter操作。

Parameters
  • value – 一个张量兼容的值,用于归约和分散。 所有进程的输入形状必须完全相同。

  • name – 此操作所创建常量的可选名称。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

horovod.keras.load_model(filepath, custom_optimizers=None, custom_objects=None, compression=<class 'horovod.tensorflow.compression.NoneCompressor'>, legacy_opts=False)[源代码]

加载一个使用 Horovod 分布式优化器保存的 Keras 模型。

分布式优化器将包装用于训练已保存模型的基础优化器,以便优化器状态(参数和权重)能够被拾取用于重新训练。

默认情况下,模块 keras.optimizers 中的所有优化器将被加载 并包装,无需指定任何 custom_optimizerscustom_objects

Parameters
  • filepath – 以下选项之一: - 字符串,保存模型的路径,或 - 用于加载模型的 h5py.File 对象

  • custom_optimizers – 加载过程中支持的可选优化器子类列表

  • custom_objects – 可选字典映射名称(字符串)到自定义类或函数,这些将在反序列化过程中被考虑。

  • compression – 用于减少每个工作节点发送和接收数据量的压缩算法。默认为不使用压缩。

  • legacy_opts – 如果为True,模型将使用tf.keras.optimizers.legacy.*优化器

Returns

一个 Keras 模型实例。

Raises
  • ImportError – If h5py is not available.

  • ValueError – In case of an invalid savefile.

class horovod.keras.callbacks.BroadcastGlobalVariablesCallback(*args, **kw)[源代码]

Keras回调函数,在初始化期间会将所有全局变量从根等级广播到所有其他进程。

这是必要的,以确保在训练开始时使用随机权重或从检查点恢复时,所有工作节点都能进行一致的初始化。

__init__(root_rank, device='', local_variables=None)[源代码]

构建一个新的 BroadcastGlobalVariablesCallback,它将在初始化期间将所有全局变量从根等级广播到所有其他进程。

Parameters
  • root_rank – 将发送数据的等级,其他等级将接收数据。

  • device – 用于广播的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

  • local_variables – 一组不需要广播的变量集合。

class horovod.keras.callbacks.MetricAverageCallback(*args, **kw)[源代码]

Keras回调函数,用于在epoch结束时对所有进程的指标进行平均。与ReduceLROnPlateau、TensorBoard及其他基于指标的回调函数结合使用时非常有用。

注意:此回调必须在ReduceLROnPlateau、TensorBoard或其他基于指标的回调之前添加到回调列表中。

__init__(device='')[源代码]

构建一个新的MetricAverageCallback,它将在每个周期结束时对所有进程的指标进行平均。

Parameters

device – 用于allreduce操作的设备。如果Horovod是使用HOROVOD_GPU_OPERATIONS构建的,则默认使用GPU。

class horovod.keras.callbacks.LearningRateScheduleCallback(*args, **kw)[源代码]

LearningRateScheduleCallback 在 start_epochend_epoch 之间的周期内将学习率设置为 initial_lr * multipliermultiplier 可以是一个常数或一个函数 f(epoch) = lr’

如果 multiplier 是一个函数且 staircase=True,学习率调整将在每个周期的开始时发生,并且传递给 multiplier 函数的周期将是一个整数。

如果 multiplier 是一个函数且 staircase=False,学习率调整将在每个批次的开始时发生,传递给 multiplier 函数的轮次将是一个浮点数:epoch’ = epoch + batch / steps_per_epoch。此功能对于平滑的学习率调整调度器非常有用,例如 LearningRateWarmupCallback

initial_lr 是模型优化器在训练开始时的学习率。

__init__(initial_lr, multiplier, start_epoch=0, end_epoch=None, staircase=True, momentum_correction=True, steps_per_epoch=None)[源代码]

构建一个新的学习率调度回调。

Parameters
  • initial_lr – 训练开始时的初始学习率。

  • multiplier – 一个常数乘数或函数 f(epoch) = lr’

  • start_epoch – 该调整将应用到的第一个周期。默认为0。

  • end_epoch – 该调整将停止应用的轮次(不包含结束值)。 默认为 None。

  • staircase – 是否在训练周期开始时调整学习率 (staircase=True) 或在每个批次开始时调整学习率 (staircase=False)。

  • momentum_correction – 对具有动量的优化器应用动量校正。 默认为 True。

  • steps_per_epoch – 该回调函数将尝试在 Keras >= 2.0.0 版本中自动检测每个周期的批次数量。如果您使用的是较旧版本的 Keras,请提供此值。

class horovod.keras.callbacks.LearningRateWarmupCallback(*args, **kw)[源代码]

实现渐进式学习率预热:

lr = initial_lr / hvd.size() —> lr = initial_lr

initial_lr 是模型优化器在训练开始时的学习率。

该技术在论文《精确、大批次SGD:一小时训练ImageNet》中有所描述。详情请见https://arxiv.org/pdf/1706.02677.pdf

数学回顾:

\[ \begin{align}\begin{aligned}epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}\\lr'(epoch) &= \frac{lr}{size} * (\frac{size - 1}{warmup} * epoch + 1)\\lr'(epoch = 0) &= \frac{lr}{size}\\lr'(epoch = warmup) &= lr\end{aligned}\end{align} \]
__init__(initial_lr, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None, verbose=0)[源代码]

构建一个新的学习率预热回调函数,它将逐步预热学习率。

Parameters
  • initial_lr – 训练开始时的初始学习率。

  • warmup_epochs – 预热阶段的周期数。默认为5。

  • momentum_correction – 对具有动量的优化器应用动量校正。 默认为 True。

  • steps_per_epoch – 对于 Keras >= 2.0.0 版本,该回调函数将尝试自动检测每个周期的批次数量。如果您使用的是较旧版本的 Keras,请提供此值。

  • verbose – 详细模式,0 或 1。

class horovod.keras.elastic.KerasState(model, optimizer=None, **kwargs)[源代码]

keras 模型和优化器的状态表示。

Parameters
  • model – Keras 模型。

  • optimizer – 可选的优化器,也可以编译进模型中。

  • kwargs – 用于同步的附加属性,将作为对象的属性公开。

class horovod.keras.elastic.CommitStateCallback(*args, **kw)[源代码]

Keras回调函数,将在每批次结束时每batches_per_commit批次提交state对象。

class horovod.keras.elastic.UpdateBatchStateCallback(*args, **kw)[源代码]

Keras 回调函数,将在每个批次结束时更新 state.batch 的值为当前批次编号。每个周期结束时,批次编号将重置为 0。

如果设置了steps_per_epoch,那么此回调函数还将确保在重置后的第一个周期中,已处理的批次数量会缩短步骤数。

class horovod.keras.elastic.UpdateEpochStateCallback(*args, **kw)[源代码]

Keras回调函数,将在每个周期结束时使用当前周期编号更新state.epoch的值。

horovod.keras.elastic.run(func)[源代码]

用于运行弹性训练过程的装饰器。

此装饰器的目的是允许被包装函数在多个工作者之间并行不间断执行,因为工作者会动态加入或离开系统。当新增一个工作者时,需要将其状态提升到与其他工作者相同的进度点,这是通过在执行func前同步状态对象来实现的。

当添加或移除一个工作节点时,其他工作节点将引发异常,使它们回到这样的同步点,然后再次执行func。这确保了当此类重置事件发生时,工作节点不会出现分歧。

需要注意的是,集合操作(例如广播、全局归约)不能作为包装函数的调用。否则,新工作进程在初始化期间可能会执行这些操作,而其他工作进程正尝试同步状态,从而导致死锁。

Parameters

func – 一个包装函数,接受任意数量的参数或关键字参数。第一个参数 必须是一个 horovod.common.elastic.State 对象,用于在多个 工作节点之间同步状态。

horovod.torch

class horovod.torch.Compression[源代码]

可选梯度压缩算法,用于allreduce过程中。

none

将所有浮点梯度压缩至16位。

horovod.torch.compression.NoneCompressor 的别名

fp16

horovod.torch.compression.FP16Compressor 的别名

horovod.torch.allgather_object(obj, name=None)[源代码]

从所有其他进程中序列化并聚合一个对象。

Parameters
  • obj – 一个能够在不丢失任何上下文的情况下被序列化的对象。

  • name – 用于allgather操作时的可选名称,默认将使用类类型。

Returns

在所有层级上全部收集的对象列表。

horovod.torch.broadcast_object(obj, root_rank=0, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

将对象从根等级序列化并广播到所有其他进程。 典型用法是广播optimizer.state_dict(),例如:

state_dict = broadcast_object(optimizer.state_dict(), 0)
if hvd.rank() > 0:
    optimizer.load_state_dict(state_dict)
Parameters
  • obj – 一个能够在不丢失任何上下文的情况下被序列化的对象。

  • root_rank – 从此进程的排名,参数将被广播到所有其他进程。

  • name – 广播期间使用的可选名称,默认值为类类型。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

root_rank广播的对象。

horovod.torch.broadcast_optimizer_state(optimizer, root_rank, model=None)[源代码]

将优化器状态从根等级广播到所有其他进程。

Parameters
  • optimizer – 一个优化器。

  • root_rank – 从该进程的排名开始,优化器将被广播到所有其他进程。

  • model – (可选) 模型,用于识别稀疏参数。

horovod.torch.broadcast_parameters(params, root_rank)[源代码]

将参数从根等级广播到所有其他进程。 典型用法是广播 model.state_dict()model.named_parameters()model.parameters()

Parameters
  • params – 以下之一: - 要广播的参数列表 - 要广播的参数字典

  • root_rank – 从此进程的排名,参数将被广播到所有其他进程。

horovod.torch.allreduce(tensor, average=None, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量执行平均或求和操作的函数。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 需要归约的张量。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 归约操作的名称。

  • compression – 在所有reduce操作期间使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor具有相同形状和类型的张量,在所有进程中进行平均或求和。

horovod.torch.allreduce_async(tensor, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量执行异步平均或求和操作的函数。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensor – 需要归约的张量。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 归约操作的名称。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个可用于poll()synchronize()的allreduce操作句柄。

horovod.torch.allreduce_(tensor, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量执行原地平均或求和的函数。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensor – 需要归约的张量。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 归约操作的名称。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor形状和类型相同的张量,在所有进程中进行平均或求和。

horovod.torch.allreduce_async_(tensor, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量执行异步原地平均或求和的函数。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensor – 需要归约的张量。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 归约操作的名称。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个可用于poll()synchronize()的allreduce操作句柄。

horovod.torch.grouped_allreduce(tensors, average=None, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量列表执行平均或求和操作的函数。输入张量不会被修改。

归约操作通过基础名称进行键控。如果未提供基础名称,则使用自动递增生成的基础名称。归约操作在同一列表位置的张量之间执行。对于输入张量列表中共享位置的张量,所有Horovod进程上的张量类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

这充当了自动梯度函数的一个薄包装层。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensors – 需要归约的张量列表。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 用于组归约操作的基础名称。

  • compression – 在所有reduce操作期间使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个包含与tensors中相同形状和类型的张量列表,在所有进程中进行平均或求和。

horovod.torch.grouped_allreduce_async(tensors, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量列表执行异步平均或求和操作的函数。输入张量不会被修改。

归约操作通过基础名称进行键控。如果未提供基础名称,则使用自动递增生成的基础名称。归约操作在相同列表位置的张量之间执行。对于输入张量列表中共享位置的张量,所有Horovod进程上的张量类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensors – 需要归约的张量列表。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 用于组归约操作的基础名称。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个指向组内全归约操作(group allreduce operation)的句柄,可与 poll()synchronize() 配合使用。

horovod.torch.grouped_allreduce_(tensors, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量执行原地平均或求和操作的函数。

归约操作通过基础名称进行键控。如果未提供基础名称,则使用自动递增生成的基础名称。归约操作在相同列表位置的张量之间执行。对于输入张量列表中共享位置的张量,所有Horovod进程上的张量类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensors – 需要归约的张量列表。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 用于组归约操作的基础名称。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个包含与tensors中相同形状和类型的张量列表,在所有进程中进行平均或求和。

horovod.torch.grouped_allreduce_async_(tensors, average=None, name=None, op=None, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个在所有Horovod进程中对输入张量执行异步原地平均或求和的函数。

归约操作通过基础名称进行键控。如果未提供基础名称,则使用自动生成的递增基础名称。归约操作在同一列表位置的张量之间执行。对于输入张量列表中共享位置的张量,所有Horovod进程上的张量类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensors – 需要归约的张量列表。

  • average

    警告

    自版本0.19.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • name – 用于组归约操作的基础名称。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子。

  • postscale_factor – 全规约后张量的缩放乘数因子。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个指向组内全归约操作的句柄,可与poll()synchronize()配合使用。

horovod.torch.allgather(tensor, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,用于将输入张量与所有其他Horovod进程上的相同输入张量进行拼接。输入张量不会被修改。

拼接操作在第一个维度上进行,因此不同进程上的对应输入张量必须具有相同的秩和形状,除了第一个维度允许不同。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 一个需要全局收集的张量。

  • name – allgather操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor类型相同的张量,在所有进程的零维上进行拼接。其形状与输入形状相同,除了第一维度可能更大,且等于不同Horovod进程中所有张量的第一维度之和。

horovod.torch.allgather_async(tensor, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个异步地将输入张量与所有其他Horovod进程上的相同输入张量进行拼接的函数。输入张量不会被修改。

拼接操作在第一个维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一个维度允许不同之外。

Parameters
  • tensor – 一个需要全局收集的张量。

  • name – allgather操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个可用于poll()synchronize()的allgather操作句柄。

horovod.torch.grouped_allgather(tensors, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,用于将每个输入张量与所有其他Horovod进程上的对应输入张量连接起来,针对输入张量列表。 输入张量不会被修改。

拼接操作在第一维度上进行,因此不同进程上的对应输入张量必须具有相同的秩和形状,除了第一维度允许不同。

Parameters
  • tensors – 需要全局收集的张量列表。

  • name – 用于组内全收集操作的基础名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个包含与tensors中相同类型张量的列表。每个张量 在所有进程的零维上进行拼接。其形状与对应的输入形状相同, 除了第一维度可能更大,它是不同Horovod进程中对应张量 所有第一维度的总和。

horovod.torch.grouped_allgather_async(tensors, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个异步函数,用于将每个输入张量与所有其他Horovod进程上对应的输入张量连接起来,适用于输入张量列表。输入张量不会被修改。

拼接操作在第一个维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一个维度允许不同之外。

Parameters
  • tensors – 需要全局收集的张量列表。

  • name – 用于组内全收集操作的基础名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个指向组内全收集操作的句柄,可与poll()synchronize()配合使用。

horovod.torch.broadcast(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,将根节点上的输入张量广播到所有其他Horovod进程上的相同输入张量。输入张量不会被修改。

广播操作由名称进行标识。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,广播不会开始。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 要广播的张量。

  • root_rank – 广播值的来源排名。

  • name – 广播操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor形状和类型相同的张量,其值从根等级广播而来。

horovod.torch.broadcast_async(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,用于异步将根节点上的输入张量广播到所有其他Horovod进程中的相同输入张量。输入张量不会被修改。

广播操作通过名称进行标识。如果未提供名称,则使用自动生成的递增名称。对于给定名称,张量类型和形状在所有Horovod进程中必须相同。在所有进程准备好发送和接收张量之前,广播不会开始。

Parameters
  • tensor – 要广播的张量。

  • root_rank – 广播值的来源排名。

  • name – 广播操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个可用于 poll()synchronize() 的广播操作句柄。

horovod.torch.broadcast_(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,将根节点上的输入张量广播到所有其他Horovod进程中的相同输入张量。该操作是原地执行的。

广播操作通过名称进行标识。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,广播不会开始。

Parameters
  • tensor – 要广播的张量。

  • root_rank – 广播值的来源等级。

  • name – 广播操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor形状和类型相同的张量,其值从根等级广播而来。

horovod.torch.broadcast_async_(tensor, root_rank, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,用于异步将根节点上的输入张量广播到所有其他Horovod进程中的相同输入张量。该操作是原地执行的。

广播操作通过名称进行标识。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,广播不会开始。

Parameters
  • tensor – 要广播的张量。

  • root_rank – 广播值的来源排名。

  • name – 广播操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个可用于poll()synchronize()的广播操作句柄。

horovod.torch.alltoall(tensor, splits=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个将输入张量的切片分散到所有其他Horovod进程并返回从所有其他Horovod进程收集的切片张量的函数。输入张量不会被修改。

切片操作在第一维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一维度允许不同之外。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 一个用于通过alltoall进行分发的张量。

  • splits – 一个按秩排序的整数张量,描述将tensor中的多少个元素发送给每个工作进程。分割操作沿着tensor的第一个维度进行。如果未提供splits,则第一个维度将按Horovod进程数量进行均等分割。

  • name – alltoall操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

  1. 一个包含从所有工作者收集的张量数据的张量。

  2. 如果提供了splits:一个按等级排序的整数张量,描述输出张量中从每个工作者接收了多少个元素。

horovod.torch.alltoall_async(tensor, splits=None, name=None, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个将输入张量的切片分散到所有其他Horovod进程并返回从所有其他Horovod进程收集的切片张量的函数。输入张量不会被修改。

切片操作在第一维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一维度允许不同之外。

Parameters
  • tensor – 一个用于通过alltoall进行分发的张量。

  • splits – 一个按秩排序的整数张量,描述将tensor中的多少个元素发送给每个工作进程。分割操作沿着tensor的第一个维度进行。如果未提供splits,则第一个维度将按Horovod进程数量进行均等分割。

  • name – alltoall操作的名称。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个可用于poll()synchronize()的alltoall操作句柄。

horovod.torch.reducescatter(tensor, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629543345696'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

一个函数,用于在所有Horovod进程中对输入张量执行归约操作,然后将结果分散到所有Horovod进程中。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensor – 一个用于求平均/求和并分散的张量。

  • name – 归约操作的名称。

  • compression – 在reducescatter过程中使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前对张量进行缩放的乘法因子。

  • postscale_factor – 在reducescatter操作后对张量进行缩放的乘法因子。

Returns

一个在所有进程中与tensor具有相同秩和类型的张量。其形状与输入形状相同,除了第一维度,该维度将在不同的Horovod进程之间进行划分。

horovod.torch.reducescatter_async(tensor, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629543345696'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

一个在所有Horovod进程中对输入张量执行异步归约操作,然后将结果分散到所有Horovod进程的函数。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

不同进程上的输入张量必须具有相同的秩和形状。输出张量在所有进程上具有相同的秩,但第一个维度可能不同。

Parameters
  • tensor – 需要求平均值和总和的张量。

  • name – 归约操作的名称。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前对张量进行缩放的乘法因子。

  • postscale_factor – 在reducescatter操作后对张量进行缩放的乘法因子。

Returns

一个可与poll()synchronize()一起使用的reducescatter操作句柄。

horovod.torch.grouped_reducescatter(tensors, name=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629543345696'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

一个函数,用于在所有Horovod进程中对输入张量列表执行归约操作,然后将结果分散到所有Horovod进程中。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensors – 需要求平均和求和的张量列表。

  • name – 用于组归约操作的基础名称。

  • compression – 在reducescatter过程中使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前用于缩放张量的乘法因子。

  • postscale_factor – 在reducescatter操作后用于缩放张量的乘法因子。

Returns

一个包含与tensors中相同秩和类型的张量列表。对于每个张量,其形状与输入形状相同,除了第一个维度,该维度将在不同的Horovod进程之间进行划分。

horovod.torch.grouped_reducescatter_async(tensors, name=None, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629543345696'>, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

一个在所有Horovod进程间对输入张量列表执行异步归约操作,然后将结果分散到所有Horovod进程的函数。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于每个输入张量,在给定名称下,所有Horovod进程中的类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

列表张量中某处的输入张量在不同进程上必须具有相同的秩和形状。对应的输出张量在所有进程上具有相同的秩,但第一个维度可能不同。

Parameters
  • tensors – 需要求平均和求和的张量列表。

  • name – 用于组归约操作的基础名称。

  • op – 用于跨不同等级合并张量的归约操作。默认为平均值。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前用于缩放张量的乘法因子。

  • postscale_factor – 在reducescatter操作后用于缩放张量的乘法因子。

Returns

一个指向组内归约分散操作的句柄,可与poll()synchronize()配合使用。

horovod.torch.join(device=- 1)int[源代码]

一个表示该智能体已完成数据处理工作的函数。

所有未调用join()的等级继续处理allreduce操作。 此函数会阻塞Python线程,直到所有等级加入。

Parameters

device – 用于创建临时零张量的设备ID(默认为-1,CPU)

Returns

最后加入的排名ID。

horovod.torch.barrier(process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个作为指定进程组(默认为全局组)中排名的简单同步点的函数。到达此函数调用的排名将停滞,直到所有其他排名都已到达。

Parameters

process_set – 进程集对象,用于将此操作限制在Horovod进程的子集内。默认为全局进程集。

horovod.torch.poll(handle)[源代码]

轮询一个allreduce、allgather、alltoall、broadcast或reducescatter句柄,以确定底层异步操作是否已完成。当poll()返回True后,synchronize()将无需阻塞即可返回。

Parameters

handle – 由 allreduce、allgather、alltoall、broadcast 或 reducescatter 异步操作返回的句柄。

Returns

一个标志,指示操作是否已完成。

horovod.torch.synchronize(handle)[源代码]

同步异步allreduce、allgather、alltoall、broadcast或reducescatter操作,直到操作完成。返回操作结果。

Parameters

handle – 由 allreduce、allgather、alltoall、broadcast 或 reducescatter 异步操作返回的句柄。

Returns

操作的一个输出张量或多个输出张量的元组。

horovod.torch.is_initialized()

如果 Horovod 已初始化,则返回 True

horovod.torch.start_timeline(file_path, mark_cycles=False)

file_path 处创建时间线文件并开始记录。

Parameters
  • file_path – 时间线文件的字符串路径。

  • mark_cycles – 布尔值,指示是否应在时间线上标记循环(默认值:False)。

Raises a ValueError if Horovod is not initialized.

horovod.torch.stop_timeline()

停止活动时间线记录并关闭文件。

Raises a ValueError if Horovod is not initialized.

horovod.torch.size()

一个返回Horovod进程数量的函数。

Returns

一个整数标量,包含Horovod进程的数量。

horovod.torch.local_size()

一个返回当前进程所在节点内Horovod进程数量的函数。

Returns

一个整数标量,包含本地 Horovod 进程的数量。

horovod.torch.cross_size()

一个返回当前Horovod进程本地等级节点数量的函数。例如,如果作业中有2个节点:一个运行2个进程,另一个运行1个进程,那么每个节点上的第一个进程将具有交叉大小2,而第一个节点上的第二个进程将具有交叉大小1。

Returns

一个整数标量,包含跨 Horovod 进程的数量。

horovod.torch.rank()

一个返回调用进程的Horovod等级的函数。

Returns

一个整数标量,表示调用进程的Horovod等级。

horovod.torch.local_rank()

一个返回调用进程在其运行节点内的本地Horovod等级的函数。例如,如果一个节点上有七个进程在运行,它们的本地等级将是从零到六(包含两端)。

Returns

一个整数标量,表示调用进程的本地 Horovod 等级。

horovod.torch.cross_rank()

一个返回调用进程在作业中跨节点间的跨Horovod等级的函数。进程的跨等级对应其运行所在节点的等级。例如,如果作业中有7个节点,跨等级将是从零到六(包含)。

Returns

一个整数标量,表示调用进程的跨 Horovod 等级。

horovod.torch.mpi_threads_supported()

一个返回标志的函数,指示是否支持MPI多线程。

如果支持MPI多线程,用户可以混合使用Horovod与其他MPI库,例如mpi4py

Returns

一个布尔值,表示是否支持MPI多线程。

horovod.torch.mpi_enabled()

如果当前在运行时启用了MPI模式,则返回True。

如果启用了MPI,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了MPI。

horovod.torch.mpi_built()

如果 Horovod 编译时支持 MPI,则返回 True。

Returns

一个布尔值,表示是否编译了MPI支持。

horovod.torch.gloo_enabled()

如果运行时启用了Gloo模式,则返回True。

如果启用了Gloo,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了Gloo。

horovod.torch.gloo_built()

如果 Horovod 编译时支持 Gloo,则返回 True。

Returns

一个布尔值,表示是否编译了Gloo支持。

horovod.torch.nccl_built()

用于检查Horovod是否使用NCCL支持编译的函数。

Returns

一个整数值,表示是否编译了NCCL支持。 如果编译了NCCL支持,则返回NCCL_VERSION_CODE。否则, 返回0。

horovod.torch.ddl_built()

如果 Horovod 编译时支持 DDL,则返回 True。

Returns

一个布尔值,指示是否编译了DDL支持。

horovod.torch.ccl_built()

如果 Horovod 编译时支持 oneCCL,则返回 True。

Returns

一个布尔值,表示是否编译了oneCCL支持。

horovod.torch.cuda_built()

如果 Horovod 编译时支持 CUDA,则返回 True。

Returns

一个布尔值,指示是否编译了CUDA支持。

horovod.torch.rocm_built()

如果 Horovod 编译时支持 ROCm,则返回 True。

Returns

一个布尔值,表示是否编译了ROCm支持。

class horovod.torch.ProcessSet(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[源代码]

一组将共同运行集体操作的Horovod进程的表示

使用进程等级列表或MPI通信器初始化一个进程集。然后将此实例传递给hvd.init()或hvd.add_process_set()。如果已初始化有效的进程集,process_set_id将被设置为一个数值。

size()Optional[int][源代码]

返回进程集的大小,如果未初始化则返回 None。

rank()Optional[int][源代码]

返回相对于此进程集的排名,如果未初始化则返回None。

这很有用,例如,用于处理 hvd.allgather() 的结果。

请注意,即使使用进程集,Horovod操作(如hvd.broadcast())也不是通过这个相对等级参数化的,而是通过从hvd.rank()获取的全局等级参数化的。

included()Optional[bool][源代码]

返回当前进程是否属于此进程集,如果未初始化则返回None。

horovod.torch.add_process_set(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]])horovod.common.process_sets.ProcessSet[源代码]

在Horovod初始化后添加一个新的process_set并返回它。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。不能存在已包含相同等级的进程集。 返回的进程集将被完全初始化。

horovod.torch.remove_process_set(process_set: horovod.common.process_sets.ProcessSet)bool[源代码]

尝试移除进程集并返回此尝试是否成功。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。如果移除成功,我们将使进程集对象失效。

exception horovod.torch.HorovodInternalError[源代码]

当Horovod集合操作(例如,allreduce)失败时引发的内部错误。

这在弹性模式下被视为可恢复错误,并将导致重置事件。

horovod.torch.DistributedOptimizer(optimizer, named_parameters=None, compression=<class 'horovod.torch.compression.NoneCompressor'>, backward_passes_per_step=1, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629543345696'>, gradient_predivide_factor=1.0, num_groups=0, groups=None, sparse_as_dense=False, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个优化器,它包装了另一个 torch.optim.Optimizer,在将梯度应用于模型权重之前,使用 allreduce 操作来合并梯度值。

Allreduce操作在每个梯度通过loss.backward()计算后并行执行。step()方法确保所有allreduce操作在将梯度应用到模型之前完成。

DistributedOptimizer 暴露了 synchronize() 方法,该方法强制所有reduce操作在继续执行之前完成。它与梯度裁剪或在执行 step() 之前原地修改梯度的其他操作结合使用时非常有用。如果在代码中调用 synchronize(),请确保使用 optimizer.skip_synchronize()

梯度裁剪示例:

output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.synchronize()
torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
with optimizer.skip_synchronize():
    optimizer.step()
Parameters
  • optimizer – 用于计算梯度和应用更新的优化器。

  • named_parameters – 参数名称与值之间的映射。用于命名allreduce操作。通常只需使用 model.named_parameters()

  • compression – 在所有reduce操作期间使用的压缩算法,用于减少每个参数更新步骤中发送的数据量。默认为不使用压缩。

  • backward_passes_per_step – 在调用 step()/synchronize() 之前预期执行的反向传播次数。这允许在多个小批次上累积梯度,然后再进行规约和应用。

  • op – 在不同等级间合并梯度时使用的归约操作。

  • gradient_predivide_factor – 如果 op == Average,gradient_predivide_factor 会在求和前后分别进行平均分割。梯度在求和前会按 1.0 / gradient_predivide_factor 进行缩放,在求和后会按 gradient_predivide_factor / size 进行缩放。

  • num_groups – 用于显式分组的梯度allreduce操作分配组数。默认为无显式分组。

  • groups – 用于分组梯度allreduce操作的参数。可接受的值为非负整数或torch.Tensor的列表的列表。 如果groups是一个非负整数,它表示用于显式分组的梯度allreduce操作的分组数量。 如果groups是torch.Tensor的列表的列表,相同内部列表中的张量将被分配到同一组,而未出现在任何列表中的参数将自行形成一个组。 默认为None,表示没有显式分组。

  • sparse_as_dense

    如果设为True,将所有稀疏梯度转换为密集梯度并执行allreduce操作,然后

    在应用更新前转换回稀疏梯度。

    process_set: 梯度将仅在属于此进程集的Horovod进程间进行规约

    默认为全局进程集。

class horovod.torch.SyncBatchNorm(*args, **kw)[源代码]

应用同步版本的N维批量归一化。

在此版本中,前向传播过程中归一化参数会在工作节点间同步。 这在每个GPU只能容纳少量样本的情况下非常有用。

有关 BatchNorm 的更多详情,请参阅 https://pytorch.org/docs/stable/nn.html#batchnorm2d

Parameters
  • num_features – 通道数 C,来自形状 (N, C, …)

  • eps – 添加到分母中用于数值稳定性的值。默认值:1e-5

  • momentum – 用于计算running_mean和running_var的值。可设置为None以使用累积移动平均(即简单平均)。默认值:0.1

  • affine – 一个布尔值,当设置为 True 时,此模块具有可学习的仿射参数。默认值:True

  • track_running_stats – 一个布尔值,当设置为 True 时,该模块会跟踪运行中的均值和方差;当设置为 False 时,该模块不会跟踪这些统计信息,并且在训练和评估模式下始终使用批次统计信息。默认值:True

注意

在训练模式下,仅支持GPU输入张量。

class horovod.torch.elastic.TorchState(model=None, optimizer=None, **kwargs)[源代码]

PyTorch 训练过程的状态表示。

通过将它们作为关键字参数提供,支持多个模型和优化器。在初始化期间,TorchState将为每个关键字参数分配属性,并处理其状态同步。

Parameters
  • model – 可选的PyTorch模型。

  • optimizer – 可选的PyTorch优化器。

  • kwargs – 属性同步,将作为对象的属性公开。如果存在对应属性类型的处理器,将使用它来同步对象,否则将作为普通Python对象处理。

save()[源代码]

将状态保存到主机内存。

restore()[源代码]

恢复最后提交的状态,撤销任何未提交的修改。

sync()[源代码]

跨工作节点同步状态。

class horovod.torch.elastic.ElasticSampler(*args, **kw)[源代码]

采样器,在重置事件后跨智能体划分数据集并重新划分。

工作方式类似于DistributedSampler,但具备可选功能,可记录每批次处理过的数据集索引。当由TorchState对象跟踪时,采样器会自动在新工作节点集合中重新分配未处理的索引。

为了成功使用此对象,建议用户:

  1. 将此对象包含在TorchState中。

  2. 在处理一组样本后调用record_batch

  3. 在每个epoch结束时调用set_epoch以清除已处理的索引。

Parameters
  • dataset – 用于采样的数据集(假设大小恒定)。

  • shuffle – 如果为 True(默认值),则打乱索引顺序。

  • seed – 当shuffle=True时用于随机打乱采样器的随机种子。 该数值在所有进程中应当保持一致(默认值:0)。

set_epoch(epoch)[源代码]

设置此采样器的周期。

shuffle=True 时,这确保所有副本在每个周期使用不同的随机排序。

将在下一个周期清除并重置processed_indices。重要的是 这个操作必须在周期结束时(而非开始时)调用,以确保 部分完成的周期不会重新处理样本。

Parameters

epoch – 轮次数。

record_batch(batch_idx, batch_size)[源代码]

记录已处理的样本数量。

horovod.torch.elastic.run(func)[源代码]

用于运行弹性训练过程的装饰器。

此装饰器的目的是允许被包装函数在多个工作者之间并行不间断执行,因为工作者会动态加入或离开系统。当新增一个工作者时,需要将其状态提升到与其他工作者相同的进度点,这是通过在执行func前同步状态对象来实现的。

当添加或移除一个工作节点时,其他工作节点将引发异常,使它们回到这样的同步点,然后再次执行func。这确保了当此类重置事件发生时,工作节点不会出现分歧。

需要注意的是,集合操作(例如广播、全局归约)不能作为包装函数的调用。否则,新工作进程在初始化期间可能会执行这些操作,而其他工作进程正尝试同步状态,从而导致死锁。

Parameters

func – 一个包装函数,接受任意数量的参数或关键字参数。第一个参数 必须是一个 horovod.common.elastic.State 对象,用于在多个 工作节点之间同步状态。

horovod.mxnet

class horovod.mxnet.Compression[源代码]

可选梯度压缩算法,用于allreduce过程中。

none

将所有浮点梯度压缩至16位。

horovod.mxnet.compression.NoneCompressor 的别名

fp16

horovod.mxnet.compression.FP16Compressor 的别名

horovod.mxnet.allgather_object(obj, name=None)[源代码]

从所有其他进程中序列化并聚合一个对象。

Parameters
  • obj – 一个能够在不丢失任何上下文的情况下被序列化的对象。

  • name – 用于allgather操作时的可选名称,默认将使用类类型。

Returns

在所有层级上全部收集的对象列表。

horovod.mxnet.broadcast_object(obj, root_rank=0, name=None)[源代码]

将对象从根等级序列化并广播到所有其他进程。

Parameters
  • obj – 一个能够在不丢失任何上下文的情况下被序列化的对象。

  • root_rank – 从此进程的排名,参数将被广播到所有其他进程。

  • name – 广播期间使用的可选名称,默认值为类类型。

Returns

root_rank广播的对象。

horovod.mxnet.allgather(tensor, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,用于将输入张量与所有其他Horovod进程上的相同输入张量进行拼接。输入张量不会被修改。

拼接操作在第一维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一维度允许不同之外。

Parameters
  • tensor – 一个需要全局收集的张量。

  • name – allgather操作的名称。

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor类型相同的张量,在所有进程的零维上进行拼接。其形状与输入形状相同,除了第一维度可能更大,且等于不同Horovod进程中所有张量的第一维度之和。

horovod.mxnet.grouped_allgather(tensors, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,用于将每个输入张量与所有其他Horovod进程上的对应输入张量连接起来,针对输入张量列表。 输入张量不会被修改。

拼接操作在第一维度上进行,因此不同进程上的对应输入张量必须具有相同的秩和形状,除了第一维度允许不同。

Parameters
  • tensors – 需要全局收集的张量列表。

  • name – 用于组内全收集操作的基础名称。

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个包含与tensors中相同类型张量的列表。每个张量 在所有进程的零维上进行拼接。其形状与对应的输入形状相同, 除了第一维度可能更大,它是不同Horovod进程中对应张量 所有第一维度的总和。

horovod.mxnet.allreduce(tensor, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[源代码]

一个在所有Horovod进程中对输入张量执行平均或求和操作的函数。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,张量类型和形状在所有Horovod进程中必须保持一致。只有当所有进程都准备好发送和接收张量时,归约操作才会开始。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 一个用于求平均值或求和的张量。

  • average

    警告

    自版本0.24.0起已弃用。

    请改用 op。将在v1.0版本中移除。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • name – 归约操作的名称。

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子

  • postscale_factor – 全规约后张量的缩放乘数因子

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor具有相同形状和类型的张量,在所有进程中求平均或求和。

horovod.mxnet.allreduce_(tensor, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[源代码]

一个在所有Horovod进程中对输入张量执行原地平均或求和操作的函数。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。只有当所有进程都准备好发送和接收张量时,归约操作才会开始。

Parameters
  • tensor – 一个用于求平均值或求和的张量。

  • average

    警告

    自版本0.24.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • name – 归约操作的名称。

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • prescale_factor – 全归约操作前对张量进行缩放的乘法因子

  • postscale_factor – 全规约后缩放张量的乘法因子

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor具有相同形状和类型的张量,在所有进程中求平均或求和。

horovod.mxnet.grouped_allreduce(tensors, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[源代码]

一个在所有Horovod进程中对输入张量执行平均或求和操作的函数。输入张量不会被修改。

归约操作通过基础名称进行键控。如果未提供基础名称,则使用自动生成的递增基础名称。归约操作在同一列表位置的张量之间执行。对于输入张量列表中共享位置的张量,所有Horovod进程上的张量类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensors – 需要求平均或求和的张量列表。

  • average

    警告

    自版本0.24.0起已弃用。

    请改用op。将在v1.0版本中移除。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • name – 用于组归约操作的基础名称

  • priority – 此操作的优先级。较高优先级的操作很可能在其他操作之前执行。

  • prescale_factor – 全规约操作前对张量进行缩放的乘法因子

  • postscale_factor – 全规约后张量的缩放乘数因子

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个包含与tensors中相同形状和类型的张量列表,在所有进程中进行平均或求和。

horovod.mxnet.grouped_allreduce_(tensors, average=None, name=None, priority=0, prescale_factor=1.0, postscale_factor=1.0, process_set=<horovod.common.process_sets.ProcessSet object>, op=None)[源代码]

一个在所有Horovod进程中对输入张量执行原地平均或求和操作的函数。

归约操作通过基础名称进行键控。如果未提供基础名称,则使用自动生成的递增基础名称。归约操作在同一列表位置的张量之间执行。对于输入张量列表中共享位置的张量,所有Horovod进程上的张量类型和形状必须相同。在所有进程准备好发送和接收张量之前,归约操作不会开始。

Parameters
  • tensors – 需要求平均或求和的张量列表。

  • average

    警告

    自版本0.24.0起已弃用。

    请改用 op。将在v1.0版本中移除。

  • op – 用于跨不同等级合并张量的归约操作。 支持的op值包括Sum、Average、Min、Max和Product。如果未提供值,则默认为Average。

  • name – 用于组归约操作的基础名称

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • prescale_factor – 全归约操作前对张量进行缩放的乘法因子

  • postscale_factor – 全规约后张量的缩放乘数因子

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个包含与tensors中相同形状和类型的张量列表,在所有进程中进行平均或求和。

horovod.mxnet.alltoall(tensor, splits=None, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个将输入张量的切片分散到所有其他Horovod进程并返回从所有其他Horovod进程收集的切片张量的函数。输入张量不会被修改。

切片操作在第一维度上进行,因此不同进程上的输入张量必须具有相同的秩和形状,除了第一维度允许不同之外。

Parameters
  • tensor – 一个用于通过alltoall进行分发的张量。

  • splits – 一个按秩排序的整数张量,描述将tensor中的多少个元素发送给每个工作进程。分割操作沿着tensor的第一个维度进行。如果未提供splits,则第一个维度将按Horovod进程数量进行均等分割。

  • name – alltoall操作的名称。

  • priority – 此操作的优先级。较高优先级的操作很可能在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

  1. 一个包含从所有工作者收集的张量数据的张量。

  2. 如果提供了splits:一个按等级排序的整数张量,描述输出张量中从每个工作者接收了多少个元素。

horovod.mxnet.broadcast(tensor, root_rank, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,将根节点上的输入张量广播到所有其他Horovod进程中的相同输入张量。输入张量不会被修改。

广播操作由名称标识。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,广播不会开始。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 要广播的张量。

  • root_rank – 广播值的来源等级。

  • name – 广播操作的名称。

  • priority – 此操作的优先级。较高优先级的操作很可能在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor形状和类型相同的张量,其值从根等级广播而来。

horovod.mxnet.broadcast_(tensor, root_rank, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>)[源代码]

一个函数,将根节点上的输入张量广播到所有其他Horovod进程中相同的输入张量。该操作是原地执行的。

广播操作由名称进行标识。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,广播不会开始。

Parameters
  • tensor – 要广播的张量。

  • root_rank – 广播值的来源等级。

  • name – 广播操作的名称。

  • priority – 此操作的优先级。较高优先级的操作很可能在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

Returns

一个与tensor形状和类型相同的张量,其值从根等级广播而来。

horovod.mxnet.reducescatter(tensor, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629538882800'>, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

一个函数,用于在所有Horovod进程间对输入张量执行异步平均或求和操作,然后将结果分散到所有Horovod进程中。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,张量类型和形状在所有Horovod进程中必须保持一致。只有当所有进程都准备好发送和接收张量时,归约操作才会开始。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensor – 一个用于求平均/求和并分散的张量。

  • op – 用于跨不同等级合并张量的归约操作。可以是平均值(默认)或求和。

  • name – 归约操作的名称。

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前对张量进行缩放的乘法因子。

  • postscale_factor – 在reducescatter操作后对张量进行缩放的乘法因子。

Returns

一个在所有进程中与tensor具有相同秩和类型的张量。 其形状与输入形状相同,除了第一维度, 该维度将在不同的Horovod进程之间划分。

horovod.mxnet.grouped_reducescatter(tensors, op=<MagicMock name='mock().horovod_reduce_op_average()' id='137629538882800'>, name=None, priority=0, process_set=<horovod.common.process_sets.ProcessSet object>, prescale_factor=1.0, postscale_factor=1.0)[源代码]

一个函数,用于在所有Horovod进程中对输入张量列表执行归约操作,然后将结果分散到所有Horovod进程中。输入张量不会被修改。

归约操作通过名称进行索引。如果未提供名称,则使用自动生成的递增名称。对于给定名称,所有Horovod进程中的张量类型和形状必须相同。在所有进程都准备好发送和接收张量之前,归约操作不会开始。

这是一个对自动梯度函数的轻量级封装。如果您的输入张量需要梯度,那么调用此函数将允许计算梯度并进行反向传播。

Parameters
  • tensors – 需要求平均和求和的张量列表。

  • op – 用于跨不同等级合并张量的归约操作。 可以是平均值(默认)或求和。

  • name – 用于组归约操作的基础名称。

  • priority – 此操作的优先级。较高优先级的操作可能会在其他操作之前执行。

  • process_set – 用于将此操作限制在Horovod进程子集的进程集对象。默认为全局进程集。

  • prescale_factor – 在reduce-scatter操作前用于缩放张量的乘法因子。

  • postscale_factor – 在reducescatter操作后用于缩放张量的乘法因子。

Returns

一个包含与tensors中相同秩和类型的张量列表。对于每个张量,其形状与输入形状相同,除了第一个维度,该维度将在不同的Horovod进程之间进行划分。

horovod.mxnet.shutdown()

一个关闭 Horovod 的函数。

horovod.mxnet.is_initialized()

如果 Horovod 已初始化,则返回 True

horovod.mxnet.start_timeline(file_path, mark_cycles=False)

file_path 处创建时间线文件并开始记录。

Parameters
  • file_path – 时间线文件的字符串路径。

  • mark_cycles – 布尔值,指示是否应在时间线上标记循环(默认值:False)。

Raises a ValueError if Horovod is not initialized.

horovod.mxnet.stop_timeline()

停止活动时间线记录并关闭文件。

Raises a ValueError if Horovod is not initialized.

horovod.mxnet.size()

一个返回Horovod进程数量的函数。

Returns

一个整数标量,包含Horovod进程的数量。

horovod.mxnet.local_size()

一个返回当前进程所在节点内Horovod进程数量的函数。

Returns

一个整数标量,包含本地 Horovod 进程的数量。

horovod.mxnet.cross_size()

一个返回当前Horovod进程本地等级节点数量的函数。例如,如果作业中有2个节点:一个运行2个进程,另一个运行1个进程,那么每个节点上的第一个进程将具有交叉大小2,而第一个节点上的第二个进程将具有交叉大小1。

Returns

一个整数标量,包含跨 Horovod 进程的数量。

horovod.mxnet.rank()

一个返回调用进程的Horovod等级的函数。

Returns

一个整数标量,表示调用进程的Horovod等级。

horovod.mxnet.local_rank()

一个返回调用进程在其运行节点内的本地Horovod等级的函数。例如,如果一个节点上有七个进程在运行,它们的本地等级将是从零到六(包含两端)。

Returns

一个整数标量,表示调用进程的本地 Horovod 等级。

horovod.mxnet.cross_rank()

一个返回调用进程在作业中跨节点间的跨Horovod等级的函数。进程的跨等级对应其运行所在节点的等级。例如,如果作业中有7个节点,跨等级将是从零到六(包含)。

Returns

一个整数标量,表示调用进程的跨 Horovod 等级。

horovod.mxnet.mpi_threads_supported()

一个返回标志的函数,指示是否支持MPI多线程。

如果支持MPI多线程,用户可以混合使用Horovod与其他MPI库,例如mpi4py

Returns

一个布尔值,表示是否支持MPI多线程。

horovod.mxnet.mpi_enabled()

如果当前在运行时启用了MPI模式,则返回True。

如果启用了MPI,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了MPI。

horovod.mxnet.mpi_built()

如果 Horovod 编译时支持 MPI,则返回 True。

Returns

一个布尔值,表示是否编译了MPI支持。

horovod.mxnet.gloo_enabled()

如果运行时启用了Gloo模式,则返回True。

如果启用了Gloo,用户可以使用它进行控制器或数据传输操作。

Returns

一个布尔值,表示是否启用了Gloo。

horovod.mxnet.gloo_built()

如果 Horovod 编译时支持 Gloo,则返回 True。

Returns

一个布尔值,表示是否编译了Gloo支持。

horovod.mxnet.nccl_built()

用于检查Horovod是否使用NCCL支持编译的函数。

Returns

一个整数值,表示是否编译了NCCL支持。 如果编译了NCCL支持,则返回NCCL_VERSION_CODE。否则, 返回0。

horovod.mxnet.ddl_built()

如果 Horovod 编译时支持 DDL,则返回 True。

Returns

一个布尔值,指示是否编译了DDL支持。

horovod.mxnet.ccl_built()

如果 Horovod 编译时支持 oneCCL,则返回 True。

Returns

一个布尔值,表示是否编译了oneCCL支持。

horovod.mxnet.cuda_built()

如果 Horovod 编译时支持 CUDA,则返回 True。

Returns

一个布尔值,指示是否编译了CUDA支持。

horovod.mxnet.rocm_built()

如果 Horovod 编译时支持 ROCm,则返回 True。

Returns

一个布尔值,表示是否编译了ROCm支持。

class horovod.mxnet.ProcessSet(ranks_or_comm: Union[Sequence[int], horovod.common.process_sets.MPI.Comm])[源代码]

一组将共同运行集体操作的Horovod进程的表示

使用进程等级列表或MPI通信器初始化一个进程集。然后将此实例传递给hvd.init()或hvd.add_process_set()。如果已初始化有效的进程集,process_set_id将被设置为一个数值。

size()Optional[int][源代码]

返回进程集的大小,如果未初始化则返回 None。

rank()Optional[int][源代码]

返回相对于此进程集的排名,如果未初始化则返回None。

这很有用,例如,用于处理 hvd.allgather() 的结果。

请注意,即使使用进程集,Horovod操作(如hvd.broadcast())也不是通过这个相对等级参数化的,而是通过从hvd.rank()获取的全局等级参数化的。

included()Optional[bool][源代码]

返回当前进程是否属于此进程集,如果未初始化则返回None。

horovod.mxnet.add_process_set(process_set: Union[horovod.common.process_sets.ProcessSet, Sequence[int]])horovod.common.process_sets.ProcessSet[源代码]

在Horovod初始化后添加一个新的process_set并返回它。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。不能存在已包含相同等级的进程集。 返回的进程集将被完全初始化。

horovod.mxnet.remove_process_set(process_set: horovod.common.process_sets.ProcessSet)bool[源代码]

尝试移除进程集并返回此尝试是否成功。

需要以HOROVOD_DYNAMIC_PROCESS_SETS=1运行。如果移除成功,我们将使进程集对象失效。

class horovod.mxnet.OrderedDict[源代码]

记住插入顺序的字典

clear()None.  Remove all items from od.
popitem(last=True)

从字典中移除并返回一个(键,值)对。

如果last为true,则按LIFO顺序返回对;如果为false,则按FIFO顺序返回。

move_to_end(key, last=True)

将现有元素移至末尾(如果 last 为 false,则移至开头)。

Raise KeyError if the element does not exist.

update([E, ]**F)None.  Update D from dict/iterable E and F.

如果 E 存在且具有 .keys() 方法,则执行:for k in E: D[k] = E[k] 如果 E 存在但缺少 .keys() 方法,则执行:for k, v in E: D[k] = v 无论哪种情况,之后都会执行:for k in F: D[k] = F[k]

keys()a set-like object providing a view on D’s keys
items()a set-like object providing a view on D’s items
values()an object providing a view on D’s values
pop(k[, d])v, remove specified key and return the corresponding

value. If key is not found, d is returned if given, otherwise KeyError is raised.

setdefault(key, default=None)

如果键不在字典中,则插入键并赋予默认值。

如果键在字典中,则返回键对应的值,否则返回默认值。

copy()a shallow copy of od
fromkeys(value=None)

创建一个新的有序字典,其键来自可迭代对象,值设置为指定值。

class horovod.mxnet.defaultdict

defaultdict(default_factory[, …]) –> 带默认工厂的字典

当键不存在时,默认工厂被无参数调用以生成新值,仅在 __getitem__ 中生效。 defaultdict 与包含相同项的 dict 相等。 所有剩余参数的处理方式与传递给 dict 构造器时相同,包括关键字参数。

copy()a shallow copy of D.
default_factory

由 __missing__() 调用的默认值工厂。

horovod.mxnet.broadcast_parameters(params, root_rank=0, prefix=None)[源代码]

将参数从根等级广播到所有其他进程。 典型用法是广播Module.get_params()Block.collect_params()

Parameters
  • params – 以下之一: - 要广播的参数字典 - 要广播的ParameterDict

  • root_rank – 从此进程的排名,参数将被广播到所有其他进程。

  • prefix – 需要广播的参数前缀。 如果在同一程序中多次调用broadcast_parameters, 必须使用不同的前缀以避免张量名称冲突。

horovod.spark

horovod.spark.run(fn, args=(), kwargs={}, num_proc=None, start_timeout=None, use_mpi=None, use_gloo=None, extra_mpi_args=None, env=None, stdout=None, stderr=None, verbose=1, nics=None, prefix_output_with_timestamp=False, executable=None)[源代码]

在Spark上运行Horovod。使用相同数量的Spark任务运行num_proc个进程执行fn

Parameters
  • fn – 要运行的函数。

  • args – 传递给 fn 的参数。

  • kwargs – 传递给 fn 的关键字参数。

  • num_proc – Horovod进程数量。默认为spark.default.parallelism

  • start_timeout – Spark任务生成、注册并开始运行代码的超时时间,单位为秒。 如果未设置,则回退到HOROVOD_SPARK_START_TIMEOUT环境变量值。 如果该环境变量也未设置,则默认为600秒。

  • extra_mpi_args – mpi_run的额外参数。默认为无额外参数。

  • env – 在Horovod运行中使用的环境字典。

  • stdout – Horovod 标准输出被重定向到此流。与 MPI 一起使用时,默认为 sys.stdout。

  • stderr – Horovod 标准错误输出被重定向到此流。与 MPI 一起使用时,默认为 sys.stderr。

  • verbose – 调试输出详细程度(0-2)。默认为1。

  • nics – 用于TCP网络通信的NIC列表。

  • prefix_output_with_timestamp – 在驱动程序的stdout/stderr转发中显示时间戳

  • executable – 启动工作进程时运行的可选可执行文件。默认为 sys.executable

Returns

在每个等级上运行fn返回的结果列表。

horovod.spark.run_elastic(fn, args=(), kwargs={}, num_proc=None, min_num_proc=None, max_num_proc=None, start_timeout=None, elastic_timeout=None, reset_limit=None, env=None, stdout=None, stderr=None, verbose=1, nics=None, prefix_output_with_timestamp=False, min_np=None, max_np=None)[源代码]

在Spark上运行弹性Horovod。使用相同数量的Spark任务运行num_proc个进程执行fn

Parameters
  • fn – 要运行的函数。

  • args – 传递给 fn 的参数。

  • kwargs – 传递给 fn 的关键字参数。

  • num_proc – Horovod进程数量。 默认为 spark.default.parallelism

  • min_num_proc – 维持训练所需的最小进程数。 若可用进程数低于此阈值, 训练将等待更多实例变为可用。

  • max_num_proc – 训练进程的最大数量,超过此数量将不会创建额外的进程。如果未指定,则数量无上限。

  • start_timeout – Spark任务生成、注册并开始运行代码的超时时间,单位为秒。 如果未设置,则回退到HOROVOD_SPARK_START_TIMEOUT环境变量值。 如果该环境变量也未设置,则默认为600秒。

  • elastic_timeout – 集群重新扩缩容后弹性初始化的超时时间。 如果未设置,则回退到 HOROVOD_ELASTIC_TIMEOUT 环境变量值。 如果该环境变量也未设置,则默认为 600 秒。

  • reset_limit – 最大重置次数,超过该次数后作业将被终止。

  • env – 在Horovod运行中使用的环境字典。默认为 os.environ

  • stdout – Horovod 标准输出被重定向到此流。

  • stderr – Horovod 标准错误输出被重定向到此流。

  • verbose – 调试输出详细程度(0-2)。默认为1。

  • nics – 用于TCP网络通信的NIC列表。

  • prefix_output_with_timestamp – 在驱动程序的stdout/stderr转发中显示时间戳

Returns

在每个等级上运行fn返回的结果列表。

horovod.spark.keras

class horovod.spark.keras.KerasEstimator(*args, **kwargs)[源代码]

基类: horovod.spark.common.estimator.HorovodEstimator, horovod.spark.keras.estimator.KerasEstimatorParamsReadable, horovod.spark.keras.estimator.KerasEstimatorParamsWritable

用于将Keras模型拟合到DataFrame的Spark Estimator。

支持独立的 kerastf.keras,以及 TensorFlow 1.X 和 2.X。

Parameters
  • num_proc – Horovod进程数量。 默认为 spark.default.parallelism

  • data_module – (可选) 用于训练和验证的DataModule类,如果未设置,默认为PetastormDataModule。

  • model – 要训练的Keras模型。

  • backend – 用于运行分布式训练函数的可选后端对象。默认为带有 num_proc 工作进程的 SparkBackend。如果同时提供了 num_proc,则无法指定此参数。

  • store – 存储对象,用于抽象化中间数据和运行结果的读写操作。

  • custom_objects – 可选的字典映射,将名称(字符串)映射到自定义类或函数,这些类或函数在序列化/反序列化过程中会被考虑。

  • optimizer – 将被转换为 hvd.DistributedOptimizer 用于训练的 Keras 优化器。

  • loss – Keras 损失函数或损失函数列表。

  • loss_weights – 可选的浮点权重值列表,用于为每个损失分配权重。

  • sample_weight_col – 可选列,用于指示每个样本的权重。

  • gradient_compression – 由hvd.DistributedOptimizer使用的梯度压缩方法。

  • metrics – 可选的记录指标。

  • feature_cols – 用作模型特征输入的列名。必须是一个列表,其中每个特征对应模型forward()函数中的一个顺序参数。

  • label_cols – 用作标签的列名。必须是一个列表,其中每个标签对应模型的一个输出。

  • validation – 可选的验证列名称(字符串),该列中的每一行值为1/True或0/False, 或验证分割比例(浮点数),表示随机选择用于验证的数据百分比。

  • callbacks – Keras 回调函数。

  • batch_size – 每个批次从DataFrame中读取的行数。

  • val_batch_size – 每个验证批次从DataFrame中读取的行数,如果未设置,将使用batch_size。

  • epochs – 训练的轮数。

  • verbose – 详细程度级别 [0, 2] (默认: 1)。

  • random_seed – 可选随机种子,用于Tensorflow。默认值:None。

  • shuffle_buffer_size – (已弃用) 可选的内存中洗牌缓冲区大小(按训练数据行数计)。 分配更大的缓冲区大小会增加洗牌的随机性,但代价是消耗更多主机内存。默认值基于每台主机4GB内存的假设进行估算。设置 shuffle_buffer_size=0 将关闭洗牌功能。

  • shuffle – (可选) 是否打乱训练样本。默认为 True。

  • partitions_per_process – 从num_proc分配给每个工作进程的Parquet分区数量(默认值:10)。

  • run_id – 用于在存储库中组织此运行的可选唯一ID。如果未提供,将自动分配。

  • train_steps_per_epoch – 每个训练周期要训练的步数。对于测试模型是否成功训练很有用。 默认为每个周期训练整个数据集。

  • validation_steps_per_epoch – 每个训练周期执行的验证步骤数量。

  • transformation_fn – 可选函数,接收一行数据作为参数,并返回经过修改的行数据,随后传入训练或验证步骤。此转换在批处理之后应用。更多详情请参阅 Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py)。请注意,此函数会构造另一个应执行转换的函数。

  • train_reader_num_workers – 此参数指定从数据存储中读取训练数据并对其应用数据转换的并行进程数量。增加此数值通常会提高读取速率,但也会增加内存占用。如果到数据存储的带宽不够高,或者用户需要在原始数据上应用解压缩或数据增强等转换,则更多进程特别有用。

  • val_reader_num_workers – 类似于 train_reader_num_workers。

  • reader_pool_type – 用于并行读取数据集数据的Petastorm工作池类型。 应为['thread', 'process', 'dummy']其中之一。默认为'thread'。

  • inmemory_cache_all – 布尔值。在内存中缓存训练和验证数据。默认值:False。

  • backend_env – 要添加到后端环境中的字典。默认设置为通过petastorm为libhdfs设置Java堆大小为最小和最大2G

  • use_gpu – 是否使用GPU进行训练。默认为True。

  • mp_start_method – 用于启动多进程的方法。默认为 None。

  • tensorflow_dataset_prefetch_buffer_size – TensorFlow 数据集的预取缓冲区大小。默认为 0。

class horovod.spark.keras.KerasModel(*args, **kwargs)[源代码]

基类: horovod.spark.common.estimator.HorovodModel, horovod.spark.keras.estimator.KerasEstimatorParamsReadable, horovod.spark.keras.estimator.KerasEstimatorParamsWritable

Spark Transformer包装一个Keras模型,用于在DataFrame上进行预测。

通过调用keras_model.getModel()获取底层的Keras模型。

Parameters
  • history – 指标列表,训练期间每个周期一个条目。

  • model – 训练好的 Keras 模型。

  • feature_columns – 特征列名称列表。

  • label_columns – 标签列名称的列表。

  • custom_objects – Keras 自定义对象。

  • run_id – 用于训练模型的运行ID。

horovod.spark.torch

class horovod.spark.torch.TorchEstimator(*args, **kwargs)[源代码]

基类: horovod.spark.common.estimator.HorovodEstimator, horovod.spark.torch.estimator.TorchEstimatorParamsWritable, horovod.spark.torch.estimator.TorchEstimatorParamsReadable

用于将PyTorch模型拟合到DataFrame的Spark Estimator。

Parameters
  • num_proc – Horovod进程数量。 默认为 spark.default.parallelism

  • data_module – (可选) 用于训练和验证的DataModule类,如果未设置,默认为PetastormDataModule。

  • model – 用于训练的PyTorch模型。

  • backend – 用于运行分布式训练函数的可选后端对象。默认为带有 num_proc 工作进程的 SparkBackend。如果同时提供了 num_proc,则无法指定此参数。

  • store – 存储对象,用于抽象化中间数据和运行结果的读写操作。

  • optimizer – 需要转换为 hvd.DistributedOptimizer 进行训练的 PyTorch 优化器。

  • loss – PyTorch 损失函数或损失函数列表。

  • loss_constructors – 可选函数,用于生成损失值。

  • metrics – 可选的记录指标。

  • loss_weights – 可选的浮点权重值列表,用于为每个损失分配权重。

  • sample_weight_col – 可选列,用于指示每个样本的权重。

  • gradient_compression – 由hvd.DistributedOptimizer使用的梯度压缩方法。

  • feature_cols – 用作模型特征输入的列名。必须是一个列表,其中每个特征对应模型forward()函数中的一个顺序参数。

  • continuous_cols – 包含连续特征的所有列的列名。

  • categorical_cols – 包含类别特征的所有列的列名。

  • input_shapes – 模型每个输入张量的形状列表。

  • validation – 可选的验证列名称(字符串),该列中的每一行值为1/True或0/False, 或验证分割比例(浮点数),给出随机选择用于验证的数据百分比。

  • label_cols – 用作标签的列名。必须是一个列表,其中每个标签对应模型的一个输出。

  • batch_size – 每个批次从DataFrame中读取的行数。

  • val_batch_size – 每个验证批次从DataFrame中读取的行数,如果未设置,将使用batch_size。

  • epochs – 训练的轮数。

  • verbose – 详细程度级别 [0, 2] (默认: 1)。

  • random_seed – 可选随机种子,用于Torch。默认值:None。

  • shuffle_buffer_size – (已弃用) 可选的内存中洗牌缓冲区大小(按训练数据行数计)。 分配更大的缓冲区大小会增加洗牌的随机性,但代价是消耗更多主机内存。默认值基于每台主机4GB内存的假设进行估算。设置 shuffle_buffer_size=0 将关闭洗牌功能。

  • shuffle – (可选) 是否打乱训练样本。默认为 True。

  • partitions_per_process – 从num_proc分配给每个工作进程的Parquet分区数量(默认值:10)。

  • run_id – 可选参数,用于在存储库中组织此次运行的唯一标识符。若未提供,将自动分配。

  • train_minibatch_fn – 可选的自定义函数,在训练循环中执行。默认为标准梯度下降过程。

  • train_steps_per_epoch – 每个训练周期要训练的步数。对于测试模型是否成功训练非常有用。 默认为每个周期训练整个数据集。

  • validation_steps_per_epoch – 每个训练周期执行的验证步骤数量。

  • transformation_fn – 可选函数,接收一行数据作为参数,并返回修改后的行数据,随后传入训练或验证步骤。此转换在批处理之后应用。更多详情请参阅 Petastorm 的 [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py)。请注意,此函数会构造另一个应执行转换的函数。

  • train_reader_num_workers – 此参数指定从数据存储中读取训练数据并对其应用数据转换的并行进程数量。增加此数值通常会提高读取速率,但也会增加内存占用。如果到数据存储的带宽不够高,或者用户需要对原始数据应用解压缩或数据增强等转换,则更多进程特别有用。

  • val_reader_num_workers – 类似于 train_reader_num_workers。

  • reader_pool_type – 用于并行从数据集中读取数据的Petastorm工作池类型。 应为['thread', 'process', 'dummy']其中之一。默认为'thread'。

  • inmemory_cache_all – (可选) 在内存中缓存训练和验证数据。

  • use_gpu – 是否使用GPU进行训练。默认为True。

  • mp_start_method – 用于启动多进程的方法。默认为None。

  • backward_passes_per_step – 在调用hvd.allreduce之前要执行的反向传播次数。 这允许在减少和应用更新之前,在多个小批次上累积更新。默认为1。

class horovod.spark.torch.TorchModel(*args, **kwargs)[源代码]

基类: horovod.spark.common.estimator.HorovodModel, horovod.spark.torch.estimator.TorchEstimatorParamsWritable, horovod.spark.torch.estimator.TorchEstimatorParamsReadable

Spark Transformer包装了一个PyTorch模型,用于对DataFrame进行预测。

通过调用torch_model.getModel()获取底层的PyTorch模型。

Parameters
  • history – 指标列表,训练期间每个周期对应一个条目。

  • model – 训练好的PyTorch模型。

  • feature_columns – 特征列名称列表。

  • label_columns – 标签列名称的列表。

  • optimizer – 训练期间使用的PyTorch优化器,包含更新后的状态。

  • run_id – 用于训练模型的运行ID。

  • loss – PyTorch 损失函数。

  • loss_constructors – PyTorch 损失函数构造器。

horovod.spark.common

class horovod.spark.common.estimator.HorovodEstimator[源代码]
fit(df, params=None)[源代码]

将模型拟合到DataFrame。

Parameters
  • df – 输入数据集,是 pyspark.sql.DataFrame 的一个实例。

  • params – 一个可选参数映射,用于覆盖嵌入参数。

Returns

HorovodModel 转换器包装训练好的模型。

fit_on_parquet(params=None, dataset_idx=None)[源代码]

在保存的Parquet文件store.get_train_path()上训练模型。

Parameters

params – 一个可选参数映射,用于覆盖嵌入参数。

Returns

训练好的HorovodModel转换器,属于适当的子类,包装了已训练的模型。

class horovod.spark.common.estimator.HorovodModel[源代码]
transform(df, params=None)[源代码]

将包含代表模型预测的预测列的输入数据集进行转换。

预测列名默认为 __output。通过调用 transformer.setOutputCols(col_names) 来覆盖列名。

Parameters
  • df – 输入数据集,是 pyspark.sql.DataFrame 的一个实例。

  • params – 一个可选参数映射,用于覆盖嵌入参数。

Returns

转换后的数据集。

class horovod.spark.common.backend.Backend[源代码]

基类:object

分布式训练函数远程执行的接口。

当运行Horovod的训练环境与运行HorovodEstimator的Spark应用不同时,可以使用自定义后端。

run(fn, args=(), kwargs={}, env=None)[源代码]

执行训练函数 fn 并返回每个工作节点的结果(按升序排列)。

Parameters
  • fn – 要运行的函数。

  • args – 传递给 fn 的参数。

  • kwargs – 传递给 fn 的关键字参数。

  • env – 在Horovod运行中使用的环境字典。默认为 os.environ

Returns

在每个等级上运行fn返回的结果列表。

num_processes()[源代码]

返回用于训练的进程数量。

class horovod.spark.common.backend.SparkBackend(num_proc=None, env=None, **kwargs)[源代码]

基类:horovod.spark.common.backend.Backend

使用 horovod.spark.run 执行分布式训练 fn

run(fn, args=(), kwargs={}, env=None)[源代码]

执行训练函数 fn 并返回每个工作节点的结果(按升序排列)。

Parameters
  • fn – 要运行的函数。

  • args – 传递给 fn 的参数。

  • kwargs – 传递给 fn 的关键字参数。

  • env – 在Horovod运行中使用的环境字典。默认为 os.environ

Returns

在每个等级上运行fn返回的结果列表。

num_processes()[源代码]

返回用于训练的进程数量。

horovod.spark.common.store.host_hash(salt=None)[源代码]

通过调用 horovod.runner.common.util.host_hash.host_hash 计算此主机的哈希值。

考虑环境变量 CONTAINER_ID,该变量在通过 YARN 运行 Spark 时存在。 YARN 容器不与同一主机上的其他容器共享内存, 因此从 host_hash 的角度来看,它必须被视为一个主机

Parameters

salt – 包含在哈希中的额外信息,忽略 Falsy 值

Returns

主机哈希

class horovod.spark.common.store.Store[源代码]

基类:object

用于中间文件(物化数据帧)和训练产物(检查点、日志)的存储层。

存储(Store)提供了对文件系统(例如本地文件系统与HDFS)或对象存储数据库的抽象。它提供了读写对象的基本语义,以及如何通过特定定义访问对象。

存储暴露了一个通用接口,不与特定的数据帧、模型或运行时耦合。每次运行估计器都应产生一个独立的运行目录,其中包含检查点和日志,而数据集的每个变化都应生成一个独立的中级数据路径。

为了允许缓存但防止过度使用中间数据的磁盘空间,中间数据集以确定性序列命名。当数据集完成训练使用后,中间文件可以被回收以释放磁盘空间,但不会自动删除,以便在需要时能够重复使用。这是为了支持在多个DataFrame上使用相同存储的并行训练过程,以及在不同模型变体上使用相同DataFrame的迭代训练。

is_parquet_dataset(path)[源代码]

如果路径是Parquet数据集的根目录,则返回True。

get_parquet_dataset(path)[源代码]

从路径返回一个 pyarrow.parquet.ParquetDataset

get_train_data_path(idx=None)[源代码]

返回训练数据集的路径。

get_val_data_path(idx=None)[源代码]

返回验证数据集的路径。

get_test_data_path(idx=None)[源代码]

返回测试数据集的路径。

saving_runs()[源代码]

如果训练期间应保存运行输出,则返回 True。

get_runs_path()[源代码]

返回所有运行的基础路径。

get_run_path(run_id)[源代码]

返回具有给定ID的运行路径。

get_checkpoint_path(run_id)[源代码]

返回给定运行的检查点文件的路径。

get_checkpoints(run_id, suffix='.ckpt')[源代码]

返回本次运行中保存的所有检查点的路径列表。

get_logs_path(run_id)[源代码]

返回给定运行的日志目录路径。

get_checkpoint_filename()[源代码]

返回已保存检查点文件的基本名称。

get_logs_subdir()[源代码]

返回日志目录的子目录名称。

exists(path)[源代码]

如果路径存在于存储中,则返回 True。

read(path)[源代码]

以字节形式返回路径的内容。

write_text(path, text)[源代码]

将文本文件写入路径。

sync_fn(run_id)[源代码]

返回一个函数,该函数将给定路径递归同步到run_id的运行路径中。

to_remote(run_id, dataset_idx)[源代码]

返回一个可以在远程环境中执行而无需Horoovd依赖的存储视图。

class horovod.spark.common.store.AbstractFilesystemStore(prefix_path, train_path=None, val_path=None, test_path=None, runs_path=None, save_runs=True, storage_options=None, checkpoint_filename=None, **kwargs)[源代码]

基类: horovod.spark.common.store.Store

使用文件系统作为底层存储的存储抽象类。

exists(path)[源代码]

如果路径存在于存储中,则返回 True。

read(path)[源代码]

以字节形式返回路径的内容。

read_serialized_keras_model(ckpt_path, model, custom_objects)[源代码]

读取keras模型的检查点文件到模型字节,并返回base 64编码的模型字节。 :param ckpt_path: 检查点文件路径的字符串。 :param model: 一个keras模型。当ckpt_path仅包含模型权重时,此参数将用于DBFSLocalStore.read_serialized_keras_model()。 :param custom_objects: 加载keras模型时,此参数将用于DBFSLocalStore.read_serialized_keras_model()。 :return: 检查点模型的base 64编码模型字节。

write_text(path, text)[源代码]

将文本文件写入路径。

is_parquet_dataset(path)[源代码]

如果路径是Parquet数据集的根目录,则返回True。

get_parquet_dataset(path)[源代码]

从路径返回一个 pyarrow.parquet.ParquetDataset

get_train_data_path(idx=None)[源代码]

返回训练数据集的路径。

get_val_data_path(idx=None)[源代码]

返回验证数据集的路径。

get_test_data_path(idx=None)[源代码]

返回测试数据集的路径。

saving_runs()[源代码]

如果训练期间应保存运行输出,则返回 True。

get_runs_path()[源代码]

返回所有运行的基础路径。

get_run_path(run_id)[源代码]

返回具有给定ID的运行路径。

get_checkpoint_path(run_id)[源代码]

返回给定运行的检查点文件的路径。

get_checkpoints(run_id, suffix='.ckpt')[源代码]

返回本次运行保存的所有检查点的路径列表。

get_logs_path(run_id)[源代码]

返回给定运行的日志目录路径。

get_checkpoint_filename()[源代码]

返回已保存检查点文件的基本名称。

get_logs_subdir()[源代码]

返回日志目录的子目录名称。

class horovod.spark.common.store.FilesystemStore(prefix_path, *args, **kwargs)[源代码]

基类:horovod.spark.common.store.AbstractFilesystemStore

具体文件系统存储,委托给 fsspec

sync_fn(run_id)[源代码]

返回一个函数,该函数将给定路径递归同步到run_id的运行路径中。

copy(lpath, rpath, recursive=False, callback=<MagicMock id='137629530667184'>, **kwargs)[源代码]

此方法将本地源目录的内容复制到目标目录。 这与fsspec的put()不同,因为它不会在目标目录已存在的情况下将源文件夹复制到目标目录。

class horovod.spark.common.store.LocalStore(*args, **kwargs)[源代码]

基类: horovod.spark.common.store.FilesystemStore

使用本地文件系统作为中间数据和训练产物的存储。

该类已弃用,现在仅解析为FilesystemStore。

class horovod.spark.common.store.HDFSStore(prefix_path, host=None, port=None, user=None, kerb_ticket=None, driver='libhdfs', extra_conf=None, *args, **kwargs)[源代码]

基类: horovod.spark.common.store.AbstractFilesystemStore

使用HDFS作为中间数据和训练产物的存储。

prefix_path 初始化,该路径可以采用以下形式之一:

  1. “hdfs://namenode01:8020/user/test/horovod”

  2. “hdfs:///user/test/horovod”

  3. “/user/test/horovod”

完整路径(包括前缀、主机和端口)将用于通过Spark对HDFS的所有读写操作。如果未提供主机和端口,它们将被省略。如果未提供前缀(情况3),无论怎样它都会被添加到完整路径前。

本地化路径(不含前缀、主机和端口)将用于与PyArrow交互。若未通过初始化器的hostport参数提供,则解析到的主机和端口信息将用于初始化PyArrow HadoopFilesystem。若路径URL和参数均未提供此信息,这些参数将默认为default0

sync_fn(run_id)[源代码]

返回一个函数,该函数将给定路径递归同步到run_id的运行路径中。

class horovod.spark.common.store.DBFSLocalStore(prefix_path, *args, **kwargs)[源代码]

基类:horovod.spark.common.store.FilesystemStore

使用 Databricks 文件系统 (DBFS) 本地文件 API 作为中间数据和训练产物的存储。

从以 prefix_path 开头的路径初始化,路径以 /dbfs/…file:///dbfs/…file:/dbfs/…dbfs:/… 开头,请参阅 https://docs.databricks.com/data/databricks-file-system.html#local-file-apis

static normalize_path(path)[源代码]

将路径标准化为 /dbfs/… 的形式

exists(path)[源代码]

如果路径存在于存储中,则返回 True。

get_checkpoint_filename()[源代码]

返回已保存检查点文件的基本名称。

read_serialized_keras_model(ckpt_path, model, custom_objects)[源代码]

返回序列化的keras模型。 参数model用于在检查点文件仅包含模型权重时提供模型结构。

horovod.ray

class horovod.ray.RayExecutor(settings, num_workers: Optional[int] = None, num_hosts: Optional[int] = None, num_workers_per_host: int = 1, cpus_per_worker: int = 1, use_gpu: bool = False, gpus_per_worker: Optional[int] = None, use_current_placement_group: bool = True, min_workers: Optional[int] = None, max_workers: Optional[int] = None, reset_limit: Optional[int] = None, cooldown_range: Optional[List[int]] = None, elastic_timeout: int = 600, override_discovery: bool = True)[源代码]

用于 Horovod + Ray 集成的作业类。

Parameters
  • settings (horovod.Settings) – 用于作业设置的配置。您可以使用标准的Horovod设置对象,或直接从RayExecutor.create_settings创建一个。

  • num_workers (int) – 用于训练的worker数量。

  • cpus_per_worker (int) – 分配给每个工作进程的CPU资源数量。

  • use_gpu (bool) – 是否使用GPU进行分配。待办事项:这个 可以被移除。

  • gpus_per_worker (int) – 分配给每个工作进程的GPU资源数量。

  • num_hosts (int) – num_workers 的替代 API。执行作业的机器数量。用于强制每台机器上的工作进程数量相同。

  • num_workers_per_host (int) – num_workers 的替代 API。每台机器上要放置的工作进程数量。 用于强制每台机器上的工作进程数量相等。仅与 num_hosts 结合使用。

  • use_current_placement_group (bool) – 是否使用当前的放置组而不是创建新的。默认为 True。

  • min_workers (int) – 维持训练所需的最小进程数。如果可用进程数量低于此阈值,训练将等待更多可用实例。

  • max_workers (int) – 训练进程的最大数量, 超过此数量将不会创建额外的进程。 如果未指定,则数量将不受限制。

  • reset_limit (int) – 训练作业可以增加或减少工作器数量的最大次数,超过该次数后作业将被终止。

  • elastic_timeout (int) – 集群重新调整规模后弹性初始化的超时时间。默认值为600秒。 或者,也可以使用环境变量 HOROVOD_ELASTIC_TIMEOUT。

  • override_discovery (bool) – 是否让ElasticRayExecutor自动为ElasticSettings提供发现机制。

classmethod create_settings(timeout_s=30, ssh_identity_file=None, ssh_str=None, placement_group_timeout_s=100, nics=None)[源代码]

创建一个迷你设置对象。

Parameters
  • timeout_s (int) – Gloo集合点的超时参数。

  • ssh_identity_file (str) – 用于通过ssh连接到集群中不同主机的身份文件路径。

  • ssh_str (str) – 使用此参数时请谨慎。私钥文件内容。将私钥写入ssh_identity_file。

  • placement_group_timeout_s (int) – Ray放置组创建的超时参数。

  • nics (set) – 可用于通信的网络接口。

Returns

MiniSettings 对象。

start(executable_cls: Optional[type] = None, executable_args: Optional[List] = None, executable_kwargs: Optional[Dict] = None, extra_env_vars: Optional[Dict] = None)[源代码]

启动工作器并将它们共同部署在所有机器上。

我们实现节点分组是因为看起来我们的实现对于不平衡节点不太适用。 此外,同地部署性能通常远优于非同地部署的工作节点。

Parameters
  • executable_cls (type) – 将在智能体内部创建的类(BaseHorovodWorker)。这将允许Horovod建立其连接并设置环境变量。

  • executable_args (List) – 初始化时传递给工作类的参数。

  • executable_kwargs (Dict) – 在初始化时传递给工作器类的关键字参数。

  • extra_env_vars (Dict) – 在初始化前需要设置在智能体(工作进程)上的环境变量。

execute(fn: Callable[[executable_cls], Any], callbacks: Optional[List[Callable]] = None)List[Any][源代码]

在所有工作节点上执行提供的函数。

Parameters
  • fn – 在每个对象上调用的目标函数。

  • callbacks – 可调用对象列表。每个回调必须为 可调用函数或实现了__call__的类。 每个回调将在rank 0工作节点记录的每个值上被调用。

Returns

从目标函数反序列化的返回值。

run(fn: Callable[[Any], Any], args: Optional[List] = None, kwargs: Optional[Dict] = None, callbacks: Optional[List[Callable]] = None)List[Any][源代码]

在所有工作节点上执行提供的函数。

Parameters
  • fn – 可使用任意参数和关键字参数执行的目标函数。

  • args – 传递给目标函数的参数列表。

  • kwargs – 传递给目标函数的关键字参数字典。

  • callbacks – 可调用对象列表。每个回调必须为 可调用函数或实现了 __call__ 的类。 每个回调都会由 rank 0 工作节点记录的每个值调用。

Returns

从目标函数反序列化的返回值。

run_remote(fn: Callable[[Any], Any], args: Optional[List] = None, kwargs: Optional[Dict] = None)List[Any][源代码]

在所有工作节点上执行提供的函数。

Parameters
  • fn – 可使用任意参数和关键字参数执行的目标函数。

  • args – 传递给目标函数的参数列表。

  • kwargs – 传递给目标函数的关键字参数字典。

Returns

你可以运行ray.get的对象引用列表,用于

检索值。

Return type

列表

execute_single(fn: Callable[[executable_cls], Any])List[Any][源代码]

在排名为0的工作节点(主节点)上执行所提供的函数。

Parameters

fn – 在主要对象上调用的目标函数。

Returns

从目标函数反序列化的返回值。

shutdown()[源代码]

销毁提供的智能体。

class horovod.ray.ElasticRayExecutor(settings: horovod.runner.elastic.settings.ElasticSettings, use_gpu: bool = False, cpus_per_slot: int = 1, gpus_per_slot: Optional[int] = None, env_vars: Optional[dict] = None, override_discovery=True)[源代码]

使用Ray执行弹性作业的执行器。

利用Ray全局状态检测可用主机和插槽。假设整个Ray集群可供执行器使用。

Parameters
  • settings – 弹性作业配置设置。您可以使用标准的Horovod ElasticSettings对象,或直接从ElasticRayExecutor.create_settings创建。

  • use_gpu (bool) – 是否使用GPU进行分配。

  • cpus_per_slot (int) – 分配给每个工作进程的CPU资源数量。

  • gpus_per_slot (int) – 分配给每个工作器的GPU资源数量。

  • env_vars (Dict) – 初始化前需要在智能体(工作进程)上设置的环境变量。

  • override_discovery (bool) – 是否让ElasticRayExecutor自动为ElasticSettings提供发现机制。

示例:

import ray
ray.init(address="auto")
settings = ElasticRayExecutor.create_settings(verbose=True)
executor = ElasticRayExecutor(
    settings, use_gpu=True, cpus_per_slot=2)
executor.start()
executor.run(train_fn)

警告:.. 已弃用:0.25.0

static create_settings(min_num_proc: int = 1, max_num_proc: Optional[int] = None, reset_limit: Optional[int] = None, elastic_timeout: int = 600, timeout_s: int = 30, ssh_identity_file: Optional[str] = None, nics: Optional[str] = None, min_np=None, max_np=None, **kwargs)[源代码]

返回一个用于ElasticRayExecutor的Settings对象。

请注意,discovery属性将在运行时设置。

Parameters
  • min_num_proc (int) – 维持训练所需的最小进程数。当可用进程数量低于此阈值时,训练将等待更多实例变为可用。

  • max_num_proc (int) – 最大训练进程数, 超过此数值后将不再创建新进程。 若未指定,则无上限限制。

  • reset_limit (int) – 训练作业可以增加或减少工作节点数量的最大次数,超过该次数后作业将被终止。

  • elastic_timeout (int) – 集群重新扩缩容后弹性初始化的超时时间。默认值为600秒。 或者,也可以使用环境变量 HOROVOD_ELASTIC_TIMEOUT。

  • timeout_s (int) – Horovod 在指定超时时间之前执行所有检查并启动进程。 默认值为30秒。

  • ssh_identity_file (str) – 驱动节点上用于读取身份认证(私钥)的文件。

  • nics (set) – 可用于通信的网络接口。

start()[源代码]

启动 Horovod 驱动和服务。

run(worker_fn: Callable, callbacks: Optional[List[Callable]] = None)List[Any][源代码]

在所有工作节点上执行提供的函数。

Parameters
  • worker_fn – 可执行的目标弹性函数。

  • callbacks – 可调用对象列表。每个回调必须是一个可调用函数或实现了 __call__ 的类。 每个回调都会由 rank 0 工作节点记录的每个值触发。

Returns

来自每个已完成工作节点的返回值列表。

horovod.run

启动一个Horovod作业来运行指定的进程函数并获取返回值。

param func

在Horovod作业进程中运行的函数。该函数的返回值将被收集为对应Horovod进程的返回值。 此函数必须与pickle兼容。

param args

传递给 func 的参数。

param kwargs

传递给 func 的关键字参数。

param num_proc

Horovod进程数量。

param min_num_proc

继续训练所需的最小进程数。如果可用进程数量低于此阈值,训练将等待更多实例变为可用。默认为 num_proc

param max_num_proc

训练进程的最大数量,超过此数量将不会创建额外的进程。如果未指定,则无上限。

param slots

每台主机上进程的插槽数量。通常每台主机每个GPU对应1个插槽。 如果插槽数量由主机发现脚本的输出提供,则该值将覆盖此参数。

param reset_limit

训练任务可以增加或减少工作节点数量的最大次数,超过该次数后任务将被终止。重置事件发生在初始注册后工作节点被添加或移除时。因此,reset_limit为0意味着任务在初始工作节点集之后无法更改成员资格。reset_limit为1意味着它最多可以调整大小一次,等等。

param cooldown_range

失败主机在黑名单中保留的时间范围(最小值,最大值),单位为秒。

param hosts

主机名称列表及每台主机上可运行进程的可用插槽数量,格式为:: (例如:host1:2,host2:4,host3:1 表示 host1 可运行2个进程,host2 可运行4个,host3 可运行1个)。如果未指定,默认使用 localhost:

param hostfile

主机文件路径,包含主机名列表和可用插槽数量。文件的每一行必须符合以下格式: slots=

param host_discovery_script

用于弹性训练(自动扩缩容和容错)。 一个可执行脚本,将向标准输出打印所有可用的主机(每行一个),这些主机可用于运行工作进程。可选地,在同一行上指定主机名上的插槽数量,格式为:“主机名:插槽数”。 提供发现脚本可启用弹性训练。 如果脚本在首次调用时返回非零退出码,作业将立即失败。后续调用将重试直到超时。

param start_timeout

Horovodrun 必须在指定超时时间内完成所有检查并启动进程。默认值为30秒。 此外,也可使用环境变量 HOROVOD_START_TIMEOUT 来指定初始化超时时间。

param ssh_port

所有主机上的SSH端口。

param ssh_identity_file

SSH 身份(私钥)文件。

param disable_cache

如果未设置该标志,horovodrun将仅每60分钟执行一次初始化检查——前提是检查成功通过。否则,每次调用horovodrun时都会运行所有检查。

param output_filename

对于Gloo,将所有进程的stdout / stderr写入到格式为/rank./的文件名中。将用0字符填充以确保字典顺序。 对于MPI,将其行为委托给mpirun。

param verbose

如果设置了此标志,将打印额外消息。

param use_gloo

使用Gloo控制器运行Horovod。如果Horovod未构建MPI支持,这将是默认选项。

param use_mpi

使用MPI控制器运行Horovod。如果Horovod构建时支持MPI,这将是默认选项。

param mpi_args

MPI控制器的额外参数。仅在use_mpi为True时使用。

param network_interfaces

用于通信的网络接口列表。如果未指定,Horovod将在所有工作节点间寻找通用网卡。 示例:[“eth0”, “eth1”]。

param executable

启动工作进程时可选的可执行文件。默认为 sys.executable

return

返回一个包含所有Horovod进程返回值的列表。 列表的索引对应每个Horovod进程的排名。 如果设置了min_num_proc参数,则仅返回前min_num_proc个结果。