参数

参数(Params)允许您为任务提供运行时配置。您可以在DAG代码中配置默认参数,并在触发DAG时提供额外参数或覆盖参数值。Param值会通过JSON Schema进行验证。对于计划执行的DAG运行,将使用默认的Param值。

已定义的参数用于在手动触发时呈现良好的用户界面。 当您手动触发DAG时,可以在dagrun开始前修改其参数。 如果用户提供的值未通过验证,Airflow会显示警告而非创建dagrun。

DAG级别参数

要向DAG添加参数,请使用params关键字参数初始化它。 使用一个将参数名称映射到Param或指示参数默认值的对象的字典。

 from airflow import DAG
 from airflow.decorators import task
 from airflow.models.param import Param

 with DAG(
     "the_dag",
     params={
         "x": Param(5, type="integer", minimum=3),
         "my_int_param": 6
     },
 ) as dag:

     @task.python
     def example_task(params: dict):
         # This will print the default value, 6:
         dag.log.info(dag.params['my_int_param'])

         # This will print the manually-provided value, 42:
         dag.log.info(params['my_int_param'])

         # This will print the default value, 5, since it wasn't provided manually:
         dag.log.info(params['x'])

     example_task()

 if __name__ == "__main__":
     dag.test(
         run_conf={"my_int_param": 42}
     )

注意

DAG级别的参数是传递给任务的默认值。这些不应与通过UI表单或CLI手动提供的值混淆,后者仅存在于DagRunTaskInstance的上下文中。这一区别对于TaskFlow DAG至关重要,因为其中可能包含with DAG(...) as dag:代码块内的逻辑。在这种情况下,用户可能尝试使用dag对象访问手动提供的参数值,但该对象始终只包含默认值。为确保访问手动提供的值,请在任务中使用模板变量如paramsti

任务级参数

你也可以为单个任务添加参数。

def print_my_int_param(params):
  print(params.my_int_param)

PythonOperator(
    task_id="print_my_int_param",
    params={"my_int_param": 10},
    python_callable=print_my_int_param,
)

任务级参数优先于DAG级参数,而用户提供的参数(在触发DAG时)优先于任务级参数。

在任务中引用参数

参数可以在模板字符串中通过params引用。例如:

 PythonOperator(
     task_id="from_template",
     op_args=[
         "{{ params.my_int_param + 10 }}",
     ],
     python_callable=(
         lambda my_int_param: print(my_int_param)
     ),
 )

尽管Params可以使用多种类型,但模板的默认行为是为您的任务提供字符串。 您可以通过在初始化DAG时设置render_template_as_native_obj=True来更改此行为。

 with DAG(
     "the_dag",
     params={"my_int_param": Param(5, type="integer", minimum=3)},
     render_template_as_native_obj=True
 ):

这样,当Param被提供给您的任务时,其类型会得到遵循:

# prints <class 'str'> by default
# prints <class 'int'> if render_template_as_native_obj=True
PythonOperator(
    task_id="template_type",
    op_args=[
        "{{ params.my_int_param }}",
    ],
    python_callable=(
        lambda my_int_param: print(type(my_int_param))
    ),
)

另一种访问参数的方式是通过任务的context关键字参数。

 def print_my_int_param(**context):
     print(context["params"]["my_int_param"])

 PythonOperator(
     task_id="print_my_int_param",
     python_callable=print_my_int_param,
     params={"my_int_param": 12345},
 )

JSON 模式验证

Param makes use of JSON Schema, so you can use the full JSON Schema specifications mentioned at https://json-schema.org/draft/2020-12/json-schema-validation.html to define Param objects.

with DAG(
    "my_dag",
    params={
        # an int with a default value
        "my_int_param": Param(10, type="integer", minimum=0, maximum=20),

        # a required param which can be of multiple types
        # a param must have a default value
        "multi_type_param": Param(5, type=["null", "number", "string"]),

        # an enum param, must be one of three values
        "enum_param": Param("foo", enum=["foo", "bar", 42]),

        # a param which uses json-schema formatting
        "email": Param(
            default="example@example.com",
            type="string",
            format="idn-email",
            minLength=5,
            maxLength=255,
        ),
    },
):

注意

如果为DAG定义了schedule,则带有默认值的参数必须有效。这会在DAG解析期间进行验证。 如果schedule=None,则参数不会在DAG解析期间验证,而是在触发DAG之前验证。 这在DAG作者不想提供默认值但希望强制用户在触发时提供有效参数的情况下非常有用。

注意

出于安全考虑,目前无法使用从自定义类派生的Param对象。我们计划为自定义Param类建立一个注册系统,就像我们为Operator ExtraLinks所做的那样。

使用参数提供触发器UI表单

在2.6.0版本中新增。

