分布式部署
自版本v0.8.0起,Qdrant支持分布式部署模式。 在此模式下,多个Qdrant服务相互通信,以在节点之间分发数据,从而扩展存储能力并提高稳定性。
我应该运行多少个Qdrant节点?
Qdrant节点的理想数量取决于您在成本节约、弹性和性能/可扩展性之间的相对重视程度。
优先考虑成本节约:如果成本对您来说最重要,可以运行单个Qdrant节点。这不推荐用于生产环境。缺点:
- 弹性:用户将在节点重启期间遇到停机时间,除非您有备份或快照,否则无法恢复。
- 性能:仅限于单个服务器的资源。
优先考虑弹性:如果弹性对您来说最重要,请运行一个具有三个或更多节点和两个或更多分片副本的Qdrant集群。具有三个或更多节点和复制的集群即使在一个节点宕机的情况下也能执行所有操作。此外,它们从负载均衡中获得性能优势,并且可以从一个节点的永久丢失中恢复,而无需备份或快照(但仍强烈建议备份)。这是生产环境中最推荐的配置。缺点:
- 成本:较大的集群比较小的集群成本更高,这是此配置的唯一缺点。
平衡成本、弹性和性能:运行一个具有复制分片的两节点Qdrant集群,即使在维护事件期间一个节点宕机,集群也能响应大多数读/写请求。拥有两个节点也意味着比单节点集群更高的性能,同时仍然比三节点集群更便宜。缺点:
- 弹性(正常运行时间):当一个节点宕机时,集群无法对集合执行操作。这些操作需要超过50%的节点在运行,因此只有在3+节点集群中才可能实现。由于创建、编辑和删除集合通常是罕见的操作,许多用户认为这个缺点可以忽略不计。
- 弹性(数据完整性):如果两个节点中的一个节点的数据永久丢失或损坏,除了快照或备份外,无法恢复。只有3+节点集群可以从单个节点的永久丢失中恢复,因为恢复操作需要超过50%的集群健康。
- 成本:复制分片需要存储两份数据副本。
- 性能:随着添加更多节点,Qdrant集群的最大性能会增加。
总之,单节点集群最适合非生产工作负载,复制的3+节点集群是黄金标准,而复制的2节点集群则达到了良好的平衡。
在自托管的Qdrant中启用分布式模式
要启用分布式部署 - 在配置中启用集群模式或使用环境变量:QDRANT__CLUSTER__ENABLED=true。
cluster:
# Use `enabled: true` to run Qdrant in distributed deployment mode
enabled: true
# Configuration of the inter-cluster communication
p2p:
# Port for internal communication between peers
port: 6335
# Configuration related to distributed consensus algorithm
consensus:
# How frequently peers should ping each other.
# Setting this parameter to lower value will allow consensus
# to detect disconnected node earlier, but too frequent
# tick period may create significant network and CPU overhead.
# We encourage you NOT to change this parameter unless you know what you are doing.
tick_period_ms: 100
默认情况下,Qdrant 将使用端口 6335 进行内部通信。
集群内的所有节点都应在此端口上可访问,但请确保将此端口与外部访问隔离,因为它可能用于执行写操作。
此外,您必须为第一个对等节点提供--uri标志,以便它可以告诉其他节点如何到达它:
./qdrant --uri 'http://qdrant_node_1:6335'
集群中的后续节点必须知道现有集群的至少一个节点,以便通过它与集群的其余部分同步。
为此,需要为他们提供一个引导URL:
./qdrant --bootstrap 'http://qdrant_node_1:6335'
新对等节点自身的URL将根据其请求的IP地址自动计算。
但也可以使用--uri参数单独提供它们。
USAGE:
qdrant [OPTIONS]
OPTIONS:
--bootstrap <URI>
Uri of the peer to bootstrap from in case of multi-peer deployment. If not specified -
this peer will be considered as a first in a new deployment
--uri <URI>
Uri of this peer. Other peers should be able to reach it by this uri.
This value has to be supplied if this is the first peer in a new deployment.
In case this is not the first peer and it bootstraps the value is optional. If not
supplied then qdrant will take internal grpc port from config and derive the IP address
of this peer on bootstrap peer (receiving side)
成功同步后,您可以通过REST API观察集群的状态:
GET /cluster
示例结果:
{
"result": {
"status": "enabled",
"peer_id": 11532566549086892000,
"peers": {
"9834046559507417430": {
"uri": "http://172.18.0.3:6335/"
},
"11532566549086892528": {
"uri": "http://qdrant_node_1:6335/"
}
},
"raft_info": {
"term": 1,
"commit": 4,
"pending_operations": 1,
"leader": 11532566549086892000,
"role": "Leader"
}
},
"status": "ok",
"time": 5.731e-06
}
请注意,启用分布式模式不会自动复制您的数据。有关下一步操作,请参阅使用新的分布式Qdrant集群部分。
在Qdrant Cloud中启用分布式模式
为了获得最佳效果,首先确保您的集群运行的是Qdrant v1.7.4或更高版本。旧版本的Qdrant确实支持分布式模式,但v1.7.4中的改进使得分布式集群在中断期间更具弹性。
在Qdrant Cloud控制台中,点击“Scale Up”以将您的集群大小增加到>1。Qdrant Cloud会自动配置分布式模式设置。
在扩展过程完成后,您将拥有一个新的空节点与现有节点一起运行。要将数据复制到这个新的空节点中,请参阅下一节。
利用新的分布式Qdrant集群
当您启用分布式模式并扩展到两个或更多节点时,您的数据不会自动移动到新节点;新节点最初是空的。要利用您的新空节点,请执行以下操作之一:
- 通过将复制因子设置为2或更多,并将分片数量设置为节点数的倍数来创建一个新的复制集合。
- 如果您现有的集合不包含每个节点的足够分片,您必须按照前一个要点中描述的方式创建一个新集合。
- 如果您已经为每个节点准备了足够的分片,并且只需要复制您的数据,请按照创建新分片副本的指示操作。
- 如果您已经为每个节点准备了足够的分片,并且您的数据已经复制,您可以通过移动分片将数据(无需复制)移动到新节点上。
Raft
Qdrant 使用 Raft 共识协议来维护集群拓扑和集合结构的一致性。
另一方面,对点的操作不通过共识基础设施进行。 Qdrant 不打算提供强事务保证,这使得它能够以低开销执行点操作。 实际上,这意味着 Qdrant 不保证原子分布式更新,但允许您等待直到操作完成以查看写入结果。
集合操作,相反,是共识的一部分,它保证了所有操作都是持久的,并最终由所有节点执行。 实际上,这意味着大多数节点在服务执行操作之前就哪些操作应该应用达成一致。
实际上,这意味着如果集群处于过渡状态——无论是故障后选举新领导者还是启动,集合更新操作都将被拒绝。
您可以使用集群REST API来检查共识的状态。
分片
Qdrant 中的集合由一个或多个分片组成。 分片是点的独立存储,能够执行集合提供的所有操作。 有两种方法可以在分片之间分配点:
自动分片:通过使用一致性哈希算法将点分布在分片之间,使得分片管理不相交的点子集。这是默认行为。
用户自定义分片: 自v1.7.0起可用 - 每个点都被上传到特定的分片,这样操作可以只命中它们需要的分片。即使在这种分布下,分片仍然确保拥有不重叠的点子集。查看更多…
每个节点通过共识协议知道集合的所有部分存储在哪里,因此当你向一个Qdrant节点发送搜索请求时,它会自动查询所有其他节点以获取完整的搜索结果。
选择正确的分片数量
当你创建一个集合时,Qdrant会将集合分割成shard_number个分片。如果未设置,shard_number会在集合创建时设置为集群中的节点数量。shard_number在不重新创建集合的情况下无法更改。
PUT /collections/{collection_name}
{
"vectors": {
"size": 300,
"distance": "Cosine"
},
"shard_number": 6
}
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333")
client.create_collection(
collection_name="{collection_name}",
vectors_config=models.VectorParams(size=300, distance=models.Distance.COSINE),
shard_number=6,
)
import { QdrantClient } from "@qdrant/js-client-rest";
const client = new QdrantClient({ host: "localhost", port: 6333 });
client.createCollection("{collection_name}", {
vectors: {
size: 300,
distance: "Cosine",
},
shard_number: 6,
});
use qdrant_client::qdrant::{CreateCollectionBuilder, Distance, VectorParamsBuilder};
use qdrant_client::Qdrant;
let client = Qdrant::from_url("http://localhost:6334").build()?;
client
.create_collection(
CreateCollectionBuilder::new("{collection_name}")
.vectors_config(VectorParamsBuilder::new(300, Distance::Cosine))
.shard_number(6),
)
.await?;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Collections.CreateCollection;
import io.qdrant.client.grpc.Collections.Distance;
import io.qdrant.client.grpc.Collections.VectorParams;
import io.qdrant.client.grpc.Collections.VectorsConfig;
QdrantClient client =
new QdrantClient(QdrantGrpcClient.newBuilder("localhost", 6334, false).build());
client
.createCollectionAsync(
CreateCollection.newBuilder()
.setCollectionName("{collection_name}")
.setVectorsConfig(
VectorsConfig.newBuilder()
.setParams(
VectorParams.newBuilder()
.setSize(300)
.setDistance(Distance.Cosine)
.build())
.build())
.setShardNumber(6)
.build())
.get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
var client = new QdrantClient("localhost", 6334);
await client.CreateCollectionAsync(
collectionName: "{collection_name}",
vectorsConfig: new VectorParams { Size = 300, Distance = Distance.Cosine },
shardNumber: 6
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
CollectionName: "{collection_name}",
VectorsConfig: qdrant.NewVectorsConfig(&qdrant.VectorParams{
Size: 300,
Distance: qdrant.Distance_Cosine,
}),
ShardNumber: qdrant.PtrOf(uint32(6)),
})
为了确保集群中的所有节点都能均匀利用,分片的数量必须是你当前在集群中运行的节点数的倍数。
旁注:多租户等高级用例可能需要不均匀的分片分布。参见多租户。
我们建议每个节点至少创建2个分片,以便未来扩展时无需重新分片。应避免重新分片,因为这需要创建一个新的集合。Qdrant的未来版本计划支持原地重新分片。
如果您预计会有大量增长,我们建议使用12个分片,因为您可以从1个节点扩展到2、3、6和12个节点,而无需重新分片。在小型集群中拥有超过12个分片可能不值得性能开销。
当集合首次创建时,分片会均匀分布在所有现有节点上,但如果集群大小或复制因子发生变化,Qdrant不会自动重新平衡分片(因为这在大型集群上是一项昂贵的操作)。有关在扩展操作后如何移动分片的信息,请参阅下一节。
移动分片
自 v0.9.0 起可用
Qdrant 允许在集群中的节点之间移动分片,并从集群中移除节点。此功能解锁了在不中断服务的情况下动态调整集群大小的能力。它还允许您在不中断服务的情况下升级或迁移节点。
Qdrant 提供了关于集群中当前分片分布的信息,通过 集合集群信息API。
使用更新集合集群设置API来启动分片转移:
POST /collections/{collection_name}/cluster
{
"move_shard": {
"shard_id": 0,
"from_peer_id": 381894127,
"to_peer_id": 467122995
}
}
在转移启动后,服务将根据使用的转移方法进行处理,保持两个分片同步。一旦转移完成,旧的分片将从源节点中删除。
如果你想缩小集群规模,你可以将所有分片从对等节点移开,然后使用移除对等体 API移除对等节点。
DELETE /cluster/peer/{peer_id}
之后,Qdrant 将从共识中排除该节点,实例将准备好关闭。
用户自定义分片
自 v1.7.0 版本起可用
Qdrant 允许您为每个点单独指定分片。如果您想控制数据的分片放置,以便操作仅命中实际需要的分片子集,此功能非常有用。在大集群中,这可以显著提高不需要扫描整个集合的操作的性能。
此功能的一个明确用例是管理多租户集合,其中每个租户(无论是用户还是组织)都被假定为隔离的,因此他们可以将数据存储在单独的分片中。
要启用用户定义的分片,请在集合创建期间将sharding_method设置为custom:
PUT /collections/{collection_name}
{
"shard_number": 1,
"sharding_method": "custom"
// ... other collection parameters
}
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333")
client.create_collection(
collection_name="{collection_name}",
shard_number=1,
sharding_method=models.ShardingMethod.CUSTOM,
# ... other collection parameters
)
client.create_shard_key("{collection_name}", "{shard_key}")
import { QdrantClient } from "@qdrant/js-client-rest";
const client = new QdrantClient({ host: "localhost", port: 6333 });
client.createCollection("{collection_name}", {
shard_number: 1,
sharding_method: "custom",
// ... other collection parameters
});
client.createShardKey("{collection_name}", {
shard_key: "{shard_key}"
});
use qdrant_client::qdrant::{
CreateCollectionBuilder, CreateShardKeyBuilder, CreateShardKeyRequestBuilder, Distance,
ShardingMethod, VectorParamsBuilder,
};
use qdrant_client::Qdrant;
let client = Qdrant::from_url("http://localhost:6334").build()?;
client
.create_collection(
CreateCollectionBuilder::new("{collection_name}")
.vectors_config(VectorParamsBuilder::new(300, Distance::Cosine))
.shard_number(1)
.sharding_method(ShardingMethod::Custom.into()),
)
.await?;
client
.create_shard_key(
CreateShardKeyRequestBuilder::new("{collection_name}")
.request(CreateShardKeyBuilder::default().shard_key("{shard_key".to_string())),
)
.await?;
import static io.qdrant.client.ShardKeyFactory.shardKey;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Collections.CreateCollection;
import io.qdrant.client.grpc.Collections.ShardingMethod;
import io.qdrant.client.grpc.Collections.CreateShardKey;
import io.qdrant.client.grpc.Collections.CreateShardKeyRequest;
QdrantClient client =
new QdrantClient(QdrantGrpcClient.newBuilder("localhost", 6334, false).build());
client
.createCollectionAsync(
CreateCollection.newBuilder()
.setCollectionName("{collection_name}")
// ... other collection parameters
.setShardNumber(1)
.setShardingMethod(ShardingMethod.Custom)
.build())
.get();
client.createShardKeyAsync(CreateShardKeyRequest.newBuilder()
.setCollectionName("{collection_name}")
.setRequest(CreateShardKey.newBuilder()
.setShardKey(shardKey("{shard_key}"))
.build())
.build()).get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
var client = new QdrantClient("localhost", 6334);
await client.CreateCollectionAsync(
collectionName: "{collection_name}",
// ... other collection parameters
shardNumber: 1,
shardingMethod: ShardingMethod.Custom
);
await client.CreateShardKeyAsync(
"{collection_name}",
new CreateShardKey { ShardKey = new ShardKey { Keyword = "{shard_key}", } }
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
CollectionName: "{collection_name}",
// ... other collection parameters
ShardNumber: qdrant.PtrOf(uint32(1)),
ShardingMethod: qdrant.ShardingMethod_Custom.Enum(),
})
client.CreateShardKey(context.Background(), "{collection_name}", &qdrant.CreateShardKey{
ShardKey: qdrant.NewShardKey("{shard_key}"),
})
在这种模式下,shard_number 表示每个分片键的分片数量,其中点将均匀分布。例如,如果您有10个分片键和具有这些设置的集合配置:
{
"shard_number": 1,
"sharding_method": "custom",
"replication_factor": 2
}
那么你将在集合中拥有1 * 10 * 2 = 20个物理分片。
物理分片需要大量资源,因此请确保您的自定义分片键具有低基数。
对于高基数键,建议使用根据负载分区代替。
要为每个点指定分片,您需要在upsert请求中提供shard_key字段:
PUT /collections/{collection_name}/points
{
"points": [
{
"id": 1111,
"vector": [0.1, 0.2, 0.3]
},
]
"shard_key": "user_1"
}
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333")
client.upsert(
collection_name="{collection_name}",
points=[
models.PointStruct(
id=1111,
vector=[0.1, 0.2, 0.3],
),
],
shard_key_selector="user_1",
)
client.upsert("{collection_name}", {
points: [
{
id: 1111,
vector: [0.1, 0.2, 0.3],
},
],
shard_key: "user_1",
});
use qdrant_client::qdrant::{PointStruct, UpsertPointsBuilder};
use qdrant_client::Payload;
client
.upsert_points(
UpsertPointsBuilder::new(
"{collection_name}",
vec![PointStruct::new(
111,
vec![0.1, 0.2, 0.3],
Payload::default(),
)],
)
.shard_key_selector("user_1".to_string()),
)
.await?;
import java.util.List;
import static io.qdrant.client.PointIdFactory.id;
import static io.qdrant.client.ShardKeySelectorFactory.shardKeySelector;
import static io.qdrant.client.VectorsFactory.vectors;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Points.PointStruct;
import io.qdrant.client.grpc.Points.UpsertPoints;
QdrantClient client =
new QdrantClient(QdrantGrpcClient.newBuilder("localhost", 6334, false).build());
client
.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName("{collection_name}")
.addAllPoints(
List.of(
PointStruct.newBuilder()
.setId(id(111))
.setVectors(vectors(0.1f, 0.2f, 0.3f))
.build()))
.setShardKeySelector(shardKeySelector("user_1"))
.build())
.get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
var client = new QdrantClient("localhost", 6334);
await client.UpsertAsync(
collectionName: "{collection_name}",
points: new List<PointStruct>
{
new() { Id = 111, Vectors = new[] { 0.1f, 0.2f, 0.3f } }
},
shardKeySelector: new ShardKeySelector { ShardKeys = { new List<ShardKey> { "user_1" } } }
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: "{collection_name}",
Points: []*qdrant.PointStruct{
{
Id: qdrant.NewIDNum(111),
Vectors: qdrant.NewVectors(0.1, 0.2, 0.3),
},
},
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{
qdrant.NewShardKey("user_1"),
},
},
})
现在,您可以通过在任何操作上指定shard_key来将操作定位到特定的分片。未指定分片键的操作将在所有分片上执行。
另一个用例是按时间顺序跟踪数据的分片,这样你就可以执行更复杂的操作,比如在一个分片中上传实时数据,并在数据达到一定年龄后将其归档。

