对象存储

本教程展示如何使用对象存储API来管理位于对象存储(如S3、GCS和Azure Blob存储)中的对象。该API是作为Airflow 2.8的一部分引入的。

本教程涵盖数据工程和数据科学工作流中常用的一个简单模式:访问Web API、保存并分析结果。

先决条件

要完成本教程,您需要准备以下事项:

  • DuckDB,一个进程内分析数据库,可以通过运行pip install duckdb来安装。

  • 一个S3存储桶,以及包含s3fs的Amazon提供商。您可以通过运行 pip install apache-airflow-providers-amazon[s3fs] 来安装该提供商包。或者,您可以通过更改 create_object_storage_path函数中的URL为适合您提供商的URL来使用不同的存储提供商, 例如将s3://替换为gs://以使用Google云存储,并安装不同的提供商。

  • pandas,你可以通过运行pip install pandas来安装它。

创建ObjectStoragePath

ObjectStoragePath是一个类似路径的对象,表示对象存储上的路径。它是对象存储API的基本构建模块。

airflow/example_dags/tutorial_objectstorage.py[source]

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

提供给ObjectStoragePath的URL中的用户名部分应为连接ID。 指定的连接将用于获取访问后端的正确凭证。如果省略,则将使用该后端的默认连接。

连接ID也可以通过关键字参数传入:

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

这在重用为其他目的(例如Dataset)定义的URL时非常有用,这类URL通常不包含用户名部分。如果同时指定了显式关键字参数和URL中的用户名值,显式关键字参数将优先于URL中的用户名值。

在DAG的根目录实例化ObjectStoragePath是安全的。连接只有在路径被使用时才会创建。这意味着你可以在DAG的全局作用域中创建路径,并在多个任务中使用它。

将数据保存到对象存储

ObjectStoragePath的行为大多类似于pathlib.Path对象。您可以使用它直接在对象存储中保存和加载数据。因此,典型流程可能如下所示:

airflow/example_dags/tutorial_objectstorage.py[source]

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

get_air_quality_data 调用芬兰气象研究所的API获取赫尔辛基地区的空气质量数据。它会将返回的json数据转换为Pandas DataFrame格式。然后将数据保存到对象存储中,并即时转换为parquet格式。

对象的键值会根据任务的逻辑日期自动生成, 这样我们每天运行它时,都会为当天创建一个新对象。我们 将这个键值与基础路径拼接,形成对象的完整路径。最后, 在将对象写入存储后,我们返回该对象的路径。这使得 我们可以在下一个任务中使用这个路径。

分析数据

在理解数据时,通常需要对其进行分析。Duck DB 是一个非常适合的工具。它是一个进程内分析数据库,允许您在内存中对数据运行 SQL 查询。

由于数据已经是parquet格式,我们可以使用read_parquet函数, 并且因为Duck DB和ObjectStoragePath都使用fsspec,我们可以将 ObjectStoragePath的后端注册到Duck DB中。ObjectStoragePath为此提供了fs 属性。然后我们可以使用Duck DB中的register_filesystem函数 将这个后端注册到Duck DB中。

在Duck DB中,我们可以从数据创建表并对其运行查询。查询结果以数据框形式返回,可用于进一步分析或保存到对象存储中。

airflow/example_dags/tutorial_objectstorage.py[source]

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

您可能会注意到analyze函数并不知道对象的原始路径,而是通过参数传入并通过XCom获取。您无需重新实例化Path对象。此外,连接细节的处理也是透明的。

整合所有内容

最终的DAG看起来是这样的,它封装了所有内容以便我们可以运行它:

airflow/example_dags/tutorial_objectstorage.py[source]


import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

这篇内容对您有帮助吗?