序列化

为了实现任务间的数据交换(例如参数传递),Airflow需要对交换的数据进行序列化,并在下游任务需要时进行反序列化。序列化过程还确保网络服务器和调度器(与DAG处理器不同)无需读取DAG文件,这既出于安全考虑也提高了运行效率。

序列化是一项出人意料困难的工作。Python默认只支持基本类型的序列化,比如strint,并且会遍历可迭代对象。当情况变得更加复杂时,就需要自定义序列化。

Airflow 开箱即用支持三种自定义序列化方式。基本类型会原样返回,不进行额外编码,例如 str 保持为 str。当对象不是基本类型(或其可迭代对象)时,Airflow 会在 airflow.serialization.serializers 命名空间中查找已注册的序列化器和反序列化器。如果未找到,则会检查类中是否存在 serialize() 方法,或在反序列化时查找 deserialize(data, version: int) 方法。最后,如果类被 @dataclass@attr.define 装饰,则会使用这些装饰器提供的公共方法。

如果您想通过新的序列化器扩展Airflow,了解何时选择何种序列化方式非常重要。 由Airflow控制的对象,即位于airflow.*命名空间下的对象,例如 airflow.model.dag.DAG,或由开发者控制的对象如my.company.Foo,应首先检查它们是否可以用 @attr.define@dataclass装饰。如果不可行,则应实现serializedeserialize方法。serialize方法应返回基本类型或字典。 它不需要序列化字典中的值,这部分会由系统处理,但键应为基本类型。

不受Airflow控制的对象,例如numpy.int16需要注册序列化器和反序列化器。 版本控制是必需的。除bytes外的原始类型可以直接返回,字典也可以。再次强调,dict的值不需要序列化, 但其键必须是原始类型。在实现注册的序列化器时,要特别注意 避免循环导入。通常可以通过使用str来填充序列化器列表来避免这种情况。 像这样:serializers = ["my.company.Foo"]而不是serializers = [Foo]

注意

序列化和反序列化的性能取决于速度。尽可能使用内置函数如dict,避免使用类和其他复杂结构。

Airflow 对象

from typing import Any, ClassVar


class Foo:
    __version__: ClassVar[int] = 1

    def __init__(self, a, v) -> None:
        self.a = a
        self.b = {"x": v}

    def serialize(self) -> dict[str, Any]:
        return {
            "a": self.a,
            "b": self.b,
        }

    @staticmethod
    def deserialize(data: dict[str, Any], version: int):
        f = Foo(a=data["a"])
        f.b = data["b"]
        return f

已注册

from __future__ import annotations

from decimal import Decimal
from typing import TYPE_CHECKING

from airflow.utils.module_loading import qualname

if TYPE_CHECKING:
    from airflow.serialization.serde import U


serializers = [
    Decimal
]  # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers  # in some cases you might not have a deserializer (e.g. k8s pod)

__version__ = 1  # required


# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
    if isinstance(o, Decimal):
        name = qualname(o)
        _, _, exponent = o.as_tuple()
        if exponent >= 0:  # No digits after the decimal point.
            return int(o), name, __version__, True
            # Technically lossy due to floating point errors, but the best we
            # can do without implementing a custom encode function.
        return float(o), name, __version__, True

    return "", "", 0, False


# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
    # always check version compatibility
    if version > __version__:
        raise TypeError(f"serialized {version} of {classname} > {__version__}")

    if classname != qualname(Decimal):
        raise TypeError(f"{classname} != {qualname(Decimal)}")

    return Decimal(str(data))

这篇内容对您有帮助吗?