跳转到主要内容
Version: latest

数据

NautilusTrader平台提供了一套专门设计用于表示交易领域的内置数据类型。 这些数据类型包括:

  • OrderBookDelta (L1/L2/L3): 表示最细粒度的订单簿更新。
  • OrderBookDeltas (L1/L2/L3): 批量处理多个订单簿增量数据以提高处理效率。
  • OrderBookDepth10: 聚合订单簿快照(每边买卖盘最多10档)。
  • QuoteTick: 表示订单簿顶部的最佳买卖价格及其数量。
  • TradeTick: 交易对手之间的一次交易/撮合事件。
  • Bar: OHLCV (开盘价、最高价、最低价、收盘价、成交量) 柱状图/蜡烛图,使用指定的聚合方法进行汇总。
  • InstrumentStatus: 一个工具级别的状态事件。
  • InstrumentClose: 交易品种的收盘价。

NautilusTrader主要设计用于处理精细的订单簿数据,为回测中的执行模拟提供最高真实度。不过,根据所需的模拟精度,也可以在支持的任何市场数据类型上进行回测。

订单簿

一个用Rust实现的高性能订单簿可用于根据提供的数据维护订单簿状态。

OrderBook 实例会为每个交易品种在回测和实盘交易中维护,提供以下订单簿类型:

  • L3_MBO: 订单簿数据(MBO) 或L3数据,使用每个价格层级上的所有订单簿事件,以订单ID为键。
  • L2_MBP: 按价格汇总市场数据(MBP) 或 L2 数据,按价格级别聚合订单簿事件。
  • L1_MBP: 价格行情(MBP) 或 L1 数据,也称为最优买卖报价(BBO),仅捕获顶级更新。
note

盘口数据,例如QuoteTickTradeTickBar,也可以用于回测,市场运行在L1_MBP订单簿类型上。

交易品种

以下工具定义可用:

  • Betting: 表示博彩市场中的一种交易标的。
  • BinaryOption: 表示一种通用的二元期权工具。
  • Cfd: 代表差价合约(CFD)金融工具。
  • Commodity: 代表现货/现金市场中的大宗商品工具。
  • CryptoFuture: 代表一种可交割的期货合约工具,以加密资产作为标的物并进行结算。
  • CryptoPerpetual: 代表加密货币永续期货合约工具(也称为永续互换)。
  • CurrencyPair: 表示现货/现金市场中的通用货币对工具。
  • Equity: 表示通用的股票类金融工具。
  • FuturesContract: 代表一种通用的可交割期货合约工具。
  • FuturesSpread: 表示一种通用的可交割期货价差工具。
  • Index: 表示一个通用的指数工具。
  • OptionContract: 表示一个通用的期权合约工具。
  • OptionSpread: 表示一个通用的期权价差工具。
  • Synthetic: 表示一种合成工具,其价格通过公式从组成工具中衍生而来。

K线数据与聚合

K线数据简介

一个柱状图(也称为蜡烛图、K线图)是一种数据结构,表示特定时间段内的价格和成交量信息,包括:

  • 开盘价
  • 最高价
  • 最低价格
  • 收盘价
  • 交易量(或使用tick作为交易量的代理指标)

这些柱状图是通过聚合方法生成的,该方法根据特定标准对数据进行分组。

数据聚合的目的

在NautilusTrader中,数据聚合将细粒度的市场数据转换为结构化的柱状图或蜡烛图,主要出于以下几个原因:

  • 为技术指标和策略开发提供数据。
  • 因为时间聚合数据(如分钟柱)通常足以满足许多策略的需求。
  • 相比高频L1/L2/L3市场数据,可降低成本。

聚合方法

该平台实现了多种聚合方法:

名称描述分类
TICKAggregation of a number of ticks.Threshold
TICK_IMBALANCEAggregation of the buy/sell imbalance of ticks.Threshold
TICK_RUNSAggregation of sequential buy/sell runs of ticks.Information
VOLUMEAggregation of traded volume.Threshold
VOLUME_IMBALANCEAggregation of the buy/sell imbalance of traded volume.Threshold
VOLUME_RUNSAggregation of sequential runs of buy/sell traded volume.Information
VALUEAggregation of the notional value of trades (also known as "Dollar bars").Threshold
VALUE_IMBALANCEAggregation of the buy/sell imbalance of trading by notional value.Information
VALUE_RUNSAggregation of sequential buy/sell runs of trading by notional value.Threshold
MILLISECONDAggregation of time intervals with millisecond granularity.Time
SECONDAggregation of time intervals with second granularity.Time
MINUTEAggregation of time intervals with minute granularity.Time
HOURAggregation of time intervals with hour granularity.Time
DAYAggregation of time intervals with day granularity.Time
WEEKAggregation of time intervals with week granularity.Time
MONTHAggregation of time intervals with month granularity.Time

