使用Airflow和Astronomer进行语义查询
| 时间: 45 分钟 | 级别: 中级 |
|---|
在本教程中,您将使用Qdrant作为provider在Apache Airflow中,这是一个开源工具,允许您设置数据工程工作流。
您将用Python编写管道作为DAG(有向无环图)。通过这种方式,您可以利用Python强大的功能和库来实现几乎任何数据管道需求。
天文学家 是一个托管平台,通过其易于使用的CLI和广泛的自动化功能,简化了开发和部署Airflow项目的过程。
当在Qdrant中基于数据事件运行操作或构建并行任务以生成向量嵌入时,Airflow非常有用。通过使用Airflow,您可以为您的管道设置监控和警报,以实现完全的可观察性。
先决条件
请确保您已准备好以下内容:
- 一个正在运行的Qdrant实例。我们将使用来自https://cloud.qdrant.io的免费实例。
- 天文学家CLI。查找安装说明这里。
- 一个HuggingFace 令牌用于生成嵌入。
实现
我们将构建一个DAG,为我们的数据语料库并行生成嵌入,并根据用户输入执行语义检索。
设置项目
Astronomer CLI 使得设置 Airflow 项目变得非常简单:
mkdir qdrant-airflow-tutorial && cd qdrant-airflow-tutorial
astro dev init
此命令生成您需要在本地运行Airflow的所有项目文件。您可以找到一个名为dags的目录,这是我们放置Python DAG文件的地方。
要在Airflow中使用Qdrant,请通过将以下内容添加到requirements.txt文件来安装Qdrant Airflow提供程序
apache-airflow-providers-qdrant
配置凭据
我们可以使用Airflow UI、环境变量或airflow_settings.yml文件来设置提供者连接。
将以下内容添加到项目中的.env文件中。根据您的凭据替换这些值。
HUGGINGFACE_TOKEN="<YOUR_HUGGINGFACE_ACCESS_TOKEN>"
AIRFLOW_CONN_QDRANT_DEFAULT='{
"conn_type": "qdrant",
"host": "xyz-example.eu-central.aws.cloud.qdrant.io:6333",
"password": "<YOUR_QDRANT_API_KEY>"
}'
添加数据语料库
让我们添加一些示例数据来进行操作。将以下内容粘贴到include目录中名为books.txt的文件中。
1 | To Kill a Mockingbird (1960) | fiction | Harper Lee's Pulitzer Prize-winning novel explores racial injustice and moral growth through the eyes of young Scout Finch in the Deep South.
2 | Harry Potter and the Sorcerer's Stone (1997) | fantasy | J.K. Rowling's magical tale follows Harry Potter as he discovers his wizarding heritage and attends Hogwarts School of Witchcraft and Wizardry.
3 | The Great Gatsby (1925) | fiction | F. Scott Fitzgerald's classic novel delves into the glitz, glamour, and moral decay of the Jazz Age through the eyes of narrator Nick Carraway and his enigmatic neighbour, Jay Gatsby.
4 | 1984 (1949) | dystopian | George Orwell's dystopian masterpiece paints a chilling picture of a totalitarian society where individuality is suppressed and the truth is manipulated by a powerful regime.
5 | The Catcher in the Rye (1951) | fiction | J.D. Salinger's iconic novel follows disillusioned teenager Holden Caulfield as he navigates the complexities of adulthood and society's expectations in post-World War II America.
6 | Pride and Prejudice (1813) | romance | Jane Austen's beloved novel revolves around the lively and independent Elizabeth Bennet as she navigates love, class, and societal expectations in Regency-era England.
7 | The Hobbit (1937) | fantasy | J.R.R. Tolkien's adventure follows Bilbo Baggins, a hobbit who embarks on a quest with a group of dwarves to reclaim their homeland from the dragon Smaug.
8 | The Lord of the Rings (1954-1955) | fantasy | J.R.R. Tolkien's epic fantasy trilogy follows the journey of Frodo Baggins to destroy the One Ring and defeat the Dark Lord Sauron in the land of Middle-earth.
9 | The Alchemist (1988) | fiction | Paulo Coelho's philosophical novel follows Santiago, an Andalusian shepherd boy, on a journey of self-discovery and spiritual awakening as he searches for a hidden treasure.
10 | The Da Vinci Code (2003) | mystery/thriller | Dan Brown's gripping thriller follows symbologist Robert Langdon as he unravels clues hidden in art and history while trying to solve a murder mystery with far-reaching implications.
现在,进入黑客部分 - 编写我们的Airflow DAG!
编写DAG
我们将以下内容添加到dags目录中的books_recommend.py文件中。让我们来看看它为每个任务做了什么。
import os
import requests
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.qdrant.hooks.qdrant import QdrantHook
from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator
from pendulum import datetime
from qdrant_client import models
QDRANT_CONNECTION_ID = "qdrant_default"
DATA_FILE_PATH = "include/books.txt"
COLLECTION_NAME = "airflow_tutorial_collection"
EMBEDDING_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_DIMENSION = 384
SIMILARITY_METRIC = models.Distance.COSINE
def embed(text: str) -> list:
HUGGINFACE_URL = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{EMBEDDING_MODEL_ID}"
response = requests.post(
HUGGINFACE_URL,
headers={"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"},
json={"inputs": [text], "options": {"wait_for_model": True}},
)
return response.json()[0]
@dag(
dag_id="books_recommend",
start_date=datetime(2023, 10, 18),
schedule=None,
catchup=False,
params={"preference": Param("Something suspenseful and thrilling.", type="string")},
)
def recommend_book():
@task
def import_books(text_file_path: str) -> list:
data = []
with open(text_file_path, "r") as f:
for line in f:
_, title, genre, description = line.split("|")
data.append(
{
"title": title.strip(),
"genre": genre.strip(),
"description": description.strip(),
}
)
return data
@task
def init_collection():
hook = QdrantHook(conn_id=QDRANT_CONNECTION_ID)
if not hook.conn..collection_exists(COLLECTION_NAME):
hook.conn.create_collection(
COLLECTION_NAME,
vectors_config=models.VectorParams(
size=EMBEDDING_DIMENSION, distance=SIMILARITY_METRIC
),
)
@task
def embed_description(data: dict) -> list:
return embed(data["description"])
books = import_books(text_file_path=DATA_FILE_PATH)
embeddings = embed_description.expand(data=books)
qdrant_vector_ingest = QdrantIngestOperator(
conn_id=QDRANT_CONNECTION_ID,
task_id="qdrant_vector_ingest",
collection_name=COLLECTION_NAME,
payload=books,
vectors=embeddings,
)
@task
def embed_preference(**context) -> list:
user_mood = context["params"]["preference"]
response = embed(text=user_mood)
return response
@task
def search_qdrant(
preference_embedding: list,
) -> None:
hook = QdrantHook(conn_id=QDRANT_CONNECTION_ID)
result = hook.conn.query_points(
collection_name=COLLECTION_NAME,
query=preference_embedding,
limit=1,
with_payload=True,
).points
print("Book recommendation: " + result[0].payload["title"])
print("Description: " + result[0].payload["description"])
chain(
init_collection(),
qdrant_vector_ingest,
search_qdrant(embed_preference()),
)
recommend_book()
import_books: 此任务读取包含书籍信息(如标题、类型和描述)的文本文件,然后将数据作为字典列表返回。
init_collection: 此任务在Qdrant数据库中初始化一个集合,我们将在此存储书籍描述的向量表示。
embed_description: 这是一个动态任务,它为列表中的每本书创建一个映射任务实例。该任务使用embed函数为每个描述生成向量嵌入。要使用不同的嵌入模型,您可以调整EMBEDDING_MODEL_ID和EMBEDDING_DIMENSION的值。
embed_user_preference: 在这里,我们获取用户的输入,并使用与书籍描述相同的预训练模型将其转换为向量。
qdrant_vector_ingest: 此任务使用QdrantIngestOperator将书籍数据导入Qdrant集合,将每本书的描述与其对应的向量嵌入关联起来。
search_qdrant: 最后,此任务使用向量化的用户偏好在Qdrant数据库中进行搜索。它基于向量相似性在集合中找到最相关的书籍。
运行DAG
前往您的终端并运行
astro dev start
本地Airflow容器应该已经启动。您现在可以通过http://localhost:8080访问Airflow UI。通过点击books_recommend来访问我们的DAG。

点击右侧的PLAY按钮来运行DAG。系统会询问您的偏好输入,默认值已经预先填写。

在您的DAG运行完成后,您应该能够在search_qdrant任务的日志中看到搜索的输出。

这就是了,一个与Qdrant交互的Airflow管道!请随意调整和探索Airflow。下面有一些可能会派上用场的参考资料。
