可延迟操作符与触发器¶
标准的Operators和Sensors在运行时将占用整个工作槽位,即使它们处于空闲状态。例如,如果您只有100个工作槽位可用于运行任务,而您有100个DAG正在等待当前运行但空闲的传感器,那么您无法运行任何其他任务——即使整个Airflow集群实际上处于空闲状态。传感器的reschedule模式通过允许传感器仅在固定间隔运行来解决部分问题,但它不够灵活,仅允许使用时间作为恢复条件,而不支持其他标准。
这里就是使用可延迟操作符的场景。当操作符无事可做只能等待时,它可以通过延迟来暂停自身并释放工作线程给其他流程使用。当操作符延迟时,执行会转移到触发器,操作符指定的触发器将开始运行。触发器可以执行操作符所需的轮询或等待操作。当触发器完成轮询或等待后,它会发送信号让操作符恢复执行。在执行延迟阶段,由于工作已转移给触发器,任务不再占用工作线程槽位,从而释放更多工作负载容量。默认情况下,处于延迟状态的任务不会占用资源池槽位。如需改变此行为,可通过编辑对应资源池进行调整。
触发器是小型、异步的Python代码片段,设计用于在单个Python进程中运行。由于它们是异步的,可以高效地共存于Airflow的触发器组件中。
该流程的工作原理概述:
任务实例(运行中的算子)到达一个需要等待其他操作或条件的节点时,会将自己与触发器绑定并推迟执行,等待事件触发后恢复运行。这样释放了工作节点以运行其他任务。
新的触发器实例由Airflow注册,并由触发器进程拾取。
触发器会持续运行直到触发,此时调度器会重新安排其源任务。
调度器将任务重新排队到工作节点上执行。
作为DAG作者,您既可以使用预先编写的可延迟操作符,也可以自行编写。然而编写这些操作符需要满足特定的设计标准。
使用可延迟操作符¶
如果你想使用Airflow内置的预编写可延迟操作符,例如TimeSensorAsync,那么你只需要完成两个步骤:
确保您的Airflow安装至少运行一个
triggerer进程,以及正常的scheduler在你的DAG中使用可延迟的操作器/传感器
Airflow 自动为您处理并实现延迟流程。
如果您正在升级现有的DAG以使用可延迟操作符,Airflow包含了API兼容的传感器变体,例如TimeSensorAsync对应TimeSensor。将这些变体添加到您的DAG中即可使用可延迟操作符,无需其他更改。
请注意,您不能在自定义PythonOperator或TaskFlow Python函数内部使用延迟功能。延迟仅适用于传统的基于类的操作器。
编写可延迟操作符¶
编写可延迟操作符时,需要考虑以下要点:
您的算子必须使用触发器来延迟自身。您可以使用Airflow核心中包含的触发器,也可以编写自定义触发器。
当操作符被延迟时,它将被停止并从工作节点中移除,且不会自动保持状态。您可以通过指示Airflow在特定方法处恢复操作符,或通过传递特定的kwargs参数来保持状态。
你可以多次延迟执行,也可以在操作符完成重要工作之前或之后进行延迟。或者,当满足特定条件时你也可以选择延迟。例如,当系统无法立即给出响应时。延迟机制完全由你掌控。
任何操作符都可以延迟;不需要在类上进行特殊标记,也不仅限于传感器。
如果您想添加一个同时支持可延迟和不可延迟模式的算子或传感器,建议在算子的
__init__方法中添加deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),并用它来决定是否以可延迟模式运行该算子。您可以通过operator部分中的default_deferrable来配置所有支持在可延迟和不可延迟模式之间切换的算子和传感器的deferrable默认值。以下是一个支持两种模式的传感器示例。
import time
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferrable
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete",
)
else:
time.sleep(3600)
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return
编写触发器¶
一个触发器是通过继承BaseTrigger类来实现的,需要实现三个方法:
__init__: 一个用于接收操作符实例化时传入参数的方法。从2.10.0版本开始,我们可以直接从预定义的触发器启动任务执行。要使用此功能,__init__中的所有参数都必须是可序列化的。run: 一个异步方法,运行其逻辑并作为异步生成器产生一个或多个TriggerEvent实例。serialize: 返回重建此触发器所需的信息,作为类路径的元组,以及传递给__init__的关键字参数。
这个示例展示了一个基础触发器的结构,它是Airflow中DateTimeTrigger的极简版本:
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
class DateTimeTrigger(BaseTrigger):
def __init__(self, moment):
super().__init__()
self.moment = moment
def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
await asyncio.sleep(1)
yield TriggerEvent(self.moment)
代码示例展示了以下几点:
__init__和serialize是成对编写的。触发器在由操作员作为其延迟请求的一部分提交时实例化一次,然后在运行触发器的任何触发器进程上序列化并重新实例化。run方法被声明为async def,因为它必须是异步的,并且使用asyncio.sleep而不是常规的time.sleep(因为后者会阻塞进程)。当它发出事件时,会将
self.moment打包进去,因此如果这个触发器在多个主机上冗余运行,事件可以被去重。
触发器可以根据您的需求设计得复杂或简单,只要它们满足设计约束条件即可。它们可以以高可用方式运行,并自动分布在运行触发器的主机之间。我们建议您避免在触发器中使用任何类型的持久状态。触发器应该从它们的__init__中获取所需的一切,以便它们可以被序列化并自由移动。
如果你是异步Python编程的新手,在编写run()方法时需要格外小心。Python的异步模型意味着,如果在执行阻塞操作时没有正确使用await,代码可能会阻塞整个进程。Airflow会尝试检测进程阻塞代码,并在触发器日志中发出警告。你可以在编写触发器时设置变量PYTHONASYNCIODEBUG=1来启用Python的额外检查,确保你编写的是非阻塞代码。在进行文件系统调用时要特别小心,因为如果底层文件系统是基于网络的,它可能会造成阻塞。
在编写自己的触发器时需要注意一些设计约束:
run方法 必须是异步的(使用 Python 的 asyncio),并且在执行阻塞操作时正确地await。run必须yield其 TriggerEvents,而不是返回它们。如果在至少产生一个事件之前返回,Airflow 将认为这是一个错误,并使任何等待它的任务实例失败。如果抛出异常,Airflow 也会使任何依赖的任务实例失败。你应该假设一个触发器实例可以运行多次。如果发生网络分区且Airflow在另一台机器上重新启动触发器,就可能出现这种情况。因此,你必须注意副作用。例如,你可能不希望使用触发器来插入数据库行。
如果你的触发器设计为发出多个事件(目前不支持),那么每个发出的事件必须包含一个有效载荷,以便在触发器在多个位置运行时可用于去重事件。如果你只触发一个事件且不需要将信息传递回操作器,可以将有效载荷设置为
None。触发器可能会突然从一个触发器服务中移除,并在新的服务上启动。例如,如果子网变更导致网络分区,或者发生部署操作。如果需要,您可以实现
cleanup方法,该方法总是在run之后被调用,无论触发器是正常退出还是异常退出。为了使对触发器的任何更改生效,每当修改触发器时,都需要重新启动triggerer。
注意
目前触发器仅在首次事件触发前有效,因为它们仅用于恢复延迟任务,而任务会在首次事件触发后继续执行。不过,Airflow计划未来支持通过触发器启动DAG,届时多事件触发器将发挥更大作用。
触发器中的敏感信息¶
自Airflow 2.9.0版本起,触发器kwargs在存入数据库前会进行序列化和加密处理。这意味着您传递给触发器的任何敏感信息都将以加密形式存储在数据库中,并在从数据库读取时解密。
触发延迟¶
如果你想触发延迟执行,可以在操作符的任何位置调用self.defer(trigger, method_name, kwargs, timeout)。这会为Airflow抛出一个特殊异常。参数包括:
trigger: 你想要延迟的触发器实例。它将被序列化到数据库中。method_name: 当Airflow恢复时,您希望调用的操作器上的方法名称。kwargs: (可选) 调用方法时传递的额外关键字参数。默认为{}。timeout: (可选) 指定超时时间的timedelta,超过该时间后此延迟将失败,并导致任务实例失败。默认为None,表示没有超时限制。
这是一个关于传感器如何触发延迟的基本示例:
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context: Context) -> None:
self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
当你选择延迟执行时,你的操作符将在该点停止执行,并从当前工作节点移除。所有状态都不会保留,例如局部变量或self上设置的属性。当操作符恢复时,它将作为一个新实例重新开始。唯一能将状态从旧操作符实例传递到新实例的方式是通过method_name和kwargs。
当你的操作符恢复时,Airflow会向传递给method_name方法的kwargs中添加一个context对象和一个event对象。这个event对象包含来自触发操作符恢复的触发事件的有效载荷。根据触发器的不同,这可能对你的操作符很有用,比如它是一个状态码或用于获取结果的URL。或者,它可能是不重要的信息,比如一个日期时间。然而,你的method_name方法必须接受context和event作为关键字参数。
如果您的操作符从首次execute()方法(当它是新的时候)或由method_name指定的后续方法返回,它将被视为已完成并结束执行。
如果希望您的操作符有一个入口点,可以将method_name设置为execute,但它还必须接受event作为可选关键字参数。
让我们更深入地看看上面的WaitOneHourSensor示例。这个传感器只是触发器的一个简单封装。它依赖于触发器,并指定了触发器触发时回调的不同方法。当它立即返回时,会将传感器标记为成功。
self.defer 调用会引发 TaskDeferred 异常,因此它可以在操作符代码中的任何位置工作,即使是在 execute() 内部多层嵌套调用中。你也可以手动抛出 TaskDeferred,它使用与 self.defer 相同的参数。
execution_timeout 在操作器上的设定是基于总运行时间,而非每次延期间隔的执行时间。这意味着如果设置了execution_timeout,操作器可能在延迟期间或延迟后恢复运行的几秒钟内就失败。
从任务开始触发延迟¶
在2.10.0版本中新增。
如果您希望直接将任务递交给触发器而不经过工作节点,可以将类级别属性start_from_trigger设置为True,并在您的可延迟操作器中添加一个类级别属性start_trigger_args,该属性需包含一个具有以下4个属性的StartTriggerArgs对象:
trigger_cls: 可导入的触发器类路径。trigger_kwargs: 传递给trigger_cls初始化时的关键字参数。注意所有参数都必须是可序列化的。这是该功能的主要限制。next_method: 当Airflow恢复时,您希望它调用的操作器上的方法名称。next_kwargs: 调用next_method时传递给它的额外关键字参数。timeout: (可选) 指定超时时间的时间增量,超过此时间后此延迟将失败,并导致任务实例失败。默认为None,表示没有超时限制。
当延迟是execute方法唯一要做的事情时,这特别有用。这是对前一个例子的基本改进。在前一个例子中,我们使用了DateTimeTrigger,它接受一个类型为datetime.timedelta的参数delta,该参数不可序列化。因此,我们需要创建一个具有可序列化参数的新触发器。
from __future__ import annotations
import datetime
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
class HourDeltaTrigger(DateTimeTrigger):
def __init__(self, hours: int):
moment = timezone.utcnow() + datetime.timedelta(hours=hours)
super().__init__(moment=moment)
在传感器部分,我们需要将HourDeltaTrigger的路径作为trigger_cls提供。
from __future__ import annotations
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
start_from_trigger 和 trigger_kwargs 也可以在实例级别进行修改,以实现更灵活的配置。
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitTwoHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = {"hours": 2}
self.start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
映射任务的初始化阶段发生在调度程序将它们提交给执行器之后。因此,该功能提供的动态任务映射支持有限,其使用方式与标准实践有所不同。要启用动态任务映射支持,您需要在__init__方法中定义start_from_trigger和trigger_kwargs。请注意,您不需要同时定义这两个参数来使用此功能,但需要使用完全相同的参数名称。例如,如果您将参数定义为t_kwargs并将此值赋给self.start_trigger_args.trigger_kwargs,它将不会产生任何效果。当映射一个start_from_trigger设置为True的任务时,整个__init__方法将被跳过。调度程序将使用从partial和expand提供的start_from_trigger和trigger_kwargs(如果未提供,则回退到类属性中的值)来确定是否以及如何将任务提交给执行器或触发器。请注意,在此阶段不会解析XCom值。
触发器执行完成后,任务可能会被发送回工作节点以执行next_method,或者任务实例可能直接结束。(参见Exiting deferred task from Triggers)如果任务被发送回工作节点,在next_method执行前,__init__方法中的参数仍然会生效,但它们不会影响触发器的执行。
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitHoursSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(
self,
*args: list[Any],
trigger_kwargs: dict[str, Any] | None,
start_from_trigger: bool,
**kwargs: dict[str, Any],
) -> None:
# This whole method will be skipped during dynamic task mapping.
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
这将扩展为2个任务,其“hours”参数分别设置为1和2。
WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)
从触发器退出延迟任务¶
在2.10.0版本中新增。
如果您希望直接从触发器(triggerer)退出任务而不进入工作节点(worker),可以按照上述讨论,在您的可延迟操作符(deferrable operator)属性中指定实例级属性end_from_trigger。这样可以节省启动新工作节点所需的资源。
触发器可以有两种选项:它们可以将执行发送回工作器,或者直接结束任务实例。如果触发器自行结束任务实例,则method_name无关紧要,可以是None。否则,请提供在任务中恢复执行时应使用的method_name。
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
TaskSuccessEvent 和 TaskFailureEvent 是两种可以直接结束任务实例的事件。这会将任务标记为 task_instance_state 状态,并在适用时可选地推送xcom。以下是使用这些事件的示例:
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
在上面的例子中,如果触发器通过生成TaskSuccessEvent将end_from_trigger设置为True,则会直接结束任务实例。否则,它将使用操作符中指定的方法恢复任务实例。
注意
仅当可延迟操作器未集成监听器时,从触发器退出功能才有效。目前,当可延迟操作器的end_from_trigger属性设置为True且已集成监听器时,解析过程中会抛出异常以提示此限制。编写自定义触发器时,如果插件中添加了监听器,请确保触发器未设置为直接终止任务实例。若触发器作者将end_from_trigger属性更改为其他属性,DAG解析将不会抛出任何异常,依赖此任务的监听器也将失效。此限制将在未来版本中解决。
高可用性¶
触发器设计用于高可用性(HA)架构。如果您想运行高可用性设置,可以在多台主机上运行多个triggerer实例。与scheduler类似,它们通过正确的锁定机制和HA技术自动实现共存。
根据触发器执行的工作量大小,单个triggerer主机可承载数百至数万个触发器。默认情况下,每个triggerer的并发容量为1000个触发器。您可以通过--capacity参数调整可同时运行的触发器数量。如果尝试运行的触发器总数超过所有triggerer进程的总容量,部分触发器将延迟执行直至其他触发器完成。
Airflow 尝试一次只在一个地方运行触发器,并维护与所有当前运行的triggerers的心跳连接。如果某个triggerer宕机,或与运行Airflow数据库的网络断开连接,Airflow会自动将该主机上的触发器重新调度到其他地方运行。Airflow会等待(2.1 * triggerer.job_heartbeat_sec)秒,等待机器重新出现,然后再重新调度触发器。
这意味着触发器有可能(尽管不太可能)同时在多个位置运行。这种行为是触发器契约的设计预期,属于正常现象。Airflow会对同时运行在多个位置的触发器所触发的事件进行去重处理,因此这一过程对您的操作器来说是透明的。
请注意,每运行一个额外的triggerer都会导致与数据库建立一个额外的持久连接。
传感器中Mode=’reschedule’与Deferrable=True的区别¶
在Airflow中,传感器会等待特定条件满足后才继续执行下游任务。传感器有两种管理空闲期的方式:mode='reschedule'和deferrable=True。由于mode='reschedule'是Airflow中BaseSensorOperator特有的参数,它允许传感器在条件未满足时自行重新调度。而'deferrable=True'是某些操作器使用的约定,表示任务可以稍后重试(或延迟),但它并非Airflow内置的参数或模式。实际的重试行为会因具体操作器实现而异。
mode=’reschedule’ |
deferrable=True |
|---|---|
持续重新调度自身直到条件满足 |
空闲时暂停执行,条件变化时恢复 |
资源使用率较高(重复执行) |
资源使用率较低(空闲时暂停,释放工作槽位) |
预期会随时间变化的条件 (例如文件创建) |
等待外部事件或资源 (例如API响应) |
内置的重新调度功能 |
需要自定义逻辑来延迟任务并处理外部变更 |