与PyTorch集成#
本介绍将简要说明如何在Mars中集成PyTorch。
安装#
如果你在单台机器上使用Mars,例如在你的笔记本电脑上,请确保已安装PyTorch。
您可以通过pip安装PyTorch:
pip3 install torch torchvision torchaudio
访问 PyTorch 安装指南 获取更多信息。
另一方面,如果您准备在集群中使用 Mars,请确保每个工作节点上都安装了 PyTorch。
准备数据#
我们使用的数据集是ionosphere dataset,点击链接以下载数据。
准备PyTorch脚本#
现在我们创建一个名为 torch_demo.py 的Python文件,其中包含PyTorch的逻辑。
import os
import mars.dataframe as md
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
from sklearn.preprocessing import LabelEncoder
def prepare_data():
df = md.read_csv('ionosphere.data', header=None)
# split into input and output columns
X = df.iloc[:, :-1].to_tensor().astype('float32')
y = df.iloc[:, -1].to_tensor()
# convert Mars tensor to numpy ndarray
X, y = X.to_numpy(), y.to_numpy()
# encode string to integer
y = LabelEncoder().fit_transform(y)
return X, y
def get_model():
return nn.Sequential(
nn.Linear(34, 10),
nn.ReLU(),
nn.Linear(10, 8),
nn.ReLU(),
nn.Linear(8, 1),
nn.Sigmoid(),
)
def train():
dist.init_process_group(backend="gloo")
torch.manual_seed(42)
data, labels= prepare_data()
data = torch.from_numpy(data)
labels = torch.from_numpy(labels)
train_dataset = torch.utils.data.TensorDataset(data, labels.float())
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=32,
shuffle=False,
sampler=train_sampler)
model = nn.parallel.DistributedDataParallel(get_model())
optimizer = optim.Adam(model.parameters(),
lr=0.001)
criterion = nn.BCELoss()
for epoch in range(150): # 150 epochs
running_loss = 0.0
for _, (batch_data, batch_labels) in enumerate(train_loader):
outputs = model(batch_data)
loss = criterion(outputs.squeeze(), batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"epoch {epoch}, running_loss is {running_loss}")
if __name__ == "__main__":
train()
Mars库,包括DataFrame等,可以直接用于处理海量数据并加速预处理。
通过Mars运行PyTorch脚本#
现在可以通过 run_pytorch_script() 提交PyTorch脚本。
In [1]: from mars.learn.contrib.pytorch import run_pytorch_script
In [2]: run_pytorch_script("torch_demo.py", n_workers=2)
task: <Task pending coro=<Event.wait() running at ./mars-dev/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f04c5027cd0>()]>>
...
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[4]: Object <op=RunPyTorch, key=d5c40e502b77310ef359729692233d56>
分布式训练或推理#
有关部署的内容,请参考 在集群上运行 部分,或者有关在Kubernetes上运行Mars的内容,请参考 在Kubernetes上运行 部分。
从 torch_demo.py 可以看出,Mars 会自动设置环境变量。因此你不需要担心分布式设置,你需要做的就是编写一个合适的 distributed PyTorch script.。
一旦集群存在,您可以将会话设置为默认,会将上述训练和预测提交给该集群,或者您也可以明确指定session=***。
# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
import mars
# set the session as the default one
sess = mars.new_session('http://<web_ip>:<web_port>')
# submitted to cluster by default
run_pytorch_script('torch_demo.py', n_workers=2)
# Or, session could be specified as well
run_pytorch_script('torch_demo.py', n_workers=2, session=sess)
Mars数据集#
为了使用Mars处理数据,我们实现了一个 MarsDataset,可以将
Mars对象 (mars.tensor.Tensor, mars.dataframe.DataFrame,
mars.dataframe.Series) 转换为 torch.util.data.Dataset。
from mars.learn.contrib.pytorch import MarsDataset, RandomSampler
data = mt.random.rand(1000, 32, dtype='f4')
labels = mt.random.randint(0, 2, (1000, 10), dtype='f4')
train_dataset = MarsDataset(data, labels)
train_sampler = RandomSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=32,
sampler=train_sampler)
现在,run_pytorch_script() 允许将数据传递给脚本。因此,您可以通过 mars 预处理数据,然后将数据传递给脚本。
import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder
df = md.read_csv('ionosphere.data', header=None)
feature_data = df.iloc[:, :-1].astype('float32')
feature_data.execute()
labels = df.iloc[:, -1]
labels = LabelEncoder().fit_transform(labels.execute().fetch())
label = label.astype('float32')
run_pytorch_script(
"torch_script.py", n_workers=2, data={'feature_data': feature_data, 'labels': labels},
port=9945, session=sess)
torch_script.py
from mars.learn.contrib.pytorch import DistributedSampler
from mars.learn.contrib.pytorch import MarsDataset
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
def get_model():
return nn.Sequential(
nn.Linear(34, 10),
nn.ReLU(),
nn.Linear(10, 8),
nn.ReLU(),
nn.Linear(8, 1),
nn.Sigmoid(),
)
def train(feature_data, labels):
dist.init_process_group(backend='gloo')
torch.manual_seed(42)
data = feature_data
labels = labels
train_dataset = MarsDataset(data, labels)
train_sampler = DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=32,
shuffle=False,
sampler=train_sampler)
model = nn.parallel.DistributedDataParallel(get_model())
optimizer = optim.Adam(model.parameters(),
lr=0.001)
criterion = nn.BCELoss()
for epoch in range(150):
# 150 epochs
running_loss = 0.0
for _, (batch_data, batch_labels) in enumerate(train_loader):
outputs = model(batch_data)
loss = criterion(outputs.squeeze(), batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"epoch {epoch}, running_loss is {running_loss}")
if __name__ == "__main__":
feature_data = globals()['feature_data']
labels = globals()['labels']
train(feature_data, labels)
结果:
epoch 147, running_loss is 0.29225416854023933
epoch 147, running_loss is 0.28132784366607666
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[7]: Object <op=RunPyTorch, key=dc3c7ab3a54a7289af15e8be5b334cf0>