Shortcuts

worker.replay_buffer

回放缓冲区

IBuffer

class ding.worker.replay_buffer.base_buffer.IBuffer[源代码]
Overview:

缓冲区接口

Interfaces:

default_config, push, update, sample, clear, count, state_dict, load_state_dict

abstract clear() None[source]
Overview:

清除所有数据并重置相关变量。

abstract count() int[source]
Overview:

计算缓冲区中有多少有效数据。

Returns:
  • count (int): 有效数据的数量。

classmethod default_config() EasyDict[source]
Overview:

此缓冲类的默认配置。

Returns:
  • 默认配置 (EasyDict)

abstract load_state_dict(_state_dict: Dict[str, Any]) None[source]
Overview:

加载状态字典以重现缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。

abstract push(data: List[Any] | Any, cur_collector_envstep: int) None[来源]
Overview:

将数据推入缓冲区。

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (在Any类型中),或多个(int List[Any]类型)。

  • cur_collector_envstep (int): 收集器当前的环境步骤。

abstract sample(batch_size: int, cur_learner_iter: int) list[source]
Overview:

长度为 batch_size 的样本数据。

Arguments:
  • size (int): 将要采样的数据数量。

  • cur_learner_iter (int): 学习者的当前迭代次数。

Returns:
  • sampled_data (list): 一个长度为 batch_size 的数据列表。

abstract state_dict() Dict[str, Any][source]
Overview:

提供一个状态字典来记录当前缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。 通过这个字典,可以轻松地重现缓冲区。

abstract update(info: Dict[str, list]) None[source]
Overview:

更新数据信息,例如优先级。

Arguments:
  • 信息 (Dict[str, list]): 信息字典。键取决于特定的缓冲区类型。

NaiveReplayBuffer

class ding.worker.replay_buffer.naive_buffer.NaiveReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]
Overview:

简单的回放缓冲区,可以存储和采样数据。 这是一个没有优先级或其他高级功能的简单回放缓冲区实现。 该缓冲区涉及多线程/多进程,并保证线程安全,这意味着像 sample, push, clear 这样的方法都是相互互斥的。

Interface:

开始, 关闭, 推送, 更新, 采样, 清除, 计数, 状态字典, 加载状态字典, 默认配置

Property:

回放缓冲区大小, 推送计数

clear() None[source]
Overview:

清除所有数据并重置相关变量。

close() None[source]
Overview:

清除缓冲区;如果启用了track_used_data,则加入缓冲区的used_data_remover线程。

count() int[source]
Overview:

计算缓冲区中有多少有效数据。

Returns:
  • count (int): 有效数据的数量。

classmethod default_config() EasyDict
Overview:

此缓冲类的默认配置。

Returns:
  • 默认配置 (EasyDict)

load_state_dict(_state_dict: dict) None[source]
Overview:

加载状态字典以重现缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。

push(data: List[Any] | Any, cur_collector_envstep: int) None[source]
Overview:

将数据推入缓冲区。

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (在Any类型中),或多个(int List[Any]类型)。

  • cur_collector_envstep (int): Collector’s current env step.

    在简单缓冲区中未使用,但为了兼容性而保留。

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None[source]
Overview:

样本数据长度为 size

Arguments:
  • size (int): 将要采样的数据数量。

  • cur_learner_iter (int): 学习者的当前迭代。在简单缓冲区中未使用,但为了兼容性而保留。

  • sample_range (slice): 用于采样的缓冲区切片,例如 slice(-10, None),这意味着仅在最后10个数据中进行采样

  • replace (bool): 是否进行有放回的抽样

Returns:
  • sample_data (list): 一个长度为 size 的数据列表。

start() None[source]
Overview:

如果启用了 track_used_data,则启动缓冲区的 used_data_remover 线程。

state_dict() dict[source]
Overview:

提供一个状态字典来记录当前缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。 通过这个字典,可以轻松地重现缓冲区。

