插件

Airflow内置了一个简单的插件管理器,只需将文件放入$AIRFLOW_HOME/plugins文件夹,即可将外部功能集成到其核心中。

plugins文件夹中的python模块会被导入,和网页视图会被集成到Airflow的主集合中并可供使用。

要排查插件问题,您可以使用airflow plugins命令。 该命令会输出已加载插件的相关信息。

版本2.0变更:不再支持通过airflow.{operators,sensors,hooks}.导入插件中的操作器、传感器和钩子,这些扩展现在应该作为常规Python模块导入。更多信息请参阅:模块管理创建自定义操作器

用途是什么?

Airflow提供了一个通用的数据工作工具箱。不同的组织拥有不同的技术栈和需求。使用Airflow插件可以让企业定制他们的Airflow安装,以适配其生态系统。

插件可以作为一种简便的方式来编写、共享和激活新的功能集。

还需要一套更复杂的应用程序来与不同类型的数据和元数据进行交互。

示例:

  • 一组用于解析Hive日志并暴露Hive元数据(CPU/IO/阶段/倾斜/...)的工具

  • 一个异常检测框架,允许人们收集指标、设置阈值和警报

  • 审计工具,帮助了解谁访问了什么

  • 一个基于配置的SLA监控工具,允许您设置监控表及其预期到达时间,提醒相关人员,并提供中断情况的可视化展示

为什么基于Airflow构建?

Airflow 包含许多在构建应用程序时可重复使用的组件:

  • 一个可用于呈现视图的网页服务器

  • 用于存储模型的元数据库

  • 访问您的数据库,并了解如何连接到它们

  • 一组可供您的应用程序推送工作负载的工作节点

  • Airflow已部署,您可以直接利用其部署物流

  • 基础图表功能、底层库和抽象层

插件何时(重新)加载?

插件默认是懒加载的,一旦加载后就不会重新加载(除了在Web服务器中会自动加载UI插件)。如果要在每个Airflow进程启动时加载它们,请在airflow.cfg中设置[core] lazy_load_plugins = False

这意味着如果您对插件进行了任何更改,并且希望webserver或scheduler使用新代码,您需要重启这些进程。不过,在scheduler重新启动之前,这些更改不会反映到新运行的任务中。

默认情况下,任务执行采用fork模式。这种方式避免了创建新Python解释器时产生的性能损耗,也无需重新解析Airflow的所有代码和启动例程。对于短时任务而言,这种方案能带来显著优势。需要注意的是:如果在任务中使用插件并希望更新插件,您需要重启工作节点(使用CeleryExecutor时)或调度器(使用Local/Sequential执行器时)。另一种方案是接受启动时的性能损耗,将配置项core.execute_tasks_new_python_interpreter设为True,这样会为每个任务启动全新的Python解释器。

(另一方面,仅由DAG文件导入的模块不会遇到此问题,因为DAG文件不会在任何长期运行的Airflow进程中被加载/解析。)

界面

要创建一个插件,你需要继承airflow.plugins_manager.AirflowPlugin类,并将你想要接入Airflow的对象进行引用。以下是需要继承的类的示例:

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
    flask_blueprints = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_views = []
    # A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
    appbuilder_menu_items = []

    # A callback to perform actions when airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in DAGs.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

您可以通过继承来派生它(请参考下面的示例)。在示例中,所有选项都已定义为类属性,但如果您需要执行额外的初始化,也可以将它们定义为属性。请注意,必须在此类中指定name

修改插件后请确保重启webserver和scheduler以使更改生效。

示例

以下代码定义了一个插件,该插件在Airflow中注入了一组示例对象定义。

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
    pass


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
    "name": "Test View",
    "category": "Test Plugin",
    "view": v_appbuilder_view,
}

v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}

# Creating flask appbuilder Menu Items
appbuilder_mitem = {
    "name": "Google",
    "href": "https://www.google.com",
    "category": "Search",
}
appbuilder_mitem_toplevel = {
    "name": "Apache",
    "href": "https://www.apache.org/",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    hooks = [PluginHook]
    macros = [plugin_macro]
    flask_blueprints = [bp]
    appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
    appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]

从CSRF保护中排除视图

我们强烈建议您应该对所有视图启用CSRF保护。但如果需要,您可以使用装饰器来排除某些视图。

from airflow.www.app import csrf


@csrf.exempt
def my_handler():
    # ...
    return "ok"

将插件作为Python包

可以通过setuptools入口点机制加载插件。为此,请使用包中的入口点链接您的插件。如果安装了该包,Airflow将自动从入口点列表中加载已注册的插件。

注意

入口点名称(例如my_plugin)和插件类的名称都不会影响插件本身的模块和类名。

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint

# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"
    flask_blueprints = [bp]

然后在 pyproject.toml 文件中:

[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"

自动重新加载的Web服务器

要启用当检测到插件目录变更时自动重新加载网页服务器的功能,您应将[webserver]部分中的reload_on_plugin_change选项设置为True

注意

有关配置设置的更多信息,请参阅设置配置选项

注意

有关Python和Airflow如何管理模块的详细信息,请参阅模块管理

故障排除

你可以使用Flask CLI来排查问题。运行前需要将变量FLASK_APP设置为airflow.www.app:create_app

例如,要打印所有路由,请运行:

FLASK_APP=airflow.www.app:create_app flask routes

这篇内容对您有帮助吗?