DAG序列化¶
为了使Airflow Webserver实现无状态化,Airflow >=1.10.7版本支持DAG序列化与数据库持久化。从Airflow 2.0.0开始,调度器也使用序列化DAG来保持一致性并做出调度决策。
在没有DAG序列化和数据库持久化的情况下,Webserver和Scheduler都需要访问DAG文件。Scheduler和Webserver都会解析这些DAG文件。
通过DAG序列化,我们的目标是实现Web服务器与DAG解析的解耦,这将使Web服务器变得非常轻量。
如上图所示,使用此功能时,
调度器中的DagFileProcessorProcess
会解析DAG文件,将其序列化为JSON格式并作为SerializedDagModel模型
保存在元数据数据库中。
Web服务器现在不再需要重新解析DAG文件,而是读取JSON格式的序列化DAG,将其反序列化后创建DagBag并用于界面展示。调度器在做出调度决策时也不再需要实际的DAG文件,自Airflow 2.0.0起(这是Scheduler HA的一部分实现),我们改用包含所有调度所需信息的序列化DAG来替代原始DAG文件。
作为DAG序列化实现的关键特性之一,当WebServer启动时,我们不再加载整个DagBag,而是按需从序列化Dag表中加载每个DAG。这有助于减少Webserver的启动时间和内存占用。当您拥有大量DAG时,这种优化效果尤为显著。
您可以将源代码存储在数据库中,使Webserver完全独立于DAG文件。
如果您的文件已嵌入Docker镜像或以其他方式提供给Webserver,则无需此操作。数据存储在DagCode模型中。
最后一个元素是渲染模板字段。当启用序列化时,模板不会被渲染到请求中,但在工作节点上执行任务前会保存字段内容的副本。这些数据存储在RenderedTaskInstanceFields模型中。为了限制数据库的过度增长,只保留最近的条目,较旧的条目会被清除。
注意
DAG序列化是严格要求且无法在Airflow 2.0+版本中关闭的。
DAG序列化设置¶
在airflow.cfg中添加以下设置:
[core]
# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
min_serialized_dag_update_interval: 该参数设置序列化DAG在数据库中更新的最小间隔时间(以秒为单位),有助于降低数据库写入频率。min_serialized_dag_fetch_interval: 该选项控制在Webserver中DagBag已加载序列化DAG的情况下,从数据库重新获取DAG的频率。将此值设置得更高会减少数据库负载,但代价是可能显示过时的DAG缓存版本。max_num_rendered_ti_fields_per_task: 此选项控制每个任务存储在数据库中的渲染任务实例字段(模板字段)的最大数量。compress_serialized_dags: 此选项控制是否压缩序列化DAG到数据库。当集群中存在非常大的DAG时很有用。当True时,这将禁用DAG依赖视图。
如果您正在从Airflow <1.10.7版本升级,请不要忘记运行airflow db migrate。
限制¶
当使用用户自定义的过滤器和宏时,Web服务器中的"渲染视图"可能显示尚未执行的任务实例(TIs)的错误结果,因为它可能使用了Web服务器无法访问的外部模块。在这种情况下,请使用
airflow tasks renderCLI命令来调试或测试template_fields的渲染效果。一旦任务开始执行,渲染后的模板字段将被存储在数据库的单独表中,之后Web服务器("渲染视图"标签页)将显示正确的值。
注意
You need Airflow >= 1.10.10 for completely stateless Webserver. Airflow 1.10.7 to 1.10.9 needed access to DAG files in some cases. More Information: https://airflow.apache.org/docs/1.10.9/dag-serialization.html#limitations
使用不同的JSON库¶
要使用不同的JSON库(如ujson)替代标准json库,您需要在本地Airflow设置文件(airflow_local_settings.py)中定义一个json变量,如下所示:
import ujson
json = ujson
有关如何配置本地设置的详细信息,请参阅配置本地设置。