创建自定义Operator¶
Airflow允许您创建新的操作符来满足您或团队的需求。 这种可扩展性是使Apache Airflow强大的众多特性之一。
您可以通过扩展airflow.models.baseoperator.BaseOperator来创建任何您想要的算子
在派生类中需要重写以下两种方法:
构造函数 - 定义该算子所需的参数。您只需指定该算子特有的参数。 您可以在DAG文件中指定
default_args。更多详情请参阅Default args。执行 - 当运行器调用操作符时要执行的代码。该方法包含Airflow上下文作为参数,可用于读取配置值。
注意
在实现自定义算子时,不要在__init__方法中进行任何耗时操作。使用这些算子的任务每个调度周期都会实例化一次算子,如果进行数据库调用会显著降低调度速度并浪费资源。
让我们在一个新文件hello_operator.py中实现一个示例HelloOperator:
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
注意
为了使导入生效,您需要将文件放置在PYTHONPATH环境变量包含的目录中。Airflow默认会将Airflow主目录下的dags/、plugins/和config/目录添加到PYTHONPATH。例如,在我们的示例中,文件被放置在custom_operator/目录下。有关Python和Airflow如何管理模块的详细信息,请参阅Modules Management。
现在可以按如下方式使用派生的自定义操作符:
from custom_operator.hello_operator import HelloOperator
with dag:
hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
您也可以继续使用插件文件夹来存储自定义操作符。如果插件文件夹中包含文件hello_operator.py,您可以按如下方式导入操作符:
from hello_operator import HelloOperator
如果一个操作符需要与外部服务(如API、数据库等)通信,建议使用Hooks来实现通信层。这种方式可以让其他用户在不同的操作符中复用已实现的逻辑。相比为每个外部服务使用CustomServiceBaseOperator,这种方法能提供更好的解耦效果,并更充分地利用新增的集成功能。
另一个考虑因素是临时状态。如果一个操作需要内存中的状态(例如在on_kill方法中用于取消请求的作业ID),那么该状态应该保存在操作符中而不是钩子中。这样服务钩子可以完全无状态,整个操作的逻辑都集中在一个地方——操作符中。
钩子(Hooks)¶
Hooks(钩子)充当与DAG中外部共享资源通信的接口。 例如,DAG中的多个任务可能需要访问MySQL数据库。您无需为每个任务创建连接, 而是可以从Hook获取连接并使用它。Hook还有助于避免在DAG中存储连接认证参数。 有关如何创建和管理连接的信息,请参阅Managing Connections; 关于如何通过providers添加自定义连接类型的详细信息,请参见Provider packages。
让我们扩展之前的示例,从MySQL获取名称:
class HelloDBOperator(BaseOperator):
def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.mysql_conn_id = mysql_conn_id
self.database = database
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
sql = "select name from user"
result = hook.get_first(sql)
message = f"Hello {result['name']}"
print(message)
return message
当操作员在hook对象上调用查询时,如果连接不存在则会创建一个新连接。
hook会从Airflow后端获取用户名和密码等认证参数,并将这些参数传递给airflow.hooks.base.BaseHook.get_connection()。
您应该只在execute方法或从execute调用的任何方法中创建hook。
每当Airflow解析DAG时都会调用构造函数(这种情况经常发生),此时实例化hook会导致许多不必要的数据库连接。
而execute方法仅在DAG运行时才会被调用。
用户界面¶
Airflow还允许开发者控制操作符在DAG界面中的显示方式。
重写ui_color可以更改操作符在界面中的背景颜色。
重写ui_fgcolor可以更改标签的颜色。
重写custom_operator_name可以将显示名称更改为类名以外的其他名称。
class HelloOperator(BaseOperator):
ui_color = "#ff0000"
ui_fgcolor = "#000000"
custom_operator_name = "Howdy"
# ...
模板化¶
您可以使用Jinja模板来参数化您的操作器。
Airflow在渲染操作器时会将template_fields中的字段名称视为模板字段。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
您可以按如下方式使用该模板:
with dag:
hello_task = HelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="Earth",
)
在这个示例中,Jinja会查找name参数,并将{{ task_instance.task_id }}替换为
task_id_1。
该参数也可以包含一个文件名,例如bash脚本或SQL文件。您需要在template_ext中添加文件的扩展名。如果template_field包含一个以template_ext中提到的扩展名结尾的字符串,Jinja会读取文件内容并用实际值替换模板。请注意,Jinja替换的是操作符属性而非参数。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("guest_name",)
template_ext = ".sql"
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.guest_name = name
在示例中,template_fields 应为 ['guest_name'] 而非 ['name']
此外,您还可以提供template_fields_renderers字典,用于定义模板字段值在Web界面中的渲染样式。例如:
class MyRequestOperator(BaseOperator):
template_fields: Sequence[str] = ("request_body",)
template_fields_renderers = {"request_body": "json"}
def __init__(self, request_body: str, **kwargs) -> None:
super().__init__(**kwargs)
self.request_body = request_body
在template_field本身是字典的情况下,还可以指定点分隔的键路径来适当地提取和渲染单个元素。例如:
class MyConfigOperator(BaseOperator):
template_fields: Sequence[str] = ("configuration",)
template_fields_renderers = {
"configuration": "json",
"configuration.query.sql": "sql",
}
def __init__(self, configuration: dict, **kwargs) -> None:
super().__init__(**kwargs)
self.configuration = configuration
然后按如下方式使用此模板:
with dag:
config_task = MyConfigOperator(
task_id="task_id_1",
configuration={"query": {"job_id": "123", "sql": "select * from my_table"}},
)
这将导致UI除了渲染query.sql中包含的使用SQL语法分析器处理的值外,还会将configuration以json格式渲染。
当前可用的词法分析器:
bash
bash_command
文档
文档JSON
doc_md
文档_rst
文档_yaml
文档标记
hql
html
jinja
json
md
mysql
postgresql
PowerShell
py
python_callable
rst
SQL
tsql
yaml
如果使用不存在的词法分析器,模板字段的值将被渲染为一个格式化打印的对象。
限制¶
为防止滥用,在操作符构造函数中定义和分配模板字段时(如果存在,否则请参见下文),必须遵守以下限制:
1. 模板化字段对应的构造函数传入参数必须与字段名称完全相同。以下示例无效,因为传入构造函数的参数与模板化字段不一致:
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a_id) -> None: # <- should be def __init__(field_a)-> None
self.field_a = field_a_id # <- should be self.field_a = field_a
2. 模板字段的实例成员必须通过构造函数中的对应参数进行赋值,可以通过直接赋值或调用父类构造函数(其中这些字段被定义为template_fields)并显式分配参数来实现。以下示例是无效的,因为尽管self.field_a是一个模板字段,但该实例成员完全没有被赋值:
class HelloOperator(BaseOperator):
template_fields = ("field_a", "field_b")
def __init__(field_a, field_b) -> None:
self.field_b = field_b
以下示例同样无效,因为MyHelloOperator的实例成员self.field_a是作为传递给其父类构造函数的kwargs的一部分被隐式初始化的:
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = ("field_a", "field_b")
def __init__(field_b, **kwargs) -> None: # <- should be def __init__(field_a, field_b, **kwargs)
super().__init__(**kwargs) # <- should be super().__init__(field_a=field_a, **kwargs)
self.field_b = field_b
3. 在构造函数的参数赋值过程中不允许对参数执行任何操作。
所有对值的操作应在 execute() 方法中完成。
因此,以下示例是无效的:
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a.lower() # <- assignment should be only self.field_a = field_a
当一个操作符继承自基础操作符且自身未定义构造函数时,上述限制条件不适用。但模板化字段必须按照这些限制在父类中正确设置。
因此,以下示例是有效的:
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = "field_a"
上述限制由一个名为'validate-operators-init'的预提交钩子强制执行。
通过子类化添加模板字段¶
创建自定义操作符的一个常见用例是简单地扩展现有的template_fields。
可能会出现这种情况:您希望使用的操作符没有将某些参数定义为模板化参数,但您希望能够将参数作为Jinja表达式动态传递。通过快速子类化现有操作符,可以轻松实现这一点。
假设你想使用之前定义的HelloOperator:
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
然而,您希望动态参数化world参数。由于template_fields属性保证是Sequence[str]类型(即字符串列表或元组),您可以子类化HelloOperator来根据需要轻松修改template_fields。
class MyHelloOperator(HelloOperator):
template_fields: Sequence[str] = (*HelloOperator.template_fields, "world")
现在你可以这样使用 MyHelloOperator:
with dag:
hello_task = MyHelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="{{ var.value.my_world }}",
)
在这个示例中,world参数将通过Jinja表达式动态设置为名为"my_world"的Airflow变量值。
定义操作符额外链接¶
对于您的操作符,您可以定义一个额外链接,该链接可以将用户重定向到外部系统。例如,您可以添加一个将用户重定向到操作符手册的链接。
传感器¶
Airflow提供了一种特殊操作符的基元,其目的是定期轮询某些状态(例如文件是否存在),直到满足成功条件。
你可以通过扩展airflow.sensors.base.BaseSensorOperator来创建任意所需的传感器,
定义一个poke方法来轮询外部状态并评估成功条件。
传感器有一个强大的功能叫做'reschedule'模式,该模式允许传感器任务被重新调度,而不是在轮询间隔期间占用一个工作槽。当您可以容忍较长的轮询间隔并预期需要长时间轮询时,这个功能非常有用。
重新调度模式有一个注意事项,即您的传感器无法在重新调度执行之间保持内部状态。在这种情况下,您应该用airflow.sensors.base.poke_mode_only()装饰您的传感器。这将让用户知道您的传感器不适合用于重新调度模式。
一个无法与重新调度模式一起使用的、需要保持内部状态的传感器示例是airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor。它会轮询前缀下的对象数量(这个数量就是传感器的内部状态),当对象数量在一段时间内没有变化时,传感器就会判定为成功。