定义一个操作符额外链接¶
如果想为操作符添加更多链接,可以通过插件或提供者包进行定义。 额外的链接将在网格视图的任务详情页面中显示。
以下代码展示了如何通过插件为操作符添加额外链接:
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
class GoogleLink(BaseOperatorLink):
name = "Google"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (GoogleLink(),)
def __init__(self, **kwargs):
super().__init__(**kwargs)
def execute(self, context):
self.log.info("Hello World!")
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
GoogleLink(),
]
注意
Operator Extra Links需要通过Airflow插件或自定义Airflow Provider注册才能生效。
您还可以添加一个全局操作符额外链接,该链接将通过Airflow插件或Airflow提供程序对所有操作符可用。您可以在插件接口和提供程序包中了解更多信息。
您可以通过社区管理的提供者查看所有可用的额外链接,详见 Extra Links。
添加或覆盖现有Operator的链接¶
你也可以通过Airflow插件或自定义提供程序,为现有操作符添加(或覆盖)额外的链接。
例如,以下Airflow插件将在所有使用GCSToS3Operator运算符的任务上添加一个操作链接。
为现有操作器添加操作器链接
plugins/extra_link.py:
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
class S3LogLink(BaseOperatorLink):
name = "S3"
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
operators = [GCSToS3Operator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
# Invalid bucket name because upper case letters and underscores are used
# This will not be a valid bucket in any region
bucket_name = "Invalid_Bucket_Name"
return "https://s3.amazonaws.com/airflow-logs/{bucket_name}/{dag_id}/{task_id}/{run_id}".format(
bucket_name=bucket_name,
dag_id=operator.dag_id,
task_id=operator.task_id,
run_id=ti_key.run_id,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
S3LogLink(),
]
覆盖现有Operator的链接:
也可以通过插件替换操作符上的内置链接。例如
BigQueryExecuteQueryOperator包含指向Google Cloud控制台的链接,但如果我们想更改该链接,可以:
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = "BigQuery Console"
operators = [BigQueryOperator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
job_id = XCom.get_one(ti_key=ti_key, key="job_id")
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
BigQueryConsoleLink(),
]
通过Providers添加Operator链接
如Provider packages中所述,当您创建自己的Airflow Provider时,可以指定提供额外链接功能的操作符列表。这是通过在Provider包的元数据中存储的provider-info信息里包含操作符类名来实现的:
您的provider-info字典中所需的示例元数据(这是当前由apache-airflow-providers-google提供程序返回的元数据的一部分):
extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
您可以包含任意多个带有额外链接的操作符。