PythonOperator¶
使用PythonOperator来执行Python可调用对象。
提示
推荐使用@task装饰器而非传统的PythonOperator来执行Python可调用对象。
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"
run_this = print_context()
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print("::group::All kwargs")
pprint(kwargs)
print("::endgroup::")
print("::group::Context variable ds")
print(ds)
print("::endgroup::")
return "Whatever you return gets printed in the logs"
run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)
传入参数¶
像普通Python函数一样,将额外参数传递给@task装饰的函数。
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)
run_this >> log_the_sql >> sleeping_task
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = PythonOperator(
task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
)
run_this >> log_the_sql >> sleeping_task
模板化¶
Airflow 传入了一组额外的关键字参数:一个用于每个
Jinja模板变量和一个templates_dict
参数。
templates_dict 参数支持模板化,因此字典中的每个值都会被当作 Jinja模板 进行解析。
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = log_sql()
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = PythonOperator(
task_id="log_sql_query",
python_callable=log_sql,
templates_dict={"query": "sql/sample.sql"},
templates_exts=[".sql"],
)
PythonVirtualenvOperator¶
使用PythonVirtualenvOperator装饰器在新创建的Python虚拟环境中执行Python可调用对象。运行Airflow的环境中需要安装virtualenv包(作为可选依赖项pip install apache-airflow[virtualenv] --constraint ...)。
此外,需要使用命令pip install [cloudpickle] --constraint ...将cloudpickle包作为可选依赖项安装。该包是当前使用的dill包的替代品。
Cloudpickle的主要优势在于其专注于标准pickling协议,确保更广泛的兼容性和更顺畅的数据交换,同时仍能有效处理常见的Python对象和函数内的全局变量。
提示
推荐使用@task.virtualenv装饰器而非传统的PythonVirtualenvOperator来在新的Python虚拟环境中执行Python可调用对象。
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = callable_virtualenv()
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
传入参数¶
向使用@task.virtualenv装饰的函数传递额外参数时,可以像普通Python函数一样操作。
遗憾的是,由于底层库的兼容性问题,Airflow不支持序列化var、ti和task_instance。对于Airflow上下文变量,
请确保通过将system_site_packages设置为True或向requirements参数添加apache-airflow来获得对Airflow的访问权限。
否则,您将无法在op_kwargs中访问大多数Airflow上下文变量。
如果您需要与日期时间对象(如data_interval_start)相关的上下文,可以添加pendulum和
lazy_object_proxy。
重要
为执行而定义的Python函数体会从DAG中被提取出来,放入一个临时文件中,不包含周围的代码。 如示例所示,您需要重新添加所有导入,并且不能依赖全局Python上下文中的变量。
如果想将变量传递到经典的PythonVirtualenvOperator中,请使用
op_args和op_kwargs。
如果包安装需要额外参数,可以通过pip_install_options参数传入,或如下例所示使用requirements.txt文件:
SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives
所有支持的选项都列在requirements文件格式中。
虚拟环境设置选项¶
虚拟环境是基于工作节点上的全局Python pip配置创建的。您可以在环境中使用额外的环境变量,或按照pip config中所述调整通用pip配置。
如果您想使用额外的任务特定私有Python仓库来设置虚拟环境,可以传递index_urls参数来调整pip安装配置。传入的索引URL将替换标准系统配置的索引URL设置。
为避免在DAG代码中将密钥添加到私有仓库,您可以使用Airflow的Connections & Hooks。为此目的,可以使用连接类型Package Index (Python)。
在特殊情况下,如果你想阻止远程调用来设置虚拟环境,可以将index_urls作为空列表传递,即index_urls=[],这会强制pip安装程序使用--no-index选项。
缓存与复用¶
虚拟环境的设置会在临时目录中为每个任务执行创建。执行完成后,虚拟环境会被再次删除。请确保工作节点上的$tmp文件夹
有足够的磁盘空间。通常情况下(如果未另行配置),将使用本地pip缓存以避免每次执行都重新下载软件包。
但每次执行时设置虚拟环境仍需要一定时间。对于重复执行的情况,您可以将选项venv_cache_path设置为工作节点上的文件系统文件夹。这样虚拟环境只需设置一次即可重复使用。如果使用虚拟环境缓存,系统会根据不同的需求集在缓存路径中创建不同的虚拟环境子文件夹。因此根据您系统中DAG的多样性,需要确保有足够的磁盘空间。
请注意,在缓存模式下不会进行自动清理。所有工作槽共享相同的虚拟环境,但如果任务在不同工作器上反复调度,可能会出现虚拟环境在多个工作器上单独创建的情况。此外,如果工作器在Kubernetes POD中启动,工作器的重启将清除缓存(假设venv_cache_path不在持久卷上)。
如果在运行时遇到缓存的虚拟环境损坏问题,您可以通过设置Airflow变量PythonVirtualenvOperator.cache_key为任意文本来影响缓存目录哈希值。该变量的内容会被用于计算缓存目录密钥的向量中。
请注意,对缓存的虚拟环境进行的任何修改(如二进制路径中的临时文件、后续安装额外依赖项)可能会污染缓存的虚拟环境,且该操作员不会维护或清理缓存路径。
ExternalPythonOperator¶
ExternalPythonOperator 可以帮助您使用与其他任务(以及主Airflow环境)不同的Python库集来运行某些任务。这可以是一个虚拟环境,或者是Airflow任务运行环境中预先安装并可用的任何Python安装。该操作器将Python二进制文件作为python参数。请注意,即使在虚拟环境的情况下,python路径也应指向虚拟环境内的Python二进制文件(通常在虚拟环境的bin子目录中)。与常规使用虚拟环境不同,不需要激活环境。仅使用python二进制文件即可自动激活它。在下面的两个示例中,PATH_TO_PYTHON_BINARY就是这样一个路径,指向可执行的Python二进制文件。
使用ExternalPythonOperator在预定义环境中执行Python可调用对象。virtualenv包应预先安装在运行Python的环境中。如果使用dill,则必须预先安装该环境(与主Airflow环境中安装的版本相同)。
提示
推荐使用@task.external_python装饰器而非传统的ExternalPythonOperator来在预定义的Python环境中执行Python代码。
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = callable_external_python()
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = ExternalPythonOperator(
task_id="external_python",
python_callable=callable_external_python,
python=PATH_TO_PYTHON_BINARY,
)
传入参数¶
向@task.external_python装饰的函数传递额外参数时,可以像普通Python函数那样操作。
遗憾的是,由于底层库的兼容性问题,Airflow不支持序列化var和ti/task_instance。
对于Airflow上下文变量,请确保虚拟环境中的Airflow安装版本与任务运行的Airflow版本一致。
否则您将无法在op_kwargs中访问大多数Airflow上下文变量。
如果您需要获取与时间相关的上下文对象(如data_interval_start),可以在虚拟环境中添加pendulum和
lazy_object_proxy。
重要
为执行而定义的Python函数体会从DAG中被提取出来,放入一个临时文件中,不包含周围的代码。 如示例所示,您需要重新添加所有导入,并且不能依赖全局Python上下文中的变量。
如果想将变量传递到经典的ExternalPythonOperator中,请使用
op_args和op_kwargs。
Python分支操作符¶
使用PythonBranchOperator来执行Python分支任务。
提示
推荐使用@task.branch装饰器而非传统的PythonBranchOperator来执行Python代码。
@task.branch()
def branching(choices: list[str]) -> str:
return f"branch_{random.choice(choices)}"
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
参数传递和模板选项与PythonOperator相同。
BranchPythonVirtualenvOperator¶
使用BranchPythonVirtualenvOperator装饰器来执行Python分支任务,它是PythonBranchOperator在虚拟环境中执行的混合体。
提示
推荐使用@task.branch_virtualenv装饰器而非传统的
BranchPythonVirtualenvOperator来执行Python代码。
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()
@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())
def branch_with_venv(choices):
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
参数传递和模板选项与PythonVirtualenvOperator相同。
BranchExternalPythonOperator¶
使用BranchExternalPythonOperator来执行Python分支任务,它是PythonBranchOperator与外部Python环境执行的混合体。
提示
推荐使用@task.branch_external_python装饰器而非传统的BranchExternalPythonOperator来执行Python代码。
@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
import random
return f"ext_py_{random.choice(choices)}"
def branch_with_external_python(choices):
import random
return f"ext_py_{random.choice(choices)}"
branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
)
参数传递和模板选项与ExternalPythonOperator相同。
ShortCircuitOperator¶
使用ShortCircuitOperator来控制当满足某个条件或获得真值时,流水线是否继续执行。
该条件的评估和真值是通过可调用对象的输出来完成的。如果可调用对象返回True或真值,则允许管道继续执行,并将输出的XCom推送出去。如果输出为False或假值,则管道将根据配置的短路逻辑被短路(后续会详细介绍)。在下面的示例中,跟随"condition_is_true"任务的任务将会执行,而"condition_is_false"任务下游的任务将被跳过。
提示
推荐使用@task.short_circuit装饰器而非传统的ShortCircuitOperator来通过Python可调用对象实现工作流短路。
@task.short_circuit()
def check_condition(condition):
return condition
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)
cond_true = ShortCircuitOperator(
task_id="condition_is_True",
python_callable=lambda: True,
)
cond_false = ShortCircuitOperator(
task_id="condition_is_False",
python_callable=lambda: False,
)
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
可以配置“短路”行为以遵循或忽略为下游任务定义的触发规则。如果将ignore_downstream_trigger_rules设为True(默认配置),则会跳过所有下游任务而不考虑为任务定义的trigger_rule。若将此参数设为False,则跳过直接下游任务,但仍遵循为其他后续下游任务指定的trigger_rule。在这种短路配置中,操作符假定直接下游任务是特意要被跳过的,但其他后续任务可能不需要跳过。当只需要短路管道中的部分而非短路任务后的所有任务时,这种配置特别有用。
在下面的示例中,请注意“short_circuit”任务被配置为遵循下游触发规则。这意味着虽然“short_circuit”任务之后的任务将被跳过(因为装饰函数返回False),但“task_7”仍会执行,因为它被设置为在上游任务完成运行后执行(无论状态如何,即TriggerRule.ALL_DONE触发规则)。
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
condition=False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = ShortCircuitOperator(
task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
传入参数¶
像普通Python函数一样,向@task.short_circuit装饰的函数传递额外参数。
模板化¶
Jinja模板可以按照PythonOperator中描述的方式使用。
PythonSensor¶
PythonSensor执行一个可调用对象,并等待其返回值变为True。
提示
推荐使用@task.sensor装饰器而非传统的PythonSensor来执行Python可调用对象以检查True条件。
# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)
t10 = PythonSensor(
task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
)