任务池¶
当过多进程同时访问时,某些系统可能会不堪重负。Airflow池可用于限制任意任务集的执行并行度。池列表在用户界面(Menu -> Admin -> Pools)中通过为池命名并分配工作槽数量进行管理。您还可以在此决定该池是否应在计算占用槽位时包含deferred tasks。
任务可以通过在创建任务时使用pool参数与现有资源池中的一个相关联:
aggregate_db_message_job = BashOperator(
task_id="aggregate_db_message_job",
execution_timeout=timedelta(hours=3),
pool="ep_data_pipeline_db_msg_agg",
bash_command=aggregate_db_message_job_cmd,
dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
任务会像往常一样被调度,直到槽位被占满。一个任务占用的槽位数量可以通过pool_slots进行配置(参见下方章节)。一旦达到容量上限,可运行的任务将进入队列,其状态会在用户界面中显示为排队状态。当槽位释放时,排队的任务会根据任务及其子任务的优先级权重开始运行。
请注意,如果任务未指定资源池,它们将被分配到默认池 default_pool。该默认池初始包含128个槽位,可通过用户界面或命令行界面修改(但无法删除)。
使用多个池槽位¶
默认情况下,Airflow任务每个都会占用一个池槽位,但如有需要可以通过pool_slots参数配置占用更多槽位。当属于同一池的多个任务具有不同的"计算权重"时,这个功能特别有用。
例如,考虑一个包含2个插槽的池,Pool(pool='maintenance', slots=2),以及以下任务:
BashOperator(
task_id="heavy_task",
bash_command="bash backup_data.sh",
pool_slots=2,
pool="maintenance",
)
BashOperator(
task_id="light_task1",
bash_command="bash check_files.sh",
pool_slots=1,
pool="maintenance",
)
BashOperator(
task_id="light_task2",
bash_command="bash remove_files.sh",
pool_slots=1,
pool="maintenance",
)
由于重型任务配置为使用2个池槽位,运行时将耗尽池资源。因此,任何轻型任务都必须排队等待重型任务完成后才能执行。从资源使用角度来看,这里的重型任务相当于两个轻型任务并发运行。
该实现可以防止系统资源过载,在本例中,当一个重量级任务和一个轻量级任务同时运行时可能会发生这种情况。 另一方面,两个轻量级任务可以并发运行,因为它们各自只占用一个资源池槽位,而重量级任务需要等待两个资源池槽位可用后才能执行。
警告
池(Pools)和子DAG(SubDAGs)的交互方式可能与您最初的预期不同。子DAG不会继承您在顶层为它们设置的任何池;必须直接在子DAG内部的任务上设置池。