常见问题解答

调度 / DAG文件解析

为什么任务没有被调度?

您的任务可能未被调度的原因有很多。以下是一些常见原因:

  • 你的脚本能否"编译",Airflow引擎能否解析它并找到你的DAG对象?要测试这一点,你可以运行airflow dags list并确认你的DAG出现在列表中。你也可以运行airflow tasks list foo_dag_id --tree来确认你的任务按预期出现在列表中。如果使用CeleryExecutor,你可能需要确认这在调度程序运行的地方和工作节点运行的地方都能正常工作。

  • 包含您DAG的文件内容中是否包含"airflow"和"DAG"字符串?在搜索DAG目录时,Airflow会忽略不包含"airflow"和"DAG"的文件,以防止DagBag解析导入与用户DAG位于同一位置的所有python文件。

  • 你的start_date设置正确吗?对于基于时间的DAG,任务在开始日期之后的第一个调度间隔过去之前不会被触发。

  • 你的schedule参数设置正确了吗?默认值是一天(datetime.timedelta(1))。你必须直接为你实例化的DAG对象指定一个不同的schedule,而不是作为default_param,因为任务实例不会覆盖其父DAG的schedule

  • 您的start_date是否超出了UI界面中可见的范围?如果您将start_date设置为比如3个月前的时间,您将无法在主界面中看到它,但您应该能在菜单 -> 浏览 ->任务 实例中找到。

  • 任务的所有依赖条件是否满足?该任务直接上游的任务实例必须处于success状态。此外,如果设置了depends_on_past=True,则前一个任务实例必须已成功或已被跳过(除非是该任务的首次运行)。另外,如果设置了wait_for_downstream=True,请确保理解其含义——前一个任务实例的所有直接下游任务必须已成功或已被跳过。您可以通过任务的Task Instance Details页面查看这些属性的设置情况。

  • 你需要的DagRun是否已创建并处于活动状态?DagRun代表整个DAG的一次具体执行,并具有状态(运行中、成功、失败等)。调度器在向前推进时会创建新的DagRun,但不会回溯时间创建新的。调度器仅评估running状态的DagRun,以查看可以触发哪些任务实例。请注意,清除任务实例(通过UI或CLI)会将DagRun的状态重置为运行中。你可以通过点击DAG的调度标签批量查看DagRun列表并更改状态。

  • 是否达到了DAG的concurrency参数限制?concurrency定义了 一个DAG允许同时运行的running任务实例数量,超过这个数量后任务将会进入队列。

  • 是否达到了DAG的max_active_runs参数限制?max_active_runs定义了允许同时运行的DAG实例数量。

您可能还想阅读关于Scheduler的内容,并确保完全理解调度器的工作周期。

如何提升DAG性能?

有一些Airflow配置可以支持更大的调度容量和频率:

DAGs 具有提高效率的配置:

操作器或任务还具有提高效率和调度优先级的配置:

  • max_active_tis_per_dag: 该参数控制每个任务在dag_runs中并发运行的任务实例数量。

  • pool: 参见 Pools

  • priority_weight: 参见 Priority Weights

  • queue: 仅适用于CeleryExecutor部署,详情参见Queues

如何降低DAG调度延迟/任务延迟?

Airflow 2.0 开箱即用具有较低的DAG调度延迟(特别是与Airflow 1.10.x相比),但如果您需要更高的吞吐量,可以启动多个调度器

如何基于另一个任务的失败触发任务?

你可以通过触发规则来实现这一点。

如何为不同的DAG文件控制DAG文件解析超时?

(仅适用于 Airflow >= 2.3.0)

您可以在airflow_local_settings.py中添加一个get_dagbag_import_timeout函数,该函数会在解析DAG文件前被调用。您可以根据不同的DAG文件返回不同的超时值。当返回值小于或等于0时,表示DAG解析过程中不会超时。

airflow_local_settings.py
 def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
     """
     This setting allows to dynamically control the DAG file parsing timeout.

     It is useful when there are a few DAG files requiring longer parsing times, while others do not.
     You can control them separately instead of having one value for all DAG files.

     If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
     """
     if "slow" in dag_file_path:
         return 90
     if "no-timeout" in dag_file_path:
         return 0
     return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")

有关如何配置本地设置的详细信息,请参阅配置本地设置

当存在大量(>1000个)DAG文件时,如何加速新文件的解析?

(仅适用于 Airflow >= 2.1.1)

file_parsing_sort_mode更改为modified_time,将 min_file_process_interval提高到600(10分钟)、6000(100分钟) 或更高的值。

如果文件最近被修改过,DAG解析器将跳过min_file_process_interval检查。

如果DAG是从单独的文件导入/创建的,这种方法可能不适用。例如: dag_file.py 导入 dag_loader.py,其中DAG文件的实际逻辑如下所示。 在这种情况下,如果 dag_loader.py 被更新但 dag_file.py 未被更新,这些更改将不会生效, 直到达到 min_file_process_interval 时间间隔,因为DAG解析器会检查 dag_file.py 文件的修改时间。

