# 隐藏
!pip install -Uqq nixtla fugue[spark]
Spark
from nixtla.utils import in_colab
= in_colab() IN_COLAB
if not IN_COLAB:
from nixtla.utils import colab_badge
from dotenv import load_dotenv
在Spark上分布式运行TimeGPT
Spark 是一个开源的分布式计算框架,旨在大规模数据处理。 在本指南中,我们将解释如何在 Spark 上使用 TimeGPT
。
大纲:
if not IN_COLAB:
load_dotenv()'docs/tutorials/17_computing_at_scale_spark_distributed') colab_badge(
1. 安装
None通过Fugue安装Spark。Fugue提供了一种易于使用的分布式计算接口,让用户能够在多个分布式计算框架(包括Spark)上执行Python代码
您可以使用 pip
安装 fugue
:
pip install fugue[spark]
如果在分布式 Spark
集群上执行,请确保在所有工作节点上安装 nixtla
库。
2. 加载数据
您可以将数据加载为 pandas
DataFrame。在本教程中,我们将使用一个包含来自不同市场的每小时电价的数据集。
import pandas as pd
= pd.read_csv(
df 'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv',
=['ds'],
parse_dates
) df.head()
unique_id | ds | y | |
---|---|---|---|
0 | BE | 2016-10-22 00:00:00 | 70.00 |
1 | BE | 2016-10-22 01:00:00 | 37.10 |
2 | BE | 2016-10-22 02:00:00 | 37.10 |
3 | BE | 2016-10-22 03:00:00 | 44.75 |
4 | BE | 2016-10-22 04:00:00 | 37.10 |
3. 初始化 Spark
初始化Spark
并将pandas DataFrame转换为Spark
DataFrame。
from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate() spark
= spark.createDataFrame(df)
spark_df 5) spark_df.show(
4. 在Spark上使用TimeGPT
在Spark
之上使用TimeGPT
几乎与非分布式情况相同。唯一的区别是你需要使用Spark
DataFrame。
首先,实例化NixtlaClient
类。
from nixtla import NixtlaClient
= NixtlaClient(
nixtla_client # defaults to os.environ.get("NIXTLA_API_KEY")
= 'my_api_key_provided_by_nixtla'
api_key )
👍 使用 Azure AI 终端
要使用 Azure AI 终端,请设置
base_url
参数:
nixtla_client = NixtlaClient(base_url="你的 Azure AI 终端", api_key="你的 api_key")
if not IN_COLAB:
= NixtlaClient() nixtla_client
请使用 NixtlaClient
类中的任何方法,例如 forecast
或 cross_validation
。
= nixtla_client.forecast(spark_df, h=12)
fcst_df 5) fcst_df.show(
📘 Azure AI 中可用模型
如果您使用的是 Azure AI 端点,请确保设置
model="azureai"
:
nixtla_client.forecast(..., model="azureai")
对于公共 API,我们支持两种模型:
timegpt-1
和timegpt-1-long-horizon
。默认情况下,使用
timegpt-1
。有关如何以及何时使用timegpt-1-long-horizon
的信息,请参阅 本教程。
= nixtla_client.cross_validation(spark_df, h=12, n_windows=5, step_size=2)
cv_df 5) cv_df.show(
您还可以在Spark
之上使用TimeGPT
与外生变量。要做到这一点,请参阅外生变量教程。请记住,您需要使用Spark
DataFrame,而不是使用pandas DataFrame。
5. 停止 Spark
完成后,停止 Spark
会话。
spark.stop()
Give us a ⭐ on Github