update(info: dict) None[source]
Overview:

Naive Buffer 不需要更新任何信息,但此方法为了兼容性而保留。

高级回放缓冲区

class ding.worker.replay_buffer.advanced_buffer.AdvancedReplayBuffer(cfg: dict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]
Overview:

优先回放缓冲区源自 NaiveReplayBuffer。 此回放缓冲区增加了:

  1. 通过段树实现的优先经验回放。

  2. 数据质量监控。监控每个数据的使用次数和过时情况。

  3. 吞吐量监控和控制。

  4. 在tensorboard或文本中记录2)和3)。

Interface:

开始, 关闭, 推送, 更新, 采样, 清除, 计数, 状态字典, 加载状态字典, 默认配置

Property:

beta, replay_buffer_size, push_count

clear() None[source]
Overview:

清除所有数据并重置相关变量。

close() None[source]
Overview:

清除缓冲区;如果启用了track_used_data,则加入缓冲区的used_data_remover线程。加入周期性吞吐量监视器,刷新tensorboard记录器。

count() int[source]
Overview:

计算缓冲区中有多少有效数据。

Returns:
  • count (int): 有效数据的数量。

classmethod default_config() EasyDict
Overview:

此缓冲类的默认配置。

Returns:
  • 默认配置 (EasyDict)

load_state_dict(_state_dict: dict, deepcopy: bool = False) None[source]
Overview:

加载状态字典以重现缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。

push(data: List[Any] | Any, cur_collector_envstep: int) None[source]
Overview:

将数据推入缓冲区。

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (在Any类型中),或多个(int List[Any]类型)。

  • cur_collector_envstep (int): 收集器当前的环境步骤。

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None) list | None[source]
Overview:

样本数据长度为 size

Arguments:
  • size (int): 将要采样的数据数量。

  • cur_learner_iter (int): 学习者的当前迭代次数,用于计算陈旧性。

  • sample_range (slice): 用于采样的缓冲区切片,例如 slice(-10, None),这意味着仅在最后10个数据中进行采样

Returns:
  • sample_data (list): 一个长度为 size 的数据列表

ReturnsKeys:
  • 必要的:原始键(例如obsactionnext_obsrewardinfo),replay_unique_idreplay_buffer_idx

  • 可选(如果使用优先级):IS, priority

start() None[source]
Overview:

如果启用了 track_used_data,则启动缓冲区的 used_data_remover 线程。

state_dict() dict[source]
Overview:

提供一个状态字典来记录当前缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。 通过这个字典,可以轻松地重现缓冲区。

update(info: dict) None[source]
Overview:

更新数据的优先级。使用repaly_buffer_idx进行定位,并使用replay_unique_id进行验证。

Arguments:
  • 信息 (dict): 包含优先级更新所需所有键的信息字典。

ArgumentsKeys:
  • 必要的:replay_unique_id, replay_buffer_idx, priority。所有值都是长度相同的列表。

EpisodeReplayBuffer

class ding.worker.replay_buffer.episode_buffer.EpisodeReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]
Overview:

剧集回放缓冲区是一个用于存储完整剧集的缓冲区,即剧集缓冲区中的每个元素都是一个剧集。 一些算法不希望采样batch_size个完整剧集,而是希望采样一些具有固定长度的转换。因此,sample应该根据这些需求进行重写。

Interface:

开始, 关闭, 推送, 更新, 采样, 清除, 计数, 状态字典, 加载状态字典, 默认配置

