保存TensorDict和tensorclass对象¶
虽然我们可以使用save()
保存一个tensordict,但这将创建一个包含整个数据结构内容的单一文件。可以很容易想象到这种情况可能不是最优的!
TensorDict 序列化 API 主要依赖于 MemoryMappedTensor
,它用于在磁盘上独立写入张量,并使用模仿 TensorDict 的数据结构。
TensorDict的序列化速度可以比PyTorch的快一个数量级,这得益于save()
对pickle的依赖。本文档解释了如何使用TensorDict创建和与存储在磁盘上的数据进行交互。
保存内存映射的TensorDicts¶
当tensordict被转储为mmap数据结构时,每个条目对应一个*.memmap
文件,目录结构由键结构决定:通常,嵌套的键对应于子目录。
将数据结构保存为结构化的一组内存映射张量具有以下优势:
保存的数据可以部分加载。如果一个大模型保存在磁盘上,但只需要将其部分权重加载到在单独脚本中创建的模块上,那么只有这些权重会被加载到内存中。
保存数据是安全的:使用pickle库序列化大数据结构可能不安全,因为反序列化可以执行任意代码。TensorDict的加载API仅从保存的json文件和磁盘上保存的内存缓冲区中读取预选字段。
保存速度快:因为数据被写入多个独立的文件中,我们可以通过启动多个并发线程来分摊IO开销,每个线程独立访问一个专用文件。
保存数据的结构是显而易见的:目录树表明了数据内容。
然而,这种方法也有一些缺点:
并非每种数据类型都能被保存。
tensorclass
允许保存任何非张量数据:如果这些数据可以用json文件表示,将使用json格式。否则,非张量数据将作为备用方案,使用save()
独立保存。NonTensorData
类可用于在常规的TensorDict
实例中表示非张量数据。
tensordict 的内存映射 API 依赖于四个核心方法:
memmap_()
, memmap()
,
memmap_like()
和 load_memmap()
.
memmap_()
和 memmap()
方法将在磁盘上写入数据,无论是否修改包含数据的 tensordict 实例。这些方法可用于将模型序列化到磁盘上(我们使用多线程来加速序列化):
>>> model = nn.Transformer()
>>> weights = TensorDict.from_module(model)
>>> weights_disk = weights.memmap("/path/to/saved/dir", num_threads=32)
>>> new_weights = TensorDict.load_memmap("/path/to/saved/dir")
>>> assert (weights_disk == new_weights).all()
当需要在磁盘上预分配数据集时,应使用memmap_like()
,典型用法如下:
>>> def make_datum(): # used for illustration purposes
... return TensorDict({"image": torch.randint(255, (3, 64, 64)), "label": 0}, batch_size=[])
>>> dataset_size = 1_000_000
>>> datum = make_datum() # creates a single instance of a TensorDict datapoint
>>> data = datum.expand(dataset_size) # does NOT require more memory usage than datum, since it's only a view on datum!
>>> data_disk = data.memmap_like("/path/to/data") # creates the two memory-mapped tensors on disk
>>> del data # data is not needed anymore
如上所示,当将TensorDict`
的条目转换为MemoryMappedTensor
时,可以控制内存映射在磁盘上的保存位置,以便它们可以持久化并在以后加载。另一方面,也可以使用文件系统。要使用此功能,只需在上述三种序列化方法中丢弃prefix
参数。
当指定了prefix
时,数据结构遵循TensorDict的结构:
>>> import torch
>>> from tensordict import TensorDict
>>> td = TensorDict({"a": torch.rand(10), "b": {"c": torch.rand(10)}}, [10])
>>> td.memmap_(prefix="tensordict")
生成以下目录结构
tensordict
├── a.memmap
├── b
│ ├── c.memmap
│ └── meta.json
└── meta.json
meta.json
文件包含重建 tensordict 所需的所有相关信息,例如设备、批量大小,以及 tensordict 的子类型。
这意味着 load_memmap()
将能够重建复杂的嵌套结构,其中子 tensordict 的类型与父 tensordict 不同:
>>> from tensordict import TensorDict, tensorclass, TensorDictBase
>>> from tensordict.utils import print_directory_tree
>>> import torch
>>> import tempfile
>>> td_list = [TensorDict({"item": i}, batch_size=[]) for i in range(4)]
>>> @tensorclass
... class MyClass:
... data: torch.Tensor
... metadata: str
>>> tc = MyClass(torch.randn(3), metadata="some text", batch_size=[])
>>> data = TensorDict({"td_list": torch.stack(td_list), "tensorclass": tc}, [])
>>> with tempfile.TemporaryDirectory() as tempdir:
... data.memmap_(tempdir)
...
... loaded_data = TensorDictBase.load_memmap(tempdir)
... assert (loaded_data == data).all()
... print_directory_tree(tempdir)
tmpzy1jcaoq/
tensorclass/
_tensordict/
data.memmap
meta.json
meta.json
td_list/
0/
item.memmap
meta.json
1/
item.memmap
meta.json
3/
item.memmap
meta.json
2/
item.memmap
meta.json
meta.json
meta.json
处理现有的MemoryMappedTensor
¶
如果 TensorDict`
已经包含
MemoryMappedTensor
条目,有几种
可能的行为。
如果未指定
prefix
并且memmap()
被调用两次,生成的TensorDict将包含与原始数据相同的数据。>>> td = TensorDict({"a": 1}, []) >>> td0 = td.memmap() >>> td1 = td0.memmap() >>> td0["a"] is td1["a"] True
如果指定了
prefix
并且与现有的MemoryMappedTensor
实例的前缀不同,则会引发异常,除非传递了copy_existing=True:>>> with tempfile.TemporaryDirectory() as tmpdir_0: ... td0 = td.memmap(tmpdir_0) ... td0 = td.memmap(tmpdir_0) # works, results are just overwritten ... with tempfile.TemporaryDirectory() as tmpdir_1: ... td1 = td0.memmap(tmpdir_1) ... td_load = TensorDict.load_memmap(tmpdir_1) # works! ... assert (td_load == td).all() ... with tempfile.TemporaryDirectory() as tmpdir_1: ... td_load = TensorDict.load_memmap(tmpdir_1) # breaks!
此功能旨在防止用户无意中将内存映射张量从一个位置复制到另一个位置。
TorchSnapshot 兼容性¶
警告
由于torchsnapshot的维护即将停止。因此,我们将不会为tensordict与该库的兼容性实现新功能。
TensorDict 与 torchsnapshot 兼容,
这是一个 PyTorch 的检查点库。
TorchSnapshot 会独立保存每个张量,并使用一个模仿你的 tensordict 或 tensorclass 的数据结构。
此外,TensorDict 自然内置了必要的工具,可以在不将完整张量加载到内存中的情况下保存和加载大型数据集:换句话说,tensordict + torchsnapshot 的组合使得可以将数百 Gb 大小的张量加载到预分配的 MemmapTensor
上,而无需将其一次性加载到 RAM 中。
主要有两种使用场景:保存和加载适合内存的tensordicts,以及使用MemmapTensor
保存和加载存储在磁盘上的tensordicts。
通用用例:内存加载¶
如果您的目标tensordict没有预先分配,则此方法适用。 这提供了灵活性(您可以将任何tensordict加载到您的tensordict上,您 不需要提前知道其内容),并且此方法在编码上比其他方法稍微 容易一些。 然而,如果您的张量非常大且无法放入内存,则此方法可能会失败。 此外,它不允许您直接加载到您选择的设备上。
保存操作中需要记住的两个主要命令是:
>>> state = {"state": tensordict_source}
>>> snapshot = torchsnapshot.Snapshot.take(app_state=state, path="/path/to/my/snapshot")
要加载到目标tensordict上,您可以简单地加载快照并更新tensordict。在底层,此方法将调用tensordict_target.load_state_dict(state_dict)
,这意味着state_dict
将首先完全放入内存,然后加载到目标tensordict上:
>>> snapshot = Snapshot(path="/path/to/my/snapshot")
>>> state_target = {"state": tensordict_target}
>>> snapshot.restore(app_state=state_target)
这是一个完整的示例:
>>> import uuid
>>> import torchsnapshot
>>> from tensordict import TensorDict
>>> import torch
>>>
>>> tensordict_source = TensorDict({"a": torch.randn(3), "b": {"c": torch.randn(3)}}, [])
>>> state = {"state": tensordict}
>>> path = f"/tmp/{uuid.uuid4()}"
>>> snapshot = torchsnapshot.Snapshot.take(app_state=state, path=path)
>>> # later
>>> snapshot = torchsnapshot.Snapshot(path=path)
>>> tensordict2 = TensorDict()
>>> target_state = {
>>> "state": tensordict2
>>> }
>>> snapshot.restore(app_state=target_state)
>>> assert (tensordict == tensordict2).all()
保存和加载大数据集¶
如果数据集太大无法放入内存,上述方法可能会轻易崩溃。 我们利用torchsnapshot的功能,将张量以小块形式加载到预分配的目标位置。 这需要您知道目标数据的形状、设备等信息,但为了能够检查点您的模型或数据加载,这是一个值得付出的小代价!
与前面的例子相比,我们将不会使用load_state_dict()
方法
的TensorDict
,而是使用从目标对象获取的state_dict
,
我们将用保存的数据重新填充它。
再次,两行代码足以保存数据:
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tensordict_source.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path="/path/to/my/snapshot")
我们一直在使用torchsnapshot.StateDict
,并且我们明确调用了my_tensordict_source.state_dict(keep_vars=True)
,与之前的例子不同。现在,要将这个加载到目标tensordict上:
>>> snapshot = Snapshot(path="/path/to/my/snapshot")
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tensordict_target.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
在这个例子中,加载完全由torchsnapshot处理,即没有调用TensorDict.load_state_dict()
。
注意
这有两个重要的含义:
由于
LazyStackedTensorDict.state_dict()
(以及其他惰性tensordict类)在执行某些操作后返回数据的副本,加载到state-dict不会更新原始类。然而,由于支持state_dict()操作,这不会引发错误。同样地,由于状态字典是原地更新的,但使用
TensorDict.update()
或TensorDict.set()
时,目标tensordict中的缺失键将不会被注意到。
这是一个完整的示例:
>>> td = TensorDict({"a": torch.randn(3), "b": TensorDict({"c": torch.randn(3, 1)}, [3, 1])}, [3])
>>> td.memmap_()
>>> assert isinstance(td["b", "c"], MemmapTensor)
>>>
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=td.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path=f"/tmp/{uuid.uuid4()}")
>>>
>>>
>>> td_dest = TensorDict({"a": torch.zeros(3), "b": TensorDict({"c": torch.zeros(3, 1)}, [3, 1])}, [3])
>>> td_dest.memmap_()
>>> assert isinstance(td_dest["b", "c"], MemmapTensor)
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=td_dest.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
>>> # sanity check
>>> assert (td_dest == td).all()
>>> assert (td_dest["b"].batch_size == td["b"].batch_size)
>>> assert isinstance(td_dest["b", "c"], MemmapTensor)
最后,tensorclass 也支持这个功能。代码与上面的代码非常相似:
>>> from __future__ import annotations
>>> import uuid
>>> from typing import Union, Optional
>>>
>>> import torchsnapshot
>>> from tensordict import TensorDict, MemmapTensor
>>> import torch
>>> from tensordict.prototype import tensorclass
>>>
>>> @tensorclass
>>> class MyClass:
... x: torch.Tensor
... y: Optional[MyClass]=None
...
>>> tc = MyClass(x=torch.randn(3), y=MyClass(x=torch.randn(3), batch_size=[]), batch_size=[])
>>> tc.memmap_()
>>> assert isinstance(tc.y.x, MemmapTensor)
>>>
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tc.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path=f"/tmp/{uuid.uuid4()}")
>>>
>>> tc_dest = MyClass(x=torch.randn(3), y=MyClass(x=torch.randn(3), batch_size=[]), batch_size=[])
>>> tc_dest.memmap_()
>>> assert isinstance(tc_dest.y.x, MemmapTensor)
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tc_dest.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
>>>
>>> assert (tc_dest == tc).all()
>>> assert (tc_dest.y.batch_size == tc.y.batch_size)
>>> assert isinstance(tc_dest.y.x, MemmapTensor)