血缘关系

注意

血缘关系支持功能目前处于高度实验性阶段,后续可能会发生变化。

Airflow可以帮助追踪数据的来源、处理过程以及随时间的流转情况。这有助于建立审计跟踪和数据治理,同时也能辅助数据流的调试。

Airflow通过任务的输入和输出来追踪数据。让我们通过一个示例来了解其工作原理。

import datetime
import pendulum

from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]

dag = DAG(
    dag_id="example_lineage",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="0 0 * * *",
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)

f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)

f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
    outlets.append(f_out)

run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)

入口(inlets)可以是上游任务ID的(列表),也可以静态定义为属性注解对象,例如File对象。出口(outlets)只能是属性注解对象。两者都会在运行时被渲染。然而,当某个任务的出口作为另一个任务的入口时,这些出口不会为下游任务重新渲染。

注意

如果操作器支持,操作器可以自动添加入口和出口。

在示例DAG任务中,run_this (task_id=run_me_first)是一个BashOperator,它接收3个输入参数:CAT1CAT2CAT3,这些参数是从列表中生成的。请注意data_interval_start是一个模板字段,将在任务运行时被渲染。

注意

在后台,Airflow 将血缘元数据准备作为任务 pre_execute 方法的一部分。当任务执行完成后,会调用 post_execute 并将血缘元数据推送到 XCOM 中。因此,如果您创建的自定义算子需要重写此方法,请确保分别用 prepare_lineageapply_lineage 装饰您的方法。

简写符号

也可以使用简写符号,其工作原理几乎与Unix命令行管道、输入和输出相同。 请注意,运算符优先级仍然适用。此外,|运算符仅在左侧定义了输出时才会工作(例如通过使用add_outlets(..)或原生支持血缘关系operator.supports_lineage == True)。

f_in > run_this | (run_this_last > outlets)

Hook 血缘关系

Airflow提供了一个强大的功能,不仅可以追踪任务之间的数据血缘关系,还能追踪这些任务中所用钩子的数据来源。 这一功能帮助您理解数据如何在Airflow管道中流动。

一个全局的HookLineageCollector实例作为收集血缘信息的中心枢纽。 Hook可以将它们交互的数据集详细信息发送给这个收集器。 收集器随后使用这些数据构建符合AIP-60标准的Datasets,这是一种描述数据集的标准格式。

from airflow.lineage.hook_lineage import get_hook_lineage_collector


class CustomHook(BaseHook):
    def run(self):
        # run actual code
        collector = get_hook_lineage_collector()
        collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
        collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})

HookLineageCollector收集的血缘数据可以通过HookLineageReader实例访问, 该实例注册在Airflow插件中。

from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin


class CustomHookLineageReader(HookLineageReader):
    def get_inputs(self):
        return self.lineage_collector.collected_datasets.inputs


class HookLineageCollectionPlugin(AirflowPlugin):
    name = "HookLineageCollectionPlugin"
    hook_lineage_readers = [CustomHookLineageReader]

如果在Airflow中没有注册HookLineageReader,则会使用默认的NoOpCollector。 该收集器不会创建符合AIP-60标准的数据集,也不会收集血缘信息。

血缘关系后端

可以通过在配置中提供LineageBackend的实例,将血缘指标推送到自定义后端:

[lineage]
backend = my.lineage.CustomBackend

后端应该继承自 airflow.lineage.LineageBackend

from airflow.lineage.backend import LineageBackend


class CustomBackend(LineageBackend):
    def send_lineage(self, operator, inlets=None, outlets=None, context=None):
        ...
        # Send the info to some external service

这篇内容对您有帮助吗?