Shortcuts

torch.distributed.rpc 的源代码

```html
from datetime import timedelta
import logging
import os
import threading
import warnings
from typing import Generator, Tuple
from urllib.parse import urlparse

import torch
import torch.distributed as dist

logger = logging.getLogger(__name__)


_init_counter = 0
_init_counter_lock = threading.Lock()

__all__ = ["is_available"]

def is_available() -> bool:
    return hasattr(torch._C, "_rpc_init")


if is_available() and not torch._C._rpc_init():
    raise RuntimeError("Failed to initialize torch.distributed.rpc")


if is_available():
    from torch._C._distributed_c10d import Store
    from torch._C._distributed_rpc import (
        _disable_jit_rref_pickle,
        _enable_jit_rref_pickle,
        _disable_server_process_global_profiler,
        _enable_server_process_global_profiler,
        _set_and_start_rpc_agent,
        _reset_current_rpc_agent,
        _delete_all_user_and_unforked_owner_rrefs,
        _destroy_rref_context,
        _set_profiler_node_id,
        _is_current_rpc_agent_set,
        _rref_context_get_debug_info,
        _cleanup_python_rpc_handler,
        _invoke_rpc_builtin,
        _invoke_rpc_python_udf,
        _invoke_rpc_torchscript,
        _invoke_remote_builtin,
        _invoke_remote_python_udf,
        _invoke_remote_torchscript,
        _set_rpc_timeout,
        _get_current_rpc_agent,
        get_rpc_timeout,
        enable_gil_profiling,
        RpcBackendOptions,
        _TensorPipeRpcBackendOptionsBase,
        RpcAgent,
        PyRRef,
        TensorPipeAgent,
        RemoteProfilerManager,
        WorkerInfo,
        _DEFAULT_INIT_METHOD,
        _DEFAULT_NUM_WORKER_THREADS,
        _UNSET_RPC_TIMEOUT,
        _DEFAULT_RPC_TIMEOUT_SEC,
    )  # noqa: F401

    from . import api, backend_registry, functions
    from .api import *  # noqa: F401,F403
    import numbers

    import torch.distributed.autograd as dist_autograd

    from .backend_registry import BackendType
    from .options import TensorPipeRpcBackendOptions  # noqa: F401
    from .server_process_global_profiler import (
        _server_process_global_profile,
    )

    rendezvous_iterator: Generator[Tuple[Store, int, int], None, None]

    __all__ += ["init_rpc", "BackendType", "TensorPipeRpcBackendOptions"]
    __all__ = __all__ + api.__all__ + backend_registry.__all__  # noqa: PLE0605

[docs] def init_rpc( name, backend=None, rank=-1, world_size=None, rpc_backend_options=None, ): r""" 初始化RPC原语,例如本地RPC代理和分布式自动求导,这会立即使当前进程准备好发送和接收RPC。 参数: name (str): 该节点的全局唯一名称。(例如,``Trainer3``, ``ParameterServer2``, ``Master``, ``Worker1``) 名称只能包含数字、字母、下划线、冒号和/或破折号,并且必须少于128个字符。 backend (BackendType, 可选): RPC后端实现的类型。支持的值是``BackendType.TENSORPIPE``(默认)。 有关更多信息,请参阅 :ref:`rpc-backends`。 rank (int): 该节点的全局唯一id/rank。 world_size (int): 组中的工作节点数量。 rpc_backend_options (RpcBackendOptions, 可选): 传递给RpcAgent构造函数的选项。它必须是特定代理的 :class:`~torch.distributed.rpc.RpcBackendOptions`的子类,并包含代理特定的初始化配置。默认情况下, 对于所有代理,它将默认超时设置为60秒,并使用``init_method = "env://"``初始化底层进程组, 这意味着需要正确设置环境变量``MASTER_ADDR``和``MASTER_PORT``。有关更多信息,请参阅 :ref:`rpc-backends`,并查找可用的选项。 """ torch._C._log_api_usage_once("torch.distributed.init_rpc") if backend is not None and not isinstance( backend, backend_registry.BackendType ): raise TypeError("Argument backend must be a member of BackendType") if rpc_backend_options is not None and not isinstance( rpc_backend_options, RpcBackendOptions ): raise TypeError( "Argument rpc_backend_options must be an instance of RpcBackendOptions" ) # 尝试从选项中检测后端 if backend is None and rpc_backend_options is not None: for candidate_backend in BackendType: if isinstance( rpc_backend_options, type( backend_registry<span