Airflow监听器插件¶
Airflow 具备一项功能,允许通过插件添加监听器来监控和跟踪任务状态。
这是一个简单的Airflow监听器插件示例,用于跟踪任务状态并收集有关任务、DAG运行和DAG的有用元数据信息。
这是一个Airflow示例插件,用于创建Airflow的监听器插件。 该插件通过使用SQLAlchemy的事件机制工作。它监控 任务实例在表级别的状态变化并触发事件。 这将通知所有DAG中的所有任务。
在这个插件中,对象引用是从基类airflow.plugins_manager.AirflowPlugin派生的。
Listener插件在底层使用了pluggy应用。Pluggy是一个专为Pytest设计的插件管理和钩子调用应用。Pluggy实现了函数钩子功能,因此允许您通过自定义钩子来构建"可插拔"系统。
- Using this plugin, following events can be listened:
任务实例处于运行状态。
任务实例处于成功状态。
任务实例处于失败状态。
DAG运行处于运行状态。
DAG运行处于成功状态。
DAG运行处于失败状态。
在Airflow作业、调度器或回填作业等事件开始前
在停止类似airflow作业、调度器或回填作业的事件之前
监听器注册¶
一个包含监听器对象引用的监听器插件被注册为airflow插件的一部分。以下是实现新监听器的基本框架:
from airflow.plugins_manager import AirflowPlugin
# This is the listener file created where custom code to monitor is added over hookimpl
import listener
class MetadataCollectionPlugin(AirflowPlugin):
name = "MetadataCollectionPlugin"
listeners = [listener]
接下来,我们可以检查添加到listener中的代码,并查看每个监听器的实现方法。实现完成后,监听器部分会在所有DAG的任务执行期间被执行。
作为参考,以下是listener.py类中显示数据库表列表的插件代码:
这个示例监听任务实例处于运行状态时的情况
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to RUNNING.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that is running its dag_run,
task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
state: TaskInstanceState = task_instance.state
name: str = task_instance.task_id
start_date = task_instance.start_date
dagrun = task_instance.dag_run
dagrun_status = dagrun.state
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name} state:{state} start_date:{start_date}")
print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
同样地,可以实现在task_instance成功和失败后监听的代码。
这个示例监听当DAG运行状态变为失败时的情况
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")
同样地,可以实现在dag_run成功后和运行状态期间进行监听的代码。
实现监听器所需的插件文件作为Airflow插件的一部分被添加到$AIRFLOW_HOME/plugins/文件夹中,并在Airflow启动时加载。