Apache Airflow

Apache Airflow 是一个开源平台,用于编写、调度和监控数据和计算工作流。Airflow 使用 Python 创建工作流,这些工作流可以轻松地进行调度和监控。

Qdrant 可作为 Airflow 中的 provider 来与数据库进行交互。

先决条件

在配置Airflow之前,您需要:

  1. 要连接的Qdrant实例。您可以在我们的安装指南中设置一个。

  2. 一个正在运行的Airflow实例。您可以使用他们的快速入门指南

安装

你可以通过在Airflow shell中运行pip install apache-airflow-providers-qdrant来安装Qdrant提供程序。

注意:您需要重新启动您的Airflow会话以使提供程序可用。

设置连接

打开Airflow UI中的Admin-> Connections部分。点击Create链接以创建一个新的Qdrant 连接

Qdrant connection

您还可以使用环境变量外部秘密后端来设置连接。

Qdrant 钩子

Airflow钩子是一个特定API的抽象,它允许Airflow与外部系统进行交互。

from airflow.providers.qdrant.hooks.qdrant import QdrantHook

hook = QdrantHook(conn_id="qdrant_connection")

hook.verify_connection()

一个qdrant_client#QdrantClient实例可以通过QdrantHook实例的@property conn在你的Airflow工作流中使用。

from qdrant_client import models

hook.conn.count("<COLLECTION_NAME>")

hook.conn.upsert(
    "<COLLECTION_NAME>",
    points=[
        models.PointStruct(id=32, vector=[0.32, 0.12, 0.123], payload={"color": "red"})
    ],
)

Qdrant 数据摄取操作符

Qdrant 提供程序还提供了一个便捷的操作符,用于将数据上传到 Qdrant 集合,该操作符内部使用了 Qdrant 钩子。

from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator

vectors = [
    [0.11, 0.22, 0.33, 0.44],
    [0.55, 0.66, 0.77, 0.88],
    [0.88, 0.11, 0.12, 0.13],
]
ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"]
payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}]

QdrantIngestOperator(
    conn_id="qdrant_connection",
    task_id="qdrant_ingest",
    collection_name="<COLLECTION_NAME>",
    vectors=vectors,
    ids=ids,
    payload=payload,
)

参考

这个页面有用吗?

感谢您的反馈!🙏

我们很抱歉听到这个消息。😔 你可以在GitHub上编辑这个页面,或者创建一个GitHub问题。