DAG 级别的参数用于渲染用户友好的触发表单。 当用户点击“Trigger DAG”按钮时,会显示此表单。

触发器UI表单是基于预定义的DAG参数渲染的。如果DAG没有定义参数,则跳过触发表单。 表单元素可以通过Param类来定义,其属性决定了表单字段的显示方式。

Trigger UI 表单支持以下功能:

  • 来自顶层DAG参数的直接标量值(布尔值、整数、字符串、列表、字典)会自动封装到Param对象中。系统会根据原生Python数据类型自动检测type属性。因此这些简单类型会渲染为对应的字段类型。参数名称会被用作标签,且不进行额外验证,所有值都被视为可选。

  • 如果使用Param类作为参数值的定义,可以添加以下属性:

    • Param 属性 title 用于渲染输入框的表单字段标签。 如果未定义 title,则使用参数名称/键代替。

    • Param 属性 description 会在输入框下方以灰色帮助文本的形式呈现。 如需提供特殊格式或链接,需使用 Param 属性 description_md。具体示例可参考教程 DAG Params UI example DAG

    • Param 属性的 type 会影响字段的渲染方式。支持以下类型:

      参数类型

      表单元素类型

      额外支持的属性

      示例

      string

      生成一个单行文本框或文本区域用于编辑文本。

      • minLength: 最小文本长度

      • maxLength: 最大文本长度

      • format="date": 生成日期选择器
        带日历弹窗
      • format="date-time": 生成一个带有日历弹窗的日期和时间选择器
      • format="time": 生成一个时间选择器

      • format="multiline": 生成一个多行文本框

      • enum=["a", "b", "c"]: 生成一个
        用于标量值的下拉选择列表。
        根据JSON验证规则,必须选择
        一个值或者将该字段明确标记为
        可选。更多详情请参阅
      • values_display={"a": "Alpha", "b": "Beta"}:
        对于通过
        enum生成的下拉选择框,您可以添加属性
        values_display并使用字典将数据值映射到显示标签。
      • examples=["One", "Two", "Three"]: 如果
        你想提供可选值建议
        (不像上面的enum那样限制用户
        只能选择固定值),可以使用examples
        这是一个项目列表。
      另请参阅
      这些是在后端DAG触发之前检查的。

      Param("default", type="string", maxLength=10)

      Param(f"{datetime.date.today()}", type="string", format="date")

      number

      integer

      生成一个限制添加的字段
      numeric values only. The HTML browser
      通常还会在
      上添加一个加载动画
      右侧可增加或减少
      value. integer only permits int
      numbers, number allows also
      分数值。
      • minimum: 最小值

      • maximum: 最大值

      另请参阅
      这些在后端DAG触发前会被检查。

      Param(42, type="integer", minimum=14, multipleOf=7)

      boolean

      生成一个用于切换的按钮
      as True or False.

      无。

      Param(True, type="boolean")

      array

      Generates a HTML multi line text field,
      每行编辑的内容都将被转换为
      字符串数组作为值。
      • 如果添加属性 examples
        并指定一个列表,将会生成多选下拉框
        而不是自由文本输入框。
      • values_display={"a": "Alpha", "b": "Beta"}:
        对于多值选择examples,您可以添加
        属性values_display,使用字典将
        数据值映射到显示标签。
      • 如果添加属性items并附带一个
        包含type字段的字典,
        且该字段值不是"string"时,将会生成
        JSON条目字段以支持更多数组类型和
        额外的类型验证,具体描述请参考

      Param(["a", "b", "c"], type="array")

      Param(["two", "three"], type="array", examples=["one", "two", "three", "four", "five"])

      Param(["one@example.com", "two@example.com"], type="array", items={"type": "string", "format": "idn-email"})

      object

      生成一个包含JSON的输入字段
      文本验证。
      The HTML form does only validate the syntax of the
      JSON输入。用于验证以下内容
      具体结构请查看

      Param({"key": "value"}, type=["object", "null"])

      null

      指定不期望有任何内容。
      单独使用这个功能意义不大
      但对于类型组合很有用
      like type=["null", "string"] as the
      type属性也接受一个列表
      类型。
      默认情况下,如果您指定一个类型,则
      该字段将被设为必填项
      输入 - 由于JSON验证。
      如果您希望某个字段的值
      添加了可选参数,您必须允许
      允许空值的JSON模式验证
      值。

      Param(None, type=["null", "string"])

  • 如果表单字段留空,它将以None值传递给params字典。

  • 表单字段按照DAG中params定义的顺序呈现。

  • 如果想在表单中添加分区,请为每个字段添加section属性。该文本将用作分区标签。 没有section的字段将在默认区域呈现。 其他分区默认会折叠显示。

  • 如果希望某些参数不显示,可以使用const属性。这些参数会被提交但在表单中隐藏。 const值必须与默认值匹配才能通过JSON Schema验证

  • 在表单底部可以展开生成的JSON配置。如需手动修改值,可以调整JSON配置。当表单字段变更时,手动修改会被覆盖。

  • 要在发布触发器表单链接时预填充表单中的值,您可以调用触发器URL /dags//trigger,并在URL中添加查询参数,格式为name=value,例如/dags/example_params_ui_tutorial/trigger?required_field=some%20text。要预定义DAG运行的运行ID,请使用URL参数run_id

  • 字段可以是必填或选填。默认情况下,类型化字段是必填的,以确保它们通过JSON模式验证。要使类型化字段变为选填,必须允许"null"类型。

  • 没有“部分”的字段将在默认区域呈现。其他部分默认会折叠显示。

