实验跟踪#

备注

本指南适用于所有定义自定义训练循环的训练器。这包括 TorchTrainerTensorflowTrainer

大多数实验跟踪库都可以开箱即用地与 Ray Train 一起工作。本指南提供了如何设置代码的说明,以便您喜欢的实验跟踪库可以用于 Ray Train 的分布式训练。指南的末尾列出了常见错误,以帮助调试设置。

以下伪代码展示了如何在 Ray Train 中使用原生实验跟踪库调用:

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    # Training code and native experiment tracking library calls go here.

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()

Ray Train 允许你通过自定义 train_func 函数内部的跟踪逻辑来使用本地的实验跟踪库。通过这种方式,你可以将实验跟踪逻辑移植到 Ray Train 中,只需进行最小的改动。

入门指南#

让我们从一些代码片段开始。

以下示例使用了 Weights & Biases (W&B) 和 MLflow,但它可以适应其他框架。

import ray
from ray import train
import wandb

# Step 1
# This ensures that all ray worker processes have `WANDB_API_KEY` set.
ray.init(runtime_env={"env_vars": {"WANDB_API_KEY": "your_api_key"}})

def train_func():
    # Step 1 and 2
    if train.get_context().get_world_rank() == 0:
        wandb.init(
            name=...,
            project=...,
            # ...
        )

    # ...
    loss = optimize()
    metrics = {"loss": loss}

    # Step 3
    if train.get_context().get_world_rank() == 0:
        wandb.log(metrics)

    # ...

    # Step 4
    # Make sure that all loggings are uploaded to the W&B backend.
    if train.get_context().get_world_rank() == 0:
        wandb.finish()
from ray import train
import mlflow

# Run the following on the head node:
# $ databricks configure --token
# mv ~/.databrickscfg YOUR_SHARED_STORAGE_PATH
# This function assumes `databricks_config_file` is specified in the Trainer's `train_loop_config`.
def train_func(config):
    # Step 1 and 2
    os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
    mlflow.set_tracking_uri("databricks")
    mlflow.set_experiment_id(...)
    mlflow.start_run()

    # ...

    loss = optimize()

    metrics = {"loss": loss}
    # Only report the results from the first worker to MLflow
    to avoid duplication

    # Step 3
    if train.get_context().get_world_rank() == 0:
        mlflow.log_metrics(metrics)

小技巧

分布式训练与非分布式训练的一个主要区别在于,在分布式训练中,多个进程并行运行,并且在某些设置下它们具有相同的结果。如果所有进程都将结果报告给跟踪后端,您可能会得到重复的结果。为了解决这个问题,Ray Train 允许您仅对 rank 0 的 worker 应用日志记录逻辑,使用以下方法:ray.train.get_context().get_world_rank()

from ray import train
def train_func():
    ...
    if train.get_context().get_world_rank() == 0:
        # Add your logging logic only for rank0 worker.
    ...

train_func 中与实验跟踪后端的交互有4个逻辑步骤:

  1. 设置与跟踪后端的连接

  2. 配置并启动运行

  3. 日志指标

  4. 完成运行

每个步骤的更多细节如下。

步骤 1:连接到您的跟踪后端#

首先,决定使用哪个跟踪后端:W&B、MLflow、TensorBoard、Comet 等。如果适用,请确保在每个训练工作节点上正确设置凭据。

W&B 提供 在线离线 两种模式。

在线

对于 在线 模式,由于您登录到 W&B 的跟踪服务,请确保在 train_func 中设置了凭证。有关更多信息,请参阅 设置凭证

# This is equivalent to `os.environ["WANDB_API_KEY"] = "your_api_key"`
wandb.login(key="your_api_key")

离线

对于 离线 模式,由于您记录到本地文件系统,请将离线目录指向所有节点都可以写入的共享存储路径。有关更多信息,请参阅 设置共享文件系统

os.environ["WANDB_MODE"] = "offline"
wandb.init(dir="some_shared_storage_path/wandb")

MLflow 提供 本地远程 (例如,到 Databrick 的 MLflow 服务)模式。

本地

对于 本地 模式,由于您记录到本地文件系统,请将离线目录指向所有节点都可以写入的共享存储路径。更多信息请参见 设置共享文件系统

mlflow.start_run(tracking_uri="file:some_shared_storage_path/mlruns")

远程,由Databricks托管

确保所有节点都能访问Databricks配置文件。更多信息请参见 设置凭证

# The MLflow client looks for a Databricks config file
# at the location specified by `os.environ["DATABRICKS_CONFIG_FILE"]`.
os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
mlflow.set_tracking_uri("databricks")
mlflow.start_run()

