调试Airflow DAGs¶
使用dag.test()测试DAG¶
要在IDE中调试DAG,您可以在DAG文件中设置dag.test命令,并通过单个序列化的Python进程运行您的DAG。
该方法可用于任何支持的数据库(包括本地SQLite数据库),并且会快速失败,因为所有任务都在单个进程中运行。
要设置dag.test,请将以下两行添加到您的DAG文件底部:
if __name__ == "__main__":
dag.test()
就这样!你可以添加可选参数来微调测试,除此之外你可以根据需要运行或调试DAG。以下是一些参数示例:
execution_date如果你想测试特定参数的DAG运行use_executor如果你想使用执行器测试DAG。默认情况下dag.test运行DAG时不使用执行器,它只是在本地运行所有任务。 通过提供此参数,DAG将使用Airflow环境中配置的执行器来执行。
有条件跳过任务¶
如果您不希望在本机环境中执行某些任务子集(例如依赖检查传感器或清理步骤),可以通过在mark_success_pattern参数中提供匹配其task_id的模式来自动将其标记为成功。
在以下示例中,测试dag不会等待上游任何一个dag完成。相反,测试数据是手动导入的。清理步骤也被跳过,使得中间csv文件可供检查。
with DAG("example_dag", default_args=default_args) as dag:
sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
# ... run other tasks
cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])
[sensor, sensor2] >> collect_stats >> cleanup
if __name__ == "__main__":
ingest_testing_data()
run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")
与DebugExecutor的对比¶
与现已弃用的DebugExecutor类相比,dag.test命令具有以下优势:
它完全不需要运行执行器。任务一次运行一个,没有执行器或调度器日志。
它比使用DebugExecutor运行代码要快得多,因为它不需要经过调度器循环。
它不会执行回填操作。
在命令行调试Airflow DAGs¶
通过上述部分提到的相同两行代码添加,您现在也可以轻松使用pdb调试DAG。
在命令行运行python -m pdb 即可获得交互式调试体验。
root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>
Debug Executor (已弃用)¶
DebugExecutor 是一个调试工具,可以从IDE中使用。它是一个单进程执行器,用于排队TaskInstance并通过运行_run_raw_task方法来执行它们。
由于其特性,执行器可以与SQLite数据库一起使用。当与传感器一起使用时,执行器会将传感器模式更改为reschedule以避免阻塞DAG的执行。
此外,DebugExecutor 可以在快速失败模式下使用,该模式将使所有其他正在运行或计划的任务立即失败。要启用此选项,请设置 AIRFLOW__DEBUG__FAIL_FAST=True 或在您的 airflow.cfg 中调整 fail_fast 选项。有关设置配置的更多信息,请参阅 Setting Configuration Options。
IDE设置步骤:
在DAG文件末尾添加
main代码块使其可运行。
它将运行一个回填作业:
if __name__ == "__main__":
from airflow.utils.state import State
dag.clear()
dag.run()
在IDE的运行配置中设置
AIRFLOW__CORE__EXECUTOR=DebugExecutor。在此步骤中,您还应设置DAG所需的所有环境变量。运行/调试DAG文件。