操作符

Operator(操作符)在概念上是预定义Task的模板,您只需在DAG中以声明式方式定义即可:

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflow 提供了一套非常丰富的操作器,其中一些内置于核心或预安装的提供程序中。一些来自核心的流行操作器包括:

  • BashOperator - 执行bash命令

  • PythonOperator - 调用任意Python函数

  • EmailOperator - 发送电子邮件

  • 使用@task装饰器来执行任意Python函数。它不支持渲染作为参数传递的jinja模板。

注意

推荐使用@task装饰器而非传统的PythonOperator来执行参数中无需模板渲染的Python可调用对象。

有关所有核心操作符的列表,请参阅:Core Operators and Hooks Reference

如果默认情况下Airflow没有安装您需要的操作器,您很可能可以在我们庞大的社区provider packages集合中找到它。这里包含一些流行的操作器:

但还有更多、更多的内容 - 您可以在我们的providers packages文档中查看所有社区管理的operators、hooks、sensors和transfers的完整列表。

注意

在Airflow的代码中,我们经常将Tasks和Operators的概念混用,它们大多可以互换。然而,当我们谈论Task时,指的是DAG中通用的"执行单元";当我们谈论Operator时,指的是一个可重复使用的预制任务模板,其逻辑已全部完成,只需提供一些参数即可。

Jinja模板

Airflow 利用了 Jinja 模板引擎的强大功能,结合使用 可以成为一个强大的工具。

例如,假设你想使用BashOperator将数据间隔的开始时间作为环境变量传递给Bash脚本:

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

这里,{{ ds }}是一个模板变量,由于BashOperatorenv参数通过Jinja进行了模板化,数据间隔的开始日期将作为一个名为DATA_INTERVAL_START的环境变量在你的Bash脚本中可用。

当Python代码比Jinja模板更易读时,您也可以传入一个可调用对象。该可调用对象必须接受两个命名参数contextjinja_env

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        return do_complex_things(f)


t = BashOperator(
    task_id="complex_templated_echo",
    bash_command=build_complex_command,
    dag=dag,
)

由于每个模板字段仅渲染一次,可调用对象的返回值不会再次经过渲染。因此,可调用对象必须手动渲染任何模板。这可以通过在当前任务上调用render_template()来实现,如下所示:

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        data = do_complex_things(f)
    return context["task"].render_template(data, context, jinja_env)

您可以在文档中标记为"可模板化"的每个参数上使用模板功能。模板替换操作会在调用操作符的pre_execute函数之前执行。

您还可以在嵌套字段中使用模板功能,只要这些嵌套字段在其所属结构中已被标记为可模板化:注册在template_fields属性中的字段将被提交进行模板替换,如下例中的path字段所示:

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

注意

template_fields 属性是一个类变量,保证是 Sequence[str] 类型(即字符串的列表或元组)。

深层嵌套字段也可以被替换,只要所有中间字段都被标记为模板字段:

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

在创建DAG时,您可以向Jinja的Environment传递自定义选项。一个常见的用法是防止Jinja从模板字符串中删除尾随换行符:

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

查看 Jinja文档 以获取所有可用选项。

某些操作符在渲染字段时,也会将特定后缀(定义在template_ext中)结尾的字符串视为文件引用。这对于直接从文件加载脚本或查询(而非将它们包含在DAG代码中)非常有用。

例如,考虑一个运行多行bash脚本的BashOperator,这将加载script.sh文件并使用其内容作为bash_command的值:

run_script = BashOperator(
    task_id="run_script",
    bash_command="script.sh",
)

默认情况下,以这种方式提供的路径应相对于DAG的文件夹(因为这是默认的Jinja模板搜索路径),但可以通过在DAG上设置template_searchpath参数来添加其他路径。

在某些情况下,您可能希望从模板中排除某个字符串并直接使用它。考虑以下任务:

print_script = BashOperator(
    task_id="print_script",
    bash_command="cat script.sh",
)

这将导致TemplateNotFound: cat script.sh错误,因为Airflow会将该字符串视为文件路径而非命令。 我们可以通过将其包裹在literal()中来阻止Airflow将此值视为文件引用。 这种方法会禁用宏和文件的渲染,可以应用于选定的嵌套字段,同时保留其余内容的默认模板规则。

from airflow.utils.template import literal


fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command=literal("cat script.sh"),
)

在版本2.8中新增:literal() 被添加。

或者,如果您想防止Airflow将某个值视为文件引用,可以覆盖template_ext

fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()

将字段渲染为原生Python对象

默认情况下,template_fields中的所有Jinja模板都会被渲染为字符串。但这并不总是我们想要的。例如,假设一个extract任务将一个字典{"1001": 301.27, "1002": 433.21, "1003": 502.22}推送到XCom

@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

如果一个任务依赖于 extractorder_data 参数会被传递一个字符串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"

def transform(order_data):
    total_order_value = sum(order_data.values())  # Fails because order_data is a str :(
    return {"total_order_value": total_order_value}


transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

extract() >> transform

如果我们想要获取实际的字典,有两种解决方案。第一种是使用可调用对象:

def render_transform_op_kwargs(context, jinja_env):
    order_data = context["ti"].xcom_pull("extract")
    return {"order_data": order_data}


transform = PythonOperator(
    task_id="transform",
    op_kwargs=render_transform_op_kwargs,
    python_callable=transform,
)

或者,也可以指示Jinja渲染原生Python对象。这需要通过向DAG传递render_template_as_native_obj=True来实现。这将使Airflow使用NativeEnvironment而非默认的SandboxedEnvironment

with DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
):
    transform = PythonOperator(
        task_id="transform",
        op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
        python_callable=transform,
    )

保留参数关键字

在Apache Airflow 2.2.0中,params变量用于DAG序列化过程。请不要在第三方操作符中使用该名称。 如果您升级环境后遇到以下错误:

AttributeError: 'str' object has no attribute '__module__'

在您的操作器中将名称从params更改。

这篇内容对您有帮助吗?