聚合类型

NautilusTrader实现了三种不同的数据聚合方法:

  1. 交易到柱状聚合: 从TradeTick对象(已执行的交易)创建柱状图

    • 使用场景:适用于分析执行价格或直接处理交易数据的策略。
    • 在柱状图规格中始终使用LAST价格类型。
  2. 报价到柱状聚合: 从QuoteTick对象(买价/卖价)创建柱状图

    • 使用场景:适用于专注于买卖价差或市场深度分析的策略。
    • 在柱状图规格中使用 BIDASKMID 价格类型。
  3. 柱状图聚合: 通过较小时间周期的Bar对象创建更大时间周期的Bar对象

    • 使用场景:用于将现有的较小时间周期柱线(1分钟)重新采样为较大时间周期(5分钟、每小时)。
    • 在规范中始终需要@符号。

柱状图类型与组件

NautilusTrader 定义了一个独特的柱类型(BarType 类),基于以下组件:

  • Instrument ID (InstrumentId): 指定该柱状图对应的具体交易品种。
  • Bar Specification (BarSpecification):
    • step: 定义每个柱状图的间隔或频率。
    • aggregation: 指定用于数据聚合的方法(参见上表)。
    • price_type: 表示K线柱的价格基准(例如:买价、卖价、中间价、最新价)。
  • 聚合源 (AggregationSource): 表示该柱状数据是在内部(在Nautilus内)聚合的
  • 或外部(由交易场所或数据提供商)。

K线类型也可以分为标准型复合型

  • 标准: 由细粒度市场数据生成,例如报价tick或交易tick。
  • 复合: 通过子采样从更高粒度的柱状类型派生而来(例如5分钟柱状由1分钟柱状聚合而成)。

聚合数据源

Bar数据的聚合可以是内部外部的:

  • INTERNAL: 该数据条是在本地Nautilus系统边界内聚合生成的。
  • EXTERNAL: 该数据条是在本地Nautilus系统边界之外聚合的(通常由交易场所或数据提供商完成)。

对于逐根K线的聚合操作,目标K线类型始终是INTERNAL(因为聚合是在NautilusTrader内部完成的),但源K线可以是INTERNALEXTERNAL,也就是说,您可以聚合外部提供的K线数据,也可以聚合已经聚合过的内部K线数据。

使用字符串语法定义Bar类型

标准柱线

您可以使用以下约定从字符串定义标准柱线类型:

{instrument_id}-{step}-{aggregation}-{price_type}-{INTERNAL | EXTERNAL}

例如,要定义一个BarType用于纳斯达克(XNAS)上苹果公司(AAPL)的交易(最后价格),使用由Nautilus本地聚合的5分钟间隔:

bar_type = BarType.from_str("AAPL.XNAS-5-MINUTE-LAST-INTERNAL")

复合K线

复合K线是通过将更高粒度的K线聚合为所需的K线类型而生成的。要定义复合K线,请使用以下约定:

{instrument_id}-{step}-{aggregation}-{price_type}-INTERNAL@{step}-{aggregation}-{INTERNAL | EXTERNAL}

注意事项:

  • 派生的柱状类型必须使用INTERNAL聚合源(因为这是该柱状数据的聚合方式)。
  • 采样柱类型必须比衍生柱类型具有更高的粒度。
  • 采样工具ID被推断为与衍生柱类型的ID相匹配。
  • 复合K线可以从INTERNALEXTERNAL聚合源进行聚合。

例如,要定义一个BarType用于纳斯达克(XNAS)上AAPL股票交易(最后成交价),使用由Nautilus本地聚合的5分钟间隔,从外部聚合的1分钟间隔柱状图:

bar_type = BarType.from_str("AAPL.XNAS-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL")

聚合语法示例

BarType字符串格式既编码了目标柱状类型,也可选地编码了源数据类型:

{instrument_id}-{step}-{aggregation}-{price_type}-{source}@{step}-{aggregation}-{source}

@符号后的部分是可选的,仅用于柱状图到柱状图的聚合:

  • 不使用@: 从TradeTick对象(当price_type为LAST时)或QuoteTick对象(当price_type为BIDASKMID时)聚合数据。
  • 使用 @: 从现有的 Bar 对象聚合数据(需指定源柱状图类型)。

