通过Confluent设置Kafka数据流
作者: M K Pavan Kumar,印度工艺与设计学院,库尔诺尔的研究学者。专注于幻觉缓解技术和RAG方法论的专家。 • GitHub • 中等
介绍
本指南将引导您完成安装和设置Qdrant Sink Connector的详细步骤,构建必要的基础设施,并创建一个实用的游乐场应用程序。通过本文,您将深入了解如何利用这一强大的集成来简化您的数据工作流程,最终提升您的数据驱动的实时语义搜索和RAG应用程序的性能和功能。
在这个例子中,原始数据将来自Azure Blob Storage和MongoDB。

图1:使用Kafka和Qdrant的实时变更数据捕获(CDC)。
架构:
源系统
架构始于源系统,由MongoDB和Azure Blob Storage表示。这些系统对于存储和管理原始数据至关重要。MongoDB是一种流行的NoSQL数据库,以其处理各种数据格式的灵活性和水平扩展能力而闻名。它广泛用于需要高性能和可扩展性的应用程序。另一方面,Azure Blob Storage是微软的云对象存储解决方案。它设计用于存储大量非结构化数据,如文本或二进制数据。使用源连接器从这些源中提取数据,这些连接器负责实时捕获变化并将其流式传输到Kafka中。
Kafka
在这个架构的核心是Kafka,一个能够每天处理数万亿事件的分布式事件流平台。Kafka作为一个中心枢纽,可以接收来自各种来源的数据,进行处理,并分发到各种下游系统。其容错和可扩展的设计确保了数据可以可靠地实时传输和处理。Kafka处理高吞吐量、低延迟数据流的能力使其成为实时数据处理和分析的理想选择。使用Confluent增强了Kafka的功能,提供了额外的工具和服务来管理Kafka集群和流处理。
Qdrant
处理后的数据随后被路由到Qdrant,这是一个高度可扩展的向量搜索引擎,专为相似性搜索设计。Qdrant在管理和搜索高维向量数据方面表现出色,这对于涉及机器学习和AI的应用至关重要,例如推荐系统、图像识别和自然语言处理。Kafka的Qdrant Sink Connector在这里起着关键作用,实现了Kafka和Qdrant之间的无缝集成。该连接器允许将向量数据实时摄取到Qdrant中,确保数据始终是最新的,并准备好进行高性能的相似性搜索。
集成和管道的重要性
这些组件的集成形成了一个强大且高效的数据流管道。Qdrant Sink Connector确保通过Kafka流动的数据能够持续地自动摄入Qdrant,无需人工干预。这种实时集成对于依赖最新数据进行决策和分析的应用程序至关重要。通过结合MongoDB和Azure Blob Storage的数据存储优势、Kafka的数据流处理能力以及Qdrant的向量搜索功能,该管道为实时管理和处理大量数据提供了一个强大的解决方案。该架构的可扩展性、容错性和实时处理能力是其有效性的关键,使其成为现代数据驱动应用程序的多功能解决方案。
Confluent Kafka 平台的安装
要安装Confluent Kafka平台(本地自管理),请按照以下3个简单步骤操作:
下载并解压分发文件:
- 访问 Confluent 安装页面。
- 下载分发文件(tar、zip等)。
- 使用以下命令解压下载的文件:
tar -xvf confluent-<version>.tar.gz
或
unzip confluent-<version>.zip
配置环境变量:
# Set CONFLUENT_HOME to the installation directory:
export CONFLUENT_HOME=/path/to/confluent-<version>
# Add Confluent binaries to your PATH
export PATH=$CONFLUENT_HOME/bin:$PATH
本地运行Confluent平台:
# Start the Confluent Platform services:
confluent local start
# Stop the Confluent Platform services:
confluent local stop
安装 Qdrant:
要安装并运行Qdrant(本地自托管),您可以使用Docker,这简化了过程。首先,确保您的系统上已安装Docker。然后,您可以从Docker Hub拉取Qdrant镜像,并使用以下命令运行它:
docker pull qdrant/qdrant
docker run -p 6334:6334 -p 6333:6333 qdrant/qdrant
这将下载Qdrant镜像并启动一个可通过http://localhost:6333访问的Qdrant实例。有关更详细的说明和替代安装方法,请参阅Qdrant安装文档。
安装Qdrant-Kafka Sink连接器:
要使用Confluent Hub安装Qdrant Kafka连接器,您可以使用简单的confluent-hub install命令。此命令通过消除手动配置文件操作的需要来简化过程。要安装Qdrant Kafka连接器版本1.1.0,请在终端中执行以下命令:
confluent-hub install qdrant/qdrant-kafka:1.1.0
此命令直接从Confluent Hub下载并安装指定的连接器到您的Confluent Platform或Kafka Connect环境中。安装过程确保所有必要的依赖项自动处理,从而实现Qdrant Kafka连接器与现有设置的无缝集成。安装完成后,可以使用Confluent Control Center或Kafka Connect REST API配置和管理连接器,从而在Kafka和Qdrant之间实现高效的数据流,无需复杂的手动设置。

