血缘关系¶
注意
血缘关系支持功能目前处于高度实验性阶段,后续可能会发生变化。
Airflow可以帮助追踪数据的来源、处理过程以及随时间的流转情况。这有助于建立审计跟踪和数据治理,同时也能辅助数据流的调试。
Airflow通过任务的输入和输出来追踪数据。让我们通过一个示例来了解其工作原理。
import datetime
import pendulum
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
dag = DAG(
dag_id="example_lineage",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="0 0 * * *",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)
f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)
入口(inlets)可以是上游任务ID的(列表),也可以静态定义为属性注解对象,例如File对象。出口(outlets)只能是属性注解对象。两者都会在运行时被渲染。然而,当某个任务的出口作为另一个任务的入口时,这些出口不会为下游任务重新渲染。
注意
如果操作器支持,操作器可以自动添加入口和出口。
在示例DAG任务中,run_this (task_id=run_me_first)是一个BashOperator,它接收3个输入参数:CAT1、CAT2、CAT3,这些参数是从列表中生成的。请注意data_interval_start是一个模板字段,将在任务运行时被渲染。
注意
在后台,Airflow 将血缘元数据准备作为任务 pre_execute 方法的一部分。当任务执行完成后,会调用 post_execute 并将血缘元数据推送到 XCOM 中。因此,如果您创建的自定义算子需要重写此方法,请确保分别用 prepare_lineage 和 apply_lineage 装饰您的方法。
简写符号¶
也可以使用简写符号,其工作原理几乎与Unix命令行管道、输入和输出相同。
请注意,运算符优先级仍然适用。此外,|运算符仅在左侧定义了输出时才会工作(例如通过使用add_outlets(..)或原生支持血缘关系operator.supports_lineage == True)。
f_in > run_this | (run_this_last > outlets)
Hook 血缘关系¶
Airflow提供了一个强大的功能,不仅可以追踪任务之间的数据血缘关系,还能追踪这些任务中所用钩子的数据来源。 这一功能帮助您理解数据如何在Airflow管道中流动。
一个全局的HookLineageCollector实例作为收集血缘信息的中心枢纽。
Hook可以将它们交互的数据集详细信息发送给这个收集器。
收集器随后使用这些数据构建符合AIP-60标准的Datasets,这是一种描述数据集的标准格式。
from airflow.lineage.hook_lineage import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})
由HookLineageCollector收集的血缘数据可以通过HookLineageReader实例访问,
该实例注册在Airflow插件中。
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_datasets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
如果在Airflow中没有注册HookLineageReader,则会使用默认的NoOpCollector。
该收集器不会创建符合AIP-60标准的数据集,也不会收集血缘信息。
血缘关系后端¶
可以通过在配置中提供LineageBackend的实例,将血缘指标推送到自定义后端:
[lineage]
backend = my.lineage.CustomBackend
后端应该继承自 airflow.lineage.LineageBackend。
from airflow.lineage.backend import LineageBackend
class CustomBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
...
# Send the info to some external service