交易到柱状图示例

def on_start(self) -> None:
# Define a bar type for aggregating from TradeTick objects
# Uses price_type=LAST which indicates TradeTick data as source
bar_type = BarType.from_str("6EH4.XCME-50-VOLUME-LAST-INTERNAL")

# Request historical data (will receive bars in on_historical_data handler)
self.request_bars(bar_type)

# Subscribe to live data (will receive bars in on_bar handler)
self.subscribe_bars(bar_type)

报价转K线示例

def on_start(self) -> None:
# Create 1-minute bars from ASK prices (in QuoteTick objects)
bar_type_ask = BarType.from_str("6EH4.XCME-1-MINUTE-ASK-INTERNAL")

# Create 1-minute bars from BID prices (in QuoteTick objects)
bar_type_bid = BarType.from_str("6EH4.XCME-1-MINUTE-BID-INTERNAL")

# Create 1-minute bars from MID prices (middle between ASK and BID prices in QuoteTick objects)
bar_type_mid = BarType.from_str("6EH4.XCME-1-MINUTE-MID-INTERNAL")

# Request historical data and subscribe to live data
self.request_bars(bar_type_ask) # Historical bars processed in on_historical_data
self.subscribe_bars(bar_type_ask) # Live bars processed in on_bar

逐K线示例

def on_start(self) -> None:
# Create 5-minute bars from 1-minute bars (Bar objects)
# Format: target_bar_type@source_bar_type
# Note: price type (LAST) is only needed on the left target side, not on the source side
bar_type = BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL")

# Request historical data (processed in on_historical_data(...) handler)
self.request_bars(bar_type)

# Subscribe to live updates (processed in on_bar(...) handler)
self.subscribe_bars(bar_type)

高级的逐K线示例

您可以创建复杂的聚合链,从已经聚合的柱状数据中进行进一步聚合:

# First create 1-minute bars from TradeTick objects (LAST indicates TradeTick source)
primary_bar_type = BarType.from_str("6EH4.XCME-1-MINUTE-LAST-INTERNAL")

# Then create 5-minute bars from 1-minute bars
# Note the @1-MINUTE-INTERNAL part identifying the source bars
intermediate_bar_type = BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-INTERNAL")

# Then create hourly bars from 5-minute bars
# Note the @5-MINUTE-INTERNAL part identifying the source bars
hourly_bar_type = BarType.from_str("6EH4.XCME-1-HOUR-LAST-INTERNAL@5-MINUTE-INTERNAL")

处理Bar数据:请求(Request)与订阅(Subscribe)的区别

NautilusTrader提供了两种不同的操作来处理K线数据:

  • request_bars(): 获取由on_historical_data()处理器处理的历史数据。
  • subscribe_bars(): 建立由on_bar()处理器处理的实时数据流。

这些方法在一个典型的工作流程中协同工作:

  1. 首先,request_bars()加载历史数据,用过去市场行为初始化指标或策略状态。
  2. 然后,subscribe_bars() 确保策略能够持续接收实时形成的新行情数据。

on_start()中的使用示例:

def on_start(self) -> None:
# Define bar type
bar_type = BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL")

# Request historical data to initialize indicators
# These bars will be delivered to the on_historical_data(...) handler in strategy
self.request_bars(bar_type)

# Subscribe to real-time updates
# New bars will be delivered to the on_bar(...) handler in strategy
self.subscribe_bars(bar_type)

# Register indicators to receive bar updates (they will be automatically updated)
self.register_indicator_for_bars(bar_type, self.my_indicator)

您的策略中需要包含以下处理器来接收数据:

def on_historical_data(self, data):
# Processes batches of historical bars from request_bars()
# Note: indicators registered with register_indicator_for_bars
# are updated automatically with historical data
pass

def on_bar(self, bar):
# Processes individual bars in real-time from subscribe_bars()
# Indicators registered with this bar type will update automatically and they will be updated before this handler is called
pass

带聚合的历史数据请求

在请求历史数据条进行回测或初始化指标时,您可以使用request_bars()方法,该方法同时支持直接请求和聚合:

# Request raw 1-minute bars (aggregated from TradeTick objects as indicated by LAST price type)
self.request_bars(BarType.from_str("6EH4.XCME-1-MINUTE-LAST-EXTERNAL"))

