调度器¶
Airflow调度器监控所有任务和DAG,一旦它们的依赖项完成就会触发任务实例。在后台,调度器会启动一个子进程,该子进程监控并与指定DAG目录中的所有DAG保持同步。默认情况下,调度器每分钟收集一次DAG解析结果,并检查是否有任何活动任务可以被触发。
Airflow调度器设计为在Airflow生产环境中作为持久服务运行。要启动它,您只需执行airflow scheduler命令。它使用airflow.cfg中指定的配置。
调度器使用配置的Executor来运行准备就绪的任务。
要启动调度程序,只需运行以下命令:
airflow scheduler
一旦调度程序成功运行,您的DAG将开始执行。
注意
第一个DAG运行是根据您DAG中任务的最小start_date创建的。
后续的DAG运行将根据您DAG的timetable生成。
对于使用cron或timedelta调度的DAG,调度器在覆盖的时间段结束之前不会触发您的任务。例如,将schedule设置为@daily的任务会在当天结束后运行。这种技术确保在执行DAG之前,该时间段所需的任何数据都已完全可用。
在UI中,看起来像是Airflow延迟一天运行您的任务。
注意
如果你以一天的schedule运行一个DAG,数据间隔从2019-11-21开始的运行将在2019-11-21T23:59之后触发。
让我们再重复一遍,调度器会在开始日期后的一个schedule运行您的作业,在间隔结束时。
关于如何调度DAG的详细信息,请参考DAG Runs。
注意
调度器设计用于实现高吞吐量。这是一个经过深思熟虑的设计决策,旨在尽可能快地调度任务。调度器会检查池中有多少可用空闲槽位,并在每次迭代中最多调度相应数量的任务实例。 这意味着只有当等待调度的任务数量超过队列槽位时,任务优先级才会发挥作用。因此可能会出现这样的情况:如果低优先级任务与高优先级任务处于同一批次,低优先级任务可能会先被调度。 想了解更多相关信息,可以参考这个GitHub讨论。
DAG文件处理¶
您可以让Airflow调度器负责启动将DAGs文件夹中的Python文件转换为包含待调度任务的DAG对象的过程。
有关如何实现此功能的详细信息,请参阅DAG文件处理
触发未来日期的DAG¶
如果想使用'外部触发器'来运行未来日期的数据间隔,请在airflow.cfg文件的scheduler部分设置allow_trigger_in_future = True。
此设置仅在您的DAG定义为schedule=None时生效。
当设置为False(默认值)时,如果您手动触发一个未来日期的数据间隔运行,
调度程序将不会执行它,直到其data_interval_start成为过去时间。
运行多个调度器¶
Airflow支持同时运行多个调度器——既出于性能考虑,也为了提高系统弹性。
概述¶
HA调度器的设计旨在利用现有的元数据数据库。这样做主要是为了操作简便性:每个组件已经需要与该数据库通信,通过不使用调度器之间的直接通信或共识算法(如Raft、Paxos等),也不使用其他共识工具(例如Apache Zookeeper或Consul),我们将“操作表面区域”保持在最小。
调度器现在使用序列化的DAG表示来进行调度决策,其调度循环的大致流程如下:
检查是否有任何DAG需要新的DagRun,并创建它们
检查一批DagRuns以查找可调度的TaskInstances或已完成的DagRuns
选择可调度的TaskInstances,在遵守Pool限制和其他并发限制的同时,将它们加入执行队列
然而,这确实对数据库提出了一些要求。
数据库要求¶
简而言之,PostgreSQL 12+ 或 MySQL 8.0+ 的用户已经准备就绪——你可以根据需要运行任意多个调度器副本——无需进行额外的设置或配置。如果你使用的是其他数据库,请继续阅读。
为了保持性能和吞吐量,调度循环中有一个部分会在内存中进行大量计算(因为每次为TaskInstance往返数据库会太慢),因此我们需要确保一次只有一个调度器处于这个关键部分——否则限制条件将无法被正确遵守。为此我们使用数据库行级锁(使用SELECT ... FOR UPDATE)。
这个关键环节是将TaskInstances从调度状态转入执行队列的过程,同时确保遵守各种并发和资源池限制。通过请求对Pool表中每一行数据施加行级写锁(大致相当于SELECT * FROM slot_pool FOR
UPDATE NOWAIT,但实际查询语句略有不同)来获取该关键区。
以下数据库得到全面支持并提供"最优"体验:
PostgreSQL 12+
MySQL 8.0及以上版本
警告
MariaDB在10.6.0版本之前没有实现SKIP LOCKED或NOWAIT SQL子句。
由于缺乏这些功能,不支持运行多个调度器,并且已有死锁错误的报告。MariaDB
10.6.0及后续版本可能可以正常使用多个调度器,但尚未经过测试。
注意
Microsoft SQL Server尚未经过高可用性(HA)测试。
优化您的Scheduler性能¶
影响调度程序性能的因素¶
调度器负责两项操作:
持续解析DAG文件并与数据库中的DAG保持同步
持续调度任务以执行
这两个任务由调度程序并行执行,并在不同的进程中独立运行。为了优化调度程序,需要考虑以下因素:
- The kind of deployment you have
需要共享DAG文件的文件系统类型(会影响持续读取DAG的性能)
文件系统的速度有多快(在许多分布式云文件系统的案例中,您可以通过额外付费来获得更高的吞吐量/更快的文件系统)
你的处理内存有多少
你有多少可用的CPU
您可用的网络吞吐量有多少
- The logic and definition of your DAG structure:
你有多少个DAG文件
你的文件中有多少个DAG
DAG文件的大小(请记住DAG解析器需要每隔n秒读取并解析该文件)
它们的复杂程度(即解析速度有多快、包含多少任务和依赖关系)
解析您的DAG文件是否涉及导入大量库或在顶层进行繁重处理 (提示!不应该这样做。参见Top level Python Code)
- The scheduler configuration
你有多少个调度器
你的调度器中有多少个解析进程
调度器在重新解析同一个DAG之间等待多长时间(这个过程会持续发生)
调度器单次循环处理的任务实例数量
每次循环应创建/调度多少个新的DAG运行
调度器应多久执行一次清理并检查孤立任务/接管它们
为了进行精细调优,了解Scheduler的底层工作原理很有帮助。 您可以参考Airflow Summit 2021的演讲 Deep Dive into the Airflow Scheduler talk来进行调优。
如何优化调度器的精细调优¶
Airflow提供了许多可调节的"旋钮"来微调性能,但具体需要调整哪些旋钮以获得最佳效果,这取决于您的特定部署环境、DAG结构、硬件资源和使用预期。在管理部署时,部分工作就是确定您要优化的目标。有些用户可以接受新DAG解析有30秒延迟,以换取较低的CPU使用率;而另一些用户则希望DAG在出现在DAGs文件夹时能近乎即时完成解析,即使这意味着更高的CPU使用率。
Airflow为您提供了灵活的决定权,但您应该找出对您来说最重要的性能方面,并决定要朝哪个方向调整哪些旋钮。
通常对于微调来说,您的方法应该与任何性能改进和优化相同(我们不会推荐任何特定工具——只需使用您通常用来观察和监控系统的工具):
使用您通常用于监控系统的正确工具集来监控系统极其重要。本文档不会详细介绍您可以使用的具体指标和工具,它只是描述了您应该监控哪些类型的资源,但您应该遵循监控的最佳实践来获取正确的数据。
决定哪方面的性能对你最重要(你想要改进什么)
观察您的系统以识别瓶颈所在:CPU、内存、I/O通常是主要限制因素
基于您的期望和观察结果 - 决定下一步改进方向,并重新审视您的性能表现和瓶颈。性能优化是一个迭代的过程。
哪些资源可能限制Scheduler的性能¶
有几个资源使用领域需要您特别注意:
文件系统性能。Airflow Scheduler 高度依赖解析(有时是大量)Python文件,这些文件通常位于共享文件系统上。Airflow Scheduler 会持续读取并重新解析这些文件。这些相同的文件需要能被工作节点访问,因此它们通常存储在分布式文件系统中。为此您可以使用各种文件系统(NFS、CIFS、EFS、GCS fuse、Azure 文件系统都是很好的例子)。您可以控制这些文件系统的各种参数并微调其性能,但这超出了本文档的范围。您应该观察文件系统的统计信息和使用情况,以确定问题是否来自文件系统性能。例如,有传闻证据表明,当使用 EFS 时,提高 EFS 性能的 IOPS(并支付更多费用)可以显著提高解析 Airflow DAG 的稳定性和速度。
如果文件系统性能成为瓶颈,另一个解决方案是转向替代性的DAG分发机制。将DAG嵌入镜像和使用GitSync分发都具有以下特性:文件在调度器本地可用,调度器无需使用分布式文件系统读取文件,这些文件在调度器本地即可获取,通常能达到最快速度,特别是当您的机器使用高速SSD磁盘作为本地存储时。这些分发机制具有其他特性,可能使它们不是您的最佳选择,但如果您的性能问题源自分布式文件系统的性能,它们可能是最佳解决方案。
随着您希望提高性能并并行处理更多任务,数据库连接和数据库使用可能会成为问题。Airflow以"极度消耗数据库连接"而闻名 - 您拥有的DAG越多,想要并行处理的任务越多,打开的数据库连接就越多。对于MySQL来说这通常不是问题,因为它的连接处理模型是基于线程的,但对于Postgres来说可能是个问题,因为它的连接处理是基于进程的。业界普遍认为,即使您的中等规模基于Postgres的Airflow部署,最佳解决方案是使用PGBouncer作为数据库代理。Helm Chart for Apache Airflow原生支持PGBouncer。
CPU使用率对文件处理器最为关键——这些是解析和执行Python DAG文件的进程。由于调度器会持续触发此类解析,当您拥有大量DAG时,处理过程可能会消耗大量CPU资源。您可以通过增加min_file_process_interval来缓解此问题,但这正是前文提到的权衡之一,其结果是文件变更的感知速度会变慢,您将观察到从提交文件到这些文件在Airflow UI中可见并被调度器执行之间存在延迟。优化DAG构建方式、避免使用外部数据源是改善CPU使用率的最佳途径。如果有更多可用CPU核心,您可以增加处理线程数parsing_processes。此外,Airflow调度器在多实例情况下几乎呈线性扩展,因此如果调度器性能受限于CPU,您还可以增加更多调度器实例。
当您试图从Airflow获取更高性能时,它可能会消耗相当多的内存。通常通过增加处理负载的进程数量来提升Airflow性能,每个进程都需要加载完整的Python解释器、导入大量类以及临时内存存储。Airflow通过使用fork和写时复制内存优化了许多内存使用,但如果fork后导入新类,可能会导致额外的内存压力。您需要观察系统是否使用了超过其物理内存的内存——这会导致使用交换磁盘,从而显著降低性能。请注意,在
2.1.4版本之前的Airflow调度程序会生成大量由日志文件使用的Page Cache内存(当日志文件未被删除时)。这通常是无害的,因为该内存只是缓存,系统可以随时回收,但在2.1.4及更高版本中,写入日志不会产生过多的Page Cache内存。无论如何——当您查看内存使用情况时,请确保注意所观察的内存类型。通常您应该关注working memory``(名称可能因部署环境而异) 而不是 ``total memory used。
如何优化Scheduler的性能¶
当您了解资源使用情况后,可以考虑的改进措施可能包括:
改进解析逻辑和效率,降低顶层DAG Python代码的复杂度。由于代码会被持续解析,优化这部分代码可能带来显著性能提升,特别是在解析DAG时尝试连接外部数据库等操作的情况下(应不惜一切代价避免这种情况)。Top level Python Code阐述了编写顶层Python代码的最佳实践。Reducing DAG complexity文档提供了若干可降低代码复杂度的优化方向。
提高资源利用率。当系统中有空闲容量似乎未被充分利用时(CPU、内存I/O、网络是主要考量因素)——您可以采取诸如增加调度程序数量、解析进程或缩短间隔以执行更频繁操作等措施,这可能会以更高资源利用率为代价带来性能提升。
increase hardware capacity (for example if you see that CPU is limiting you or that I/O you use for DAG filesystem is at its limits). Often the problem with scheduler performance is simply because your system is not “capable” enough and this might be the only way. For example if you see that you are using all CPU you have on machine, you might want to add another scheduler on a new machine - in most cases, when you add 2nd or 3rd scheduler, the capacity of scheduling grows linearly (unless the shared database or filesystem is a bottleneck).
尝试调整“调度器可调参数”的不同取值。通常,通过牺牲某一性能指标来换取另一指标的提升可能会获得更好的效果。例如,若想降低CPU使用率,可以增加文件处理间隔时间(但代价是新DAG的生成会出现更长的延迟)。性能调优本质上是一门平衡各项指标的技艺。
有时您会稍微调整调度器行为(例如更改解析排序顺序),以便为您的特定部署获得更精细优化的结果。
调度器配置选项¶
以下配置设置可用于控制调度器的各个方面。
不过,你也可以参考配置参考中[scheduler]部分列出的其他与性能无关的调度器配置参数。
max_dagruns_to_create_per_loop
这个参数控制每次调度循环中锁定的DAG数量。设置较低值的一个可能原因是:如果您有大型DAG(每个DAG包含1万+任务)且运行多个调度器时,您不会希望单个调度器承担所有工作。
max_dagruns_per_loop_to_schedule
调度程序在安排和排队任务时应检查(并锁定)多少个DagRuns。增加此限制将允许较小的DAG有更高的吞吐量,但可能会减慢较大(例如>500个任务)DAG的吞吐量。在使用多个调度程序时将此值设置过高,还可能导致一个调度程序占用所有DAG运行,而其他调度程序没有工作可做。
-
调度器是否应该在相关查询中使用
SELECT ... FOR UPDATE。 如果设置为False,则不应同时运行多个调度器。 -
池使用统计信息应多久(以秒为单位)发送到StatsD(如果启用了statsd_on)。计算此统计信息是一个相对昂贵的查询,因此应将其设置为与StatsD汇总周期相同的周期。
-
调度器检查孤儿任务或死亡SchedulerJobs的频率(以秒为单位)。
此设置控制如何检测到死亡的调度器,并将其"监督"的任务由另一个调度器接管。任务会继续运行,因此短时间内未检测到这种情况不会造成危害。
当检测到SchedulerJob"死亡"时(由scheduler_health_check_threshold确定),由死亡进程启动的任何正在运行或排队的任务将被此调度器"收养"并监控。
dag_dir_list_interval 扫描DAGs目录以查找新文件的频率(以秒为单位)。
file_parsing_sort_mode 调度器将列出并对DAG文件进行排序,以决定解析顺序。
max_tis_per_query 调度主循环中的查询批次大小。该值不应超过
core.parallelism。如果设置过高,SQL查询性能可能会受到查询谓词复杂度和/或过度锁定的影响。此外,可能会达到数据库允许的最大查询长度限制。设置为0表示使用
core.parallelism的值。min_file_process_interval DAG文件重新解析的时间间隔(秒)。DAG文件每隔min_file_process_interval秒会被解析一次。对DAG的更新会在这个间隔后生效。保持这个数值较低会增加CPU使用率。
parsing_processes 调度器可以并行运行多个进程来解析DAG文件。这个参数定义了将运行多少个进程。
scheduler_idle_sleep_time 控制调度器在循环之间休眠的时间,但前提是循环中没有任务需要处理。也就是说,如果调度器安排了任务,它将立即开始下一次循环迭代。这个参数命名不当(历史原因),未来将通过弃用当前名称进行重命名。
schedule_after_task_execution 任务监督进程是否应执行"迷你调度器"来尝试调度同一DAG中的更多任务。保持开启状态意味着同一DAG中的任务执行速度更快,但在某些情况下可能会使其他DAG处于饥饿状态。