分片传输方法
自 v1.7.0 版本起可用
有多种方法可以将分片转移到另一个节点,例如移动或复制。根据您希望的性能和保证以及您希望如何管理集群,您可能希望选择特定的方法。每种方法都有其优缺点。哪种方法最快取决于分片的大小和状态。
可用的分片传输方法有:
stream_records: (默认) 通过流式传输仅将其记录分批传输到目标节点。snapshot: 通过自动利用快照传输包括其索引和量化数据。wal_delta: (自动恢复默认) 通过解析WAL差异进行传输;遗漏的操作。
每种方法都有其优点、缺点和特定要求,其中一些包括:
| 方法: | 流记录 | 快照 | WAL增量 |
|---|---|---|---|
| 版本 | v0.8.0+ | v1.7.0+ | v1.8.0+ |
| 目标 | 新/现有分片 | 新/现有分片 | 现有分片 |
| 连接性 | 内部 gRPC API (6335) | REST API (6333) 内部 gRPC API (6335) | 内部 gRPC API (6335) |
| HNSW 索引 | 不传输,将在目标上重新索引。 | 传输,目标上立即准备就绪。 | 不传输,可能在目标上索引。 |
| 量化 | 不传输,将在目标上重新量化。 | 传输,立即在目标上准备就绪。 | 不传输,可能在目标上量化。 |
| 排序 | 目标上的无序更新1 | 目标上的有序更新2 | 目标上的有序更新2 |
| 磁盘空间 | 无需额外空间 | 两个节点上的快照需要额外空间 | 无需额外空间 |
要选择分片传输方法,请指定method如下:
POST /collections/{collection_name}/cluster
{
"move_shard": {
"shard_id": 0,
"from_peer_id": 381894127,
"to_peer_id": 467122995,
"method": "snapshot"
}
}
stream_records 传输方法是最简单的可用方法。它简单地将所有分片记录批量传输到目标节点,直到传输完所有记录,保持两个分片同步。它还会确保在执行最终切换之前,传输的分片索引过程能够跟上。该方法有两个常见的缺点:1. 它不传输索引或量化数据,这意味着分片必须在新节点上再次优化,这可能会非常昂贵。2. 顺序保证是weak1,这不适合某些应用。因为它非常简单,所以也非常健壮,如果在您的使用场景中可以接受上述缺点,那么它是一个可靠的选择。如果您的集群不稳定且资源不足,最好使用stream_records传输方法,因为它不太可能失败。
snapshot 传输方法利用 快照 来传输一个分片。快照会自动创建。然后它被传输并在目标节点上恢复。完成后,快照会从两个节点中删除。在快照/传输/恢复操作进行时,源节点会排队所有新操作。所有排队的更新随后按顺序发送到目标分片,使其与源分片处于相同状态。有两个重要的好处:1. 它传输索引和量化数据,因此分片不需要在目标节点上再次优化,使其立即可用。这样,Qdrant 确保在传输结束时不会出现性能下降。特别是在大型分片上,这可以带来巨大的性能提升。2. 顺序保证可以是 strong2,这是某些应用程序所必需的。
wal_delta 传输方法仅传输两个分片之间的差异。更具体地说,它传输所有目标分片错过的操作。使用两个分片的WAL来解决这个问题。有两个好处:1. 它会非常快,因为它只传输差异而不是所有数据。2. 顺序保证可以是strong2,这对某些应用程序是必需的。两个缺点是:1. 它只能用于传输到另一个节点上已经存在的分片。2. 适用性有限,因为WAL通常不会保存超过64MB的最近操作。但对于快速重启的节点来说,这应该足够了,例如升级。如果无法解析差异,此方法会自动回退到stream_records,这相当于传输整个分片。
目前,stream_records 方法被用作默认方法。这在未来可能会改变。从 Qdrant 1.9.0 开始,wal_delta 用于自动分片复制以恢复失效的分片。
复制
Qdrant 允许您在集群中的节点之间复制分片。
分片复制通过将分片的多个副本分布在集群中,提高了集群的可靠性。这确保了在节点故障时数据的可用性,除非所有副本都丢失。
复制因子
当你创建一个集合时,你可以通过更改replication_factor来控制你想要存储多少个分片副本。默认情况下,replication_factor设置为“1”,这意味着不会自动维护额外的副本。默认值可以在Qdrant配置中更改。你可以在创建集合时通过设置replication_factor来更改它。
replication_factor 可以更新现有集合,但其效果取决于您如何运行 Qdrant。如果您自己托管 Qdrant 的开源版本,创建集合后更改复制因子不会产生任何效果。您可以手动 创建 或删除分片副本以实现所需的复制因子。在 Qdrant Cloud(包括混合云、私有云)中,您的分片将自动复制或删除以匹配您配置的复制因子。
PUT /collections/{collection_name}
{
"vectors": {
"size": 300,
"distance": "Cosine"
},
"shard_number": 6,
"replication_factor": 2,
}
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333")
client.create_collection(
collection_name="{collection_name}",
vectors_config=models.VectorParams(size=300, distance=models.Distance.COSINE),
shard_number=6,
replication_factor=2,
)
import { QdrantClient } from "@qdrant/js-client-rest";
const client = new QdrantClient({ host: "localhost", port: 6333 });
client.createCollection("{collection_name}", {
vectors: {
size: 300,
distance: "Cosine",
},
shard_number: 6,
replication_factor: 2,
});
use qdrant_client::qdrant::{CreateCollectionBuilder, Distance, VectorParamsBuilder};
use qdrant_client::Qdrant;
let client = Qdrant::from_url("http://localhost:6334").build()?;
client
.create_collection(
CreateCollectionBuilder::new("{collection_name}")
.vectors_config(VectorParamsBuilder::new(300, Distance::Cosine))
.shard_number(6)
.replication_factor(2),
)
.await?;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Collections.CreateCollection;
import io.qdrant.client.grpc.Collections.Distance;
import io.qdrant.client.grpc.Collections.VectorParams;
import io.qdrant.client.grpc.Collections.VectorsConfig;
QdrantClient client =
new QdrantClient(QdrantGrpcClient.newBuilder("localhost", 6334, false).build());
client
.createCollectionAsync(
CreateCollection.newBuilder()
.setCollectionName("{collection_name}")
.setVectorsConfig(
VectorsConfig.newBuilder()
.setParams(
VectorParams.newBuilder()
.setSize(300)
.setDistance(Distance.Cosine)
.build())
.build())
.setShardNumber(6)
.setReplicationFactor(2)
.build())
.get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
var client = new QdrantClient("localhost", 6334);
await client.CreateCollectionAsync(
collectionName: "{collection_name}",
vectorsConfig: new VectorParams { Size = 300, Distance = Distance.Cosine },
shardNumber: 6,
replicationFactor: 2
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
CollectionName: "{collection_name}",
VectorsConfig: qdrant.NewVectorsConfig(&qdrant.VectorParams{
Size: 300,
Distance: qdrant.Distance_Cosine,
}),
ShardNumber: qdrant.PtrOf(uint32(6)),
ReplicationFactor: qdrant.PtrOf(uint32(2)),
})
此代码示例创建了一个集合,总共有6个逻辑分片,由12个物理分片支持。
由于复制因子为“2”将需要两倍的存储空间,建议事先确保硬件能够承载额外的分片副本。
创建新的分片副本
可以使用更新集合集群设置API在现有集合上手动创建或删除副本。这通常只有在运行Qdrant开源版本时才需要。在Qdrant Cloud中,分片复制是自动处理和更新的,以匹配配置的replication_factor。
可以通过指定要复制的对等体,在特定对等体上添加副本。
POST /collections/{collection_name}/cluster
{
"replicate_shard": {
"shard_id": 0,
"from_peer_id": 381894127,
"to_peer_id": 467122995
}
}
并且可以在特定的对等体上移除一个副本。
POST /collections/{collection_name}/cluster
{
"drop_replica": {
"shard_id": 0,
"peer_id": 381894127
}
}
请记住,一个集合必须包含至少一个分片的活跃副本。
错误处理
副本可以处于不同的状态:
- 活跃:健康并准备处理流量
- Dead: 不健康且未准备好处理流量
- 部分:当前在激活前正在进行重新同步
如果副本没有响应内部健康检查或无法处理流量,则将其标记为死亡。
一个失效的副本将不会从其他对等节点接收流量,如果它不能自动恢复,可能需要手动干预。
这种机制确保在更新操作期间如果一部分副本失败,数据的一致性和可用性仍然得到保证。
节点故障恢复
有时硬件故障可能会导致Qdrant集群的某些节点无法恢复。 没有系统能完全避免这种情况。
但是有几种恢复场景可以让qdrant保持可用,甚至避免性能下降。让我们从最好到最坏的情况来逐一了解。
使用复制的集合进行恢复
如果失败的节点数量少于集合的复制因子,那么您的集群应该仍然能够执行读取、搜索和更新查询。
现在,如果故障节点重新启动,共识机制将触发复制过程,以使用最新的更新来更新恢复节点,这些更新是它错过的。
如果故障节点永远不会重新启动,您可以在拥有3个或更多节点的集群中恢复丢失的分片。在较小的集群中,您无法恢复丢失的分片,因为恢复操作需要通过raft,这要求超过50%的节点是健康的。
使用复制的集合重新创建节点
如果一个节点失败且无法恢复,您应该将该失效节点从共识中排除,并创建一个空节点。
要从共识中排除失败的节点,请使用移除对等体 API。
如有必要,应用force标志。
当你创建一个新节点时,确保通过指定--bootstrap CLI参数并将其设置为任何正在运行的集群节点的URL,将其附加到现有集群。
一旦新节点准备就绪并与集群同步,您可能希望确保集合分片有足够的复制。请记住,Qdrant不会自动平衡分片,因为这是一项昂贵的操作。 使用复制分片操作在新连接的节点上创建分片的另一个副本。
值得一提的是,Qdrant 仅提供了创建自动故障恢复所需的必要构建模块。 构建一个完全自动的集合扩展过程将需要控制集群机器本身。 查看我们的 云解决方案,我们在其中实现了这一点。
从快照恢复
如果集群中没有数据的副本,仍然可以从快照中恢复。
按照相同的步骤分离失败的节点并在集群中创建一个新节点:
- 要从共识中排除失败的节点,请使用移除节点 API。如有必要,应用
force标志。 - 创建一个新节点,确保通过指定
--bootstrapCLI参数并附上任何正在运行的集群节点的URL,将其附加到现有集群。
快照恢复,用于单节点部署,与集群恢复不同。 共识管理所有集合的所有元数据,不需要快照来恢复它。 但你可以使用快照来恢复集合中缺失的分片。
使用集合快照恢复 API来完成此操作。 该服务将下载指定的集合快照,并使用其中的数据恢复分片。
一旦集合的所有分片恢复,集合将再次变得可操作。
临时节点故障
如果配置得当,在分布式模式下运行Qdrant可以使您的集群在一个节点暂时故障时抵抗中断。
以下是不同配置的Qdrant集群的响应方式:
- 1节点集群:所有操作都会超时或失败,最长可达几分钟。这取决于从磁盘重启和加载数据所需的时间。
- 2节点集群,其中分片未被复制:所有操作将超时或失败,最长可达几分钟。这取决于从磁盘重启和加载数据所需的时间。
- 2节点集群,其中所有分片都复制到两个节点:在中断期间,除了对集合的操作外,所有请求都继续工作。
- 3+-节点集群,其中所有分片都至少复制到2个节点:在中断期间,所有请求继续工作。
一致性保证
默认情况下,Qdrant 专注于搜索操作的可用性和最大吞吐量。对于大多数用例来说,这是一个更可取的选择。
在正常运行状态下,可以从集群中的任何节点搜索和修改数据。
在响应客户端之前,处理请求的对等节点会根据当前拓扑结构调度所有操作,以保持集群中数据的同步。
- 读取操作采用部分扇出策略以优化延迟和可用性
- 写入在所有活动的分片副本上并行执行

