序列化¶
为了实现任务间的数据交换(例如参数传递),Airflow需要对交换的数据进行序列化,并在下游任务需要时进行反序列化。序列化过程还确保网络服务器和调度器(与DAG处理器不同)无需读取DAG文件,这既出于安全考虑也提高了运行效率。
序列化是一项出人意料困难的工作。Python默认只支持基本类型的序列化,比如str和int,并且会遍历可迭代对象。当情况变得更加复杂时,就需要自定义序列化。
Airflow 开箱即用支持三种自定义序列化方式。基本类型会原样返回,不进行额外编码,例如 str 保持为 str。当对象不是基本类型(或其可迭代对象)时,Airflow 会在 airflow.serialization.serializers 命名空间中查找已注册的序列化器和反序列化器。如果未找到,则会检查类中是否存在 serialize() 方法,或在反序列化时查找 deserialize(data, version: int) 方法。最后,如果类被 @dataclass 或 @attr.define 装饰,则会使用这些装饰器提供的公共方法。
如果您想通过新的序列化器扩展Airflow,了解何时选择何种序列化方式非常重要。
由Airflow控制的对象,即位于airflow.*命名空间下的对象,例如
airflow.model.dag.DAG,或由开发者控制的对象如my.company.Foo,应首先检查它们是否可以用
@attr.define或@dataclass装饰。如果不可行,则应实现serialize
和deserialize方法。serialize方法应返回基本类型或字典。
它不需要序列化字典中的值,这部分会由系统处理,但键应为基本类型。
不受Airflow控制的对象,例如numpy.int16需要注册序列化器和反序列化器。
版本控制是必需的。除bytes外的原始类型可以直接返回,字典也可以。再次强调,dict的值不需要序列化,
但其键必须是原始类型。在实现注册的序列化器时,要特别注意
避免循环导入。通常可以通过使用str来填充序列化器列表来避免这种情况。
像这样:serializers = ["my.company.Foo"]而不是serializers = [Foo]。
注意
序列化和反序列化的性能取决于速度。尽可能使用内置函数如dict,避免使用类和其他复杂结构。
Airflow 对象¶
from typing import Any, ClassVar
class Foo:
__version__: ClassVar[int] = 1
def __init__(self, a, v) -> None:
self.a = a
self.b = {"x": v}
def serialize(self) -> dict[str, Any]:
return {
"a": self.a,
"b": self.b,
}
@staticmethod
def deserialize(data: dict[str, Any], version: int):
f = Foo(a=data["a"])
f.b = data["b"]
return f
已注册¶
from __future__ import annotations
from decimal import Decimal
from typing import TYPE_CHECKING
from airflow.utils.module_loading import qualname
if TYPE_CHECKING:
from airflow.serialization.serde import U
serializers = [
Decimal
] # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers # in some cases you might not have a deserializer (e.g. k8s pod)
__version__ = 1 # required
# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
if isinstance(o, Decimal):
name = qualname(o)
_, _, exponent = o.as_tuple()
if exponent >= 0: # No digits after the decimal point.
return int(o), name, __version__, True
# Technically lossy due to floating point errors, but the best we
# can do without implementing a custom encode function.
return float(o), name, __version__, True
return "", "", 0, False
# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
# always check version compatibility
if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")
if classname != qualname(Decimal):
raise TypeError(f"{classname} != {qualname(Decimal)}")
return Decimal(str(data))