dag_file.py
 from dag_loader import create_dag

 globals()[dag.dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
dag_loader.py
 from airflow import DAG
 from airflow.decorators import task

 import pendulum


 def create_dag(dag_id, schedule, dag_number, default_args):
     dag = DAG(
         dag_id,
         schedule=schedule,
         default_args=default_args,
         pendulum.datetime(2021, 9, 13, tz="UTC"),
     )

     with dag:

         @task()
         def hello_world():
             print("Hello World")
             print(f"This is DAG: {dag_number}")

         hello_world()

     return dag

DAG构建

关于start_date有什么讲究?

start_date部分源自DagRun之前时代,但在许多方面仍然相关。创建新DAG时,您可能希望为任务设置全局start_date。这可以通过直接在DAG()对象中声明start_date来实现。DAG的第一个DagRun将基于start_date之后的第一个完整data_interval创建。例如,对于start_date=datetime(2024, 1, 1)schedule="0 0 3 * *"的DAG,第一个DAG运行将在2024-02-03午夜触发,其data_interval_start=datetime(2024, 1, 3)data_interval_end=datetime(2024, 2, 3)。从那时起,调度程序根据您的schedule创建新的DagRuns,并在满足依赖关系时运行相应的任务实例。当向DAG引入新任务时,您需要特别注意start_date,并可能希望重新激活非活动的DagRuns以使新任务正确加入。

我们建议不要使用动态值作为start_date,特别是 datetime.now(),因为这可能会造成混淆。任务会在 周期结束后触发,理论上一个@hourly DAG永远不会达到 当前时间之后的一小时,因为now()会不断变化。

之前,我们还建议使用与DAG的schedule相关的圆整start_date。这意味着@hourly会在00:00分:秒执行,@daily任务在午夜执行,@monthly任务在每月第一天执行。现在不再需要这样做。Airflow现在会自动对齐start_dateschedule,使用start_date作为开始查找的时刻。

你可以使用任何传感器或TimeDeltaSensor来延迟 在调度间隔内任务的执行。 虽然schedule确实允许指定一个datetime.timedelta 对象,但我们建议使用宏或cron表达式替代, 因为它强化了这种四舍五入调度的理念。

当使用depends_on_past=True时,需要特别注意start_date,因为过去依赖不仅针对任务指定的start_date的具体调度时间。在引入新的depends_on_past=True时,及时监控DagRun活动状态也很重要,除非您计划对新任务进行回填操作。

还需要注意的是,任务的start_date在使用回填CLI命令时会被回填操作的start_date命令覆盖。这使得那些设置了depends_on_past=True的任务能够真正开始回填。如果不是这样,回填操作根本无法启动。

使用时区

创建一个带时区感知的datetime对象(例如DAG的start_date)非常简单。只需确保使用pendulum提供带时区信息的日期即可。不要尝试使用标准库的timezone,因为已知它们存在限制,我们特意禁止在DAG中使用它们。

execution_date 是什么意思?

执行日期execution_date是一个历史名称,指的是所谓的逻辑日期,通常也代表DAG运行所对应数据区间的开始时间。

Airflow是作为ETL需求的解决方案而开发的。在ETL领域,通常需要对数据进行汇总。因此,如果你想汇总2016-02-19的数据,你会在UTC时间2016-02-20午夜进行,这时2016-02-19的所有数据都已可用。2016-02-192016-02-20午夜之间的这段时间被称为数据间隔,由于它代表的是2016-02-19日期的数据,这个日期也被称为运行的逻辑日期,或者这个DAG运行所执行的日期,因此也称为执行日期

为了向后兼容,日期时间值 execution_date 仍然作为 Template variables 以多种格式存在于 Jinja 模板字段中,以及 Airflow 的 Python API 中。它也被包含在传递给操作器执行函数的上下文字典中。

class MyOperator(BaseOperator):
    def execute(self, context):
        logging.info(context["execution_date"])

然而,您应该尽可能使用data_interval_startdata_interval_end,因为这些名称在语义上更准确且不易引起误解。

请注意ds(即data_interval_start的YYYY-MM-DD格式)指的是日期*字符串*,而非可能令人困惑的日期*开始*

提示

有关logical date的更多信息,请参阅Data IntervalRunning DAGs

如何动态创建DAG?

Airflow会在您的DAGS_FOLDER目录中查找包含全局命名空间内DAG对象的模块,并将找到的对象添加到DagBag中。了解这一点后,我们只需要一种方法来动态分配全局命名空间中的变量。在Python中,使用标准库中的globals()函数可以轻松实现这一点,该函数的行为类似于一个简单的字典。

def create_dag(dag_id):
    """
    A function returning a DAG object.
    """

    return DAG(dag_id)


for i in range(10):
    dag_id = f"foo_{i}"
    globals()[dag_id] = DAG(dag_id)

    # or better, call a function that returns a DAG object!
    other_dag_id = f"bar_{i}"
    globals()[other_dag_id] = create_dag(other_dag_id)

尽管Airflow支持在单个Python文件中定义多个DAG(无论是动态生成还是其他方式),但不建议这样做。因为从故障处理和部署的角度来看,Airflow更希望DAG之间保持更好的隔离性,而在同一文件中定义多个DAG违背了这一原则。

是否允许使用顶层Python代码?

虽然不建议在定义Airflow构造之外编写任何代码,但只要不破坏DAG文件处理器或将文件处理时间延长超过dagbag_import_timeout值,Airflow确实支持任意Python代码。

一个常见的例子是在构建动态DAG时违反时间限制,这通常需要从数据库等其他服务查询数据。与此同时,被请求的服务可能正因DAG文件处理器为处理文件而发起的大量数据请求而不堪重负。这些意外的交互可能导致服务性能下降,并最终导致DAG文件处理失败。

更多信息请参考DAG编写最佳实践

宏是否在另一个Jinja模板中解析?

无法在另一个Jinja模板中渲染Macros或任何Jinja模板。这种情况通常出现在user_defined_macros中。

dag = DAG(
    # ...
    user_defined_macros={"my_custom_macro": "day={{ ds }}"}
)

bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)