图2:安装后显示源连接器和接收器连接器的本地Confluent平台。
确保连接器安装后按以下方式配置。请记住,您的key.converter和value.converter对于kafka安全地将消息从主题传递到qdrant非常重要。
{
"name": "QdrantSinkConnectorConnector_0",
"config": {
"value.converter.schemas.enable": "false",
"name": "QdrantSinkConnectorConnector_0",
"connector.class": "io.qdrant.kafka.QdrantSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "topic_62,qdrant_kafka.docs",
"errors.deadletterqueue.topic.name": "dead_queue",
"errors.deadletterqueue.topic.replication.factor": "1",
"qdrant.grpc.url": "http://localhost:6334",
"qdrant.api.key": "************"
}
}
MongoDB的安装
为了让Kafka连接MongoDB作为源,您的MongoDB实例应该在replicaSet模式下运行。以下是docker compose文件,它将启动一个单节点的replicaSet实例的MongoDB。
version: "3.8"
services:
mongo1:
image: mongo:7.0
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'host.docker.internal:27017'}]}) }" | mongosh --port 27017 --quiet
interval: 5s
timeout: 30s
start_period: 0s
start_interval: 1s
retries: 30
volumes:
- "mongo1_data:/data/db"
- "mongo1_config:/data/configdb"
volumes:
mongo1_data:
mongo1_config:
同样地,按照以下方式安装和配置源连接器。
confluent-hub install mongodb/kafka-connect-mongodb:latest
安装MongoDB连接器后,连接器配置应如下所示:
{
"name": "MongoSourceConnectorConnector_0",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"connection.uri": "mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true",
"database": "qdrant_kafka",
"collection": "docs",
"publish.full.document.only": "true",
"topic.namespace.map": "{\"*\":\"qdrant_kafka.docs\"}",
"copy.existing": "true"
}
}
游乐场应用程序
随着基础设施的完全设置完成,现在是我们创建一个简单应用程序并检查我们的设置的时候了。我们应用程序的目标是将数据插入到Mongodb中,并最终通过变更数据捕获(CDC)将其摄取到Qdrant中。
requirements.txt
fastembed==0.3.1
pymongo==4.8.0
qdrant_client==1.10.1
project_root_folder/main.py
这只是一个示例代码。然而,根据您的使用情况,它可以扩展到数百万次操作。
from pymongo import MongoClient
from utils.app_utils import create_qdrant_collection
from fastembed import TextEmbedding
collection_name: str = 'test'
embed_model_name: str = 'snowflake/snowflake-arctic-embed-s'
# Step 0: create qdrant_collection
create_qdrant_collection(collection_name=collection_name, embed_model=embed_model_name)
# Step 1: Connect to MongoDB
client = MongoClient('mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true')
# Step 2: Select Database
db = client['qdrant_kafka']
# Step 3: Select Collection
collection = db['docs']
# Step 4: Create a Document to Insert
description = "qdrant is a high available vector search engine"
embedding_model = TextEmbedding(model_name=embed_model_name)
vector = next(embedding_model.embed(documents=description)).tolist()
document = {
"collection_name": collection_name,
"id": 1,
"vector": vector,
"payload": {
"name": "qdrant",
"description": description,
"url": "https://qdrant.tech/documentation"
}
}
# Step 5: Insert the Document into the Collection
result = collection.insert_one(document)
# Step 6: Print the Inserted Document's ID
print("Inserted document ID:", result.inserted_id)
project_root_folder/utils/app_utils.py
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333", api_key="<YOUR_KEY>")
dimension_dict = {"snowflake/snowflake-arctic-embed-s": 384}
def create_qdrant_collection(collection_name: str, embed_model: str):
if not client.collection_exists(collection_name=collection_name):
client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(size=dimension_dict.get(embed_model), distance=models.Distance.COSINE)
)
在我们运行应用程序之前,以下是MongoDB和Qdrant数据库的状态。

图3:初始状态:MongodDB的docs集合中没有名为test的集合和no data。
一旦你运行代码,数据就会进入Mongodb,CDC会被触发,最终Qdrant会接收到这些数据。

图4:测试Qdrant集合是自动创建的。

图5:数据被插入到MongoDB和Qdrant中。
结论:
总之,使用Qdrant Sink Connector将Kafka与Qdrant集成,为实时数据流和处理提供了一个无缝且高效的解决方案。这种设置不仅增强了数据管道的能力,还确保了高维向量数据被持续索引并随时可用于相似性搜索。通过遵循安装和设置指南,您可以轻松建立一个从源系统(如MongoDB和Azure Blob Storage)通过Kafka到Qdrant的稳健数据流。这种架构使现代应用程序能够利用实时数据洞察和高级搜索能力,为创新的数据驱动解决方案铺平道路。
