# 隐藏
!pip install -Uqq nixtla fugue[spark]Spark
from nixtla.utils import in_colabIN_COLAB = in_colab()if not IN_COLAB:
from nixtla.utils import colab_badge
from dotenv import load_dotenv在Spark上分布式运行TimeGPT
Spark 是一个开源的分布式计算框架,旨在大规模数据处理。 在本指南中,我们将解释如何在 Spark 上使用 TimeGPT。
大纲:
if not IN_COLAB:
load_dotenv()
colab_badge('docs/tutorials/17_computing_at_scale_spark_distributed')1. 安装
None通过Fugue安装Spark。Fugue提供了一种易于使用的分布式计算接口,让用户能够在多个分布式计算框架(包括Spark)上执行Python代码
您可以使用 pip 安装 fugue:
pip install fugue[spark]
如果在分布式 Spark 集群上执行,请确保在所有工作节点上安装 nixtla 库。
2. 加载数据
您可以将数据加载为 pandas DataFrame。在本教程中,我们将使用一个包含来自不同市场的每小时电价的数据集。
import pandas as pd df = pd.read_csv(
'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv',
parse_dates=['ds'],
)
df.head()| unique_id | ds | y | |
|---|---|---|---|
| 0 | BE | 2016-10-22 00:00:00 | 70.00 |
| 1 | BE | 2016-10-22 01:00:00 | 37.10 |
| 2 | BE | 2016-10-22 02:00:00 | 37.10 |
| 3 | BE | 2016-10-22 03:00:00 | 44.75 |
| 4 | BE | 2016-10-22 04:00:00 | 37.10 |
3. 初始化 Spark
初始化Spark并将pandas DataFrame转换为Spark DataFrame。
from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()spark_df = spark.createDataFrame(df)
spark_df.show(5)4. 在Spark上使用TimeGPT
在Spark之上使用TimeGPT几乎与非分布式情况相同。唯一的区别是你需要使用Spark DataFrame。
首先,实例化NixtlaClient类。
from nixtla import NixtlaClientnixtla_client = NixtlaClient(
# defaults to os.environ.get("NIXTLA_API_KEY")
api_key = 'my_api_key_provided_by_nixtla'
)👍 使用 Azure AI 终端
要使用 Azure AI 终端,请设置
base_url参数:
nixtla_client = NixtlaClient(base_url="你的 Azure AI 终端", api_key="你的 api_key")
if not IN_COLAB:
nixtla_client = NixtlaClient()请使用 NixtlaClient 类中的任何方法,例如 forecast 或 cross_validation。
fcst_df = nixtla_client.forecast(spark_df, h=12)
fcst_df.show(5)📘 Azure AI 中可用模型
如果您使用的是 Azure AI 端点,请确保设置
model="azureai":
nixtla_client.forecast(..., model="azureai")对于公共 API,我们支持两种模型:
timegpt-1和timegpt-1-long-horizon。默认情况下,使用
timegpt-1。有关如何以及何时使用timegpt-1-long-horizon的信息,请参阅 本教程。
cv_df = nixtla_client.cross_validation(spark_df, h=12, n_windows=5, step_size=2)
cv_df.show(5)您还可以在Spark之上使用TimeGPT与外生变量。要做到这一点,请参阅外生变量教程。请记住,您需要使用Spark DataFrame,而不是使用pandas DataFrame。
5. 停止 Spark
完成后,停止 Spark 会话。
spark.stop()Give us a ⭐ on Github