# Request 5-minute bars aggregated from 1-minute bars
self.request_bars(BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL"))

如果需要历史聚合K线数据,可以使用专门的请求方法 request_aggregated_bars()

# Request bars that are aggregated from historical trade ticks
self.request_aggregated_bars(BarType.from_str("6EH4.XCME-100-VOLUME-LAST-INTERNAL"))

# Request bars that are aggregated from other bars
self.request_aggregated_bars(BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL"))

常见陷阱

在请求数据前注册指标:确保在请求历史数据前注册指标,以便它们能正确更新。

# Correct order
self.register_indicator_for_bars(bar_type, self.ema)
self.request_bars(bar_type)

# Incorrect order
self.request_bars(bar_type) # Indicator won't receive historical data
self.register_indicator_for_bars(bar_type, self.ema)

时间戳

该平台使用两个基本的时间戳字段,这些字段出现在许多对象中,包括市场数据、订单和事件。 这些时间戳具有不同的用途,有助于在整个系统中维护精确的时间信息:

  • ts_event: 表示事件实际发生时间的UNIX时间戳(纳秒)。
  • ts_init: UNIX时间戳(纳秒),表示nautilustrader创建代表该事件的内部对象的时间。

示例

Event Typets_eventts_init
TradeTickTime when trade occurred at the exchange.Time when Nautilus received the trade data.
QuoteTickTime when quote occurred at the exchange.Time when Nautilus received the quote data.
OrderBookDeltaTime when order book update occurred at the exchange.Time when Nautilus received the order book update.
BarTime of the bar's closing (exact minute/hour).Time when Nautilus generated (for internal bars) or received the bar data (for external bars).
OrderFilledTime when order was filled at the exchange.Time when Nautilus received and processed the fill confirmation.
OrderCanceledTime when cancellation was processed at the exchange.Time when Nautilus received and processed the cancellation confirmation.
NewsEventTime when the news was published.Time when the event object was created (if internal event) or received (if external event) in Nautilus.
自定义事件事件条件实际发生的时间事件对象在Nautilus中被创建(内部事件)或接收(外部事件)的时间
note

ts_init字段代表的不仅仅是一个事件的"接收时间",而是一个更广泛的概念。 它表示在Nautilus内部初始化某个对象(如数据点或命令)时的时间戳。 这个区别很重要,因为ts_init不仅适用于"接收的事件"——它适用于任何内部初始化过程。

例如,ts_init字段也用于命令,其中接收的概念并不适用。 这种更广泛的定义确保了系统中各种对象类型的初始化时间戳处理的一致性。

延迟分析

双重时间戳系统支持平台内的延迟分析:

  • 延迟可以通过ts_init - ts_event计算得出。
  • 这一差异代表整个系统的延迟时间,包括网络传输时间、处理开销以及任何排队延迟。
  • 需要记住的是,产生这些时间戳的时钟很可能不同步。

特定环境行为

回测环境

  • 数据通过ts_init使用稳定排序进行排序。
  • 这种行为确保了确定性处理顺序,并模拟了包括延迟在内的真实系统行为。

实盘交易环境

  • Data is processed as it arrives, ensuring minimal latency and allowing for real-time decision-making.
    • ts_init 字段记录了Nautilus实时接收数据的确切时刻。
    • ts_event 反映了事件在外部发生的时间,使得可以准确比较外部事件时间与系统接收时间。
  • 我们可以利用ts_initts_event之间的差异来检测网络或处理延迟。

其他注意事项和考虑因素

  • 对于来自外部数据源的数据,ts_init始终与ts_event相同或在其之后。
  • 对于在Nautilus内部创建的数据,ts_initts_event可以相同,因为对象初始化与事件发生是同时进行的。
  • Not every type with a ts_init field necessarily has a ts_event field. This reflects cases where:
    • 对象的初始化与事件本身同时发生。
    • 外部事件时间的概念不适用。

持久化数据

ts_init字段表示消息最初被接收的时间。

数据流

该平台通过在所有系统环境上下文(例如backtestsandboxlive)中使用相同的数据流路径来确保一致性。数据主要通过MessageBus传输到DataEngine,然后分发给订阅或注册的处理程序。

对于需要更高灵活性的用户,该平台还支持创建自定义数据类型。 有关如何实现用户定义数据类型的详细信息,请参阅高级自定义数据指南

加载数据

NautilusTrader 为三种主要使用场景提供了数据加载和转换功能:

  • BacktestEngine提供数据以运行回测。
  • 将Nautilus特有的Parquet格式持久化到数据目录中,通过ParquetDataCatalog.write_data(...)以便后续与BacktestNode一起使用。
  • 用于研究目的(确保研究和回测之间的数据一致性)。

无论目标如何,流程保持不变:将各种外部数据格式转换为Nautilus数据结构。

要实现这一点,需要两个主要组件:

  • 一种数据加载器(通常针对特定原始数据源/格式),能够读取数据并返回符合所需Nautilus对象正确模式的pd.DataFrame
  • 一种特定数据类型的DataWrangler,接收pd.DataFrame并返回Nautilus对象的list[Data]

数据加载器

数据加载器组件通常针对原始数据源/格式和每个集成而特定。例如,Binance订单簿数据以其原始的CSV文件形式存储,其格式与Databento二进制编码(DBN)文件完全不同。

数据整理工具

数据整理器针对特定的Nautilus数据类型实现,可以在nautilus_trader.persistence.wranglers模块中找到。 目前已有的类型包括:

  • OrderBookDeltaDataWrangler
  • QuoteTickDataWrangler
  • TradeTickDataWrangler
  • BarDataWrangler
warning

尽管可能引起混淆,但越来越多的DataWrangler v2组件正在出现,这些组件通常会接收一个pd.DataFrame(通常采用不同的固定宽度Nautilus Arrow v2模式),并输出仅与正在开发的新版本Nautilus核心兼容的PyO3 Nautilus对象。

这些PyO3提供的数据对象与当前使用传统Cython对象的地方不兼容(例如,直接添加到BacktestEngine)。

转换管道

处理流程:

  1. 原始数据(例如CSV)被输入到管道中。
  2. DataLoader 处理原始数据并将其转换为 pd.DataFrame
  3. DataWrangler进一步处理pd.DataFrame以生成Nautilus对象列表。
  4. Nautilus list[Data] 是数据加载过程的输出。

下图展示了原始数据如何转换为Nautilus数据结构的过程:

  ┌──────────┐    ┌──────────────────────┐                  ┌──────────────────────┐
│ │ │ │ │ │
│ │ │ │ │ │
│ Raw data │ │ │ `pd.DataFrame` │ │
│ (CSV) ├───►│ DataLoader ├─────────────────►│ DataWrangler ├───► Nautilus `list[Data]`
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
└──────────┘ └──────────────────────┘ └──────────────────────┘

具体来说,这将涉及:

  • BinanceOrderBookDeltaDataLoader.load(...) 该函数读取由币安提供的CSV文件,并返回一个pd.DataFrame
  • OrderBookDeltaDataWrangler.process(...) 接收 pd.DataFrame 并返回 list[OrderBookDelta]

以下示例展示了如何在Python中完成上述操作:

from nautilus_trader import TEST_DATA_DIR
from nautilus_trader.adapters.binance.loaders import BinanceOrderBookDeltaDataLoader
from nautilus_trader.persistence.wranglers import OrderBookDeltaDataWrangler
from nautilus_trader.test_kit.providers import TestInstrumentProvider


# Load raw data
data_path = TEST_DATA_DIR / "binance" / "btcusdt-depth-snap.csv"
df = BinanceOrderBookDeltaDataLoader.load(data_path)

# Set up a wrangler
instrument = TestInstrumentProvider.btcusdt_binance()
wrangler = OrderBookDeltaDataWrangler(instrument)

# Process to a list `OrderBookDelta` Nautilus objects
deltas = wrangler.process(df)

数据目录

数据目录是Nautilus数据的中央存储库,以Parquet文件格式持久化存储。

我们选择Parquet作为存储格式的原因如下:

  • 在压缩比(存储大小)和读取性能方面,它的表现远优于CSV/JSON/HDF5等格式。
  • 它不需要任何单独运行的组件(例如数据库)。
  • 上手和运行都非常快速简单。

用于Parquet格式的Arrow模式要么统一来源于核心persistence Rust包,要么可从/serialization/arrow/schema.py模块获取。

note

2023-10-14: 当前计划是最终淘汰Python schemas模块,使所有模式都统一源自Rust核心。

初始化

数据目录可以从NAUTILUS_PATH环境变量初始化,也可以通过显式传入类似路径的对象来初始化。

以下示例展示了如何初始化一个数据目录,其中已有预先写入磁盘的数据存在于给定路径。

from pathlib import Path
from nautilus_trader.persistence.catalog import ParquetDataCatalog


CATALOG_PATH = Path.cwd() / "catalog"

# Create a new catalog instance
catalog = ParquetDataCatalog(CATALOG_PATH)

写入数据

新数据可以存储在目录中,这实际上是将给定数据以Nautilus特定的Parquet格式写入磁盘。 所有Nautilus内置的Data对象都受支持,任何继承自Data的数据都可以被写入。

以下示例展示了上述Binance OrderBookDelta对象列表的写入过程:

catalog.write_data(deltas)

基础名称模板

Nautilus不对特定数据类型和工具ID的数据如何在文件之间分区做出任何假设。

basename_template 关键字参数是输出文件的额外可选命名组件。 该模板应包含占位符,这些占位符将在运行时填充实际值。 这些值可以自动从数据中派生或作为额外的关键字参数提供。

例如,使用类似"{date}"这样的基础名称模板来处理AUD/USD.SIM的报价tick数据, 假设"date"是一个已提供或可推导的字段,可能会生成类似 "2023-01-01.parquet"这样的文件名,存放在"quote_tick/audusd.sim/"目录分类下。 如果未提供该参数,将应用默认的命名方案。此参数应作为关键字参数指定, 例如write_data(data, basename_template="{date}")

warning

任何已存在于文件名下的数据将被覆盖。 如果未提供basename_template,则很可能会覆盖该数据类型和工具ID的现有数据。为防止数据丢失,请确保basename_template(或默认命名方案) 为不同的数据集生成唯一的文件名。

Rust Arrow 模式实现可用于以下数据类型(性能增强):

  • OrderBookDelta
  • QuoteTick
  • TradeTick
  • Bar

读取数据

任何存储的数据都可以重新读取到内存中:

from nautilus_trader.core.datetime import dt_to_unix_nanos
import pandas as pd


start = dt_to_unix_nanos(pd.Timestamp("2020-01-03", tz=pytz.utc))
end = dt_to_unix_nanos(pd.Timestamp("2020-01-04", tz=pytz.utc))

deltas = catalog.order_book_deltas(instrument_ids=[instrument.id.value], start=start, end=end)

流式数据

当使用BacktestNode以流式模式运行回测时,数据目录可用于批量流式传输数据。

以下示例展示了如何通过初始化一个BacktestDataConfig配置对象来实现这一点:

from nautilus_trader.config import BacktestDataConfig
from nautilus_trader.model import OrderBookDelta


data_config = BacktestDataConfig(
catalog_path=str(catalog.path),
data_cls=OrderBookDelta,
instrument_id=instrument.id,
start_time=start,
end_time=end,
)

这个配置对象随后可以传入BacktestRunConfig,进而作为运行的一部分传递给BacktestNode。 更多详情请参阅Backtest (high-level API)教程。

数据迁移

NautilusTrader定义了一种内部数据格式,具体规范位于nautilus_model crate中。 这些模型会被序列化为Arrow记录批次,并写入Parquet文件。 当使用这些Nautilus格式的Parquet文件时,Nautilus的回测效率最高。

然而,在精度模式之间迁移数据模型以及处理模式变更可能会面临挑战。 本指南将介绍如何使用我们的实用工具来处理数据迁移。

迁移工具

nautilus_persistence 工具包提供了两个关键实用工具:

to_json

将Parquet文件转换为JSON同时保留元数据:

  • 创建两个文件:

    • .json: 包含反序列化后的数据
    • .metadata.json: 包含模式元数据和行组配置
  • 自动从文件名检测数据类型:

    • OrderBookDelta (包含 "deltas" 或 "order_book_delta")
    • QuoteTick (包含"报价"或"报价点")
    • TradeTick (包含 "trades" 或 "trade_tick")
    • Bar (包含"bars")

to_parquet

将JSON转换回Parquet格式:

  • 读取数据JSON和元数据JSON文件
  • 保留原始元数据中的行组大小
  • 使用ZSTD压缩
  • 创建 .parquet

迁移流程

以下迁移示例都使用交易数据(您也可以用同样的方式迁移其他数据类型)。 所有命令都应该从nautilus_core/persistence/ crate目录的根目录运行。

从标准精度(64位)迁移到高精度(128位)

本示例描述了一个场景,您希望从标准精度模式迁移到高精度模式。

note

如果您正在从使用Int64UInt64 Arrow数据类型表示价格和大小的目录迁移,请务必在编译写入初始JSON的代码之前查看提交e284162

1. 将标准精度Parquet转换为JSON:

cargo run --bin to_json trades.parquet

这将创建trades.jsontrades.metadata.json文件。

2. 从JSON转换为高精度Parquet格式:

添加 --features high-precision 标志以将数据写入为高精度(128位)模式的Parquet文件。

cargo run --features high-precision --bin to_parquet trades.json

这将创建一个包含高精度模式数据的trades.parquet文件。

迁移模式变更

这个示例描述了一个场景,您希望从一个模式版本迁移到另一个版本。

1. 从旧模式Parquet转换为JSON:

如果源数据使用高精度(128位)模式,请添加--features high-precision标志。

cargo run --bin to_json trades.parquet

这将创建trades.jsontrades.metadata.json文件。

2. 切换到新的架构版本:

git checkout <new-version>

3. 将JSON转换回新的Parquet模式:

cargo run --features high-precision --bin to_parquet trades.json

这将创建一个具有新模式的trades.parquet文件。

最佳实践

  • 始终先用小数据集测试迁移。
  • 保留原始文件的备份。
  • 迁移后验证数据完整性。
  • 在应用到生产数据之前,先在暂存环境中执行迁移。

自定义数据

得益于Nautilus的模块化设计,可以构建具有非常灵活数据流的系统,包括用户自定义的数据类型。本指南介绍了该功能的一些可能用例。

可以在Nautilus系统中创建自定义数据类型。首先需要通过继承Data类来定义您的数据。

info

由于Data不持有状态,严格来说不需要调用super().__init__()

from nautilus_trader.core import Data


class MyDataPoint(Data):
"""
This is an example of a user-defined data class, inheriting from the base class `Data`.

The fields `label`, `x`, `y`, and `z` in this class are examples of arbitrary user data.
"""

def __init__(
self,
label: str,
x: int,
y: int,
z: int,
ts_event: int,
ts_init: int,
) -> None:
self.label = label
self.x = x
self.y = y
self.z = z
self._ts_event = ts_event
self._ts_init = ts_init

@property
def ts_event(self) -> int:
"""
UNIX timestamp (nanoseconds) when the data event occurred.

Returns
-------
int

"""
return self._ts_event

@property
def ts_init(self) -> int:
"""
UNIX timestamp (nanoseconds) when the object was initialized.

Returns
-------
int

"""
return self._ts_init

Data抽象基类在系统中充当一种契约,要求所有数据类型都必须具备两个属性:ts_eventts_init。这两个属性分别表示事件发生时的UNIX纳秒时间戳和对象初始化时的时间戳。

推荐满足合约的方式是将ts_eventts_init赋值给支持字段,然后如上所示实现每个字段的@property装饰器(为保持完整性,文档字符串是从Data基类复制而来)。

info

这些时间戳使Nautilus能够使用单调递增的ts_init UNIX纳秒时间正确排序数据流以进行回测。

我们现在可以使用这种数据类型进行回测和实盘交易。例如, 我们可以创建一个适配器,能够解析并生成这种类型的对象, 然后将它们发送回DataEngine供订阅者使用。

你可以通过消息总线在你的智能体/策略中发布自定义数据类型,方法如下:

self.publish_data(
DataType(MyDataPoint, metadata={"some_optional_category": 1}),
MyDataPoint(...),
)

metadata字典可选地添加更细粒度的信息,这些信息用于在消息总线发布数据时的主题名称中。

额外的元数据信息也可以传递给BacktestDataConfig配置对象,以便在回测场景中丰富和描述自定义数据对象:

from nautilus_trader.config import BacktestDataConfig

data_config = BacktestDataConfig(
catalog_path=str(catalog.path),
data_cls=MyDataPoint,
metadata={"some_optional_category": 1},
)

您可以通过以下方式在您的智能体/策略中订阅自定义数据类型:

self.subscribe_data(
data_type=DataType(MyDataPoint,
metadata={"some_optional_category": 1}),
client_id=ClientId("MY_ADAPTER"),
)

client_id 提供了一个标识符,用于将数据订阅路由到特定的客户端。

这将导致您的actor/strategy将这些接收到的MyDataPoint对象传递给您的on_data方法。您需要检查类型,因为该方法作为所有自定义数据的灵活处理程序。

def on_data(self, data: Data) -> None:
# First check the type of data
if isinstance(data, MyDataPoint):
# Do something with the data

发布与接收信号数据

这里展示了一个使用MessageBus从智能体或策略发布和接收信号数据的示例。 信号是一种自动生成的定制数据,通过仅包含一个基本类型值(str, float, int, bool或bytes)的名称来标识。

self.publish_signal("signal_name", value, ts_event)
self.subscribe_signal("signal_name")

def on_signal(self, signal):
print("Signal", data)

期权希腊值示例

本示例演示如何为期权希腊值(特别是delta)创建自定义数据类型。 通过以下步骤,您可以创建自定义数据类型、订阅这些类型、发布这些类型,并将它们存储在CacheParquetDataCatalog中以实现高效检索。

import msgspec
from nautilus_trader.core import Data
from nautilus_trader.core.datetime import unix_nanos_to_iso8601
from nautilus_trader.model import DataType
from nautilus_trader.serialization.base import register_serializable_type
from nautilus_trader.serialization.arrow.serializer import register_arrow
import pyarrow as pa

from nautilus_trader.model import InstrumentId
from nautilus_trader.core.datetime import dt_to_unix_nanos, unix_nanos_to_dt, format_iso8601


class GreeksData(Data):
def __init__(
self, instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
ts_event: int = 0,
ts_init: int = 0,
delta: float = 0.0,
) -> None:
self.instrument_id = instrument_id
self._ts_event = ts_event
self._ts_init = ts_init
self.delta = delta

def __repr__(self):
return (f"GreeksData(ts_init={unix_nanos_to_iso8601(self._ts_init)}, instrument_id={self.instrument_id}, delta={self.delta:.2f})")

@property
def ts_event(self):
return self._ts_event

@property
def ts_init(self):
return self._ts_init

def to_dict(self):
return {
"instrument_id": self.instrument_id.value,
"ts_event": self._ts_event,
"ts_init": self._ts_init,
"delta": self.delta,
}

@classmethod
def from_dict(cls, data: dict):
return GreeksData(InstrumentId.from_str(data["instrument_id"]), data["ts_event"], data["ts_init"], data["delta"])

def to_bytes(self):
return msgspec.msgpack.encode(self.to_dict())

@classmethod
def from_bytes(cls, data: bytes):
return cls.from_dict(msgspec.msgpack.decode(data))

def to_catalog(self):
return pa.RecordBatch.from_pylist([self.to_dict()], schema=GreeksData.schema())

@classmethod
def from_catalog(cls, table: pa.Table):
return [GreeksData.from_dict(d) for d in table.to_pylist()]

@classmethod
def schema(cls):
return pa.schema(
{
"instrument_id": pa.string(),
"ts_event": pa.int64(),
"ts_init": pa.int64(),
"delta": pa.float64(),
}
)

发布与接收数据

以下是通过MessageBus从智能体或策略发布和接收数据的示例:

register_serializable_type(GreeksData, GreeksData.to_dict, GreeksData.from_dict)

def publish_greeks(self, greeks_data: GreeksData):
self.publish_data(DataType(GreeksData), greeks_data)

def subscribe_to_greeks(self):
self.subscribe_data(DataType(GreeksData))

def on_data(self, data):
if isinstance(GreeksData):
print("Data", data)

使用缓存读写数据

以下是一个使用Cache从智能体或策略中写入和读取数据的示例:

def greeks_key(instrument_id: InstrumentId):
return f"{instrument_id}_GREEKS"

def cache_greeks(self, greeks_data: GreeksData):
self.cache.add(greeks_key(greeks_data.instrument_id), greeks_data.to_bytes())

def greeks_from_cache(self, instrument_id: InstrumentId):
return GreeksData.from_bytes(self.cache.get(greeks_key(instrument_id)))

使用目录编写和读取数据

用于将自定义数据流式传输到feather文件或将其写入目录中的parquet文件 (需要使用register_arrow):

register_arrow(GreeksData, GreeksData.schema(), GreeksData.to_catalog, GreeksData.from_catalog)

from nautilus_trader.persistence.catalog import ParquetDataCatalog
catalog = ParquetDataCatalog('.')

catalog.write_data([GreeksData()])

自动创建自定义数据类

@customdataclass装饰器支持创建一个自定义数据类,默认实现了上述所有描述的功能。

如果需要,每个方法也可以被重写。以下是其使用示例:

from nautilus_trader.model.custom import customdataclass


@customdataclass
class GreeksTestData(Data):
instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX")
delta: float = 0.0


GreeksTestData(
instrument_id=InstrumentId.from_str("CL.GLBX"),
delta=1000.0,
ts_event=1,
ts_init=2,
)

自定义数据类型存根

为了提高开发便利性并改善IDE中的代码建议,您可以为自定义数据类型创建一个.pyi存根文件,包含正确的构造函数签名和属性类型提示。这在构造函数是运行时动态生成时特别有用,因为它能让IDE识别并提供该类的属性和方法建议。

例如,如果您在greeks.py中定义了一个自定义数据类,您可以使用以下构造函数签名创建对应的greeks.pyi文件:

from nautilus_trader.core import Data
from nautilus_trader.model import InstrumentId


class GreeksData(Data):
instrument_id: InstrumentId
delta: float

def __init__(
self,
ts_event: int = 0,
ts_init: int = 0,
instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
delta: float = 0.0,
) -> GreeksData: ...