与PyTorch集成#

本介绍将简要说明如何在Mars中集成PyTorch

安装#

如果你在单台机器上使用Mars,例如在你的笔记本电脑上,请确保已安装PyTorch。

您可以通过pip安装PyTorch:

pip3 install torch torchvision torchaudio

访问 PyTorch 安装指南 获取更多信息。

另一方面,如果您准备在集群中使用 Mars,请确保每个工作节点上都安装了 PyTorch。

准备数据#

我们使用的数据集是ionosphere dataset,点击链接以下载数据。

准备PyTorch脚本#

现在我们创建一个名为 torch_demo.py 的Python文件,其中包含PyTorch的逻辑。

import os

import mars.dataframe as md
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
from sklearn.preprocessing import LabelEncoder


def prepare_data():
    df = md.read_csv('ionosphere.data', header=None)

    # split into input and output columns
    X = df.iloc[:, :-1].to_tensor().astype('float32')
    y = df.iloc[:, -1].to_tensor()

    # convert Mars tensor to numpy ndarray
    X, y = X.to_numpy(), y.to_numpy()

    # encode string to integer
    y = LabelEncoder().fit_transform(y)

    return X, y


def get_model():
    return nn.Sequential(
        nn.Linear(34, 10),
        nn.ReLU(),
        nn.Linear(10, 8),
        nn.ReLU(),
        nn.Linear(8, 1),
        nn.Sigmoid(),
    )


def train():
    dist.init_process_group(backend="gloo")
    torch.manual_seed(42)

    data, labels= prepare_data()
    data = torch.from_numpy(data)
    labels = torch.from_numpy(labels)
    train_dataset = torch.utils.data.TensorDataset(data, labels.float())
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            shuffle=False,
                                            sampler=train_sampler)


    model = nn.parallel.DistributedDataParallel(get_model())
    optimizer = optim.Adam(model.parameters(),
                        lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(150):  # 150 epochs
        running_loss = 0.0
        for _, (batch_data, batch_labels) in enumerate(train_loader):
            outputs = model(batch_data)
            loss = criterion(outputs.squeeze(), batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f"epoch {epoch}, running_loss is {running_loss}")


if __name__ == "__main__":
    train()

Mars库,包括DataFrame等,可以直接用于处理海量数据并加速预处理。

通过Mars运行PyTorch脚本#

现在可以通过 run_pytorch_script() 提交PyTorch脚本。

In [1]: from mars.learn.contrib.pytorch import run_pytorch_script

In [2]: run_pytorch_script("torch_demo.py", n_workers=2)
task: <Task pending coro=<Event.wait() running at ./mars-dev/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f04c5027cd0>()]>>
...
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[4]: Object <op=RunPyTorch, key=d5c40e502b77310ef359729692233d56>

分布式训练或推理#

有关部署的内容,请参考 在集群上运行 部分,或者有关在Kubernetes上运行Mars的内容,请参考 在Kubernetes上运行 部分。

torch_demo.py 可以看出,Mars 会自动设置环境变量。因此你不需要担心分布式设置,你需要做的就是编写一个合适的 distributed PyTorch script.

一旦集群存在,您可以将会话设置为默认,会将上述训练和预测提交给该集群,或者您也可以明确指定session=***

# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
import mars
# set the session as the default one
sess = mars.new_session('http://<web_ip>:<web_port>')

# submitted to cluster by default
run_pytorch_script('torch_demo.py', n_workers=2)

# Or, session could be specified as well
run_pytorch_script('torch_demo.py', n_workers=2, session=sess)

Mars数据集#

为了使用Mars处理数据,我们实现了一个 MarsDataset,可以将 Mars对象 (mars.tensor.Tensor, mars.dataframe.DataFrame, mars.dataframe.Series) 转换为 torch.util.data.Dataset

from mars.learn.contrib.pytorch import MarsDataset, RandomSampler

data = mt.random.rand(1000, 32, dtype='f4')
labels = mt.random.randint(0, 2, (1000, 10), dtype='f4')

train_dataset = MarsDataset(data, labels)
train_sampler = RandomSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            sampler=train_sampler)

现在,run_pytorch_script() 允许将数据传递给脚本。因此,您可以通过 mars 预处理数据,然后将数据传递给脚本。

import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder


df = md.read_csv('ionosphere.data', header=None)
feature_data = df.iloc[:, :-1].astype('float32')
feature_data.execute()
labels = df.iloc[:, -1]
labels = LabelEncoder().fit_transform(labels.execute().fetch())
label = label.astype('float32')

run_pytorch_script(
    "torch_script.py", n_workers=2, data={'feature_data': feature_data, 'labels': labels},
    port=9945, session=sess)

torch_script.py

from mars.learn.contrib.pytorch import DistributedSampler
from mars.learn.contrib.pytorch import MarsDataset
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data


def get_model():
    return nn.Sequential(
        nn.Linear(34, 10),
        nn.ReLU(),
        nn.Linear(10, 8),
        nn.ReLU(),
        nn.Linear(8, 1),
        nn.Sigmoid(),
    )


def train(feature_data, labels):

    dist.init_process_group(backend='gloo')
    torch.manual_seed(42)

    data = feature_data
    labels = labels

    train_dataset = MarsDataset(data, labels)
    train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            shuffle=False,
                                            sampler=train_sampler)

    model = nn.parallel.DistributedDataParallel(get_model())
    optimizer = optim.Adam(model.parameters(),
                      lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(150):
        # 150 epochs
        running_loss = 0.0
        for _, (batch_data, batch_labels) in enumerate(train_loader):
            outputs = model(batch_data)
            loss = criterion(outputs.squeeze(), batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"epoch {epoch}, running_loss is {running_loss}")


if __name__ == "__main__":
    feature_data = globals()['feature_data']
    labels = globals()['labels']
    train(feature_data, labels)

结果:

epoch 147, running_loss is 0.29225416854023933
epoch 147, running_loss is 0.28132784366607666
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[7]: Object <op=RunPyTorch, key=dc3c7ab3a54a7289af15e8be5b334cf0>