对象存储¶
本教程展示如何使用对象存储API来管理位于对象存储(如S3、GCS和Azure Blob存储)中的对象。该API是作为Airflow 2.8的一部分引入的。
本教程涵盖数据工程和数据科学工作流中常用的一个简单模式:访问Web API、保存并分析结果。
先决条件¶
要完成本教程,您需要准备以下事项:
DuckDB,一个进程内分析数据库,可以通过运行
pip install duckdb来安装。一个S3存储桶,以及包含
s3fs的Amazon提供商。您可以通过运行pip install apache-airflow-providers-amazon[s3fs]来安装该提供商包。或者,您可以通过更改create_object_storage_path函数中的URL为适合您提供商的URL来使用不同的存储提供商, 例如将s3://替换为gs://以使用Google云存储,并安装不同的提供商。pandas,你可以通过运行pip install pandas来安装它。
创建ObjectStoragePath¶
ObjectStoragePath是一个类似路径的对象,表示对象存储上的路径。它是对象存储API的基本构建模块。
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
提供给ObjectStoragePath的URL中的用户名部分应为连接ID。 指定的连接将用于获取访问后端的正确凭证。如果省略,则将使用该后端的默认连接。
连接ID也可以通过关键字参数传入:
ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
这在重用为其他目的(例如Dataset)定义的URL时非常有用,这类URL通常不包含用户名部分。如果同时指定了显式关键字参数和URL中的用户名值,显式关键字参数将优先于URL中的用户名值。
在DAG的根目录实例化ObjectStoragePath是安全的。连接只有在路径被使用时才会创建。这意味着你可以在DAG的全局作用域中创建路径,并在多个任务中使用它。
将数据保存到对象存储¶
ObjectStoragePath的行为大多类似于pathlib.Path对象。您可以使用它直接在对象存储中保存和加载数据。因此,典型流程可能如下所示:
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
get_air_quality_data 调用芬兰气象研究所的API获取赫尔辛基地区的空气质量数据。它会将返回的json数据转换为Pandas DataFrame格式。然后将数据保存到对象存储中,并即时转换为parquet格式。
对象的键值会根据任务的逻辑日期自动生成, 这样我们每天运行它时,都会为当天创建一个新对象。我们 将这个键值与基础路径拼接,形成对象的完整路径。最后, 在将对象写入存储后,我们返回该对象的路径。这使得 我们可以在下一个任务中使用这个路径。
分析数据¶
在理解数据时,通常需要对其进行分析。Duck DB 是一个非常适合的工具。它是一个进程内分析数据库,允许您在内存中对数据运行 SQL 查询。
由于数据已经是parquet格式,我们可以使用read_parquet函数,
并且因为Duck DB和ObjectStoragePath都使用fsspec,我们可以将
ObjectStoragePath的后端注册到Duck DB中。ObjectStoragePath为此提供了fs
属性。然后我们可以使用Duck DB中的register_filesystem函数
将这个后端注册到Duck DB中。
在Duck DB中,我们可以从数据创建表并对其运行查询。查询结果以数据框形式返回,可用于进一步分析或保存到对象存储中。
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
您可能会注意到analyze函数并不知道对象的原始路径,而是通过参数传入并通过XCom获取。您无需重新实例化Path对象。此外,连接细节的处理也是透明的。
整合所有内容¶
最终的DAG看起来是这样的,它封装了所有内容以便我们可以运行它:
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath
API = "https://opendata.fmi.fi/timeseries"
aq_fields = {
"fmisid": "int32",
"time": "datetime64[ns]",
"AQINDEX_PT1H_avg": "float64",
"PM10_PT1H_avg": "float64",
"PM25_PT1H_avg": "float64",
"O3_PT1H_avg": "float64",
"CO_PT1H_avg": "float64",
"SO2_PT1H_avg": "float64",
"NO2_PT1H_avg": "float64",
"TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_objectstorage():
"""
### Object Storage Tutorial Documentation
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
obj_path = get_air_quality_data()
analyze(obj_path)
tutorial_objectstorage()