__init__(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer') None
Overview:

初始化缓冲区

Arguments:
  • cfg (dict): 配置字典。

  • tb_logger (Optional['SummaryWriter']): 外部tb日志记录器。通常在串行模式下获取此参数。

  • exp_name (Optional[str]): 此实验的名称。

  • instance_name (Optional[str]): 此实例的名称。

clear() None
Overview:

清除所有数据并重置相关变量。

close() None
Overview:

清除缓冲区;如果启用了track_used_data,则加入缓冲区的used_data_remover线程。

count() int
Overview:

计算缓冲区中有多少有效数据。

Returns:
  • count (int): 有效数据的数量。

classmethod default_config() EasyDict
Overview:

此缓冲类的默认配置。

Returns:
  • 默认配置 (EasyDict)

load_state_dict(_state_dict: dict) None
Overview:

加载状态字典以重现缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。

push(data: List[Any] | Any, cur_collector_envstep: int) None
Overview:

将数据推入缓冲区。

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (在Any类型中),或多个(int List[Any]类型)。

  • cur_collector_envstep (int): Collector’s current env step.

    在简单缓冲区中未使用,但为了兼容性而保留。

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None
Overview:

样本数据长度为 size

Arguments:
  • size (int): 将要采样的数据数量。

  • cur_learner_iter (int): 学习者的当前迭代。在简单缓冲区中未使用,但为了兼容性而保留。

  • sample_range (slice): 用于采样的缓冲区切片,例如 slice(-10, None),这意味着仅在最后10个数据中进行采样

  • replace (bool): 是否进行有放回的抽样

Returns:
  • sample_data (list): 一个长度为 size 的数据列表。

start() None
Overview:

如果启用了 track_used_data,则启动缓冲区的 used_data_remover 线程。

state_dict() dict
Overview:

提供一个状态字典来记录当前缓冲区。

Returns:
  • state_dict (Dict[str, Any]): 一个包含缓冲区中所有重要值的字典。 通过这个字典,可以轻松地重现缓冲区。

update(info: dict) None
Overview:

Naive Buffer 不需要更新任何信息,但此方法为了兼容性而保留。

create_buffer

Overview:

根据配置和其他参数创建一个缓冲区。

Arguments:
  • cfg (EasyDict): 缓冲区配置。

ArgumentsKeys:
  • 必要的:type

get_buffer_cls

Overview:

根据配置获取一个缓冲区类。

Arguments:
  • cfg (EasyDict): 缓冲区配置。

ArgumentsKeys:
  • 必要的:类型

工具

UsedDataRemover

class ding.worker.replay_buffer.utils.UsedDataRemover[源代码]
Overview:

UsedDataRemover 是一个用于移除不再使用的文件数据的工具。

Interface:

开始, 关闭, 添加已用数据

add_used_data(data: Any) None[source]
Overview:

删除self._used_data中的所有数据。然后加入delete_used_data线程。

Arguments:
  • data (Any): 将一个已使用的数据项添加到 self._used_data 中以便进一步移除。

close() None[source]
Overview:

删除self._used_data中的所有数据。然后加入delete_used_data线程。

start() None[source]
Overview:

启动delete_used_data线程。

采样数据属性监控

class ding.worker.replay_buffer.utils.SampledDataAttrMonitor(time_: BaseTime, expire: int | float)[source]
Overview:

SampledDataAttrMonitor 用于监控最近读取的 expire 次数的读取指标。 指标包括:读取时间;读取数据项使用的平均值和最大值;读取数据项优先级的平均值、最大值和最小值;陈旧度的平均值和最大值。

Interface:

__init__, fixed_time, current_time, freeze, unfreeze, register_attribute_value, __getattr__

Property:

时间, 过期

周期性吞吐量监控器

class ding.worker.replay_buffer.utils.PeriodicThruputMonitor(name, cfg, logger, tb_logger)[source]
Overview:

PeriodicThruputMonitor 是一个用于记录和打印日志(文本和tensorboard)的工具,记录在一段时间内有多少数据被推送/采样/移除/有效。对于tensorboard,您可以在‘buffer_{$NAME}_sec’中查看。

Interface:

关闭

Property:

推送数据计数, 样本数据计数, 移除数据计数, 有效计数

注意

thruput_log 线程在 __init__ 方法中被初始化和启动,因此 PeriodicThruputMonitor 只提供一个单一的接口 close