1 基础概念
1.1 Qdrant介绍
01.产品定位
a.核心价值
a.功能说明
Qdrant是一款高性能的向量数据库,专为语义搜索和AI应用设计。采用Rust语言开发,性能优异且内存安全。支持丰富的过滤条件和Payload管理。提供简洁的RESTful API和gRPC接口。支持分布式部署和水平扩展。开源免费,社区活跃。适合RAG、推荐系统、图像检索等场景。
b.代码示例
---
# Qdrant核心特性
features = {
"性能": {
"语言": "Rust开发,性能优异",
"内存": "高效内存管理",
"并发": "支持高并发查询",
"延迟": "毫秒级响应"
},
"功能": {
"过滤": "丰富的过滤条件",
"Payload": "灵活的元数据管理",
"多向量": "支持多向量存储",
"量化": "支持向量量化"
},
"部署": {
"单机": "Docker快速部署",
"集群": "分布式集群",
"云服务": "Qdrant Cloud",
"嵌入式": "嵌入式模式"
},
"接口": {
"REST": "RESTful API",
"gRPC": "高性能gRPC",
"SDK": "Python/Rust/Go/JS",
"Web UI": "管理界面"
}
}
print("Qdrant核心特性:")
for category, items in features.items():
print(f"\n{category}:")
for key, value in items.items():
print(f" {key}: {value}")
---
b.应用场景
a.功能说明
Qdrant适合多种AI应用场景。语义搜索和问答系统是核心应用。支持推荐系统和个性化推荐。图像检索和相似图片搜索。异常检测和欺诈识别。文本分类和聚类分析。知识图谱和实体链接。RAG(检索增强生成)应用。
b.代码示例
---
# Qdrant应用场景
use_cases = {
"语义搜索": {
"描述": "基于语义的文档检索",
"示例": "智能问答、知识库搜索",
"优势": "理解语义,超越关键词匹配"
},
"推荐系统": {
"描述": "基于向量相似度的推荐",
"示例": "商品推荐、内容推荐",
"优势": "实时推荐,个性化精准"
},
"图像检索": {
"描述": "以图搜图功能",
"示例": "相似图片搜索、人脸识别",
"优势": "快速检索,高召回率"
},
"RAG应用": {
"描述": "检索增强生成",
"示例": "智能客服、文档问答",
"优势": "结合检索和生成,准确性高"
},
"异常检测": {
"描述": "基于向量距离的异常识别",
"示例": "欺诈检测、设备故障预警",
"优势": "实时检测,准确率高"
}
}
print("Qdrant应用场景:")
for scenario, info in use_cases.items():
print(f"\n{scenario}:")
for key, value in info.items():
print(f" {key}: {value}")
# 技术优势
advantages = [
"Rust语言开发,性能优异",
"丰富的过滤条件,灵活查询",
"支持Payload管理,元数据丰富",
"分布式部署,水平扩展",
"开源免费,社区活跃",
"简洁API,易于集成",
"支持多种距离度量",
"提供Web UI管理界面"
]
print("\n技术优势:")
for i, adv in enumerate(advantages, 1):
print(f" {i}. {adv}")
---
02.技术架构
a.系统组件
a.功能说明
Qdrant采用模块化架构设计。核心组件包括存储引擎、索引管理、查询引擎。存储引擎负责数据持久化和内存管理。索引管理支持HNSW等高效索引算法。查询引擎处理向量检索和过滤。支持WAL(Write-Ahead Log)保证数据安全。提供快照和备份机制。
b.代码示例
---
# Qdrant系统架构
architecture = {
"存储层": {
"向量存储": "高效向量数据存储",
"Payload存储": "元数据存储",
"WAL": "Write-Ahead Log",
"快照": "数据快照和备份"
},
"索引层": {
"HNSW": "分层可导航小世界图",
"量化": "向量量化压缩",
"过滤索引": "Payload过滤索引",
"多向量": "多向量索引"
},
"查询层": {
"向量搜索": "ANN近似最近邻搜索",
"过滤器": "丰富的过滤条件",
"评分": "自定义评分函数",
"批量查询": "批量检索优化"
},
"接口层": {
"REST API": "RESTful接口",
"gRPC": "高性能gRPC",
"Web UI": "管理界面",
"SDK": "多语言SDK"
}
}
print("Qdrant系统架构:")
for layer, components in architecture.items():
print(f"\n{layer}:")
for comp, desc in components.items():
print(f" {comp}: {desc}")
# 数据流程
data_flow = [
"1. 客户端发送请求(REST/gRPC)",
"2. 接口层解析请求参数",
"3. 查询层执行向量检索",
"4. 索引层快速定位候选集",
"5. 过滤器筛选满足条件的结果",
"6. 评分排序返回Top-K",
"7. 返回结果给客户端"
]
print("\n数据流程:")
for step in data_flow:
print(f" {step}")
---
b.存储引擎
a.功能说明
Qdrant的存储引擎采用分段存储设计。数据分为多个Segment独立管理。每个Segment包含向量数据和Payload。支持内存和磁盘混合存储。使用mmap技术优化内存使用。支持数据压缩和量化。提供WAL保证数据安全。支持快照和增量备份。
b.代码示例
---
# Qdrant存储引擎配置
storage_config = {
"on_disk": False, # 是否使用磁盘存储
"optimizers": {
"deleted_threshold": 0.2, # 删除阈值
"vacuum_min_vector_number": 1000, # 最小清理向量数
"default_segment_number": 0, # 默认段数量
"max_segment_size": 200000, # 最大段大小
"memmap_threshold": 50000, # mmap阈值
"indexing_threshold": 20000, # 索引阈值
"flush_interval_sec": 5, # 刷盘间隔
"max_optimization_threads": 1 # 优化线程数
},
"wal": {
"wal_capacity_mb": 32, # WAL容量
"wal_segments_ahead": 0 # WAL段数
},
"performance": {
"max_search_threads": 0, # 搜索线程数(0=自动)
"max_optimization_threads": 1 # 优化线程数
}
}
print("存储引擎配置:")
import json
print(json.dumps(storage_config, indent=2))
# Segment管理
segment_lifecycle = {
"创建": "达到indexing_threshold时创建新Segment",
"索引": "构建HNSW索引",
"优化": "合并小Segment,清理删除数据",
"压缩": "向量量化和压缩",
"持久化": "刷盘到磁盘",
"备份": "快照和增量备份"
}
print("\nSegment生命周期:")
for stage, desc in segment_lifecycle.items():
print(f" {stage}: {desc}")
# 存储优化建议
optimization_tips = [
"小数据集(<100万)使用内存存储",
"大数据集使用on_disk模式",
"合理设置max_segment_size",
"调整memmap_threshold优化内存",
"定期执行优化操作",
"使用量化减少存储空间",
"配置WAL保证数据安全",
"定期备份重要数据"
]
print("\n存储优化建议:")
for i, tip in enumerate(optimization_tips, 1):
print(f" {i}. {tip}")
---
1.2 核心特性
01.向量管理
a.多向量支持
a.功能说明
Qdrant支持在单个Point中存储多个向量。适合多模态数据场景,如文本+图像。每个向量可以有不同的维度和距离度量。支持对不同向量分别检索。可以组合多个向量的检索结果。灵活的向量配置和管理。提升检索准确性和灵活性。
b.代码示例
---
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
client = QdrantClient(host="localhost", port=6333)
# 创建支持多向量的Collection
client.create_collection(
collection_name="multimodal_data",
vectors_config={
"text": VectorParams(size=768, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.COSINE),
"audio": VectorParams(size=256, distance=Distance.COSINE)
}
)
# 插入多向量数据
import numpy as np
points = [
PointStruct(
id=1,
vector={
"text": np.random.random(768).tolist(),
"image": np.random.random(512).tolist(),
"audio": np.random.random(256).tolist()
},
payload={"title": "Sample 1", "category": "video"}
)
]
client.upsert(
collection_name="multimodal_data",
points=points
)
# 按文本向量检索
results = client.search(
collection_name="multimodal_data",
query_vector=("text", np.random.random(768).tolist()),
limit=10
)
# 按图像向量检索
results = client.search(
collection_name="multimodal_data",
query_vector=("image", np.random.random(512).tolist()),
limit=10
)
print("多向量支持特点:")
print(" - 单个Point存储多个向量")
print(" - 不同向量可有不同维度")
print(" - 支持分别检索各向量")
print(" - 适合多模态数据场景")
---
b.Payload管理
a.功能说明
Payload是Qdrant的元数据管理机制。支持任意JSON格式的元数据。可以存储文本、数字、布尔、数组等类型。支持嵌套结构和复杂对象。提供丰富的过滤条件。支持Payload索引加速查询。可以动态更新Payload。灵活的元数据管理能力。
b.代码示例
---
# Payload数据结构
payload_example = {
"title": "Product Name",
"description": "Product description text",
"price": 99.99,
"rating": 4.5,
"in_stock": True,
"tags": ["electronics", "smartphone"],
"metadata": {
"brand": "Apple",
"model": "iPhone 15",
"color": "Black"
},
"created_at": "2024-01-01T00:00:00Z"
}
# 插入带Payload的Point
point = PointStruct(
id=1,
vector=np.random.random(768).tolist(),
payload=payload_example
)
client.upsert(
collection_name="products",
points=[point]
)
# 更新Payload
client.set_payload(
collection_name="products",
payload={"price": 89.99, "in_stock": False},
points=[1]
)
# 删除Payload字段
client.delete_payload(
collection_name="products",
keys=["tags"],
points=[1]
)
# 创建Payload索引
client.create_payload_index(
collection_name="products",
field_name="price",
field_schema="float"
)
print("Payload管理功能:")
print(" - 支持任意JSON格式")
print(" - 丰富的数据类型")
print(" - 嵌套结构支持")
print(" - 动态更新Payload")
print(" - Payload索引加速")
---
02.查询能力
a.过滤器Filter
a.功能说明
Qdrant提供强大的过滤器功能。支持等值、范围、匹配等条件。可以组合多个过滤条件。支持嵌套过滤和复杂逻辑。过滤器在向量检索前执行。大幅减少检索范围,提升性能。支持Payload索引加速过滤。灵活的过滤条件组合。
b.代码示例
---
from qdrant_client.models import Filter, FieldCondition, Range, MatchValue
# 1. 等值过滤
equal_filter = Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="electronics")
)
]
)
results = client.search(
collection_name="products",
query_vector=np.random.random(768).tolist(),
query_filter=equal_filter,
limit=10
)
# 2. 范围过滤
range_filter = Filter(
must=[
FieldCondition(
key="price",
range=Range(gte=50.0, lte=200.0)
)
]
)
# 3. 组合过滤
combined_filter = Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="electronics")),
FieldCondition(key="price", range=Range(gte=50.0, lte=200.0)),
FieldCondition(key="in_stock", match=MatchValue(value=True))
]
)
# 4. 复杂逻辑过滤
complex_filter = Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="electronics"))
],
should=[
FieldCondition(key="brand", match=MatchValue(value="Apple")),
FieldCondition(key="brand", match=MatchValue(value="Samsung"))
],
must_not=[
FieldCondition(key="rating", range=Range(lt=3.0))
]
)
results = client.search(
collection_name="products",
query_vector=np.random.random(768).tolist(),
query_filter=complex_filter,
limit=10
)
print("过滤器功能:")
print(" - 等值过滤: match")
print(" - 范围过滤: range")
print(" - 组合过滤: must/should/must_not")
print(" - 嵌套过滤: 支持复杂逻辑")
print(" - Payload索引: 加速过滤")
---
b.评分函数
a.功能说明
Qdrant支持自定义评分函数。可以调整向量距离的计算方式。支持多种距离度量:余弦、欧氏、点积。可以结合Payload进行混合评分。支持自定义评分逻辑。提供评分修改器Modifier。灵活的评分机制满足不同需求。
b.代码示例
---
from qdrant_client.models import ScoredPoint, SearchRequest
# 1. 基础向量搜索(余弦距离)
results = client.search(
collection_name="products",
query_vector=np.random.random(768).tolist(),
limit=10
)
# 2. 使用评分修改器
from qdrant_client.models import ScoreModifier
results = client.search(
collection_name="products",
query_vector=np.random.random(768).tolist(),
score_threshold=0.7, # 最低分数阈值
limit=10
)
# 3. 混合评分示例(伪代码)
def custom_scoring(vector_score, payload):
"""自定义评分函数"""
# 向量相似度得分
base_score = vector_score
# 价格因子(价格越低,得分越高)
price_factor = 1.0 / (1.0 + payload.get("price", 100) / 100)
# 评分因子
rating_factor = payload.get("rating", 0) / 5.0
# 库存因子
stock_factor = 1.2 if payload.get("in_stock") else 0.8
# 综合得分
final_score = base_score * (0.6 + 0.2 * price_factor + 0.1 * rating_factor + 0.1 * stock_factor)
return final_score
# 4. 距离度量对比
distance_metrics = {
"Cosine": "余弦距离,适合文本向量",
"Euclid": "欧氏距离,适合图像向量",
"Dot": "点积距离,适合归一化向量"
}
print("距离度量:")
for metric, desc in distance_metrics.items():
print(f" {metric}: {desc}")
# 5. 评分阈值过滤
results = client.search(
collection_name="products",
query_vector=np.random.random(768).tolist(),
score_threshold=0.8, # 只返回分数>0.8的结果
limit=10
)
print("\n评分功能:")
print(" - 多种距离度量")
print(" - 评分阈值过滤")
print(" - 自定义评分逻辑")
print(" - 混合评分支持")
---
1.3 架构设计
01.分布式架构
a.集群模式
a.功能说明
Qdrant支持分布式集群部署。数据自动分片到多个节点。支持副本机制保证高可用。节点间自动数据同步。支持水平扩展,动态添加节点。提供一致性哈希分片策略。支持读写分离提升性能。适合大规模数据和高并发场景。
b.代码示例
---
# Qdrant集群配置
cluster_config = {
"cluster": {
"enabled": True,
"p2p": {
"port": 6335
},
"consensus": {
"tick_period_ms": 100
}
},
"storage": {
"storage_path": "/qdrant/storage",
"snapshots_path": "/qdrant/snapshots",
"on_disk_payload": True
},
"service": {
"http_port": 6333,
"grpc_port": 6334
}
}
print("集群配置:")
import json
print(json.dumps(cluster_config, indent=2))
# 集群特性
cluster_features = {
"数据分片": "自动分片到多个节点",
"副本机制": "支持多副本保证高可用",
"自动同步": "节点间数据自动同步",
"水平扩展": "动态添加节点",
"一致性哈希": "数据均匀分布",
"故障转移": "节点故障自动切换"
}
print("\n集群特性:")
for feature, desc in cluster_features.items():
print(f" {feature}: {desc}")
# 集群部署架构
deployment_architecture = [
"1. 多个Qdrant节点组成集群",
"2. 使用一致性哈希分片数据",
"3. 每个分片有多个副本",
"4. 客户端连接任意节点",
"5. 节点间自动路由请求",
"6. 支持动态扩缩容"
]
print("\n部署架构:")
for step in deployment_architecture:
print(f" {step}")
---
b.数据分片
a.功能说明
Qdrant使用Shard机制管理数据。Collection自动分为多个Shard。每个Shard独立存储和索引。支持自定义Shard数量。Shard可以分布在不同节点。支持Shard副本提升可用性。提供Shard迁移和平衡机制。灵活的分片策略适应不同规模。
b.代码示例
---
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
client = QdrantClient(host="localhost", port=6333)
# 创建Collection时指定Shard数量
client.create_collection(
collection_name="large_dataset",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
shard_number=4, # 4个Shard
replication_factor=2 # 每个Shard 2个副本
)
# 获取Collection信息
collection_info = client.get_collection(collection_name="large_dataset")
print("Collection信息:")
print(f" Shard数量: {collection_info.config.params.shard_number}")
print(f" 副本因子: {collection_info.config.params.replication_factor}")
# Shard分布策略
shard_strategies = {
"轮询": "数据轮流分配到各Shard",
"哈希": "根据ID哈希值分配",
"范围": "根据ID范围分配",
"自定义": "自定义分片逻辑"
}
print("\nShard分布策略:")
for strategy, desc in shard_strategies.items():
print(f" {strategy}: {desc}")
# Shard管理操作
shard_operations = [
"创建Shard: 初始化时指定数量",
"迁移Shard: 节点间迁移数据",
"平衡Shard: 自动负载均衡",
"副本管理: 增加或删除副本",
"故障恢复: 副本自动接管"
]
print("\nShard管理:")
for op in shard_operations:
print(f" - {op}")
# Shard数量建议
shard_recommendations = {
"< 100万向量": "1-2个Shard",
"100万-1000万": "2-4个Shard",
"1000万-1亿": "4-8个Shard",
"> 1亿向量": "8-16个Shard"
}
print("\nShard数量建议:")
for scale, recommendation in shard_recommendations.items():
print(f" {scale}: {recommendation}")
---
02.性能优化
a.索引算法
a.功能说明
Qdrant使用HNSW(分层可导航小世界图)索引算法。HNSW提供高效的近似最近邻搜索。支持动态插入和删除。查询性能优异,毫秒级响应。支持参数调优优化性能。内存占用可控。适合大规模向量检索。是业界主流的ANN算法。
b.代码示例
---
from qdrant_client.models import HnswConfigDiff
# HNSW索引配置
hnsw_config = HnswConfigDiff(
m=16, # 每个节点的连接数
ef_construct=100, # 构建时的搜索宽度
full_scan_threshold=10000 # 全扫描阈值
)
# 创建Collection时配置HNSW
client.create_collection(
collection_name="optimized_collection",
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE,
hnsw_config=hnsw_config
)
)
# HNSW参数说明
hnsw_params = {
"m": {
"描述": "每个节点的连接数",
"默认值": 16,
"范围": "4-64",
"影响": "越大索引越精确,内存占用越高"
},
"ef_construct": {
"描述": "构建时的搜索宽度",
"默认值": 100,
"范围": "100-500",
"影响": "越大构建越慢,索引越精确"
},
"ef": {
"描述": "查询时的搜索宽度",
"默认值": "动态",
"范围": "100-500",
"影响": "越大查询越慢,召回率越高"
}
}
print("HNSW参数:")
for param, info in hnsw_params.items():
print(f"\n{param}:")
for key, value in info.items():
print(f" {key}: {value}")
# 查询时指定ef参数
from qdrant_client.models import SearchParams
results = client.search(
collection_name="optimized_collection",
query_vector=np.random.random(768).tolist(),
search_params=SearchParams(
hnsw_ef=128, # 查询时的ef值
exact=False # 是否精确搜索
),
limit=10
)
# HNSW优化建议
optimization_tips = [
"小数据集(<10万): m=16, ef_construct=100",
"中数据集(10万-100万): m=32, ef_construct=200",
"大数据集(>100万): m=48, ef_construct=300",
"高召回率要求: 增大ef值",
"低延迟要求: 减小ef值",
"内存受限: 减小m值",
"定期优化索引: 提升性能"
]
print("\nHNSW优化建议:")
for i, tip in enumerate(optimization_tips, 1):
print(f" {i}. {tip}")
---
b.量化压缩
a.功能说明
Qdrant支持向量量化压缩。可以大幅减少内存占用。支持标量量化(Scalar Quantization)。将float32压缩为uint8。内存占用减少75%。查询性能略有下降。适合内存受限场景。平衡内存和性能。
b.代码示例
---
from qdrant_client.models import ScalarQuantization, ScalarQuantizationConfig, QuantizationSearchParams
# 启用标量量化
client.create_collection(
collection_name="quantized_collection",
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE
),
quantization_config=ScalarQuantization(
scalar=ScalarQuantizationConfig(
type="int8", # 量化类型
quantile=0.99, # 分位数
always_ram=True # 始终保持在内存
)
)
)
# 量化配置说明
quantization_config = {
"type": {
"int8": "8位整数量化",
"描述": "将float32压缩为int8",
"压缩比": "4:1",
"精度损失": "轻微"
},
"quantile": {
"描述": "量化分位数",
"默认值": 0.99,
"范围": "0.9-1.0",
"影响": "越大精度越高"
},
"always_ram": {
"描述": "是否始终保持在内存",
"默认值": True,
"影响": "True时性能更好"
}
}
print("量化配置:")
import json
print(json.dumps(quantization_config, indent=2, ensure_ascii=False))
# 查询时使用量化
results = client.search(
collection_name="quantized_collection",
query_vector=np.random.random(768).tolist(),
search_params=QuantizationSearchParams(
ignore=False, # 是否忽略量化
rescore=True, # 是否重新评分
oversampling=2.0 # 过采样倍数
),
limit=10
)
# 量化效果对比
quantization_comparison = {
"内存占用": {
"原始": "100%",
"int8量化": "25%",
"节省": "75%"
},
"查询性能": {
"原始": "100%",
"int8量化": "90-95%",
"损失": "5-10%"
},
"召回率": {
"原始": "100%",
"int8量化": "98-99%",
"损失": "1-2%"
}
}
print("\n量化效果对比:")
for metric, values in quantization_comparison.items():
print(f"\n{metric}:")
for key, value in values.items():
print(f" {key}: {value}")
# 量化使用建议
quantization_tips = [
"内存受限时启用量化",
"大规模数据集推荐使用",
"对召回率要求不极致时使用",
"启用rescore提升精度",
"调整oversampling平衡性能",
"定期评估量化效果",
"小数据集不建议量化"
]
print("\n量化使用建议:")
for i, tip in enumerate(quantization_tips, 1):
print(f" {i}. {tip}")
---
2 快速开始
2.1 安装部署
01.Docker部署
a.单机部署
a.功能说明
Docker是最简单的Qdrant部署方式。拉取官方镜像快速启动。支持数据持久化和端口映射。适合开发测试和小规模应用。配置简单,维护方便。支持环境变量配置。可以快速升级和回滚。是推荐的部署方式。
b.代码示例
---
# 1. 拉取Qdrant镜像
# docker pull qdrant/qdrant
# 2. 启动Qdrant容器
docker_run_command = """
docker run -d \\
--name qdrant \\
-p 6333:6333 \\
-p 6334:6334 \\
-v $(pwd)/qdrant_storage:/qdrant/storage \\
-v $(pwd)/qdrant_snapshots:/qdrant/snapshots \\
qdrant/qdrant
"""
print("Docker启动命令:")
print(docker_run_command)
# 3. 使用环境变量配置
docker_run_with_env = """
docker run -d \\
--name qdrant \\
-p 6333:6333 \\
-p 6334:6334 \\
-e QDRANT__SERVICE__HTTP_PORT=6333 \\
-e QDRANT__SERVICE__GRPC_PORT=6334 \\
-e QDRANT__LOG_LEVEL=INFO \\
-v $(pwd)/qdrant_storage:/qdrant/storage \\
qdrant/qdrant
"""
print("\n带环境变量的启动:")
print(docker_run_with_env)
# 4. Docker Compose配置
docker_compose_config = """
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:latest
container_name: qdrant
ports:
- "6333:6333"
- "6334:6334"
volumes:
- ./qdrant_storage:/qdrant/storage
- ./qdrant_snapshots:/qdrant/snapshots
environment:
- QDRANT__SERVICE__HTTP_PORT=6333
- QDRANT__SERVICE__GRPC_PORT=6334
- QDRANT__LOG_LEVEL=INFO
restart: unless-stopped
"""
print("\nDocker Compose配置:")
print(docker_compose_config)
# 5. 验证部署
verification_steps = [
"1. 检查容器状态: docker ps",
"2. 查看日志: docker logs qdrant",
"3. 访问Web UI: http://localhost:6333/dashboard",
"4. 测试API: curl http://localhost:6333/",
"5. 检查健康状态: curl http://localhost:6333/health"
]
print("\n验证部署:")
for step in verification_steps:
print(f" {step}")
# 6. 常用Docker命令
docker_commands = {
"启动": "docker start qdrant",
"停止": "docker stop qdrant",
"重启": "docker restart qdrant",
"查看日志": "docker logs -f qdrant",
"进入容器": "docker exec -it qdrant /bin/bash",
"删除容器": "docker rm -f qdrant"
}
print("\n常用命令:")
for action, cmd in docker_commands.items():
print(f" {action}: {cmd}")
---
b.配置文件
a.功能说明
Qdrant支持通过配置文件自定义设置。配置文件为YAML格式。可以配置服务端口、存储路径、日志级别等。支持集群配置和性能优化。配置文件优先级高于环境变量。提供默认配置和自定义配置。灵活的配置机制满足不同需求。
b.代码示例
---
# Qdrant配置文件 (config.yaml)
qdrant_config = """
service:
http_port: 6333
grpc_port: 6334
enable_tls: false
storage:
storage_path: /qdrant/storage
snapshots_path: /qdrant/snapshots
on_disk_payload: true
wal_capacity_mb: 32
wal_segments_ahead: 0
# 性能优化
optimizers:
deleted_threshold: 0.2
vacuum_min_vector_number: 1000
default_segment_number: 0
max_segment_size: 200000
memmap_threshold: 50000
indexing_threshold: 20000
flush_interval_sec: 5
max_optimization_threads: 1
# HNSW索引配置
hnsw_index:
m: 16
ef_construct: 100
full_scan_threshold: 10000
log_level: INFO
cluster:
enabled: false
# p2p:
# port: 6335
# consensus:
# tick_period_ms: 100
"""
print("Qdrant配置文件:")
print(qdrant_config)
# 使用自定义配置启动
docker_with_config = """
docker run -d \\
--name qdrant \\
-p 6333:6333 \\
-p 6334:6334 \\
-v $(pwd)/config.yaml:/qdrant/config/production.yaml \\
-v $(pwd)/qdrant_storage:/qdrant/storage \\
qdrant/qdrant
"""
print("\n使用自定义配置:")
print(docker_with_config)
# 配置项说明
config_items = {
"service": "服务端口和TLS配置",
"storage": "存储路径和优化器配置",
"log_level": "日志级别(DEBUG/INFO/WARN/ERROR)",
"cluster": "集群模式配置",
"hnsw_index": "HNSW索引默认参数"
}
print("\n配置项说明:")
for item, desc in config_items.items():
print(f" {item}: {desc}")
# 配置优先级
config_priority = [
"1. 命令行参数(最高优先级)",
"2. 环境变量",
"3. 配置文件",
"4. 默认值(最低优先级)"
]
print("\n配置优先级:")
for priority in config_priority:
print(f" {priority}")
---
02.本地安装
a.二进制安装
a.功能说明
Qdrant提供预编译的二进制文件。支持Linux、macOS、Windows平台。下载后直接运行,无需依赖。适合生产环境部署。性能优于Docker方式。支持systemd服务管理。提供自动更新机制。
b.代码示例
---
# 1. 下载Qdrant二进制文件
download_commands = {
"Linux": "wget https://github.com/qdrant/qdrant/releases/download/v1.7.0/qdrant-x86_64-unknown-linux-gnu.tar.gz",
"macOS": "wget https://github.com/qdrant/qdrant/releases/download/v1.7.0/qdrant-x86_64-apple-darwin.tar.gz",
"Windows": "下载 qdrant-x86_64-pc-windows-msvc.zip"
}
print("下载命令:")
for platform, cmd in download_commands.items():
print(f" {platform}: {cmd}")
# 2. 解压和安装
install_steps = """
# 解压
tar -xzf qdrant-x86_64-unknown-linux-gnu.tar.gz
# 移动到系统路径
sudo mv qdrant /usr/local/bin/
# 创建数据目录
sudo mkdir -p /var/lib/qdrant/storage
sudo mkdir -p /var/lib/qdrant/snapshots
# 创建配置目录
sudo mkdir -p /etc/qdrant
"""
print("\n安装步骤:")
print(install_steps)
# 3. 创建systemd服务
systemd_service = """
[Unit]
Description=Qdrant Vector Database
After=network.target
[Service]
Type=simple
User=qdrant
Group=qdrant
ExecStart=/usr/local/bin/qdrant --config-path /etc/qdrant/config.yaml
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
"""
print("\nsystemd服务配置 (/etc/systemd/system/qdrant.service):")
print(systemd_service)
# 4. 服务管理命令
service_commands = {
"启动服务": "sudo systemctl start qdrant",
"停止服务": "sudo systemctl stop qdrant",
"重启服务": "sudo systemctl restart qdrant",
"查看状态": "sudo systemctl status qdrant",
"开机启动": "sudo systemctl enable qdrant",
"查看日志": "sudo journalctl -u qdrant -f"
}
print("\n服务管理:")
for action, cmd in service_commands.items():
print(f" {action}: {cmd}")
# 5. 验证安装
verification = [
"1. 检查版本: qdrant --version",
"2. 测试启动: qdrant --config-path /etc/qdrant/config.yaml",
"3. 访问API: curl http://localhost:6333/",
"4. 查看日志: tail -f /var/log/qdrant/qdrant.log"
]
print("\n验证安装:")
for step in verification:
print(f" {step}")
---
b.源码编译
a.功能说明
从源码编译Qdrant可以获得最佳性能。需要Rust开发环境。支持自定义编译选项。可以启用特定CPU优化。适合高性能要求场景。编译时间较长。需要一定的技术能力。
b.代码示例
---
# 1. 安装Rust环境
rust_install = """
# 安装Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# 配置环境变量
source $HOME/.cargo/env
# 验证安装
rustc --version
cargo --version
"""
print("安装Rust:")
print(rust_install)
# 2. 克隆Qdrant源码
clone_steps = """
# 克隆仓库
git clone https://github.com/qdrant/qdrant.git
cd qdrant
# 切换到稳定版本
git checkout v1.7.0
"""
print("\n克隆源码:")
print(clone_steps)
# 3. 编译Qdrant
build_commands = {
"开发版本": "cargo build",
"发布版本": "cargo build --release",
"优化编译": "RUSTFLAGS='-C target-cpu=native' cargo build --release",
"静态链接": "cargo build --release --target x86_64-unknown-linux-musl"
}
print("\n编译命令:")
for build_type, cmd in build_commands.items():
print(f" {build_type}: {cmd}")
# 4. 运行编译后的程序
run_command = """
# 运行发布版本
./target/release/qdrant --config-path ./config/config.yaml
# 或者安装到系统
sudo cp ./target/release/qdrant /usr/local/bin/
"""
print("\n运行程序:")
print(run_command)
# 5. 编译优化选项
optimization_flags = {
"target-cpu=native": "针对当前CPU优化",
"lto=true": "启用链接时优化",
"codegen-units=1": "单个代码生成单元",
"opt-level=3": "最高优化级别"
}
print("\n编译优化:")
for flag, desc in optimization_flags.items():
print(f" {flag}: {desc}")
# 6. 编译注意事项
build_notes = [
"编译时间较长(10-30分钟)",
"需要足够的内存(建议4GB+)",
"发布版本比开发版本快10-100倍",
"使用target-cpu=native获得最佳性能",
"编译前确保依赖库完整",
"定期更新Rust工具链"
]
print("\n注意事项:")
for i, note in enumerate(build_notes, 1):
print(f" {i}. {note}")
---
2.2 客户端连接
01.Python客户端
a.安装配置
a.功能说明
Qdrant提供官方Python客户端库。支持同步和异步操作。提供完整的API封装。安装简单,使用方便。支持类型提示和代码补全。与主流AI框架集成良好。是最常用的客户端。
b.代码示例
---
# 1. 安装qdrant-client
# pip install qdrant-client
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import numpy as np
# 2. 连接到Qdrant服务器
# 本地连接
client = QdrantClient(host="localhost", port=6333)
# 远程连接
client = QdrantClient(
url="https://your-cluster.qdrant.io",
api_key="your-api-key"
)
# 3. 使用内存模式(开发测试)
client = QdrantClient(":memory:")
# 4. 验证连接
try:
collections = client.get_collections()
print(f"连接成功!当前有 {len(collections.collections)} 个Collection")
except Exception as e:
print(f"连接失败: {e}")
# 5. 客户端配置选项
client_config = {
"host": "服务器地址",
"port": "端口号(默认6333)",
"grpc_port": "gRPC端口(默认6334)",
"prefer_grpc": "优先使用gRPC(默认False)",
"https": "是否使用HTTPS",
"api_key": "API密钥",
"timeout": "请求超时时间(秒)",
"prefix": "API路径前缀"
}
print("\n客户端配置:")
for key, desc in client_config.items():
print(f" {key}: {desc}")
# 6. 高级配置示例
from qdrant_client import QdrantClient
client = QdrantClient(
host="localhost",
port=6333,
grpc_port=6334,
prefer_grpc=True, # 使用gRPC提升性能
timeout=60, # 60秒超时
https=False,
api_key=None
)
print("\n客户端已配置完成")
---
b.异步客户端
a.功能说明
Qdrant支持异步Python客户端。基于asyncio实现。适合高并发场景。可以并行执行多个请求。提升整体吞吐量。与FastAPI等异步框架集成良好。API与同步版本基本一致。
b.代码示例
---
import asyncio
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import numpy as np
# 1. 创建异步客户端
async def main():
client = AsyncQdrantClient(host="localhost", port=6333)
# 2. 异步创建Collection
await client.create_collection(
collection_name="async_test",
vectors_config=VectorParams(size=128, distance=Distance.COSINE)
)
# 3. 异步插入数据
points = [
PointStruct(
id=i,
vector=np.random.random(128).tolist(),
payload={"index": i}
)
for i in range(100)
]
await client.upsert(
collection_name="async_test",
points=points
)
# 4. 异步搜索
results = await client.search(
collection_name="async_test",
query_vector=np.random.random(128).tolist(),
limit=10
)
print(f"找到 {len(results)} 个结果")
# 5. 并发执行多个搜索
search_tasks = [
client.search(
collection_name="async_test",
query_vector=np.random.random(128).tolist(),
limit=10
)
for _ in range(10)
]
# 并发执行
all_results = await asyncio.gather(*search_tasks)
print(f"并发执行了 {len(all_results)} 个搜索")
# 6. 关闭客户端
await client.close()
# 运行异步函数
# asyncio.run(main())
# 异步客户端优势
async_advantages = [
"高并发场景性能更好",
"可以并行执行多个请求",
"与异步框架集成良好",
"提升整体吞吐量",
"资源利用率更高"
]
print("\n异步客户端优势:")
for i, adv in enumerate(async_advantages, 1):
print(f" {i}. {adv}")
# 使用场景
use_cases = {
"Web应用": "FastAPI/Sanic等异步框架",
"批量处理": "并发处理大量数据",
"实时系统": "需要低延迟的实时应用",
"微服务": "异步微服务架构"
}
print("\n使用场景:")
for scenario, desc in use_cases.items():
print(f" {scenario}: {desc}")
---
02.其他客户端
a.REST API
a.功能说明
Qdrant提供完整的RESTful API。支持所有核心功能。使用标准HTTP协议。可以用任何语言调用。适合跨语言集成。提供OpenAPI文档。支持curl等命令行工具。是最通用的接口。
b.代码示例
---
import requests
import json
import numpy as np
# 1. 创建Collection
create_collection_url = "http://localhost:6333/collections/rest_test"
create_payload = {
"vectors": {
"size": 128,
"distance": "Cosine"
}
}
response = requests.put(create_collection_url, json=create_payload)
print(f"创建Collection: {response.status_code}")
# 2. 插入数据
upsert_url = "http://localhost:6333/collections/rest_test/points"
points_payload = {
"points": [
{
"id": 1,
"vector": np.random.random(128).tolist(),
"payload": {"name": "Item 1"}
},
{
"id": 2,
"vector": np.random.random(128).tolist(),
"payload": {"name": "Item 2"}
}
]
}
response = requests.put(upsert_url, json=points_payload)
print(f"插入数据: {response.status_code}")
# 3. 搜索
search_url = "http://localhost:6333/collections/rest_test/points/search"
search_payload = {
"vector": np.random.random(128).tolist(),
"limit": 10,
"with_payload": True,
"with_vector": False
}
response = requests.post(search_url, json=search_payload)
results = response.json()
print(f"搜索结果: {len(results['result'])} 个")
# 4. 获取Collection信息
info_url = "http://localhost:6333/collections/rest_test"
response = requests.get(info_url)
info = response.json()
print(f"\nCollection信息:")
print(json.dumps(info['result'], indent=2))
# 5. 常用REST API端点
api_endpoints = {
"GET /collections": "列出所有Collection",
"PUT /collections/{name}": "创建Collection",
"GET /collections/{name}": "获取Collection信息",
"DELETE /collections/{name}": "删除Collection",
"PUT /collections/{name}/points": "插入/更新点",
"POST /collections/{name}/points/search": "搜索",
"POST /collections/{name}/points/scroll": "滚动查询",
"DELETE /collections/{name}/points": "删除点"
}
print("\nREST API端点:")
for endpoint, desc in api_endpoints.items():
print(f" {endpoint}: {desc}")
# 6. curl示例
curl_examples = """
# 创建Collection
curl -X PUT 'http://localhost:6333/collections/test' \\
-H 'Content-Type: application/json' \\
-d '{
"vectors": {
"size": 128,
"distance": "Cosine"
}
}'
# 搜索
curl -X POST 'http://localhost:6333/collections/test/points/search' \\
-H 'Content-Type: application/json' \\
-d '{
"vector": [0.1, 0.2, ...],
"limit": 10
}'
"""
print("\ncurl示例:")
print(curl_examples)
---
b.gRPC接口
a.功能说明
Qdrant支持gRPC接口。性能优于REST API。适合高性能场景。使用Protocol Buffers序列化。支持流式传输。延迟更低,吞吐量更高。需要生成客户端代码。适合内部服务调用。
b.代码示例
---
from qdrant_client import QdrantClient
# 1. 使用gRPC连接
client = QdrantClient(
host="localhost",
port=6333,
grpc_port=6334,
prefer_grpc=True # 优先使用gRPC
)
# 2. gRPC性能对比
performance_comparison = {
"延迟": {
"REST": "5-10ms",
"gRPC": "1-3ms",
"提升": "50-70%"
},
"吞吐量": {
"REST": "1000 QPS",
"gRPC": "3000-5000 QPS",
"提升": "3-5倍"
},
"序列化": {
"REST": "JSON(文本)",
"gRPC": "Protobuf(二进制)",
"优势": "更紧凑高效"
}
}
print("gRPC性能对比:")
for metric, values in performance_comparison.items():
print(f"\n{metric}:")
for key, value in values.items():
print(f" {key}: {value}")
# 3. gRPC使用场景
grpc_use_cases = [
"高性能要求的生产环境",
"微服务间内部调用",
"大规模批量操作",
"实时搜索系统",
"低延迟要求的应用"
]
print("\ngRPC使用场景:")
for i, case in enumerate(grpc_use_cases, 1):
print(f" {i}. {case}")
# 4. REST vs gRPC选择建议
selection_guide = {
"REST": [
"开发测试环境",
"跨语言集成",
"简单的CRUD操作",
"需要人工调试",
"对性能要求不高"
],
"gRPC": [
"生产环境",
"高性能要求",
"大规模数据操作",
"内部服务调用",
"低延迟需求"
]
}
print("\n选择建议:")
for protocol, scenarios in selection_guide.items():
print(f"\n{protocol}:")
for scenario in scenarios:
print(f" - {scenario}")
# 5. gRPC配置示例
grpc_config = {
"prefer_grpc": True, # 优先使用gRPC
"grpc_port": 6334, # gRPC端口
"timeout": 60, # 超时时间
"max_message_size": 100 * 1024 * 1024 # 最大消息大小
}
print("\ngRPC配置:")
for key, value in grpc_config.items():
print(f" {key}: {value}")
---
2.3 基础操作
01.Collection操作
a.创建Collection
a.功能说明
Collection是Qdrant中存储向量的容器。创建时需指定向量维度和距离度量。支持多向量配置。可以配置HNSW索引参数。支持量化和优化器配置。创建后可以动态调整部分参数。是使用Qdrant的第一步。
b.代码示例
---
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, HnswConfigDiff
client = QdrantClient(host="localhost", port=6333)
# 1. 创建简单Collection
client.create_collection(
collection_name="simple_collection",
vectors_config=VectorParams(
size=768, # 向量维度
distance=Distance.COSINE # 距离度量
)
)
# 2. 创建带HNSW配置的Collection
client.create_collection(
collection_name="optimized_collection",
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE,
hnsw_config=HnswConfigDiff(
m=32, # 连接数
ef_construct=200, # 构建时搜索宽度
full_scan_threshold=10000 # 全扫描阈值
)
)
)
# 3. 创建多向量Collection
client.create_collection(
collection_name="multimodal_collection",
vectors_config={
"text": VectorParams(size=768, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.COSINE)
}
)
# 4. 创建分片Collection
client.create_collection(
collection_name="sharded_collection",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
shard_number=4, # 4个分片
replication_factor=2 # 每个分片2个副本
)
print("Collection创建完成")
# 5. 距离度量说明
distance_metrics = {
"Cosine": "余弦距离,适合文本向量,范围[-1,1]",
"Euclid": "欧氏距离,适合图像向量,范围[0,∞]",
"Dot": "点积距离,适合归一化向量,范围[-∞,∞]"
}
print("\n距离度量:")
for metric, desc in distance_metrics.items():
print(f" {metric}: {desc}")
# 6. 创建参数说明
create_params = {
"collection_name": "Collection名称",
"vectors_config": "向量配置(单向量或多向量)",
"shard_number": "分片数量(默认1)",
"replication_factor": "副本因子(默认1)",
"write_consistency_factor": "写一致性因子",
"on_disk_payload": "是否将Payload存储在磁盘"
}
print("\n创建参数:")
for param, desc in create_params.items():
print(f" {param}: {desc}")
---
b.查看Collection
a.功能说明
可以查看Collection的详细信息。包括向量配置、索引参数、数据统计。查看分片和副本信息。监控Collection状态。获取优化器配置。了解存储使用情况。是运维和调试的重要工具。
b.代码示例
---
# 1. 列出所有Collection
collections = client.get_collections()
print("所有Collection:")
for collection in collections.collections:
print(f" - {collection.name}")
# 2. 获取Collection详细信息
collection_info = client.get_collection(collection_name="simple_collection")
print("\nCollection详细信息:")
print(f" 名称: {collection_info.config.params.vectors.size}")
print(f" 向量维度: {collection_info.config.params.vectors.size}")
print(f" 距离度量: {collection_info.config.params.vectors.distance}")
print(f" 数据量: {collection_info.points_count}")
print(f" 分片数: {collection_info.config.params.shard_number}")
print(f" 副本因子: {collection_info.config.params.replication_factor}")
# 3. 获取Collection统计信息
print("\n存储统计:")
print(f" 向量数量: {collection_info.points_count}")
print(f" 索引状态: {collection_info.status}")
print(f" 优化器状态: {collection_info.optimizer_status}")
# 4. 检查Collection是否存在
def collection_exists(collection_name):
try:
client.get_collection(collection_name)
return True
except Exception:
return False
if collection_exists("simple_collection"):
print("\nCollection存在")
else:
print("\nCollection不存在")
# 5. Collection状态说明
status_description = {
"green": "健康状态,所有分片正常",
"yellow": "部分分片不可用,但服务正常",
"red": "严重问题,部分数据不可访问"
}
print("\n状态说明:")
for status, desc in status_description.items():
print(f" {status}: {desc}")
---
02.数据操作
a.插入数据
a.功能说明
向Collection中插入向量数据。支持单条和批量插入。每个Point包含ID、向量和Payload。ID可以是整数或UUID。支持自动生成ID。批量插入性能更好。插入操作是幂等的。
b.代码示例
---
import numpy as np
from qdrant_client.models import PointStruct
import uuid
# 1. 插入单条数据
point = PointStruct(
id=1,
vector=np.random.random(768).tolist(),
payload={"name": "Item 1", "category": "A"}
)
client.upsert(
collection_name="simple_collection",
points=[point]
)
# 2. 批量插入数据
points = [
PointStruct(
id=i,
vector=np.random.random(768).tolist(),
payload={"name": f"Item {i}", "index": i}
)
for i in range(1000)
]
client.upsert(
collection_name="simple_collection",
points=points,
wait=True # 等待插入完成
)
print(f"插入了 {len(points)} 条数据")
# 3. 使用UUID作为ID
uuid_points = [
PointStruct(
id=str(uuid.uuid4()),
vector=np.random.random(768).tolist(),
payload={"name": f"UUID Item {i}"}
)
for i in range(100)
]
client.upsert(
collection_name="simple_collection",
points=uuid_points
)
# 4. 插入多向量数据
multimodal_point = PointStruct(
id=1,
vector={
"text": np.random.random(768).tolist(),
"image": np.random.random(512).tolist()
},
payload={"title": "Multimodal Item"}
)
client.upsert(
collection_name="multimodal_collection",
points=[multimodal_point]
)
# 5. 批量插入性能优化
batch_size = 100
total_points = 10000
for i in range(0, total_points, batch_size):
batch = [
PointStruct(
id=j,
vector=np.random.random(768).tolist(),
payload={"batch": i // batch_size, "index": j}
)
for j in range(i, min(i + batch_size, total_points))
]
client.upsert(
collection_name="simple_collection",
points=batch,
wait=False # 异步插入,提升性能
)
print(f"\n批量插入 {total_points} 条数据完成")
# 6. 插入最佳实践
best_practices = [
"使用批量插入提升性能",
"批量大小建议100-1000",
"wait=False异步插入更快",
"定期检查插入结果",
"使用UUID避免ID冲突",
"合理设置Payload大小"
]
print("\n插入最佳实践:")
for i, practice in enumerate(best_practices, 1):
print(f" {i}. {practice}")
---
b.查询数据
a.功能说明
Qdrant提供多种数据查询方式。向量搜索是核心功能。支持过滤条件组合。可以滚动查询所有数据。支持按ID获取数据。提供分页和限制功能。灵活的查询能力满足不同需求。
b.代码示例
---
# 1. 向量搜索
query_vector = np.random.random(768).tolist()
results = client.search(
collection_name="simple_collection",
query_vector=query_vector,
limit=10,
with_payload=True, # 返回Payload
with_vector=False # 不返回向量
)
print("搜索结果:")
for result in results:
print(f" ID: {result.id}, Score: {result.score:.4f}, Payload: {result.payload}")
# 2. 带过滤的搜索
from qdrant_client.models import Filter, FieldCondition, MatchValue
filtered_results = client.search(
collection_name="simple_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="A")
)
]
),
limit=10
)
print(f"\n过滤搜索找到 {len(filtered_results)} 个结果")
# 3. 按ID获取数据
point = client.retrieve(
collection_name="simple_collection",
ids=[1, 2, 3],
with_payload=True,
with_vector=True
)
print(f"\n获取了 {len(point)} 个Point")
# 4. 滚动查询(遍历所有数据)
offset = None
all_points = []
while True:
result = client.scroll(
collection_name="simple_collection",
limit=100,
offset=offset,
with_payload=True,
with_vector=False
)
points, offset = result
all_points.extend(points)
if offset is None:
break
print(f"\n滚动查询获取了 {len(all_points)} 个Point")
# 5. 计数查询
from qdrant_client.models import CountRequest
count = client.count(
collection_name="simple_collection",
count_filter=Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="A"))
]
),
exact=True
)
print(f"\n符合条件的数据量: {count.count}")
# 6. 查询参数说明
query_params = {
"limit": "返回结果数量",
"offset": "分页偏移量",
"with_payload": "是否返回Payload",
"with_vector": "是否返回向量",
"score_threshold": "最低分数阈值",
"query_filter": "过滤条件"
}
print("\n查询参数:")
for param, desc in query_params.items():
print(f" {param}: {desc}")
---
3 Collection管理
3.1 创建Collection
01.基础配置
a.向量参数
a.功能说明
创建Collection时需要配置向量参数。指定向量维度和距离度量。支持单向量和多向量配置。可以配置HNSW索引参数。支持量化和优化器配置。配置影响性能和存储。合理配置是性能优化的基础。
b.代码示例
---
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, HnswConfigDiff
client = QdrantClient(host="localhost", port=6333)
# 1. 基础向量配置
client.create_collection(
collection_name="basic_collection",
vectors_config=VectorParams(
size=768, # 向量维度
distance=Distance.COSINE # 距离度量
)
)
# 2. 带HNSW配置
client.create_collection(
collection_name="hnsw_collection",
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE,
hnsw_config=HnswConfigDiff(
m=32,
ef_construct=200,
full_scan_threshold=10000
)
)
)
# 3. 多向量配置
client.create_collection(
collection_name="multi_vector",
vectors_config={
"text": VectorParams(size=768, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.EUCLID)
}
)
print("Collection创建完成")
---
b.分片配置
a.功能说明
Qdrant支持数据分片和副本配置。分片提升并发性能和存储容量。副本提供高可用性和容错能力。可以配置分片数量和副本因子。支持动态调整分片和副本。合理配置分片和副本平衡性能和可用性。
b.代码示例
---
# 创建分片Collection
client.create_collection(
collection_name="sharded_collection",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
shard_number=4, # 4个分片
replication_factor=2, # 每个分片2个副本
write_consistency_factor=1 # 写一致性因子
)
# 分片配置说明
shard_config = {
"shard_number": "分片数量,影响并发性能",
"replication_factor": "副本因子,影响可用性",
"write_consistency_factor": "写一致性,影响写入性能"
}
print("分片配置:")
for key, desc in shard_config.items():
print(f" {key}: {desc}")
---
02.高级配置
a.优化器配置
a.功能说明
Qdrant提供优化器配置选项。控制索引构建和数据压缩。配置内存和磁盘使用策略。调整刷盘频率和合并策略。优化器配置影响性能和资源使用。合理配置优化器提升系统效率。
b.代码示例
---
from qdrant_client.models import OptimizersConfigDiff
client.create_collection(
collection_name="optimized_collection",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
optimizers_config=OptimizersConfigDiff(
deleted_threshold=0.2,
vacuum_min_vector_number=1000,
default_segment_number=0,
max_segment_size=200000,
memmap_threshold=50000,
indexing_threshold=20000,
flush_interval_sec=5,
max_optimization_threads=1
)
)
print("优化器配置完成")
---
b.存储配置
a.功能说明
Qdrant支持灵活的存储配置。可以选择内存或磁盘存储。支持Payload存储在磁盘。配置WAL容量和段数。调整mmap阈值优化内存使用。存储配置影响性能和成本。根据数据规模选择合适的存储策略。
b.代码示例
---
# 创建磁盘存储Collection
client.create_collection(
collection_name="disk_collection",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
on_disk_payload=True # Payload存储在磁盘
)
# 存储策略
storage_strategies = {
"内存存储": "性能最好,成本高,适合小数据集",
"磁盘存储": "成本低,性能略降,适合大数据集",
"混合存储": "向量在内存,Payload在磁盘"
}
print("存储策略:")
for strategy, desc in storage_strategies.items():
print(f" {strategy}: {desc}")
---
3.2 向量配置
01.距离度量
a.余弦距离
a.功能说明
余弦距离衡量向量方向的相似度。范围[-1,1],1表示完全相同。适合文本向量和归一化向量。不受向量长度影响。是最常用的距离度量。语义搜索的首选。计算效率高。
b.代码示例
---
from qdrant_client.models import Distance, VectorParams
# 使用余弦距离
client.create_collection(
collection_name="cosine_collection",
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE
)
)
# 余弦距离特点
cosine_features = {
"范围": "[-1, 1]",
"1": "完全相同",
"0": "正交",
"-1": "完全相反",
"适用": "文本向量、归一化向量",
"优势": "不受向量模长影响"
}
print("余弦距离特点:")
for key, value in cosine_features.items():
print(f" {key}: {value}")
# 余弦相似度计算示例
import numpy as np
def cosine_similarity(vec1, vec2):
"""计算余弦相似度"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
return dot_product / (norm1 * norm2)
vec_a = np.array([1, 2, 3])
vec_b = np.array([4, 5, 6])
similarity = cosine_similarity(vec_a, vec_b)
print(f"\n余弦相似度: {similarity:.4f}")
---
b.欧氏距离
a.功能说明
欧氏距离衡量向量在空间中的直线距离。范围[0,∞],0表示完全相同。适合图像向量和未归一化向量。受向量长度影响。计算直观易理解。适合需要考虑向量模长的场景。
b.代码示例
---
# 使用欧氏距离
client.create_collection(
collection_name="euclid_collection",
vectors_config=VectorParams(
size=512,
distance=Distance.EUCLID
)
)
# 欧氏距离特点
euclid_features = {
"范围": "[0, ∞]",
"0": "完全相同",
"越大": "越不相似",
"适用": "图像向量、未归一化向量",
"优势": "计算直观,物理意义明确"
}
print("欧氏距离特点:")
for key, value in euclid_features.items():
print(f" {key}: {value}")
# 欧氏距离计算示例
def euclidean_distance(vec1, vec2):
"""计算欧氏距离"""
return np.linalg.norm(vec1 - vec2)
vec_a = np.array([1, 2, 3])
vec_b = np.array([4, 5, 6])
distance = euclidean_distance(vec_a, vec_b)
print(f"\n欧氏距离: {distance:.4f}")
# 使用场景对比
use_cases = {
"余弦距离": ["文本嵌入", "语义搜索", "推荐系统", "归一化向量"],
"欧氏距离": ["图像特征", "人脸识别", "异常检测", "未归一化向量"]
}
print("\n使用场景:")
for metric, cases in use_cases.items():
print(f" {metric}:")
for case in cases:
print(f" - {case}")
---
02.向量类型
a.密集向量
a.功能说明
密集向量是最常见的向量类型。每个维度都有值。适合大多数AI应用。支持各种距离度量。索引效率高。是Qdrant的默认向量类型。广泛应用于各种场景。
b.代码示例
---
import numpy as np
from qdrant_client.models import PointStruct
# 创建密集向量
dense_vector = np.random.random(768).tolist()
point = PointStruct(
id=1,
vector=dense_vector,
payload={"type": "dense", "model": "text-embedding-ada-002"}
)
client.upsert(
collection_name="cosine_collection",
points=[point]
)
print("密集向量插入完成")
# 密集向量特点
dense_features = [
"每个维度都有值",
"适合深度学习模型输出",
"支持HNSW等高效索引",
"查询性能优异",
"是最常用的向量类型"
]
print("\n密集向量特点:")
for i, feature in enumerate(dense_features, 1):
print(f" {i}. {feature}")
# 常见密集向量来源
vector_sources = {
"文本": "BERT、GPT、Sentence Transformers",
"图像": "ResNet、ViT、CLIP",
"音频": "Wav2Vec、HuBERT",
"多模态": "CLIP、ALIGN"
}
print("\n常见向量来源:")
for data_type, models in vector_sources.items():
print(f" {data_type}: {models}")
---
b.命名向量
a.功能说明
命名向量允许一个Point存储多个向量。每个向量有独立的名称。支持不同维度和距离度量。适合多模态数据。可以分别检索各向量。提供更灵活的数据组织方式。是多模态应用的关键特性。
b.代码示例
---
# 创建命名向量Collection
client.create_collection(
collection_name="named_vectors",
vectors_config={
"text": VectorParams(size=768, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.EUCLID),
"audio": VectorParams(size=256, distance=Distance.DOT)
}
)
# 插入命名向量
point = PointStruct(
id=1,
vector={
"text": np.random.random(768).tolist(),
"image": np.random.random(512).tolist(),
"audio": np.random.random(256).tolist()
},
payload={"title": "Multimodal Data", "type": "video"}
)
client.upsert(collection_name="named_vectors", points=[point])
# 按文本向量搜索
text_results = client.search(
collection_name="named_vectors",
query_vector=("text", np.random.random(768).tolist()),
limit=10
)
# 按图像向量搜索
image_results = client.search(
collection_name="named_vectors",
query_vector=("image", np.random.random(512).tolist()),
limit=10
)
print(f"文本搜索找到 {len(text_results)} 个结果")
print(f"图像搜索找到 {len(image_results)} 个结果")
# 命名向量应用场景
named_vector_use_cases = [
"多模态搜索:文本+图像+音频",
"跨模态检索:以图搜文、以文搜图",
"混合推荐:结合多种特征",
"多语言支持:不同语言的向量",
"多粒度表示:词级+句级+文档级"
]
print("\n命名向量应用场景:")
for i, use_case in enumerate(named_vector_use_cases, 1):
print(f" {i}. {use_case}")
# 命名向量优势
advantages = {
"灵活性": "一个Point存储多种向量",
"独立性": "每个向量独立配置和检索",
"高效性": "避免数据冗余和同步问题",
"扩展性": "易于添加新的向量类型"
}
print("\n命名向量优势:")
for advantage, desc in advantages.items():
print(f" {advantage}: {desc}")
---
3.3 Payload Schema
01.Payload结构
a.数据类型
a.功能说明
Payload支持丰富的数据类型。包括字符串、数字、布尔、数组、对象。支持嵌套结构。可以存储复杂的元数据。类型灵活,易于扩展。是向量数据的重要补充。提供业务逻辑支持。
b.代码示例
---
# Payload示例
payload = {
"title": "Product Name", # 字符串
"price": 99.99, # 浮点数
"rating": 5, # 整数
"in_stock": True, # 布尔值
"tags": ["electronics", "smartphone"], # 数组
"metadata": { # 嵌套对象
"brand": "Apple",
"model": "iPhone 15",
"specs": {
"storage": "256GB",
"color": "Black",
"ram": "8GB"
}
},
"created_at": "2024-01-01T00:00:00Z" # 时间戳
}
point = PointStruct(
id=1,
vector=np.random.random(768).tolist(),
payload=payload
)
client.upsert(collection_name="products", points=[point])
print("Payload插入完成")
# Payload数据类型说明
data_types = {
"string": "字符串类型,适合文本字段",
"integer": "整数类型,适合计数、ID等",
"float": "浮点数类型,适合价格、评分等",
"bool": "布尔类型,适合状态标志",
"array": "数组类型,适合标签、类别等",
"object": "对象类型,适合嵌套结构",
"null": "空值类型"
}
print("\nPayload数据类型:")
for dtype, desc in data_types.items():
print(f" {dtype}: {desc}")
# Payload最佳实践
best_practices = [
"合理组织Payload结构",
"避免存储过大的数据",
"使用有意义的字段名",
"保持数据类型一致",
"为常用字段创建索引",
"定期清理无用字段"
]
print("\nPayload最佳实践:")
for i, practice in enumerate(best_practices, 1):
print(f" {i}. {practice}")
---
b.索引配置
a.功能说明
Payload可以创建索引加速查询。支持多种字段类型的索引。索引提升过滤性能。可以为嵌套字段创建索引。索引占用额外存储空间。合理创建索引平衡性能和存储。是优化查询性能的关键。
b.代码示例
---
# 创建Payload索引
client.create_payload_index(
collection_name="products",
field_name="price",
field_schema="float"
)
client.create_payload_index(
collection_name="products",
field_name="tags",
field_schema="keyword"
)
client.create_payload_index(
collection_name="products",
field_name="in_stock",
field_schema="bool"
)
# 为嵌套字段创建索引
client.create_payload_index(
collection_name="products",
field_name="metadata.brand",
field_schema="keyword"
)
print("Payload索引创建完成")
# 索引类型说明
index_types = {
"keyword": {
"描述": "关键词索引,适合字符串",
"适用": "品牌、类别、标签等",
"特点": "精确匹配,支持数组"
},
"integer": {
"描述": "整数索引",
"适用": "ID、数量、年份等",
"特点": "支持范围查询"
},
"float": {
"描述": "浮点数索引",
"适用": "价格、评分、距离等",
"特点": "支持范围查询"
},
"bool": {
"描述": "布尔值索引",
"适用": "状态标志",
"特点": "二值查询"
},
"geo": {
"描述": "地理位置索引",
"适用": "经纬度坐标",
"特点": "支持地理范围查询"
}
}
print("\n索引类型:")
for idx_type, info in index_types.items():
print(f"\n{idx_type}:")
for key, value in info.items():
print(f" {key}: {value}")
# 索引性能影响
index_impact = {
"查询性能": "提升10-100倍",
"存储空间": "增加10-30%",
"写入性能": "略微下降5-10%",
"内存占用": "增加索引大小"
}
print("\n索引性能影响:")
for aspect, impact in index_impact.items():
print(f" {aspect}: {impact}")
# 索引创建建议
index_recommendations = [
"为常用过滤字段创建索引",
"避免为所有字段创建索引",
"大数据集优先创建索引",
"定期评估索引使用情况",
"删除未使用的索引",
"索引创建后需要时间生效"
]
print("\n索引创建建议:")
for i, rec in enumerate(index_recommendations, 1):
print(f" {i}. {rec}")
---
02.Payload操作
a.更新Payload
a.功能说明
Qdrant支持动态更新Payload。可以添加、修改、删除字段。支持批量更新。更新操作不影响向量。提供灵活的元数据管理。适合动态变化的业务场景。是数据维护的重要工具。
b.代码示例
---
# 1. 设置Payload(添加或更新字段)
client.set_payload(
collection_name="products",
payload={"price": 89.99, "discount": True, "discount_rate": 0.1},
points=[1, 2, 3]
)
print("Payload更新完成")
# 2. 删除Payload字段
client.delete_payload(
collection_name="products",
keys=["discount", "discount_rate"],
points=[1, 2, 3]
)
print("Payload字段删除完成")
# 3. 清空Payload(保留向量)
client.clear_payload(
collection_name="products",
points=[1]
)
print("Payload已清空")
# 4. 批量更新Payload
for i in range(100):
client.set_payload(
collection_name="products",
payload={"last_updated": "2024-01-01", "version": 2},
points=[i],
wait=False # 异步更新
)
print("批量更新完成")
# 5. 条件更新Payload
from qdrant_client.models import Filter, FieldCondition, MatchValue
# 先查询符合条件的Point
results = client.scroll(
collection_name="products",
scroll_filter=Filter(
must=[
FieldCondition(key="in_stock", match=MatchValue(value=True))
]
),
limit=1000
)
points, _ = results
point_ids = [p.id for p in points]
# 批量更新
client.set_payload(
collection_name="products",
payload={"available": True},
points=point_ids
)
print(f"更新了 {len(point_ids)} 个Point的Payload")
# Payload更新最佳实践
update_best_practices = [
"使用批量更新提升性能",
"wait=False异步更新更快",
"定期清理无用字段",
"更新前备份重要数据",
"监控更新操作的影响",
"避免频繁更新大量数据"
]
print("\nPayload更新最佳实践:")
for i, practice in enumerate(update_best_practices, 1):
print(f" {i}. {practice}")
---
b.Payload查询
a.功能说明
可以基于Payload进行查询和过滤。支持等值、范围、匹配等条件。可以组合多个过滤条件。Payload查询与向量搜索结合。提供强大的数据检索能力。是实现业务逻辑的关键。支持复杂的过滤表达式。
b.代码示例
---
from qdrant_client.models import Filter, FieldCondition, Range, MatchValue, MatchAny
# 1. 等值查询
results = client.scroll(
collection_name="products",
scroll_filter=Filter(
must=[
FieldCondition(key="brand", match=MatchValue(value="Apple"))
]
),
limit=100
)
points, offset = results
print(f"等值查询找到 {len(points)} 个结果")
# 2. 范围查询
results = client.scroll(
collection_name="products",
scroll_filter=Filter(
must=[
FieldCondition(key="price", range=Range(gte=50, lte=200))
]
),
limit=100
)
points, offset = results
print(f"范围查询找到 {len(points)} 个结果")
# 3. 数组匹配查询
results = client.scroll(
collection_name="products",
scroll_filter=Filter(
must=[
FieldCondition(key="tags", match=MatchAny(any=["electronics", "smartphone"]))
]
),
limit=100
)
points, offset = results
print(f"数组匹配查询找到 {len(points)} 个结果")
# 4. 组合查询(AND/OR/NOT)
complex_filter = Filter(
must=[ # AND条件
FieldCondition(key="in_stock", match=MatchValue(value=True)),
FieldCondition(key="price", range=Range(gte=50, lte=200))
],
should=[ # OR条件
FieldCondition(key="brand", match=MatchValue(value="Apple")),
FieldCondition(key="brand", match=MatchValue(value="Samsung"))
],
must_not=[ # NOT条件
FieldCondition(key="rating", range=Range(lt=3.0))
]
)
results = client.scroll(
collection_name="products",
scroll_filter=complex_filter,
limit=100
)
points, offset = results
print(f"组合查询找到 {len(points)} 个结果")
# 5. 嵌套字段查询
results = client.scroll(
collection_name="products",
scroll_filter=Filter(
must=[
FieldCondition(key="metadata.brand", match=MatchValue(value="Apple")),
FieldCondition(key="metadata.specs.storage", match=MatchValue(value="256GB"))
]
),
limit=100
)
points, offset = results
print(f"嵌套字段查询找到 {len(points)} 个结果")
# Payload查询操作符
query_operators = {
"match": "精确匹配",
"match_any": "匹配任意一个",
"range": "范围查询(gte/gt/lte/lt)",
"geo_radius": "地理半径查询",
"geo_bounding_box": "地理边界框查询",
"values_count": "数组长度查询"
}
print("\nPayload查询操作符:")
for operator, desc in query_operators.items():
print(f" {operator}: {desc}")
# 查询性能优化建议
query_optimization = [
"为常用过滤字段创建索引",
"避免过于复杂的过滤条件",
"使用must而不是should提升性能",
"范围查询尽量缩小范围",
"避免查询嵌套层级过深的字段",
"定期分析慢查询"
]
print("\n查询性能优化:")
for i, tip in enumerate(query_optimization, 1):
print(f" {i}. {tip}")
---
4 数据操作
4.1 点操作Points
01.插入数据
a.单点插入
a.功能说明
Point是Qdrant中的基本数据单元。包含ID、向量和Payload三部分。ID可以是整数或UUID字符串。向量是核心数据。Payload存储元数据。插入操作是幂等的,相同ID会覆盖。支持同步和异步插入。
b.代码示例
---
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import numpy as np
client = QdrantClient(host="localhost", port=6333)
# 1. 创建单个Point
point = PointStruct(
id=1, # 整数ID
vector=np.random.random(768).tolist(),
payload={
"title": "Sample Document",
"category": "tech",
"author": "John Doe",
"created_at": "2024-01-01"
}
)
# 2. 插入Point
client.upsert(
collection_name="documents",
points=[point],
wait=True # 等待插入完成
)
print("Point插入完成")
# 3. 使用UUID作为ID
import uuid
uuid_point = PointStruct(
id=str(uuid.uuid4()), # UUID字符串
vector=np.random.random(768).tolist(),
payload={"title": "UUID Document"}
)
client.upsert(
collection_name="documents",
points=[uuid_point]
)
print("UUID Point插入完成")
# Point结构说明
point_structure = {
"id": "唯一标识符(整数或UUID字符串)",
"vector": "向量数据(列表或字典)",
"payload": "元数据(JSON对象,可选)"
}
print("\nPoint结构:")
for field, desc in point_structure.items():
print(f" {field}: {desc}")
# 插入参数说明
upsert_params = {
"collection_name": "目标Collection名称",
"points": "Point列表",
"wait": "是否等待完成(True/False)",
"ordering": "写入顺序保证(weak/medium/strong)"
}
print("\n插入参数:")
for param, desc in upsert_params.items():
print(f" {param}: {desc}")
---
b.批量插入
a.功能说明
批量插入可以显著提升性能。一次插入多个Point。减少网络往返次数。支持异步插入提升吞吐量。建议批量大小100-1000。过大的批量可能导致内存问题。合理的批量大小平衡性能和资源。
b.代码示例
---
# 1. 批量创建Points
batch_size = 100
points = [
PointStruct(
id=i,
vector=np.random.random(768).tolist(),
payload={
"index": i,
"batch": i // batch_size,
"category": f"cat_{i % 5}"
}
)
for i in range(1000)
]
# 2. 批量插入
client.upsert(
collection_name="documents",
points=points,
wait=False # 异步插入,提升性能
)
print(f"批量插入 {len(points)} 个Points")
# 3. 分批插入大量数据
total_points = 100000
batch_size = 100
for i in range(0, total_points, batch_size):
batch = [
PointStruct(
id=j,
vector=np.random.random(768).tolist(),
payload={"index": j}
)
for j in range(i, min(i + batch_size, total_points))
]
client.upsert(
collection_name="documents",
points=batch,
wait=False
)
if (i + batch_size) % 10000 == 0:
print(f"已插入 {i + batch_size} 个Points")
print(f"\n总共插入 {total_points} 个Points")
# 批量插入性能对比
performance_comparison = {
"单点插入": {
"速度": "100-500 points/s",
"适用": "少量数据、实时插入"
},
"批量插入(100)": {
"速度": "5,000-10,000 points/s",
"适用": "中等规模数据"
},
"批量插入(1000)": {
"速度": "10,000-50,000 points/s",
"适用": "大规模数据导入"
}
}
print("\n性能对比:")
for method, metrics in performance_comparison.items():
print(f"\n{method}:")
for key, value in metrics.items():
print(f" {key}: {value}")
# 批量插入最佳实践
best_practices = [
"批量大小建议100-1000",
"使用wait=False异步插入",
"监控内存使用情况",
"大数据集分批插入",
"定期检查插入进度",
"插入完成后等待索引构建"
]
print("\n批量插入最佳实践:")
for i, practice in enumerate(best_practices, 1):
print(f" {i}. {practice}")
---
02.查询数据
a.按ID查询
a.功能说明
可以通过ID直接获取Point数据。支持单个或多个ID查询。返回完整的Point信息。可以选择是否返回向量和Payload。查询速度快,适合精确查找。是最基础的查询方式。
b.代码示例
---
# 1. 查询单个Point
point = client.retrieve(
collection_name="documents",
ids=[1],
with_payload=True,
with_vector=True
)
print(f"查询到Point: ID={point[0].id}")
print(f"Payload: {point[0].payload}")
# 2. 查询多个Points
points = client.retrieve(
collection_name="documents",
ids=[1, 2, 3, 4, 5],
with_payload=True,
with_vector=False # 不返回向量,节省带宽
)
print(f"\n查询到 {len(points)} 个Points")
for p in points:
print(f" ID: {p.id}, Payload: {p.payload}")
# 3. 查询UUID Points
uuid_ids = [str(uuid.uuid4()) for _ in range(5)]
# 先插入
uuid_points = [
PointStruct(
id=uid,
vector=np.random.random(768).tolist(),
payload={"uuid": uid}
)
for uid in uuid_ids
]
client.upsert(collection_name="documents", points=uuid_points)
# 再查询
retrieved = client.retrieve(
collection_name="documents",
ids=uuid_ids,
with_payload=True
)
print(f"\n查询到 {len(retrieved)} 个UUID Points")
# 查询参数说明
retrieve_params = {
"collection_name": "Collection名称",
"ids": "ID列表(整数或字符串)",
"with_payload": "是否返回Payload(默认True)",
"with_vector": "是否返回向量(默认False)"
}
print("\n查询参数:")
for param, desc in retrieve_params.items():
print(f" {param}: {desc}")
# 查询性能特点
query_features = [
"O(1)时间复杂度,速度极快",
"支持批量查询",
"可选择返回内容,节省带宽",
"适合精确查找",
"不受Collection大小影响"
]
print("\n查询性能特点:")
for i, feature in enumerate(query_features, 1):
print(f" {i}. {feature}")
---
b.滚动查询
a.功能说明
滚动查询用于遍历Collection中的所有数据。支持分页和过滤。可以指定每页大小。返回数据和下一页偏移量。适合数据导出和全量扫描。支持Payload过滤。是数据迁移的重要工具。
b.代码示例
---
from qdrant_client.models import Filter, FieldCondition, MatchValue
# 1. 基础滚动查询
offset = None
all_points = []
while True:
result = client.scroll(
collection_name="documents",
limit=100, # 每页100条
offset=offset,
with_payload=True,
with_vector=False
)
points, offset = result
all_points.extend(points)
print(f"已获取 {len(all_points)} 个Points")
if offset is None: # 没有更多数据
break
print(f"\n总共获取 {len(all_points)} 个Points")
# 2. 带过滤的滚动查询
offset = None
filtered_points = []
while True:
result = client.scroll(
collection_name="documents",
scroll_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="tech")
)
]
),
limit=100,
offset=offset,
with_payload=True
)
points, offset = result
filtered_points.extend(points)
if offset is None:
break
print(f"\n过滤后获取 {len(filtered_points)} 个Points")
# 3. 数据导出示例
import json
offset = None
export_data = []
while True:
result = client.scroll(
collection_name="documents",
limit=1000,
offset=offset,
with_payload=True,
with_vector=True
)
points, offset = result
for p in points:
export_data.append({
"id": p.id,
"vector": p.vector,
"payload": p.payload
})
if offset is None:
break
# 保存到文件
# with open("export.json", "w") as f:
# json.dump(export_data, f)
print(f"\n导出 {len(export_data)} 个Points")
# 滚动查询参数
scroll_params = {
"collection_name": "Collection名称",
"limit": "每页数量(默认10)",
"offset": "分页偏移量",
"scroll_filter": "过滤条件",
"with_payload": "是否返回Payload",
"with_vector": "是否返回向量"
}
print("\n滚动查询参数:")
for param, desc in scroll_params.items():
print(f" {param}: {desc}")
# 滚动查询最佳实践
scroll_best_practices = [
"合理设置limit,建议100-1000",
"大数据集分批处理",
"使用过滤减少数据量",
"不需要向量时设置with_vector=False",
"定期保存进度,支持断点续传",
"监控内存使用"
]
print("\n滚动查询最佳实践:")
for i, practice in enumerate(scroll_best_practices, 1):
print(f" {i}. {practice}")
---
4.2 批量上传
01.批量策略
a.分批上传
a.功能说明
大规模数据需要分批上传。避免单次请求过大。减少内存压力。支持断点续传。可以并行上传提升速度。监控上传进度。合理的批量策略提升效率。
b.代码示例
---
import numpy as np
from qdrant_client.models import PointStruct
# 1. 分批上传配置
total_count = 1000000 # 100万条数据
batch_size = 1000 # 每批1000条
# 2. 分批上传
for i in range(0, total_count, batch_size):
batch = [
PointStruct(
id=j,
vector=np.random.random(768).tolist(),
payload={"index": j, "batch": i // batch_size}
)
for j in range(i, min(i + batch_size, total_count))
]
client.upsert(
collection_name="large_dataset",
points=batch,
wait=False # 异步上传
)
# 显示进度
if (i + batch_size) % 100000 == 0:
progress = (i + batch_size) / total_count * 100
print(f"上传进度: {progress:.1f}% ({i + batch_size}/{total_count})")
print("\n批量上传完成")
# 批量大小建议
batch_size_guide = {
"< 1万": "批量100-500",
"1万-10万": "批量500-1000",
"10万-100万": "批量1000-5000",
"> 100万": "批量5000-10000"
}
print("\n批量大小建议:")
for scale, recommendation in batch_size_guide.items():
print(f" {scale}: {recommendation}")
---
b.并行上传
a.功能说明
并行上传可以显著提升速度。使用多线程或多进程。充分利用网络带宽。注意控制并发数。避免过多并发导致服务器压力。合理的并行策略平衡速度和稳定性。
b.代码示例
---
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def upload_batch(batch_id, batch_data):
"""上传单个批次"""
try:
client.upsert(
collection_name="parallel_upload",
points=batch_data,
wait=False
)
return batch_id, True, len(batch_data)
except Exception as e:
return batch_id, False, str(e)
# 准备数据
total_count = 100000
batch_size = 1000
batches = []
for i in range(0, total_count, batch_size):
batch = [
PointStruct(
id=j,
vector=np.random.random(768).tolist(),
payload={"index": j}
)
for j in range(i, min(i + batch_size, total_count))
]
batches.append((i // batch_size, batch))
# 并行上传
max_workers = 4 # 4个并发线程
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(upload_batch, batch_id, batch_data)
for batch_id, batch_data in batches
]
completed = 0
for future in as_completed(futures):
batch_id, success, result = future.result()
completed += 1
if success:
print(f"批次 {batch_id} 上传成功 ({completed}/{len(batches)})")
else:
print(f"批次 {batch_id} 上传失败: {result}")
elapsed = time.time() - start_time
print(f"\n并行上传完成,耗时: {elapsed:.2f}秒")
print(f"平均速度: {total_count / elapsed:.0f} points/s")
# 并行上传建议
parallel_recommendations = [
"并发数建议2-8",
"根据网络带宽调整",
"监控服务器负载",
"实现重试机制",
"记录失败批次",
"上传完成后验证数据"
]
print("\n并行上传建议:")
for i, rec in enumerate(parallel_recommendations, 1):
print(f" {i}. {rec}")
---
02.数据导入
a.从文件导入
a.功能说明
支持从各种文件格式导入数据。常见格式包括JSON、CSV、Parquet。需要解析文件并转换为Point格式。支持流式读取大文件。可以边读边上传。提供数据验证和清洗。是数据迁移的常用方式。
b.代码示例
---
import json
# 1. 从JSON文件导入
def import_from_json(file_path, collection_name, batch_size=1000):
"""从JSON文件导入数据"""
with open(file_path, 'r') as f:
data = json.load(f)
batch = []
for item in data:
point = PointStruct(
id=item['id'],
vector=item['vector'],
payload=item.get('payload', {})
)
batch.append(point)
if len(batch) >= batch_size:
client.upsert(
collection_name=collection_name,
points=batch,
wait=False
)
print(f"已导入 {len(batch)} 条数据")
batch = []
# 上传剩余数据
if batch:
client.upsert(
collection_name=collection_name,
points=batch
)
print(f"已导入 {len(batch)} 条数据")
# 使用示例
# import_from_json("data.json", "imported_collection")
# 2. 从CSV文件导入
import csv
def import_from_csv(file_path, collection_name, vector_dim=768):
"""从CSV文件导入数据"""
batch = []
batch_size = 1000
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
for idx, row in enumerate(reader):
# 假设CSV有id, vector, title等列
vector = [float(x) for x in row['vector'].split(',')]
point = PointStruct(
id=int(row['id']),
vector=vector,
payload={
'title': row.get('title', ''),
'category': row.get('category', '')
}
)
batch.append(point)
if len(batch) >= batch_size:
client.upsert(
collection_name=collection_name,
points=batch,
wait=False
)
batch = []
if batch:
client.upsert(collection_name=collection_name, points=batch)
print("文件导入功能已定义")
# 支持的文件格式
supported_formats = {
"JSON": "适合小到中等规模数据",
"JSONL": "适合大规模数据,流式读取",
"CSV": "适合表格数据",
"Parquet": "适合大规模数据,压缩高效",
"HDF5": "适合科学计算数据"
}
print("\n支持的文件格式:")
for fmt, desc in supported_formats.items():
print(f" {fmt}: {desc}")
---
b.从数据库导入
a.功能说明
支持从关系型数据库导入数据。常见数据库包括MySQL、PostgreSQL、MongoDB。需要建立数据库连接。查询数据并转换为向量。支持增量导入。可以定期同步数据。是企业级数据集成的常用方式。
b.代码示例
---
# 从PostgreSQL导入示例
# import psycopg2
def import_from_postgres(connection_string, query, collection_name):
"""从PostgreSQL导入数据"""
# conn = psycopg2.connect(connection_string)
# cursor = conn.cursor()
# cursor.execute(query)
batch = []
batch_size = 1000
# 模拟数据库查询结果
# for row in cursor:
# point = PointStruct(
# id=row[0], # id列
# vector=row[1], # vector列
# payload={
# 'title': row[2],
# 'content': row[3]
# }
# )
# batch.append(point)
#
# if len(batch) >= batch_size:
# client.upsert(
# collection_name=collection_name,
# points=batch,
# wait=False
# )
# batch = []
# if batch:
# client.upsert(collection_name=collection_name, points=batch)
# cursor.close()
# conn.close()
print("PostgreSQL导入功能已定义")
# 数据库导入配置
db_import_config = {
"connection_pool": "使用连接池提升性能",
"batch_query": "批量查询减少往返",
"cursor_streaming": "流式游标处理大数据",
"error_handling": "实现重试和错误处理",
"progress_tracking": "记录导入进度",
"incremental_sync": "支持增量同步"
}
print("\n数据库导入配置:")
for config, desc in db_import_config.items():
print(f" {config}: {desc}")
# 常见数据库连接示例
db_connections = {
"PostgreSQL": "psycopg2.connect(host='localhost', database='mydb')",
"MySQL": "pymysql.connect(host='localhost', database='mydb')",
"MongoDB": "pymongo.MongoClient('mongodb://localhost:27017')",
"SQLite": "sqlite3.connect('database.db')"
}
print("\n数据库连接示例:")
for db, conn_str in db_connections.items():
print(f" {db}: {conn_str}")
---
4.3 更新删除
01.更新操作
a.更新向量
a.功能说明
可以更新Point的向量数据。使用upsert操作覆盖原有向量。保持ID不变。Payload可以同时更新。更新操作是原子的。支持批量更新。是数据维护的重要功能。
b.代码示例
---
# 1. 更新单个Point的向量
updated_point = PointStruct(
id=1,
vector=np.random.random(768).tolist(), # 新向量
payload={"title": "Updated Document", "version": 2}
)
client.upsert(
collection_name="documents",
points=[updated_point]
)
print("向量更新完成")
# 2. 批量更新向量
updated_points = [
PointStruct(
id=i,
vector=np.random.random(768).tolist(),
payload={"updated": True}
)
for i in range(1, 101)
]
client.upsert(
collection_name="documents",
points=updated_points,
wait=True
)
print(f"批量更新 {len(updated_points)} 个向量")
# 更新注意事项
update_notes = [
"upsert操作会完全覆盖原Point",
"ID必须与原Point相同",
"向量维度必须匹配",
"Payload会被完全替换",
"更新后需要重建索引",
"大量更新影响查询性能"
]
print("\n更新注意事项:")
for i, note in enumerate(update_notes, 1):
print(f" {i}. {note}")
---
b.更新Payload
a.功能说明
可以单独更新Payload而不影响向量。支持部分字段更新。可以添加、修改、删除字段。支持批量更新。更新操作高效。不需要重建向量索引。是元数据管理的关键功能。
b.代码示例
---
from qdrant_client.models import Filter, FieldCondition, MatchValue
# 1. 设置Payload(添加或更新字段)
client.set_payload(
collection_name="documents",
payload={
"status": "published",
"updated_at": "2024-01-15",
"views": 1000
},
points=[1, 2, 3]
)
print("Payload更新完成")
# 2. 删除Payload字段
client.delete_payload(
collection_name="documents",
keys=["views", "status"],
points=[1, 2, 3]
)
print("Payload字段删除完成")
# 3. 清空Payload
client.clear_payload(
collection_name="documents",
points=[1]
)
print("Payload已清空")
# 4. 条件更新Payload
# 先查询符合条件的Points
results = client.scroll(
collection_name="documents",
scroll_filter=Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="tech"))
]
),
limit=1000
)
points, _ = results
point_ids = [p.id for p in points]
# 批量更新
client.set_payload(
collection_name="documents",
payload={"featured": True},
points=point_ids
)
print(f"条件更新了 {len(point_ids)} 个Points")
# Payload更新操作
payload_operations = {
"set_payload": "设置或更新字段",
"delete_payload": "删除指定字段",
"clear_payload": "清空所有Payload",
"overwrite_payload": "完全覆盖Payload"
}
print("\nPayload更新操作:")
for op, desc in payload_operations.items():
print(f" {op}: {desc}")
---
02.删除操作
a.按ID删除
a.功能说明
可以通过ID删除Point。支持单个或批量删除。删除操作是永久的。删除后ID可以重用。支持异步删除。删除操作影响索引。需要定期优化清理删除的数据。
b.代码示例
---
# 1. 删除单个Point
client.delete(
collection_name="documents",
points_selector=[1]
)
print("Point删除完成")
# 2. 批量删除Points
client.delete(
collection_name="documents",
points_selector=[2, 3, 4, 5]
)
print("批量删除完成")
# 3. 删除大量Points
ids_to_delete = list(range(1000, 2000))
client.delete(
collection_name="documents",
points_selector=ids_to_delete,
wait=True
)
print(f"删除了 {len(ids_to_delete)} 个Points")
# 删除后优化
# Qdrant会自动优化,也可以手动触发
# client.update_collection(
# collection_name="documents",
# optimizer_config=OptimizersConfigDiff(
# deleted_threshold=0.2
# )
# )
print("\n删除操作完成")
---
b.条件删除
a.功能说明
可以根据Payload条件删除Points。支持复杂的过滤条件。批量删除符合条件的数据。删除操作是永久的。需要谨慎使用。适合数据清理和维护。是数据管理的重要工具。
b.代码示例
---
from qdrant_client.models import FilterSelector
# 1. 条件删除
client.delete(
collection_name="documents",
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="status",
match=MatchValue(value="draft")
)
]
)
)
)
print("条件删除完成")
# 2. 删除过期数据
client.delete(
collection_name="documents",
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="created_at",
range=Range(lt="2023-01-01")
)
]
)
)
)
print("过期数据删除完成")
# 3. 删除前统计数量
count_result = client.count(
collection_name="documents",
count_filter=Filter(
must=[
FieldCondition(key="status", match=MatchValue(value="draft"))
]
),
exact=True
)
print(f"\n将删除 {count_result.count} 个Points")
# 删除最佳实践
delete_best_practices = [
"删除前先统计数量",
"重要数据删除前备份",
"使用条件删除需谨慎",
"大量删除后优化Collection",
"定期清理无用数据",
"监控删除操作的影响"
]
print("\n删除最佳实践:")
for i, practice in enumerate(delete_best_practices, 1):
print(f" {i}. {practice}")
# 删除操作对比
delete_comparison = {
"按ID删除": {
"速度": "快",
"适用": "精确删除",
"风险": "低"
},
"条件删除": {
"速度": "较慢",
"适用": "批量清理",
"风险": "高,需谨慎"
}
}
print("\n删除操作对比:")
for method, metrics in delete_comparison.items():
print(f"\n{method}:")
for key, value in metrics.items():
print(f" {key}: {value}")
---
5 搜索查询
5.1 向量搜索
01.基础搜索
a.相似度搜索
a.功能说明
向量搜索是Qdrant的核心功能。通过查询向量找到最相似的Points。支持多种距离度量。返回Top-K最相似结果。可以指定返回数量。支持Payload过滤。是语义搜索的基础。
b.代码示例
---
import numpy as np
query_vector = np.random.random(768).tolist()
results = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10
)
print(f"找到 {len(results)} 个相似结果")
for i, result in enumerate(results, 1):
print(f"{i}. ID: {result.id}, Score: {result.score:.4f}")
---
b.命名向量搜索
a.功能说明
对于多向量Collection,可以指定搜索哪个向量。每个向量独立检索。支持跨向量组合搜索。适合多模态应用。提供更灵活的检索方式。
b.代码示例
---
text_query = np.random.random(768).tolist()
text_results = client.search(
collection_name="multimodal",
query_vector=("text", text_query),
limit=10
)
print(f"文本搜索找到 {len(text_results)} 个结果")
---
02.高级搜索
a.带过滤的搜索
a.功能说明
可以在向量搜索时添加Payload过滤。先过滤再搜索提升精度。支持复杂的过滤条件。过滤和向量搜索结合。是实现业务逻辑的关键。
b.代码示例
---
from qdrant_client.models import Filter, FieldCondition, MatchValue
results = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="tech"))
]
),
limit=10
)
print(f"过滤后找到 {len(results)} 个结果")
---
b.相似度阈值
a.功能说明
可以设置最小相似度阈值。只返回超过阈值的结果。过滤低质量匹配。提升结果相关性。适合高精度要求场景。
b.代码示例
---
results = client.search(
collection_name="documents",
query_vector=query_vector,
limit=100,
score_threshold=0.8
)
print(f"阈值过滤后找到 {len(results)} 个结果")
---
5.2 过滤器Filter
01.过滤条件
a.基础条件
a.功能说明
过滤器支持多种基础条件。包括等值匹配、范围查询、数组匹配。可以组合多个条件。支持嵌套字段过滤。是实现业务逻辑的基础。
b.代码示例
---
from qdrant_client.models import Filter, FieldCondition, Range, MatchValue
filter_exact = Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="tech"))
]
)
filter_range = Filter(
must=[
FieldCondition(key="price", range=Range(gte=100, lte=500))
]
)
print("过滤条件已定义")
---
b.组合条件
a.功能说明
支持AND、OR、NOT逻辑组合。可以构建复杂的过滤表达式。must表示AND,should表示OR,must_not表示NOT。提供强大的查询能力。
b.代码示例
---
complex_filter = Filter(
must=[
FieldCondition(key="in_stock", match=MatchValue(value=True))
],
should=[
FieldCondition(key="category", match=MatchValue(value="electronics"))
],
must_not=[
FieldCondition(key="discontinued", match=MatchValue(value=True))
]
)
print("组合过滤条件已定义")
---
02.高级过滤
a.地理位置过滤
a.功能说明
支持地理位置过滤。可以按半径或边界框查询。适合LBS应用。需要Payload包含地理坐标。提供位置相关的检索能力。
b.代码示例
---
from qdrant_client.models import GeoRadius, GeoPoint
geo_filter = Filter(
must=[
FieldCondition(
key="location",
geo_radius=GeoRadius(
center=GeoPoint(lon=116.4074, lat=39.9042),
radius=5000.0
)
)
]
)
print("地理过滤条件已定义")
---
b.数组长度过滤
a.功能说明
可以根据数组字段的长度过滤。适合标签数量等场景。支持范围查询。提供更灵活的过滤方式。
b.代码示例
---
from qdrant_client.models import ValuesCount
array_filter = Filter(
must=[
FieldCondition(
key="tags",
values_count=ValuesCount(gte=3, lte=10)
)
]
)
print("数组长度过滤已定义")
---
5.3 Payload查询
01.查询方式
a.滚动查询
a.功能说明
滚动查询用于遍历符合条件的所有数据。支持分页和过滤。可以指定每页大小。适合数据导出和全量扫描。是数据分析的重要工具。
b.代码示例
---
offset = None
all_results = []
while True:
result = client.scroll(
collection_name="documents",
scroll_filter=Filter(
must=[FieldCondition(key="category", match=MatchValue(value="tech"))]
),
limit=100,
offset=offset
)
points, offset = result
all_results.extend(points)
if offset is None:
break
print(f"获取 {len(all_results)} 个结果")
---
b.计数查询
a.功能说明
可以统计符合条件的Point数量。支持精确和近似计数。精确计数速度较慢。近似计数速度快。适合数据统计分析。
b.代码示例
---
count_result = client.count(
collection_name="documents",
count_filter=Filter(
must=[FieldCondition(key="category", match=MatchValue(value="tech"))]
),
exact=True
)
print(f"精确计数: {count_result.count}")
---
02.聚合查询
a.分组统计
a.功能说明
虽然Qdrant不直接支持SQL式聚合,但可以通过滚动查询和客户端处理实现。适合简单的统计需求。
b.代码示例
---
from collections import Counter
offset = None
category_counts = Counter()
while True:
result = client.scroll(
collection_name="documents",
limit=1000,
offset=offset,
with_payload=True
)
points, offset = result
for point in points:
category = point.payload.get('category', 'unknown')
category_counts[category] += 1
if offset is None:
break
print("类别统计:")
for category, count in category_counts.most_common():
print(f" {category}: {count}")
---
b.范围分析
a.功能说明
可以分析数值字段的分布。统计最大值、最小值、平均值等。通过滚动查询获取数据。适合数据探索和分析。
b.代码示例
---
import statistics
offset = None
prices = []
while True:
result = client.scroll(
collection_name="products",
limit=1000,
offset=offset,
with_payload=True
)
points, offset = result
for point in points:
price = point.payload.get('price')
if price is not None:
prices.append(price)
if offset is None:
break
if prices:
print(f"价格统计: 最小{min(prices):.2f}, 最大{max(prices):.2f}, 平均{statistics.mean(prices):.2f}")
---
5.4 推荐系统
01.推荐策略
a.基于向量推荐
a.功能说明
Qdrant提供recommend API实现推荐功能。基于正负样本计算推荐向量。支持多个正样本和负样本。适合个性化推荐场景。
b.代码示例
---
recommendations = client.recommend(
collection_name="products",
positive=[1, 5, 10],
negative=[3, 7],
limit=10
)
print(f"推荐 {len(recommendations)} 个商品")
for rec in recommendations:
print(f"ID: {rec.id}, Score: {rec.score:.4f}")
---
b.批量推荐
a.功能说明
支持批量推荐请求。一次为多个用户生成推荐。提升推荐效率。适合离线推荐场景。
b.代码示例
---
from qdrant_client.models import RecommendRequest
batch_requests = [
RecommendRequest(positive=[1, 2, 3], negative=[], limit=5),
RecommendRequest(positive=[4, 5, 6], negative=[1], limit=5)
]
print(f"批量推荐请求已准备,共 {len(batch_requests)} 个")
---
02.推荐优化
a.推荐策略
a.功能说明
可以调整推荐策略参数。控制正负样本的权重。设置推荐多样性。优化推荐质量。提升用户体验。
b.代码示例
---
from qdrant_client.models import RecommendStrategy
recommendations = client.recommend(
collection_name="products",
positive=[1, 5, 10],
strategy=RecommendStrategy.AVERAGE_VECTOR,
limit=10
)
print(f"策略推荐 {len(recommendations)} 个结果")
---
b.推荐评估
a.功能说明
需要评估推荐效果。统计推荐准确率和多样性。收集用户反馈。持续优化推荐模型。提升推荐质量。
b.代码示例
---
evaluation_metrics = {
"准确率": "推荐商品被点击的比例",
"召回率": "用户感兴趣商品被推荐的比例",
"多样性": "推荐结果的类别分布"
}
print("推荐评估指标:")
for metric, desc in evaluation_metrics.items():
print(f" {metric}: {desc}")
---
5.5 批量搜索
01.批量查询
a.批量向量搜索
a.功能说明
支持一次提交多个搜索请求。减少网络往返次数。提升查询效率。适合批量推理场景。是性能优化的重要手段。
b.代码示例
---
from qdrant_client.models import SearchRequest
query_vectors = [np.random.random(768).tolist() for _ in range(10)]
search_requests = [
SearchRequest(vector=qv, limit=5, with_payload=True)
for qv in query_vectors
]
batch_results = client.search_batch(
collection_name="documents",
requests=search_requests
)
print(f"批量搜索完成,共 {len(batch_results)} 个查询")
---
b.批量推荐
a.功能说明
支持批量推荐请求。一次为多个用户生成推荐。提升推荐效率。优化系统吞吐量。
b.代码示例
---
recommend_requests = [
RecommendRequest(positive=[i, i+1, i+2], negative=[], limit=5)
for i in range(1, 100, 10)
]
print(f"批量推荐请求已准备,共 {len(recommend_requests)} 个")
---
02.性能优化
a.查询优化
a.功能说明
优化查询性能的关键在于合理配置。选择合适的索引参数。控制返回结果数量。使用Payload过滤减少计算。监控查询性能。
b.代码示例
---
optimization_config = {
"索引参数": "ef=128-256平衡性能和准确度",
"返回数量": "limit=10-50,避免过大",
"Payload": "按需返回,不需要时设置False"
}
print("查询性能优化:")
for aspect, config in optimization_config.items():
print(f" {aspect}: {config}")
---
b.缓存策略
a.功能说明
实现查询结果缓存提升性能。缓存热门查询结果。设置合理的过期时间。减少重复计算。提升用户体验。
b.代码示例
---
cache_strategies = {
"热门查询": "缓存时间1小时",
"个性化查询": "缓存时间5-10分钟",
"实时数据": "不缓存或缓存1分钟"
}
print("缓存策略建议:")
for query_type, strategy in cache_strategies.items():
print(f" {query_type}: {strategy}")
---
6 高级特性
6.1 量化Quantization
01.量化类型
a.标量量化
a.功能说明
标量量化将浮点向量压缩为整数。减少内存占用和存储空间。提升查询速度。略微降低精度。适合大规模数据集。是性能优化的重要手段。
b.代码示例
---
from qdrant_client.models import ScalarQuantization, ScalarQuantizationConfig, ScalarType
client.update_collection(
collection_name="documents",
quantization_config=ScalarQuantization(
scalar=ScalarQuantizationConfig(
type=ScalarType.INT8,
quantile=0.99,
always_ram=True
)
)
)
print("标量量化配置完成")
---
b.产品量化
a.功能说明
产品量化是更高级的压缩方法。将向量分段量化。压缩比更高。适合超大规模数据。需要更多计算资源。平衡存储和性能。
b.代码示例
---
from qdrant_client.models import ProductQuantization, ProductQuantizationConfig
client.update_collection(
collection_name="documents",
quantization_config=ProductQuantization(
product=ProductQuantizationConfig(
compression=CompressionRatio.X16,
always_ram=True
)
)
)
print("产品量化配置完成")
---
02.量化优化
a.性能影响
a.功能说明
量化显著减少内存占用。提升查询速度。略微降低召回率。需要权衡性能和精度。适合资源受限场景。
b.代码示例
---
quantization_impact = {
"内存占用": "减少75-90%",
"查询速度": "提升2-4倍",
"召回率": "降低1-3%",
"存储空间": "减少75-90%"
}
print("量化性能影响:")
for aspect, impact in quantization_impact.items():
print(f" {aspect}: {impact}")
---
b.量化策略
a.功能说明
选择合适的量化类型。根据数据规模和精度要求。配置量化参数。监控量化效果。定期评估和调整。
b.代码示例
---
quantization_strategies = {
"小数据集(<100万)": "不使用量化",
"中等数据集(100万-1000万)": "标量量化INT8",
"大数据集(>1000万)": "产品量化X16"
}
print("量化策略建议:")
for scale, strategy in quantization_strategies.items():
print(f" {scale}: {strategy}")
---
6.2 多向量Multiple Vectors
01.多向量配置
a.向量定义
a.功能说明
一个Point可以包含多个向量。每个向量有独立的名称和配置。支持不同维度和距离度量。适合多模态数据。提供灵活的数据组织。
b.代码示例
---
from qdrant_client.models import VectorParams, Distance
client.create_collection(
collection_name="multimodal",
vectors_config={
"text": VectorParams(size=768, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.EUCLID),
"audio": VectorParams(size=256, distance=Distance.DOT)
}
)
print("多向量Collection创建完成")
---
b.向量插入
a.功能说明
插入多向量Point时需要提供所有向量。每个向量独立存储。可以分别检索。支持部分向量更新。提供灵活的数据管理。
b.代码示例
---
import numpy as np
from qdrant_client.models import PointStruct
point = PointStruct(
id=1,
vector={
"text": np.random.random(768).tolist(),
"image": np.random.random(512).tolist(),
"audio": np.random.random(256).tolist()
},
payload={"title": "Multimodal Content"}
)
client.upsert(collection_name="multimodal", points=[point])
print("多向量Point插入完成")
---
02.多向量检索
a.单向量检索
a.功能说明
可以指定检索某个向量。其他向量不参与计算。适合单模态查询。提供精准的检索能力。
b.代码示例
---
text_query = np.random.random(768).tolist()
results = client.search(
collection_name="multimodal",
query_vector=("text", text_query),
limit=10
)
print(f"文本向量检索找到 {len(results)} 个结果")
---
b.多向量组合
a.功能说明
可以组合多个向量进行检索。设置不同的权重。实现跨模态搜索。提供更丰富的检索方式。
b.代码示例
---
# 多向量组合检索(如果API支持)
# results = client.search(
# collection_name="multimodal",
# query_vectors={
# "text": (text_query, 0.7),
# "image": (image_query, 0.3)
# },
# limit=10
# )
print("多向量组合检索功能")
---
6.3 命名向量Named Vectors
01.命名向量管理
a.向量命名
a.功能说明
命名向量提供更清晰的语义。便于理解和维护。支持动态添加新向量。提供灵活的扩展能力。是多模态应用的基础。
b.代码示例
---
vector_names = {
"text_en": "英文文本向量",
"text_zh": "中文文本向量",
"image_clip": "CLIP图像向量",
"audio_wav2vec": "Wav2Vec音频向量"
}
print("命名向量说明:")
for name, desc in vector_names.items():
print(f" {name}: {desc}")
---
b.向量更新
a.功能说明
可以单独更新某个命名向量。不影响其他向量。支持部分更新。提供灵活的数据维护能力。
b.代码示例
---
# 更新单个命名向量
updated_point = PointStruct(
id=1,
vector={
"text": np.random.random(768).tolist()
# 只更新text向量,其他向量保持不变
}
)
# client.upsert(collection_name="multimodal", points=[updated_point])
print("命名向量更新功能")
---
02.命名向量应用
a.多语言支持
a.功能说明
使用命名向量支持多语言。每种语言独立的向量。可以分别检索或组合检索。适合国际化应用。
b.代码示例
---
multilingual_config = {
"en": VectorParams(size=768, distance=Distance.COSINE),
"zh": VectorParams(size=768, distance=Distance.COSINE),
"ja": VectorParams(size=768, distance=Distance.COSINE)
}
print("多语言向量配置:")
for lang, config in multilingual_config.items():
print(f" {lang}: {config}")
---
b.多模态融合
a.功能说明
命名向量实现多模态数据融合。文本、图像、音频统一管理。支持跨模态检索。提供丰富的应用场景。
b.代码示例
---
multimodal_use_cases = [
"以图搜文:用图像向量检索相关文本",
"以文搜图:用文本向量检索相关图像",
"视频检索:结合图像、音频、字幕向量",
"商品搜索:结合图片、描述、属性向量"
]
print("多模态应用场景:")
for i, use_case in enumerate(multimodal_use_cases, 1):
print(f" {i}. {use_case}")
---
6.4 稀疏向量Sparse Vectors
01.稀疏向量特性
a.稀疏表示
a.功能说明
稀疏向量大部分元素为零。只存储非零元素。节省存储空间。适合高维稀疏数据。如TF-IDF、BM25向量。
b.代码示例
---
from qdrant_client.models import SparseVector
# 稀疏向量表示
sparse_vec = SparseVector(
indices=[10, 25, 100, 500],
values=[0.5, 0.8, 0.3, 0.9]
)
print(f"稀疏向量: {len(sparse_vec.indices)} 个非零元素")
---
b.稀疏向量配置
a.功能说明
Qdrant支持稀疏向量存储和检索。可以与密集向量结合使用。实现混合检索。提供更好的检索效果。
b.代码示例
---
# 创建支持稀疏向量的Collection
from qdrant_client.models import SparseVectorParams
client.create_collection(
collection_name="hybrid_search",
vectors_config={
"dense": VectorParams(size=768, distance=Distance.COSINE)
},
sparse_vectors_config={
"sparse": SparseVectorParams()
}
)
print("混合向量Collection创建完成")
---
02.混合检索
a.密集+稀疏
a.功能说明
结合密集向量和稀疏向量检索。密集向量捕捉语义。稀疏向量捕捉关键词。混合检索提升准确率。是现代检索系统的趋势。
b.代码示例
---
# 插入混合向量
hybrid_point = PointStruct(
id=1,
vector={
"dense": np.random.random(768).tolist(),
"sparse": SparseVector(
indices=[10, 25, 100],
values=[0.5, 0.8, 0.3]
)
},
payload={"title": "Hybrid Document"}
)
# client.upsert(collection_name="hybrid_search", points=[hybrid_point])
print("混合向量插入功能")
---
b.检索策略
a.功能说明
可以调整密集和稀疏向量的权重。优化检索效果。适应不同的应用场景。提供灵活的检索策略。
b.代码示例
---
hybrid_strategies = {
"语义为主": "密集向量权重0.8,稀疏向量权重0.2",
"关键词为主": "密集向量权重0.3,稀疏向量权重0.7",
"平衡模式": "密集向量权重0.5,稀疏向量权重0.5"
}
print("混合检索策略:")
for strategy, weights in hybrid_strategies.items():
print(f" {strategy}: {weights}")
---
7 性能优化
7.1 索引优化
01.HNSW参数
a.构建参数
a.功能说明
HNSW索引的构建参数影响索引质量。m参数控制连接数。ef_construct控制构建时的搜索范围。参数越大索引越好但构建越慢。需要权衡构建时间和查询性能。
b.代码示例
---
from qdrant_client.models import HnswConfigDiff
client.update_collection(
collection_name="documents",
hnsw_config=HnswConfigDiff(
m=32,
ef_construct=200,
full_scan_threshold=10000
)
)
print("HNSW参数优化完成")
---
b.查询参数
a.功能说明
查询时的ef参数控制搜索范围。ef越大结果越准确但速度越慢。需要根据应用场景调整。平衡准确率和延迟。
b.代码示例
---
from qdrant_client.models import SearchParams
results = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10,
search_params=SearchParams(
hnsw_ef=128,
exact=False
)
)
print(f"优化搜索找到 {len(results)} 个结果")
---
02.索引管理
a.索引重建
a.功能说明
数据更新后可能需要重建索引。优化索引结构。提升查询性能。重建过程需要时间。建议在低峰期进行。
b.代码示例
---
# 触发索引优化
client.update_collection(
collection_name="documents",
optimizer_config=OptimizersConfigDiff(
indexing_threshold=20000
)
)
print("索引优化已触发")
---
b.索引监控
a.功能说明
监控索引状态和性能。统计索引大小和构建时间。分析查询性能指标。及时发现和解决问题。
b.代码示例
---
collection_info = client.get_collection(collection_name="documents")
print(f"Collection信息:")
print(f" Points数量: {collection_info.points_count}")
print(f" 索引状态: {collection_info.status}")
print(f" 优化器状态: {collection_info.optimizer_status}")
---
7.2 查询优化
01.查询策略
a.过滤优化
a.功能说明
合理使用Payload过滤减少计算量。为常用过滤字段创建索引。避免过于复杂的过滤条件。优化过滤逻辑。提升查询性能。
b.代码示例
---
# 创建Payload索引
client.create_payload_index(
collection_name="documents",
field_name="category",
field_schema="keyword"
)
client.create_payload_index(
collection_name="documents",
field_name="rating",
field_schema="float"
)
print("Payload索引创建完成")
---
b.批量查询
a.功能说明
使用批量查询减少网络往返。一次提交多个查询请求。提升整体吞吐量。适合批量推理场景。
b.代码示例
---
from qdrant_client.models import SearchRequest
queries = [np.random.random(768).tolist() for _ in range(10)]
requests = [
SearchRequest(vector=q, limit=5)
for q in queries
]
batch_results = client.search_batch(
collection_name="documents",
requests=requests
)
print(f"批量查询完成,共 {len(batch_results)} 个结果")
---
02.性能调优
a.参数调整
a.功能说明
根据实际场景调整查询参数。平衡准确率和延迟。监控查询性能指标。持续优化参数配置。
b.代码示例
---
tuning_params = {
"低延迟场景": "ef=64, limit=10",
"高准确率场景": "ef=256, limit=50",
"平衡场景": "ef=128, limit=20"
}
print("参数调优建议:")
for scenario, params in tuning_params.items():
print(f" {scenario}: {params}")
---
b.性能监控
a.功能说明
监控查询延迟和吞吐量。分析慢查询。识别性能瓶颈。及时优化和调整。
b.代码示例
---
import time
start = time.time()
results = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10
)
latency = (time.time() - start) * 1000
print(f"查询延迟: {latency:.2f}ms")
---
7.3 内存管理
01.内存优化
a.量化配置
a.功能说明
使用量化减少内存占用。标量量化可减少75%内存。产品量化可减少90%内存。需要权衡内存和精度。
b.代码示例
---
from qdrant_client.models import ScalarQuantization, ScalarQuantizationConfig, ScalarType
client.update_collection(
collection_name="documents",
quantization_config=ScalarQuantization(
scalar=ScalarQuantizationConfig(
type=ScalarType.INT8,
always_ram=True
)
)
)
print("量化配置完成,内存占用减少约75%")
---
b.磁盘存储
a.功能说明
将部分数据存储在磁盘。减少内存占用。适合大规模数据集。略微降低查询性能。平衡成本和性能。
b.代码示例
---
client.create_collection(
collection_name="large_dataset",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
on_disk_payload=True
)
print("Payload存储在磁盘,节省内存")
---
02.资源监控
a.内存使用
a.功能说明
监控内存使用情况。及时发现内存泄漏。优化内存配置。避免OOM错误。
b.代码示例
---
import psutil
memory = psutil.virtual_memory()
print(f"内存使用情况:")
print(f" 总内存: {memory.total / 1024**3:.2f} GB")
print(f" 已使用: {memory.used / 1024**3:.2f} GB")
print(f" 使用率: {memory.percent}%")
---
b.资源限制
a.功能说明
设置合理的资源限制。避免单个查询占用过多资源。保护系统稳定性。提供服务质量保证。
b.代码示例
---
resource_limits = {
"最大查询数量": "limit <= 1000",
"最大批量大小": "batch_size <= 100",
"查询超时": "timeout = 30s"
}
print("资源限制建议:")
for limit, value in resource_limits.items():
print(f" {limit}: {value}")
---
8 集群部署
8.1 分布式模式
01.集群架构
a.节点角色
a.功能说明
Qdrant集群包含多个节点。每个节点可以处理请求。数据分片存储在不同节点。提供高可用性和扩展性。
b.代码示例
---
cluster_config = {
"节点数量": "建议3-5个节点",
"分片数量": "根据数据量确定",
"副本因子": "建议2-3个副本",
"一致性级别": "根据业务需求选择"
}
print("集群配置建议:")
for key, value in cluster_config.items():
print(f" {key}: {value}")
---
b.集群通信
a.功能说明
节点间通过gRPC通信。支持数据同步和复制。提供一致性保证。监控集群健康状态。
b.代码示例
---
# 连接集群
from qdrant_client import QdrantClient
cluster_client = QdrantClient(
url="http://cluster-lb.example.com:6333"
)
print("集群连接已建立")
---
02.集群管理
a.节点管理
a.功能说明
添加和移除节点。监控节点状态。处理节点故障。实现自动故障转移。
b.代码示例
---
# 获取集群信息
# cluster_info = cluster_client.get_cluster_info()
# print(f"集群节点数: {len(cluster_info.peers)}")
# for peer in cluster_info.peers:
# print(f" 节点: {peer.uri}, 状态: {peer.status}")
print("集群管理功能")
---
b.数据迁移
a.功能说明
支持数据在节点间迁移。实现负载均衡。优化数据分布。提升集群性能。
b.代码示例
---
migration_steps = [
"1. 评估当前数据分布",
"2. 规划迁移方案",
"3. 执行数据迁移",
"4. 验证数据完整性",
"5. 更新路由配置"
]
print("数据迁移步骤:")
for step in migration_steps:
print(f" {step}")
---
8.2 数据分片
01.分片策略
a.分片配置
a.功能说明
数据分片提升并发性能。每个分片独立处理请求。分片数量影响性能和资源使用。需要根据数据量和负载确定。
b.代码示例
---
client.create_collection(
collection_name="sharded_collection",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
shard_number=4,
replication_factor=2
)
print("分片Collection创建完成")
---
b.分片管理
a.功能说明
监控分片状态。优化分片分布。处理分片故障。实现分片迁移。
b.代码示例
---
shard_management = {
"分片监控": "定期检查分片健康状态",
"负载均衡": "确保分片负载均匀",
"故障处理": "自动故障转移",
"分片迁移": "支持动态调整分片"
}
print("分片管理:")
for task, desc in shard_management.items():
print(f" {task}: {desc}")
---
02.分片优化
a.负载均衡
a.功能说明
确保分片负载均匀。避免热点分片。优化数据分布。提升整体性能。
b.代码示例
---
load_balancing_strategies = [
"基于数据量的分片",
"基于访问频率的分片",
"动态调整分片权重",
"实现分片自动迁移"
]
print("负载均衡策略:")
for i, strategy in enumerate(load_balancing_strategies, 1):
print(f" {i}. {strategy}")
---
b.性能监控
a.功能说明
监控分片性能指标。识别性能瓶颈。及时优化调整。保证服务质量。
b.代码示例
---
shard_metrics = [
"每个分片的请求数",
"每个分片的延迟",
"每个分片的数据量",
"每个分片的CPU和内存使用"
]
print("分片监控指标:")
for i, metric in enumerate(shard_metrics, 1):
print(f" {i}. {metric}")
---
8.3 副本配置
01.副本策略
a.副本因子
a.功能说明
副本因子决定数据冗余度。更多副本提供更高可用性。也增加存储成本。需要权衡可用性和成本。
b.代码示例
---
client.create_collection(
collection_name="ha_collection",
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
shard_number=3,
replication_factor=3,
write_consistency_factor=2
)
print("高可用Collection创建完成")
---
b.一致性级别
a.功能说明
一致性级别控制写入保证。强一致性保证数据可靠但性能较低。最终一致性性能高但可能短暂不一致。根据业务需求选择。
b.代码示例
---
consistency_levels = {
"强一致性": "write_consistency_factor = replication_factor",
"最终一致性": "write_consistency_factor = 1",
"平衡模式": "write_consistency_factor = (replication_factor + 1) / 2"
}
print("一致性级别:")
for level, config in consistency_levels.items():
print(f" {level}: {config}")
---
02.副本管理
a.副本同步
a.功能说明
副本间自动同步数据。保证数据一致性。处理同步延迟。监控同步状态。
b.代码示例
---
replication_features = [
"自动数据同步",
"增量同步优化",
"同步状态监控",
"故障自动恢复"
]
print("副本同步特性:")
for i, feature in enumerate(replication_features, 1):
print(f" {i}. {feature}")
---
b.故障恢复
a.功能说明
副本提供故障恢复能力。节点故障时自动切换。保证服务可用性。最小化数据丢失。
b.代码示例
---
failover_process = [
"1. 检测节点故障",
"2. 标记故障节点",
"3. 切换到健康副本",
"4. 更新路由配置",
"5. 恢复故障节点"
]
print("故障恢复流程:")
for step in failover_process:
print(f" {step}")
---
9 AI框架集成
9.1 LangChain集成
01.LangChain配置
a.向量存储
a.功能说明
Qdrant作为LangChain的向量存储后端。存储文档嵌入向量。支持语义搜索。实现RAG应用。
b.代码示例
---
from langchain.vectorstores import Qdrant
from langchain.embeddings import OpenAIEmbeddings
from qdrant_client import QdrantClient
client = QdrantClient(host="localhost", port=6333)
embeddings = OpenAIEmbeddings()
vectorstore = Qdrant(
client=client,
collection_name="langchain_docs",
embeddings=embeddings
)
print("LangChain向量存储配置完成")
---
b.文档检索
a.功能说明
使用Qdrant检索相关文档。支持相似度搜索。结合LLM生成答案。实现问答系统。
b.代码示例
---
# 添加文档
texts = [
"Qdrant is a vector database",
"LangChain is a framework for LLM applications",
"RAG combines retrieval and generation"
]
vectorstore.add_texts(texts)
# 检索文档
query = "What is Qdrant?"
docs = vectorstore.similarity_search(query, k=3)
print(f"检索到 {len(docs)} 个相关文档")
for doc in docs:
print(f" {doc.page_content}")
---
02.RAG应用
a.检索增强
a.功能说明
RAG结合检索和生成。先检索相关文档。再生成答案。提升答案质量和可靠性。
b.代码示例
---
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever()
)
question = "What is Qdrant used for?"
answer = qa_chain.run(question)
print(f"问题: {question}")
print(f"答案: {answer}")
---
b.对话系统
a.功能说明
实现基于RAG的对话系统。支持多轮对话。结合历史上下文。提供准确的回答。
b.代码示例
---
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
conversation_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(),
memory=memory
)
print("对话系统已初始化")
---
9.2 LlamaIndex集成
01.LlamaIndex配置
a.向量索引
a.功能说明
Qdrant作为LlamaIndex的向量存储。构建文档索引。支持高效检索。实现知识库应用。
b.代码示例
---
from llama_index import VectorStoreIndex, ServiceContext
from llama_index.vector_stores import QdrantVectorStore
from llama_index.storage.storage_context import StorageContext
from qdrant_client import QdrantClient
client = QdrantClient(host="localhost", port=6333)
vector_store = QdrantVectorStore(
client=client,
collection_name="llamaindex_docs"
)
storage_context = StorageContext.from_defaults(
vector_store=vector_store
)
print("LlamaIndex向量存储配置完成")
---
b.文档索引
a.功能说明
索引文档到Qdrant。自动分块和嵌入。支持多种文档格式。实现语义搜索。
b.代码示例
---
from llama_index import Document
documents = [
Document(text="Qdrant is a vector database for AI applications"),
Document(text="LlamaIndex helps build LLM applications"),
Document(text="Vector search enables semantic retrieval")
]
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context
)
print(f"索引了 {len(documents)} 个文档")
---
02.查询引擎
a.语义查询
a.功能说明
使用查询引擎检索和生成答案。支持自然语言查询。结合检索和推理。提供准确的回答。
b.代码示例
---
query_engine = index.as_query_engine()
response = query_engine.query("What is Qdrant?")
print(f"查询: What is Qdrant?")
print(f"回答: {response}")
---
b.高级查询
a.功能说明
支持过滤、排序等高级查询。自定义检索策略。优化查询性能。提供灵活的查询能力。
b.代码示例
---
from llama_index.vector_stores.types import MetadataFilters, ExactMatchFilter
filters = MetadataFilters(
filters=[
ExactMatchFilter(key="category", value="tech")
]
)
filtered_query_engine = index.as_query_engine(
filters=filters
)
print("高级查询引擎已配置")
---
10 最佳实践
10.1 数据建模
01.向量设计
a.向量选择
a.功能说明
选择合适的向量模型。考虑维度和精度。平衡性能和效果。根据应用场景选择。
b.代码示例
---
vector_models = {
"文本": {
"BERT": "768维,适合通用文本",
"Sentence-BERT": "768维,适合句子相似度",
"OpenAI Ada": "1536维,高质量嵌入"
},
"图像": {
"ResNet": "2048维,适合图像分类",
"CLIP": "512维,适合多模态",
"ViT": "768维,适合图像理解"
}
}
print("向量模型选择:")
for data_type, models in vector_models.items():
print(f"\n{data_type}:")
for model, desc in models.items():
print(f" {model}: {desc}")
---
b.Payload设计
a.功能说明
合理设计Payload结构。存储必要的元数据。为常用字段创建索引。避免存储过大数据。
b.代码示例
---
payload_design = {
"基础字段": ["id", "title", "category", "created_at"],
"业务字段": ["author", "tags", "status"],
"索引字段": ["category", "status", "created_at"],
"避免字段": ["大文本内容", "二进制数据"]
}
print("Payload设计建议:")
for field_type, fields in payload_design.items():
print(f" {field_type}: {', '.join(fields)}")
---
02.数据组织
a.Collection设计
a.功能说明
合理划分Collection。按业务逻辑分离数据。避免单个Collection过大。便于管理和维护。
b.代码示例
---
collection_design = {
"按业务": "用户、商品、文档分别建Collection",
"按时间": "按月或按年划分历史数据",
"按语言": "多语言数据分别存储",
"按模态": "文本、图像、音频分别存储"
}
print("Collection设计策略:")
for strategy, desc in collection_design.items():
print(f" {strategy}: {desc}")
---
b.数据生命周期
a.功能说明
管理数据生命周期。定期清理过期数据。归档历史数据。优化存储成本。
b.代码示例
---
lifecycle_management = [
"定期清理过期数据",
"归档低频访问数据",
"压缩历史数据",
"监控数据增长",
"规划存储容量"
]
print("数据生命周期管理:")
for i, task in enumerate(lifecycle_management, 1):
print(f" {i}. {task}")
---
10.2 查询优化
01.查询设计
a.查询策略
a.功能说明
设计高效的查询策略。合理使用过滤条件。控制返回结果数量。优化查询性能。
b.代码示例
---
query_best_practices = [
"为常用过滤字段创建索引",
"使用批量查询提升吞吐量",
"设置合理的limit值",
"避免返回不需要的向量",
"使用缓存减少重复查询"
]
print("查询优化最佳实践:")
for i, practice in enumerate(query_best_practices, 1):
print(f" {i}. {practice}")
---
b.性能调优
a.功能说明
根据实际场景调优参数。监控查询性能。识别慢查询。持续优化改进。
b.代码示例
---
performance_tuning = {
"HNSW ef": "低延迟64-128,高准确率256-512",
"limit": "通常10-50,避免过大",
"batch_size": "批量查询10-100",
"timeout": "设置合理的超时时间"
}
print("性能调优参数:")
for param, recommendation in performance_tuning.items():
print(f" {param}: {recommendation}")
---
02.缓存策略
a.查询缓存
a.功能说明
缓存热门查询结果。减少重复计算。提升响应速度。降低系统负载。
b.代码示例
---
caching_strategies = {
"热门查询": "缓存1小时",
"个性化查询": "缓存5-10分钟",
"实时查询": "不缓存",
"静态数据": "缓存24小时"
}
print("缓存策略:")
for query_type, strategy in caching_strategies.items():
print(f" {query_type}: {strategy}")
---
b.缓存失效
a.功能说明
合理设置缓存过期时间。数据更新时清除相关缓存。避免返回过期数据。平衡缓存命中率和数据新鲜度。
b.代码示例
---
cache_invalidation = [
"数据更新时清除相关缓存",
"设置合理的TTL",
"监控缓存命中率",
"定期清理过期缓存"
]
print("缓存失效策略:")
for i, strategy in enumerate(cache_invalidation, 1):
print(f" {i}. {strategy}")
---
10.3 运维监控
01.监控指标
a.性能指标
a.功能说明
监控关键性能指标。及时发现性能问题。优化系统配置。保证服务质量。
b.代码示例
---
performance_metrics = {
"查询延迟": "P50/P95/P99延迟",
"查询吞吐量": "QPS",
"索引性能": "索引构建时间",
"内存使用": "内存占用率",
"CPU使用": "CPU利用率",
"磁盘IO": "读写IOPS"
}
print("性能监控指标:")
for metric, desc in performance_metrics.items():
print(f" {metric}: {desc}")
---
b.业务指标
a.功能说明
监控业务相关指标。分析用户行为。优化产品功能。提升用户体验。
b.代码示例
---
business_metrics = {
"搜索准确率": "用户点击率",
"推荐效果": "推荐转化率",
"数据规模": "Points数量增长",
"用户活跃度": "查询频率"
}
print("业务监控指标:")
for metric, desc in business_metrics.items():
print(f" {metric}: {desc}")
---
02.告警和响应
a.告警配置
a.功能说明
配置合理的告警规则。及时发现异常。快速响应处理。减少故障影响。
b.代码示例
---
alert_rules = {
"高延迟": "P99延迟 > 1000ms",
"低吞吐": "QPS < 100",
"高内存": "内存使用率 > 90%",
"磁盘满": "磁盘使用率 > 85%",
"节点故障": "节点不可达"
}
print("告警规则:")
for alert, condition in alert_rules.items():
print(f" {alert}: {condition}")
---
b.故障处理
a.功能说明
建立故障响应流程。快速定位问题。及时修复故障。总结经验教训。
b.代码示例
---
incident_response = [
"1. 接收告警通知",
"2. 快速评估影响",
"3. 定位问题根因",
"4. 执行修复方案",
"5. 验证服务恢复",
"6. 总结和改进"
]
print("故障响应流程:")
for step in incident_response:
print(f" {step}")
---