然而,在某些情况下,有必要在可能的硬件不稳定、大量并发更新相同文档等情况下确保额外的保证。
Qdrant 提供了几种选项来控制一致性保证:
write_consistency_factor- 定义在响应客户端之前必须确认写入操作的副本数量。增加此值将使写入操作能够容忍集群中的网络分区,但需要更多的副本处于活动状态才能执行写入操作。- 读取
consistency参数,可用于搜索和检索操作,以确保从所有副本获得的结果相同。如果使用此选项,Qdrant将在多个副本上执行读取操作,并根据所选策略解析结果。此选项有助于避免在并发更新相同文档时出现数据不一致的情况。如果更新操作频繁且副本数量较少,建议使用此选项。 - 编写
ordering参数,可用于更新和删除操作,以确保操作在所有副本上以相同的顺序执行。如果使用此选项,Qdrant 会将操作路由到分片的主副本,并在响应客户端之前等待响应。此选项有助于避免在并发更新相同文档时出现数据不一致的情况。如果读取操作比更新更频繁且搜索性能至关重要,则首选此选项。
写入一致性因子
write_consistency_factor 表示在响应客户端之前必须确认写入操作的副本数量。默认情况下,它被设置为1。
它可以在集合创建时进行配置。
PUT /collections/{collection_name}
{
"vectors": {
"size": 300,
"distance": "Cosine"
},
"shard_number": 6,
"replication_factor": 2,
"write_consistency_factor": 2,
}
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333")
client.create_collection(
collection_name="{collection_name}",
vectors_config=models.VectorParams(size=300, distance=models.Distance.COSINE),
shard_number=6,
replication_factor=2,
write_consistency_factor=2,
)
import { QdrantClient } from "@qdrant/js-client-rest";
const client = new QdrantClient({ host: "localhost", port: 6333 });
client.createCollection("{collection_name}", {
vectors: {
size: 300,
distance: "Cosine",
},
shard_number: 6,
replication_factor: 2,
write_consistency_factor: 2,
});
use qdrant_client::qdrant::{CreateCollectionBuilder, Distance, VectorParamsBuilder};
use qdrant_client::Qdrant;
let client = Qdrant::from_url("http://localhost:6334").build()?;
client
.create_collection(
CreateCollectionBuilder::new("{collection_name}")
.vectors_config(VectorParamsBuilder::new(300, Distance::Cosine))
.shard_number(6)
.replication_factor(2)
.write_consistency_factor(2),
)
.await?;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Collections.CreateCollection;
import io.qdrant.client.grpc.Collections.Distance;
import io.qdrant.client.grpc.Collections.VectorParams;
import io.qdrant.client.grpc.Collections.VectorsConfig;
QdrantClient client =
new QdrantClient(QdrantGrpcClient.newBuilder("localhost", 6334, false).build());
client
.createCollectionAsync(
CreateCollection.newBuilder()
.setCollectionName("{collection_name}")
.setVectorsConfig(
VectorsConfig.newBuilder()
.setParams(
VectorParams.newBuilder()
.setSize(300)
.setDistance(Distance.Cosine)
.build())
.build())
.setShardNumber(6)
.setReplicationFactor(2)
.setWriteConsistencyFactor(2)
.build())
.get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
var client = new QdrantClient("localhost", 6334);
await client.CreateCollectionAsync(
collectionName: "{collection_name}",
vectorsConfig: new VectorParams { Size = 300, Distance = Distance.Cosine },
shardNumber: 6,
replicationFactor: 2,
writeConsistencyFactor: 2
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
CollectionName: "{collection_name}",
VectorsConfig: qdrant.NewVectorsConfig(&qdrant.VectorParams{
Size: 300,
Distance: qdrant.Distance_Cosine,
}),
ShardNumber: qdrant.PtrOf(uint32(6)),
ReplicationFactor: qdrant.PtrOf(uint32(2)),
WriteConsistencyFactor: qdrant.PtrOf(uint32(2)),
})
如果活动副本的数量少于write_consistency_factor,写操作将失败。
write_consistency_factor 的配置对于调整集群在某些节点因重启、升级或故障而离线时的行为非常重要。
默认情况下,只要每个分片的至少一个副本在线,集群就会继续接受更新。然而,这种行为意味着一旦离线副本恢复,它将需要与集群的其余部分进行额外的同步。在某些情况下,这种同步可能会消耗大量资源并且是不希望的。
将write_consistency_factor设置为与复制因子匹配,可以修改集群的行为,使得未复制的更新被拒绝,从而避免了额外的同步需求。
如果更新应用到足够多的副本——根据write_consistency_factor——更新将返回成功状态。任何未能应用更新的副本将被暂时禁用,并自动恢复以保持数据一致性。如果更新无法应用到足够多的副本,它将返回错误,并且可能部分应用。用户必须再次提交操作以确保数据一致性。
对于能够处理错误和重试的异步更新和注入管道,这种策略可能更可取。
读取一致性
可以为大多数读取请求指定consistency,以确保返回的结果在集群节点之间保持一致。
all将查询所有节点并返回存在于所有节点上的点majority将查询所有节点并返回在大多数节点上存在的点quorum将查询随机选择的大多数节点,并返回存在于所有这些节点上的点1/2/3/等 - 将查询指定数量的随机选择的节点,并返回存在于所有这些节点上的点- 默认
consistency是1
POST /collections/{collection_name}/points/query?consistency=majority
{
"query": [0.2, 0.1, 0.9, 0.7],
"filter": {
"must": [
{
"key": "city",
"match": {
"value": "London"
}
}
]
},
"params": {
"hnsw_ef": 128,
"exact": false
},
"limit": 3
}
client.query_points(
collection_name="{collection_name}",
query=[0.2, 0.1, 0.9, 0.7],
query_filter=models.Filter(
must=[
models.FieldCondition(
key="city",
match=models.MatchValue(
value="London",
),
)
]
),
search_params=models.SearchParams(hnsw_ef=128, exact=False),
limit=3,
consistency="majority",
)
client.query("{collection_name}", {
query: [0.2, 0.1, 0.9, 0.7],
filter: {
must: [{ key: "city", match: { value: "London" } }],
},
params: {
hnsw_ef: 128,
exact: false,
},
limit: 3,
consistency: "majority",
});
use qdrant_client::qdrant::{
read_consistency::Value, Condition, Filter, QueryPointsBuilder, ReadConsistencyType,
SearchParamsBuilder,
};
use qdrant_client::{Qdrant, QdrantError};
let client = Qdrant::from_url("http://localhost:6334").build()?;
client
.query(
QueryPointsBuilder::new("{collection_name}")
.query(vec![0.2, 0.1, 0.9, 0.7])
.limit(3)
.filter(Filter::must([Condition::matches(
"city",
"London".to_string(),
)]))
.params(SearchParamsBuilder::default().hnsw_ef(128).exact(false))
.read_consistency(Value::Type(ReadConsistencyType::Majority.into())),
)
.await?;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Points.Filter;
import io.qdrant.client.grpc.Points.QueryPoints;
import io.qdrant.client.grpc.Points.ReadConsistency;
import io.qdrant.client.grpc.Points.ReadConsistencyType;
import io.qdrant.client.grpc.Points.SearchParams;
import static io.qdrant.client.QueryFactory.nearest;
import static io.qdrant.client.ConditionFactory.matchKeyword;
QdrantClient client =
new QdrantClient(QdrantGrpcClient.newBuilder("localhost", 6334, false).build());
client.queryAsync(
QueryPoints.newBuilder()
.setCollectionName("{collection_name}")
.setFilter(Filter.newBuilder().addMust(matchKeyword("city", "London")).build())
.setQuery(nearest(.2f, 0.1f, 0.9f, 0.7f))
.setParams(SearchParams.newBuilder().setHnswEf(128).setExact(false).build())
.setLimit(3)
.setReadConsistency(
ReadConsistency.newBuilder().setType(ReadConsistencyType.Majority).build())
.build())
.get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
using static Qdrant.Client.Grpc.Conditions;
var client = new QdrantClient("localhost", 6334);
await client.QueryAsync(
collectionName: "{collection_name}",
query: new float[] { 0.2f, 0.1f, 0.9f, 0.7f },
filter: MatchKeyword("city", "London"),
searchParams: new SearchParams { HnswEf = 128, Exact = false },
limit: 3,
readConsistency: new ReadConsistency { Type = ReadConsistencyType.Majority }
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.Query(context.Background(), &qdrant.QueryPoints{
CollectionName: "{collection_name}",
Query: qdrant.NewQuery(0.2, 0.1, 0.9, 0.7),
Filter: &qdrant.Filter{
Must: []*qdrant.Condition{
qdrant.NewMatch("city", "London"),
},
},
Params: &qdrant.SearchParams{
HnswEf: qdrant.PtrOf(uint64(128)),
},
Limit: qdrant.PtrOf(uint64(3)),
ReadConsistency: qdrant.NewReadConsistencyType(qdrant.ReadConsistencyType_Majority),
})
写入顺序
可以为任何写请求指定ordering,通过单个“领导者”节点进行序列化,确保所有写操作(使用相同的ordering发出)按顺序执行和观察。
weak(默认) 排序不提供任何额外的保证,因此写操作可以自由地重新排序。medium排序通过动态选举的领导者序列化所有写操作,这可能在领导者变更时导致轻微的不一致性。strong排序通过永久领导者序列化所有写操作,这提供了强一致性,但如果领导者宕机,写操作可能不可用。
PUT /collections/{collection_name}/points?ordering=strong
{
"batch": {
"ids": [1, 2, 3],
"payloads": [
{"color": "red"},
{"color": "green"},
{"color": "blue"}
],
"vectors": [
[0.9, 0.1, 0.1],
[0.1, 0.9, 0.1],
[0.1, 0.1, 0.9]
]
}
}
client.upsert(
collection_name="{collection_name}",
points=models.Batch(
ids=[1, 2, 3],
payloads=[
{"color": "red"},
{"color": "green"},
{"color": "blue"},
],
vectors=[
[0.9, 0.1, 0.1],
[0.1, 0.9, 0.1],
[0.1, 0.1, 0.9],
],
),
ordering=models.WriteOrdering.STRONG,
)
client.upsert("{collection_name}", {
batch: {
ids: [1, 2, 3],
payloads: [{ color: "red" }, { color: "green" }, { color: "blue" }],
vectors: [
[0.9, 0.1, 0.1],
[0.1, 0.9, 0.1],
[0.1, 0.1, 0.9],
],
},
ordering: "strong",
});
use qdrant_client::qdrant::{
PointStruct, UpsertPointsBuilder, WriteOrdering, WriteOrderingType
};
use qdrant_client::Qdrant;
let client = Qdrant::from_url("http://localhost:6334").build()?;
client
.upsert_points(
UpsertPointsBuilder::new(
"{collection_name}",
vec![
PointStruct::new(1, vec![0.9, 0.1, 0.1], [("color", "red".into())]),
PointStruct::new(2, vec![0.1, 0.9, 0.1], [("color", "green".into())]),
PointStruct::new(3, vec![0.1, 0.1, 0.9], [("color", "blue".into())]),
],
)
.ordering(WriteOrdering {
r#type: WriteOrderingType::Strong.into(),
}),
)
.await?;
import java.util.List;
import java.util.Map;
import static io.qdrant.client.PointIdFactory.id;
import static io.qdrant.client.ValueFactory.value;
import static io.qdrant.client.VectorsFactory.vectors;
import io.qdrant.client.grpc.Points.PointStruct;
import io.qdrant.client.grpc.Points.UpsertPoints;
import io.qdrant.client.grpc.Points.WriteOrdering;
import io.qdrant.client.grpc.Points.WriteOrderingType;
client
.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName("{collection_name}")
.addAllPoints(
List.of(
PointStruct.newBuilder()
.setId(id(1))
.setVectors(vectors(0.9f, 0.1f, 0.1f))
.putAllPayload(Map.of("color", value("red")))
.build(),
PointStruct.newBuilder()
.setId(id(2))
.setVectors(vectors(0.1f, 0.9f, 0.1f))
.putAllPayload(Map.of("color", value("green")))
.build(),
PointStruct.newBuilder()
.setId(id(3))
.setVectors(vectors(0.1f, 0.1f, 0.94f))
.putAllPayload(Map.of("color", value("blue")))
.build()))
.setOrdering(WriteOrdering.newBuilder().setType(WriteOrderingType.Strong).build())
.build())
.get();
using Qdrant.Client;
using Qdrant.Client.Grpc;
var client = new QdrantClient("localhost", 6334);
await client.UpsertAsync(
collectionName: "{collection_name}",
points: new List<PointStruct>
{
new()
{
Id = 1,
Vectors = new[] { 0.9f, 0.1f, 0.1f },
Payload = { ["color"] = "red" }
},
new()
{
Id = 2,
Vectors = new[] { 0.1f, 0.9f, 0.1f },
Payload = { ["color"] = "green" }
},
new()
{
Id = 3,
Vectors = new[] { 0.1f, 0.1f, 0.9f },
Payload = { ["color"] = "blue" }
}
},
ordering: WriteOrderingType.Strong
);
import (
"context"
"github.com/qdrant/go-client/qdrant"
)
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6334,
})
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: "{collection_name}",
Points: []*qdrant.PointStruct{
{
Id: qdrant.NewIDNum(1),
Vectors: qdrant.NewVectors(0.9, 0.1, 0.1),
Payload: qdrant.NewValueMap(map[string]any{"color": "red"}),
},
{
Id: qdrant.NewIDNum(2),
Vectors: qdrant.NewVectors(0.1, 0.9, 0.1),
Payload: qdrant.NewValueMap(map[string]any{"color": "green"}),
},
{
Id: qdrant.NewIDNum(3),
Vectors: qdrant.NewVectors(0.1, 0.1, 0.9),
Payload: qdrant.NewValueMap(map[string]any{"color": "blue"}),
},
},
Ordering: &qdrant.WriteOrdering{
Type: qdrant.WriteOrderingType_Strong,
},
})
监听器模式
在某些情况下,拥有一个仅积累数据而不参与搜索操作的Qdrant节点可能很有用。 有几种情况下这可能很有用:
- 监听器选项可用于将数据存储在单独的节点中,该节点可用于备份目的或长期存储数据。
- 监听器节点可用于将数据同步到另一个区域,同时仍在本地区域执行搜索操作。
要启用监听器模式,请在配置文件中将node_type设置为Listener:
storage:
node_type: "Listener"
监听节点不会参与搜索操作,但仍会接受写操作,并将数据存储在本地存储中。
所有存储在监听器节点上的分片将被转换为Listener状态。
此外,发送到监听器节点的所有写请求都将使用wait=false选项进行处理,这意味着一旦写入WAL,写操作将被视为成功。
这种机制应能在并行快照的情况下最小化upsert延迟。
共识检查点
共识检查点是一种在Raft中用于提高性能和简化日志管理的技术,通过定期创建系统状态的一致快照来实现。 这个快照代表了集群中所有节点在某个时间点上对状态达成一致的点,它可以用来截断日志,减少需要在节点之间存储和传输的数据量。
例如,如果您向集群附加一个新节点,它应该重放所有日志条目以赶上当前状态。 在长时间运行的集群中,这可能需要很长时间,并且日志可能会变得非常大。
为了防止这种情况,可以使用一种特殊的检查点机制,该机制将截断日志并创建当前状态的快照。
要使用此功能,只需在所需节点上调用 /cluster/recover API:
POST /cluster/recover
此API可以在任何非领导节点上触发,它将向当前的共识领导节点发送请求以创建快照。领导节点随后会将快照发送回请求节点以供应用。
在某些情况下,此API可用于通过强制创建快照来从不一致的集群状态中恢复。
