TaskFlow¶
在2.0版本中新增。
如果您主要使用纯Python代码而非Operators来编写DAG,那么TaskFlow API将让编写简洁的DAG变得更加容易,无需额外的样板代码,全部通过@task装饰器实现。
TaskFlow负责使用XComs在您的任务之间传递输入和输出,并自动计算依赖关系 - 当您在DAG文件中调用TaskFlow函数时,它不会立即执行,而是返回一个表示结果XCom的对象(一个XComArg),您可以将其用作下游任务或操作符的输入。例如:
from airflow.decorators import task
from airflow.operators.email import EmailOperator
@task
def get_ip():
return my_ip_service.get_main_ip()
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject':f'Server connected from {external_ip}',
'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email_notification',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)
这里有三个任务 - get_ip, compose_email, 和 send_email_notification。
前两个使用TaskFlow声明,并自动将get_ip的返回值传递给compose_email,不仅跨任务传递了XCom,还自动声明compose_email是get_ip的下游任务。
send_email_notification 是一个更传统的操作符,但它也可以使用 compose_email 的返回值来设置其参数,并再次自动确定它必须位于 compose_email 的下游。
你也可以使用普通值或变量来调用TaskFlow函数 - 例如,这将按预期工作(当然,在DAG执行之前不会运行任务内部的代码 - name值在此之前会作为任务参数持久化):
@task
def hello_name(name: str):
print(f'Hello {name}!')
hello_name('Airflow users')
如果你想了解更多关于使用TaskFlow的信息,请参考TaskFlow教程。
上下文¶
你可以通过将Airflow的上下文变量添加为关键字参数来访问它们,如下例所示:
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
或者,您可以在任务签名中添加**kwargs,这样所有Airflow上下文变量都可以通过kwargs字典访问:
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(**kwargs): ti: TaskInstance = kwargs["task_instance"] print(f"Run ID: {ti.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {ti.duration}") # Duration: 0.972019 dr: DagRun = kwargs["dag_run"] print(f"DAG Run queued at: {dr.queued_at}") # 2023-08-10 00:00:01+02:20
如需查看完整的上下文变量列表,请参阅context variables。
日志记录¶
要从您的任务函数中使用日志记录,只需导入并使用Python的日志系统:
logger = logging.getLogger("airflow.task")
通过这种方式创建的每一条日志记录都将被记录在任务日志中。
传递任意对象作为参数¶
于版本2.5.0中新增。
如前所述,TaskFlow使用XCom在任务间传递变量。这就要求作为参数使用的变量必须能够被序列化。Airflow开箱即支持所有内置类型(如int或str),同时也支持用@dataclass或@attr.define装饰的对象。以下示例展示了将经过@attr.define装饰的Dataset与TaskFlow结合使用的场景。
注意
使用Dataset的另一个好处是,当它作为输入参数时,会自动注册为inlet。如果任务的返回值是dataset或list[Dataset]],它也会自动注册为outlet。
import json
import pendulum
import requests
from airflow import Dataset
from airflow.decorators import dag, task
SRC = Dataset(
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
)
now = pendulum.now()
@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
@task()
def retrieve(src: Dataset) -> dict:
resp = requests.get(url=src.uri)
data = resp.json()
return data["data"]
@task()
def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
ret: dict[int, float] = {}
for year, celsius in temps.items():
ret[year] = float(celsius) * 1.8 + 32
return ret
@task()
def load(fahrenheit: dict[int, float]) -> Dataset:
filename = "/tmp/fahrenheit.json"
s = json.dumps(fahrenheit)
f = open(filename, "w")
f.write(s)
f.close()
return Dataset(f"file:///{filename}")
data = retrieve(SRC)
fahrenheit = to_fahrenheit(data)
load(fahrenheit)
etl()
自定义对象¶
您可能希望传递自定义对象。通常您会用@dataclass或
@attr.define装饰您的类,Airflow会自动处理所需操作。有时您可能想自行控制序列化过程。
为此,请在您的类中添加serialize()方法以及静态方法
deserialize(data: dict, version: int)。示例如下:
from typing import ClassVar
class MyCustom:
__version__: ClassVar[int] = 1
def __init__(self, x):
self.x = x
def serialize(self) -> dict:
return dict({"x": self.x})
@staticmethod
def deserialize(data: dict, version: int):
if version > 1:
raise TypeError(f"version > {MyCustom.version}")
return MyCustom(data["x"])
对象版本控制¶
最佳实践是对将用于序列化的对象进行版本控制。为此,请在您的类中添加
__version__: ClassVar[int] = 。Airflow假定您的类具有向后兼容性,
因此版本2能够反序列化版本1。如果您需要自定义反序列化逻辑,
请确保指定了deserialize(data: dict, version: int)。
注意
需要输入__version__的类型,且必须为ClassVar[int]
传感器与TaskFlow API¶
于版本2.5.0中新增。
有关使用TaskFlow API编写Sensor的示例,请参阅 在Sensor操作器中使用TaskFlow API。
历史记录¶
TaskFlow API是Airflow 2.0新增的功能,你很可能会遇到为旧版Airflow编写的DAG,它们使用PythonOperator来实现类似的目标,尽管需要编写更多的代码。
关于TaskFlow API的添加和设计的更多背景信息,可以参考其Airflow改进提案的一部分 AIP-31: "TaskFlow API"用于更清晰/更简单的DAG定义