调试Airflow DAGs

使用dag.test()测试DAG

要在IDE中调试DAG,您可以在DAG文件中设置dag.test命令,并通过单个序列化的Python进程运行您的DAG。

该方法可用于任何支持的数据库(包括本地SQLite数据库),并且会快速失败,因为所有任务都在单个进程中运行。

要设置dag.test,请将以下两行添加到您的DAG文件底部:

if __name__ == "__main__":
    dag.test()

就这样!你可以添加可选参数来微调测试,除此之外你可以根据需要运行或调试DAG。以下是一些参数示例:

  • execution_date 如果你想测试特定参数的DAG运行

  • use_executor 如果你想使用执行器测试DAG。默认情况下 dag.test 运行DAG时不使用执行器,它只是在本地运行所有任务。 通过提供此参数,DAG将使用Airflow环境中配置的执行器来执行。

有条件跳过任务

如果您不希望在本机环境中执行某些任务子集(例如依赖检查传感器或清理步骤),可以通过在mark_success_pattern参数中提供匹配其task_id的模式来自动将其标记为成功。

在以下示例中,测试dag不会等待上游任何一个dag完成。相反,测试数据是手动导入的。清理步骤也被跳过,使得中间csv文件可供检查。

with DAG("example_dag", default_args=default_args) as dag:
    sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
    sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
    collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
    # ... run other tasks
    cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])

    [sensor, sensor2] >> collect_stats >> cleanup

if __name__ == "__main__":
    ingest_testing_data()
    run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
    print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")

与DebugExecutor的对比

与现已弃用的DebugExecutor类相比,dag.test命令具有以下优势:

  1. 它完全不需要运行执行器。任务一次运行一个,没有执行器或调度器日志。

  2. 它比使用DebugExecutor运行代码要快得多,因为它不需要经过调度器循环。

  3. 它不会执行回填操作。

在命令行调试Airflow DAGs

通过上述部分提到的相同两行代码添加,您现在也可以轻松使用pdb调试DAG。 在命令行运行python -m pdb to dag file>.py即可获得交互式调试体验。

root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>

Debug Executor (已弃用)

DebugExecutor 是一个调试工具,可以从IDE中使用。它是一个单进程执行器,用于排队TaskInstance并通过运行_run_raw_task方法来执行它们。

由于其特性,执行器可以与SQLite数据库一起使用。当与传感器一起使用时,执行器会将传感器模式更改为reschedule以避免阻塞DAG的执行。

此外,DebugExecutor 可以在快速失败模式下使用,该模式将使所有其他正在运行或计划的任务立即失败。要启用此选项,请设置 AIRFLOW__DEBUG__FAIL_FAST=True 或在您的 airflow.cfg 中调整 fail_fast 选项。有关设置配置的更多信息,请参阅 Setting Configuration Options

IDE设置步骤:

  1. 在DAG文件末尾添加main代码块使其可运行。

它将运行一个回填作业:

if __name__ == "__main__":
    from airflow.utils.state import State

    dag.clear()
    dag.run()
  1. 在IDE的运行配置中设置AIRFLOW__CORE__EXECUTOR=DebugExecutor。在此步骤中,您还应设置DAG所需的所有环境变量。

  2. 运行/调试DAG文件。

这篇内容对您有帮助吗?