注意

如果该字段是必填项,默认值也必须符合模式要求。如果DAG定义为schedule=None,参数值验证将在触发时进行。

例如,请查看提供的两个示例DAG:参数触发示例DAG参数界面示例DAG

airflow/example_dags/example_params_trigger_ui.py[source]

with DAG(
    dag_id=Path(__file__).stem,
    dag_display_name="Params Trigger UI",
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example", "params"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", task_display_name="Get names")
    def get_names(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        if "names" not in params:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return params["names"]

    @task.branch(task_id="select_languages", task_display_name="Select languages")
    def select_languages(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        selected_languages = []
        for lang in ["english", "german", "french"]:
            if params[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
    def generate_english_greeting(name: str) -> str:
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
    def generate_german_greeting(name: str) -> str:
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
    def generate_french_greeting(name: str) -> str:
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)

airflow/example_dags/example_params_ui_tutorial.py[source]

    params={
        # Let's start simple: Standard dict values are detected from type and offered as entry form fields.
        # Detected types are numbers, text, boolean, lists and dicts.
        # Note that such auto-detected parameters are treated as optional (not required to contain a value)
        "x": 3,
        "text": "Hello World!",
        "flag": False,
        "a_simple_list": ["one", "two", "three", "actually one value is made per line"],
        # But of course you might want to have it nicer! Let's add some description to parameters.
        # Note if you can add any Markdown formatting to the description, you need to use the description_md
        # attribute.
        "most_loved_number": Param(
            42,
            type="integer",
            title="Your favorite number",
            description_md="Everybody should have a **favorite** number. Not only _math teachers_. "
            "If you can not think of any at the moment please think of the 42 which is very famous because"
            "of the book [The Hitchhiker's Guide to the Galaxy]"
            "(https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy#"
            "The_Answer_to_the_Ultimate_Question_of_Life,_the_Universe,_and_Everything_is_42).",
        ),
        # If you want to have a selection list box then you can use the enum feature of JSON schema
        "pick_one": Param(
            "value 42",
            type="string",
            title="Select one Value",
            description="You can use JSON schema enum's to generate drop down selection boxes.",
            enum=[f"value {i}" for i in range(16, 64)],
        ),

airflow/example_dags/example_params_ui_tutorial.py[source]

        "required_field": Param(
            # In this example we have no default value
            # Form will enforce a value supplied by users to be able to trigger
            type="string",
            title="Required text field",
            description="This field is required. You can not submit without having text in here.",
        ),
        "optional_field": Param(
            "optional text, you can trigger also w/o text",
            type=["null", "string"],
            title="Optional text field",
            description_md="This field is optional. As field content is JSON schema validated you must "
            "allow the `null` type.",
        ),

airflow/example_dags/example_params_ui_tutorial.py[source]

    @task(task_display_name="Show used parameters")
    def show_params(**kwargs) -> None:
        params: ParamsDict = kwargs["params"]
        print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")

    show_params()
../_images/trigger-dag-tutorial-form.png

在版本2.7.0中新增:即使没有定义参数,也可以通过配置开关webserver.show_trigger_form_if_no_params强制显示触发器表单。

Changed in version 2.8.0: By default custom HTML is not allowed to prevent injection of scripts or other malicious HTML code. If you trust your DAG authors you can change the trust level of parameter descriptions to allow raw HTML by setting the configuration entry webserver.allow_raw_html_descriptions to True. With the default setting all HTML will be displayed as plain text. This relates to the previous feature to enable rich formatting with the attribute description_html which is now super-seeded with the attribute description_md. Custom form elements using the attribute custom_html_form allow a DAG author to specify raw HTML form templates. These custom HTML form elements are deprecated as of version 2.8.0.

禁用运行时参数修改

在触发DAG时更新参数的能力取决于标志core.dag_run_conf_overrides_params。将此配置设置为False将有效地将您的默认参数变为常量。

这篇内容对您有帮助吗?