创建通知器

BaseNotifier是一个抽象类,为在Airflow中使用各种on_*__callback发送通知提供了基础框架。它旨在供提供商根据其特定需求进行扩展和定制。

要扩展BaseNotifier类,你需要创建一个继承它的新类。在这个新类中,你应该重写notify方法,实现你自己的通知发送逻辑。notify方法接收一个参数——Airflow上下文,其中包含当前任务和执行的相关信息。

你也可以设置template_fields属性来指定哪些属性应被渲染为模板。

以下是一个创建Notifier类的示例:

from airflow.notifications.basenotifier import BaseNotifier
from my_provider import send_message


class MyNotifier(BaseNotifier):
    template_fields = ("message",)

    def __init__(self, message):
        self.message = message

    def notify(self, context):
        # Send notification here, below is an example
        title = f"Task {context['task_instance'].task_id} failed"
        send_message(title, self.message)

使用通知器

一旦你有了通知器的实现,就可以在DAG定义中使用它,通过将其作为参数传递给on_*_callbacks。例如,你可以将其与on_success_callbackon_failure_callback一起使用,根据任务或DAG运行的状态发送通知。

以下是使用上述通知器的示例:

from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

from myprovider.notifier import MyNotifier

with DAG(
    dag_id="example_notifier",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    on_success_callback=MyNotifier(message="Success!"),
    on_failure_callback=MyNotifier(message="Failure!"),
):
    task = BashOperator(
        task_id="example_task",
        bash_command="exit 1",
        on_success_callback=MyNotifier(message="Task Succeeded!"),
    )

有关社区管理的通知列表,请参阅 Notifications

这篇内容对您有帮助吗?