对于数据间隔开始时间为2020-01-01 00:00:00的DAG运行,这将输出"day={{ ds }}"而非"day=2020-01-01"。

bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)

直接在template_field中使用ds宏时,渲染后的值会显示为"day=2020-01-01"。

为什么next_dsprev_ds可能不包含预期值?

  • 当调度DAG时,next_ds next_ds_nodash prev_ds prev_ds_nodash 这些值是通过 logical_date 和DAG的调度计划(如果适用)计算得出的。如果将 schedule 设置为 None@once, 那么 next_ds, next_ds_nodash, prev_ds, prev_ds_nodash 的值将被设为 None

  • 当手动触发DAG时,调度将被忽略,且prev_ds == next_ds == ds

任务执行交互

TemplateNotFound 是什么意思?

TemplateNotFound 错误通常是由于在向触发Jinja模板化的操作符传递路径时与用户预期不符导致的。这种情况常见于BashOperators

另一个常被忽视的事实是,文件的解析是相对于管道文件所在位置的。你可以向DAG对象的template_searchpath添加其他目录,以支持其他非相对位置。

如何基于另一个任务的失败触发任务?

对于通过依赖关系关联的任务,如果任务执行依赖于其所有上游任务的失败,可以将trigger_rule设置为TriggerRule.ALL_FAILED;如果仅依赖于其中一个上游任务的失败,则可以设置为TriggerRule.ONE_FAILED

import pendulum

from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule


@task()
def a_func():
    raise AirflowException


@task(
    trigger_rule=TriggerRule.ALL_FAILED,
)
def b_func():
    pass


@dag(schedule="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
def my_dag():
    a = a_func()
    b = b_func()

    a >> b


dag = my_dag()

更多信息请参阅触发规则

如果任务之间没有依赖关系,您需要构建自定义Operator

Airflow 用户界面

为什么我的任务失败了但在UI中没有日志?

日志通常在任务达到终止状态时提供。有时,任务的正常生命周期会被中断,任务的工作进程无法写入任务日志。这通常由以下两种原因之一导致:

  1. 僵尸任务.

  2. 任务在队列中卡住后失败(Airflow 2.6.0+版本)。在队列中停留时间超过scheduler.task_queued_timeout的任务将被标记为失败,且Airflow UI中不会显示任务日志。

为每个任务设置重试次数可以大幅降低这些问题对工作流造成影响的概率。

如何防止每个Web服务器多次同步权限?

[fab] update_fab_perms配置在airflow.cfg中的值设置为False

如何减少Airflow UI页面加载时间?

如果您的DAG加载时间过长,可以将airflow.cfg中的default_dag_run_display_number配置值减小。该参数控制UI中默认显示的DAG运行次数,默认值为25

为什么暂停DAG的开关变红了?

如果由于任何原因暂停或取消暂停DAG失败,DAG切换开关将恢复为之前的状态并变为红色。如果观察到这种行为,请尝试再次暂停DAG,或者如果问题重现,请检查控制台或服务器日志。

MySQL及MySQL变种数据库

“MySQL服务器已断开连接”是什么意思?

您可能会偶尔遇到OperationalError错误,提示"MySQL服务器已断开连接"。这是由于连接池保持连接开启时间过长,导致您获得了一个已过期的旧连接。为确保使用有效连接,您可以设置sql_alchemy_pool_recycle (Deprecated)参数来指定连接在多少秒后失效并创建新连接。

Airflow是否支持扩展ASCII或Unicode字符?

如果您打算在Airflow中使用扩展ASCII或Unicode字符,必须向MySQL数据库提供正确的连接字符串,因为它们会明确定义字符集。

sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8

您会遇到由WTForms模板和其他Airflow模块抛出的UnicodeDecodeError错误,如下所示。

'ascii' codec can't decode byte 0xae in position 506: ordinal not in range(128)

如何修复异常:全局变量explicit_defaults_for_timestamp需要设置为开启(1)?

这意味着您的MySQL服务器中explicit_defaults_for_timestamp被禁用,您需要通过以下方式启用它:

  1. my.cnf文件的mysqld部分下设置explicit_defaults_for_timestamp = 1

  2. 重启Mysql服务器。

这篇内容对您有帮助吗?