设置凭证#

请参考每个跟踪库的API文档以设置凭证。这一步通常涉及设置环境变量或访问配置文件。

将环境变量凭据传递给训练工作者的最简单方法是通过 运行时环境,您可以使用以下代码进行初始化:

import ray
# This makes sure that training workers have the same env var set
ray.init(runtime_env={"env_vars": {"SOME_API_KEY": "your_api_key"}})

要访问配置文件,请确保所有节点都可以访问配置文件。一种方法是设置共享存储。另一种方法是在每个节点中保存一个副本。

设置共享文件系统#

设置一个网络文件系统,使集群中的所有节点都可以访问。例如,AWS EFS 或 Google Cloud Filestore。

步骤 2:配置并启动运行#

这一步通常包括为运行选择一个标识符,并将其与一个项目关联。请参阅跟踪库的文档以了解语义。

小技巧

在进行带有自动恢复的 容错训练 时,使用一致的ID来配置所有逻辑上属于同一训练运行的跟踪运行。获取唯一ID的一种方法是使用以下方法:ray.train.get_context().get_trial_id()

import ray
from ray.train import ScalingConfig, RunConfig, FailureConfig
from ray.train.torch import TorchTrainer

def train_func():
    if ray.train.get_context().get_world_rank() == 0:
        wandb.init(id=ray.train.get_context().get_trial_id())
    ...

trainer = TorchTrainer(
    train_func,
    run_config=RunConfig(failure_config=FailureConfig(max_failures=3))
)

trainer.fit()

步骤 3:记录指标#

你可以在 train_func 中自定义如何记录参数、指标、模型或媒体内容,就像在非分布式训练脚本中一样。你也可以使用特定跟踪框架与特定训练框架的原生集成。例如,mlflow.pytorch.autolog()lightning.pytorch.loggers.MLFlowLogger 等。

步骤 4:完成运行#

此步骤确保所有日志都同步到跟踪服务。根据各种跟踪库的实现,有时日志会首先本地缓存,并且仅以异步方式同步到跟踪服务。完成运行确保所有日志在训练工作进程退出时都已同步。

# https://docs.wandb.ai/ref/python/finish
wandb.finish()
# https://mlflow.org/docs/1.2.0/python_api/mlflow.html
mlflow.end_run()
# https://www.comet.com/docs/v2/api-and-sdk/python-sdk/reference/Experiment/#experimentend
Experiment.end()

示例#

以下是 PyTorch 和 PyTorch Lightning 的可运行示例。

PyTorch#

记录到 W&B
from filelock import FileLock
import os

import torch
import wandb

from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.models import resnet18

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

# Run the following script with the WANDB_API_KEY env var set.
assert os.environ.get("WANDB_API_KEY", None), "Please set WANDB_API_KEY env var."

# This makes sure that all workers have this env var set.
ray.init(
    runtime_env={"env_vars": {"WANDB_API_KEY": os.environ["WANDB_API_KEY"]}}
)


def train_func(config):
    if ray.train.get_context().get_world_rank() == 0:
        wandb.init()

    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    model = ray.train.torch.prepare_model(model)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.module.parameters(), lr=0.001)

    # Data
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
    )
    with FileLock("./data.lock"):
        train_data = datasets.FashionMNIST(
            root="./data", train=True, download=True, transform=transform
        )
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(1):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if ray.train.get_context().get_world_rank() == 0:
                wandb.log({"loss": loss, "epoch": epoch})

    if ray.train.get_context().get_world_rank() == 0:
        wandb.finish()


trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2),
)
trainer.fit()
记录到基于文件的 MLflow
# Run the following script with the SHARED_STORAGE_PATH env var set.
# The MLflow offline logs are saved to SHARED_STORAGE_PATH/mlruns.

import mlflow
import os
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
from torchvision import datasets, transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader

assert os.environ.get(
    "SHARED_STORAGE_PATH", None
), "Please set SHARED_STORAGE_PATH env var."


# Assumes you are passing a `save_dir` in `config`
def train_func(config):
    save_dir = config["save_dir"]
    if ray.train.get_context().get_world_rank() == 0:
        mlflow.set_tracking_uri(f"file:{save_dir}")
        mlflow.set_experiment("my_experiment")
        mlflow.start_run()

    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    model = ray.train.torch.prepare_model(model)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.module.parameters(), lr=0.001)

    # Data
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
    )
    with FileLock("./data.lock"):
        train_data = datasets.FashionMNIST(
            root="./data", train=True, download=True, transform=transform
        )
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(1):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if ray.train.get_context().get_world_rank() == 0:
                mlflow.log_metrics({"loss": loss.item(), "epoch": epoch})

    if ray.train.get_context().get_world_rank() == 0:
        mlflow.end_run()


trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "mlruns")
    },
    scaling_config=ScalingConfig(num_workers=2),
)
trainer.fit()

PyTorch Lightning#

在使用 Ray Train 的 TorchTrainer 时,您可以在 PyTorch Lightning 中使用 W&B、CometML、MLFlow 和 Tensorboard 的原生日志记录集成。

以下示例将引导您完成整个过程。这里的代码是可运行的。

W&B
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
import wandb
from pytorch_lightning.loggers.wandb import WandbLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = WandbLogger(name="demo-run", project="demo-project")

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)
    if ray.train.get_context().get_world_rank() == 0:
        wandb.finish()


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "WANDB_API_KEY" in os.environ
), 'Please set WANDB_API_KEY="abcde" when running this script.'

# This ensures that all workers have this env var set.
ray.init(
    runtime_env={"env_vars": {"WANDB_API_KEY": os.environ["WANDB_API_KEY"]}}
)
trainer = TorchTrainer(
    train_func,
    scaling_config=scaling_config,
)

trainer.fit()
MLflow
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.mlflow import MLFlowLogger

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):

    save_dir = config["save_dir"]
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = MLFlowLogger(
            experiment_name="demo-project",
            tracking_uri=f"file:{save_dir}",
        )

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "SHARED_STORAGE_PATH" in os.environ
), "Please do SHARED_STORAGE_PATH=/a/b/c when running this script."

trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "mlruns")
    },
    scaling_config=scaling_config,
)

trainer.fit()
彗星
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.comet import CometLogger
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = CometLogger(api_key=os.environ["COMET_API_KEY"])

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "COMET_API_KEY" in os.environ
), 'Please do COMET_API_KEY="abcde" when running this script.'
# This makes sure that all workers have this env var set.
ray.init(runtime_env={"env_vars": {"COMET_API_KEY": os.environ["COMET_API_KEY"]}})
trainer = TorchTrainer(
    train_func,
    scaling_config=scaling_config,
)

trainer.fit()
TensorBoard
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Create dummy data
X = torch.randn(128, 3)  # 128 samples, 3 features
y = torch.randint(0, 2, (128,))  # 128 binary labels

# Create a TensorDataset to wrap the data
dataset = TensorDataset(X, y)

# Create a DataLoader to iterate over the dataset
batch_size = 8
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define a dummy model
class DummyModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.layer = torch.nn.Linear(3, 1)

    def forward(self, x):
        return self.layer(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.binary_cross_entropy_with_logits(y_hat.flatten(), y.float())

        # The metrics below will be reported to Loggers
        self.log("train_loss", loss)
        self.log_dict({
            "metric_1": 1 / (batch_idx + 1), "metric_2": batch_idx * 100
        })
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)
import os
import pytorch_lightning as pl
from pytorch_lightning.loggers.tensorboard import TensorBoardLogger

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):

    save_dir = config["save_dir"]
    logger = None
    if ray.train.get_context().get_world_rank() == 0:
        logger = TensorBoardLogger(name="demo-run", save_dir=f"file:{save_dir}")

    ptl_trainer = pl.Trainer(
        max_epochs=5,
        accelerator="cpu",
        logger=logger,
        log_every_n_steps=1,
    )
    model = DummyModel()
    ptl_trainer.fit(model, train_dataloaders=dataloader)


scaling_config = ScalingConfig(num_workers=2, use_gpu=False)

assert (
    "SHARED_STORAGE_PATH" in os.environ
), "Please do SHARED_STORAGE_PATH=/a/b/c when running this script."

trainer = TorchTrainer(
    train_func,
    train_loop_config={
        "save_dir": os.path.join(os.environ["SHARED_STORAGE_PATH"], "tensorboard")
    },
    scaling_config=scaling_config,
)

trainer.fit()

常见错误#

缺少凭证#

我已经调用了 `wandb login` cli,但仍然收到

wandb: ERROR api_key not configured (no-tty). call wandb.login(key=[your_api_key]).

这可能是因为wandb凭证在worker节点上没有正确设置。请确保你运行了 wandb.login 或者将 WANDB_API_KEY 传递给每个训练函数。更多详情请参见 设置凭证

缺少配置#

我已经运行了 `databricks configure`,但仍然遇到

databricks_cli.utils.InvalidConfigurationError: You haven't configured the CLI yet!

这通常是由于运行 databricks configure ,它仅在头节点上生成 ~/.databrickscfg 文件。将此文件移动到共享位置或将其复制到每个节点。更多详情请参见 设置凭证