Spark

# 隐藏
!pip install -Uqq nixtla fugue[spark]
from nixtla.utils import in_colab
IN_COLAB = in_colab()
if not IN_COLAB:
    from nixtla.utils import colab_badge
    from dotenv import load_dotenv

在Spark上分布式运行TimeGPT

Spark 是一个开源的分布式计算框架,旨在大规模数据处理。 在本指南中,我们将解释如何在 Spark 上使用 TimeGPT

大纲:

  1. 安装

  2. 加载数据

  3. 初始化 Spark

  4. 在 Spark 上使用 TimeGPT

  5. 停止 Spark

if not IN_COLAB:
    load_dotenv()
    colab_badge('docs/tutorials/17_computing_at_scale_spark_distributed')

1. 安装

None通过Fugue安装Spark。Fugue提供了一种易于使用的分布式计算接口,让用户能够在多个分布式计算框架(包括Spark)上执行Python代码

Note

您可以使用 pip 安装 fugue

pip install fugue[spark]

如果在分布式 Spark 集群上执行,请确保在所有工作节点上安装 nixtla 库。

2. 加载数据

您可以将数据加载为 pandas DataFrame。在本教程中,我们将使用一个包含来自不同市场的每小时电价的数据集。

import pandas as pd 
df = pd.read_csv(
    'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv',
    parse_dates=['ds'],
) 
df.head()
unique_id ds y
0 BE 2016-10-22 00:00:00 70.00
1 BE 2016-10-22 01:00:00 37.10
2 BE 2016-10-22 02:00:00 37.10
3 BE 2016-10-22 03:00:00 44.75
4 BE 2016-10-22 04:00:00 37.10

3. 初始化 Spark

初始化Spark并将pandas DataFrame转换为Spark DataFrame。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(df)
spark_df.show(5)

4. 在Spark上使用TimeGPT

Spark之上使用TimeGPT几乎与非分布式情况相同。唯一的区别是你需要使用Spark DataFrame。

首先,实例化NixtlaClient类。

from nixtla import NixtlaClient
nixtla_client = NixtlaClient(
    # defaults to os.environ.get("NIXTLA_API_KEY")
    api_key = 'my_api_key_provided_by_nixtla'
)

👍 使用 Azure AI 终端

要使用 Azure AI 终端,请设置 base_url 参数:

nixtla_client = NixtlaClient(base_url="你的 Azure AI 终端", api_key="你的 api_key")

if not IN_COLAB:
    nixtla_client = NixtlaClient()

请使用 NixtlaClient 类中的任何方法,例如 forecastcross_validation

fcst_df = nixtla_client.forecast(spark_df, h=12)
fcst_df.show(5)

📘 Azure AI 中可用模型

如果您使用的是 Azure AI 端点,请确保设置 model="azureai"

nixtla_client.forecast(..., model="azureai")

对于公共 API,我们支持两种模型:timegpt-1timegpt-1-long-horizon

默认情况下,使用 timegpt-1。有关如何以及何时使用 timegpt-1-long-horizon 的信息,请参阅 本教程

cv_df = nixtla_client.cross_validation(spark_df, h=12, n_windows=5, step_size=2)
cv_df.show(5)

您还可以在Spark之上使用TimeGPT与外生变量。要做到这一点,请参阅外生变量教程。请记住,您需要使用Spark DataFrame,而不是使用pandas DataFrame。

5. 停止 Spark

完成后,停止 Spark 会话。

spark.stop()

Give us a ⭐ on Github