1 基础概念

1.1 Qdrant介绍

01.产品定位
    a.核心价值
        a.功能说明
            Qdrant是一款高性能的向量数据库,专为语义搜索和AI应用设计。采用Rust语言开发,性能优异且内存安全。支持丰富的过滤条件和Payload管理。提供简洁的RESTful API和gRPC接口。支持分布式部署和水平扩展。开源免费,社区活跃。适合RAG、推荐系统、图像检索等场景。
        b.代码示例
            ---
            # Qdrant核心特性
            
            features = {
                "性能": {
                    "语言": "Rust开发,性能优异",
                    "内存": "高效内存管理",
                    "并发": "支持高并发查询",
                    "延迟": "毫秒级响应"
                },
                "功能": {
                    "过滤": "丰富的过滤条件",
                    "Payload": "灵活的元数据管理",
                    "多向量": "支持多向量存储",
                    "量化": "支持向量量化"
                },
                "部署": {
                    "单机": "Docker快速部署",
                    "集群": "分布式集群",
                    "云服务": "Qdrant Cloud",
                    "嵌入式": "嵌入式模式"
                },
                "接口": {
                    "REST": "RESTful API",
                    "gRPC": "高性能gRPC",
                    "SDK": "Python/Rust/Go/JS",
                    "Web UI": "管理界面"
                }
            }
            
            print("Qdrant核心特性:")
            for category, items in features.items():
                print(f"\n{category}:")
                for key, value in items.items():
                    print(f"  {key}: {value}")
            ---
    b.应用场景
        a.功能说明
            Qdrant适合多种AI应用场景。语义搜索和问答系统是核心应用。支持推荐系统和个性化推荐。图像检索和相似图片搜索。异常检测和欺诈识别。文本分类和聚类分析。知识图谱和实体链接。RAG(检索增强生成)应用。
        b.代码示例
            ---
            # Qdrant应用场景
            
            use_cases = {
                "语义搜索": {
                    "描述": "基于语义的文档检索",
                    "示例": "智能问答、知识库搜索",
                    "优势": "理解语义,超越关键词匹配"
                },
                "推荐系统": {
                    "描述": "基于向量相似度的推荐",
                    "示例": "商品推荐、内容推荐",
                    "优势": "实时推荐,个性化精准"
                },
                "图像检索": {
                    "描述": "以图搜图功能",
                    "示例": "相似图片搜索、人脸识别",
                    "优势": "快速检索,高召回率"
                },
                "RAG应用": {
                    "描述": "检索增强生成",
                    "示例": "智能客服、文档问答",
                    "优势": "结合检索和生成,准确性高"
                },
                "异常检测": {
                    "描述": "基于向量距离的异常识别",
                    "示例": "欺诈检测、设备故障预警",
                    "优势": "实时检测,准确率高"
                }
            }
            
            print("Qdrant应用场景:")
            for scenario, info in use_cases.items():
                print(f"\n{scenario}:")
                for key, value in info.items():
                    print(f"  {key}: {value}")
            
            # 技术优势
            advantages = [
                "Rust语言开发,性能优异",
                "丰富的过滤条件,灵活查询",
                "支持Payload管理,元数据丰富",
                "分布式部署,水平扩展",
                "开源免费,社区活跃",
                "简洁API,易于集成",
                "支持多种距离度量",
                "提供Web UI管理界面"
            ]
            
            print("\n技术优势:")
            for i, adv in enumerate(advantages, 1):
                print(f"  {i}. {adv}")
            ---

02.技术架构
    a.系统组件
        a.功能说明
            Qdrant采用模块化架构设计。核心组件包括存储引擎、索引管理、查询引擎。存储引擎负责数据持久化和内存管理。索引管理支持HNSW等高效索引算法。查询引擎处理向量检索和过滤。支持WAL(Write-Ahead Log)保证数据安全。提供快照和备份机制。
        b.代码示例
            ---
            # Qdrant系统架构
            
            architecture = {
                "存储层": {
                    "向量存储": "高效向量数据存储",
                    "Payload存储": "元数据存储",
                    "WAL": "Write-Ahead Log",
                    "快照": "数据快照和备份"
                },
                "索引层": {
                    "HNSW": "分层可导航小世界图",
                    "量化": "向量量化压缩",
                    "过滤索引": "Payload过滤索引",
                    "多向量": "多向量索引"
                },
                "查询层": {
                    "向量搜索": "ANN近似最近邻搜索",
                    "过滤器": "丰富的过滤条件",
                    "评分": "自定义评分函数",
                    "批量查询": "批量检索优化"
                },
                "接口层": {
                    "REST API": "RESTful接口",
                    "gRPC": "高性能gRPC",
                    "Web UI": "管理界面",
                    "SDK": "多语言SDK"
                }
            }
            
            print("Qdrant系统架构:")
            for layer, components in architecture.items():
                print(f"\n{layer}:")
                for comp, desc in components.items():
                    print(f"  {comp}: {desc}")
            
            # 数据流程
            data_flow = [
                "1. 客户端发送请求(REST/gRPC)",
                "2. 接口层解析请求参数",
                "3. 查询层执行向量检索",
                "4. 索引层快速定位候选集",
                "5. 过滤器筛选满足条件的结果",
                "6. 评分排序返回Top-K",
                "7. 返回结果给客户端"
            ]
            
            print("\n数据流程:")
            for step in data_flow:
                print(f"  {step}")
            ---
    b.存储引擎
        a.功能说明
            Qdrant的存储引擎采用分段存储设计。数据分为多个Segment独立管理。每个Segment包含向量数据和Payload。支持内存和磁盘混合存储。使用mmap技术优化内存使用。支持数据压缩和量化。提供WAL保证数据安全。支持快照和增量备份。
        b.代码示例
            ---
            # Qdrant存储引擎配置
            
            storage_config = {
                "on_disk": False,  # 是否使用磁盘存储
                "optimizers": {
                    "deleted_threshold": 0.2,  # 删除阈值
                    "vacuum_min_vector_number": 1000,  # 最小清理向量数
                    "default_segment_number": 0,  # 默认段数量
                    "max_segment_size": 200000,  # 最大段大小
                    "memmap_threshold": 50000,  # mmap阈值
                    "indexing_threshold": 20000,  # 索引阈值
                    "flush_interval_sec": 5,  # 刷盘间隔
                    "max_optimization_threads": 1  # 优化线程数
                },
                "wal": {
                    "wal_capacity_mb": 32,  # WAL容量
                    "wal_segments_ahead": 0  # WAL段数
                },
                "performance": {
                    "max_search_threads": 0,  # 搜索线程数(0=自动)
                    "max_optimization_threads": 1  # 优化线程数
                }
            }
            
            print("存储引擎配置:")
            import json
            print(json.dumps(storage_config, indent=2))
            
            # Segment管理
            segment_lifecycle = {
                "创建": "达到indexing_threshold时创建新Segment",
                "索引": "构建HNSW索引",
                "优化": "合并小Segment,清理删除数据",
                "压缩": "向量量化和压缩",
                "持久化": "刷盘到磁盘",
                "备份": "快照和增量备份"
            }
            
            print("\nSegment生命周期:")
            for stage, desc in segment_lifecycle.items():
                print(f"  {stage}: {desc}")
            
            # 存储优化建议
            optimization_tips = [
                "小数据集(<100万)使用内存存储",
                "大数据集使用on_disk模式",
                "合理设置max_segment_size",
                "调整memmap_threshold优化内存",
                "定期执行优化操作",
                "使用量化减少存储空间",
                "配置WAL保证数据安全",
                "定期备份重要数据"
            ]
            
            print("\n存储优化建议:")
            for i, tip in enumerate(optimization_tips, 1):
                print(f"  {i}. {tip}")
            ---

1.2 核心特性

01.向量管理
    a.多向量支持
        a.功能说明
            Qdrant支持在单个Point中存储多个向量。适合多模态数据场景,如文本+图像。每个向量可以有不同的维度和距离度量。支持对不同向量分别检索。可以组合多个向量的检索结果。灵活的向量配置和管理。提升检索准确性和灵活性。
        b.代码示例
            ---
            from qdrant_client import QdrantClient
            from qdrant_client.models import Distance, VectorParams, PointStruct
            
            client = QdrantClient(host="localhost", port=6333)
            
            # 创建支持多向量的Collection
            client.create_collection(
                collection_name="multimodal_data",
                vectors_config={
                    "text": VectorParams(size=768, distance=Distance.COSINE),
                    "image": VectorParams(size=512, distance=Distance.COSINE),
                    "audio": VectorParams(size=256, distance=Distance.COSINE)
                }
            )
            
            # 插入多向量数据
            import numpy as np
            
            points = [
                PointStruct(
                    id=1,
                    vector={
                        "text": np.random.random(768).tolist(),
                        "image": np.random.random(512).tolist(),
                        "audio": np.random.random(256).tolist()
                    },
                    payload={"title": "Sample 1", "category": "video"}
                )
            ]
            
            client.upsert(
                collection_name="multimodal_data",
                points=points
            )
            
            # 按文本向量检索
            results = client.search(
                collection_name="multimodal_data",
                query_vector=("text", np.random.random(768).tolist()),
                limit=10
            )
            
            # 按图像向量检索
            results = client.search(
                collection_name="multimodal_data",
                query_vector=("image", np.random.random(512).tolist()),
                limit=10
            )
            
            print("多向量支持特点:")
            print("  - 单个Point存储多个向量")
            print("  - 不同向量可有不同维度")
            print("  - 支持分别检索各向量")
            print("  - 适合多模态数据场景")
            ---
    b.Payload管理
        a.功能说明
            Payload是Qdrant的元数据管理机制。支持任意JSON格式的元数据。可以存储文本、数字、布尔、数组等类型。支持嵌套结构和复杂对象。提供丰富的过滤条件。支持Payload索引加速查询。可以动态更新Payload。灵活的元数据管理能力。
        b.代码示例
            ---
            # Payload数据结构
            
            payload_example = {
                "title": "Product Name",
                "description": "Product description text",
                "price": 99.99,
                "rating": 4.5,
                "in_stock": True,
                "tags": ["electronics", "smartphone"],
                "metadata": {
                    "brand": "Apple",
                    "model": "iPhone 15",
                    "color": "Black"
                },
                "created_at": "2024-01-01T00:00:00Z"
            }
            
            # 插入带Payload的Point
            point = PointStruct(
                id=1,
                vector=np.random.random(768).tolist(),
                payload=payload_example
            )
            
            client.upsert(
                collection_name="products",
                points=[point]
            )
            
            # 更新Payload
            client.set_payload(
                collection_name="products",
                payload={"price": 89.99, "in_stock": False},
                points=[1]
            )
            
            # 删除Payload字段
            client.delete_payload(
                collection_name="products",
                keys=["tags"],
                points=[1]
            )
            
            # 创建Payload索引
            client.create_payload_index(
                collection_name="products",
                field_name="price",
                field_schema="float"
            )
            
            print("Payload管理功能:")
            print("  - 支持任意JSON格式")
            print("  - 丰富的数据类型")
            print("  - 嵌套结构支持")
            print("  - 动态更新Payload")
            print("  - Payload索引加速")
            ---

02.查询能力
    a.过滤器Filter
        a.功能说明
            Qdrant提供强大的过滤器功能。支持等值、范围、匹配等条件。可以组合多个过滤条件。支持嵌套过滤和复杂逻辑。过滤器在向量检索前执行。大幅减少检索范围,提升性能。支持Payload索引加速过滤。灵活的过滤条件组合。
        b.代码示例
            ---
            from qdrant_client.models import Filter, FieldCondition, Range, MatchValue
            
            # 1. 等值过滤
            equal_filter = Filter(
                must=[
                    FieldCondition(
                        key="category",
                        match=MatchValue(value="electronics")
                    )
                ]
            )
            
            results = client.search(
                collection_name="products",
                query_vector=np.random.random(768).tolist(),
                query_filter=equal_filter,
                limit=10
            )
            
            # 2. 范围过滤
            range_filter = Filter(
                must=[
                    FieldCondition(
                        key="price",
                        range=Range(gte=50.0, lte=200.0)
                    )
                ]
            )
            
            # 3. 组合过滤
            combined_filter = Filter(
                must=[
                    FieldCondition(key="category", match=MatchValue(value="electronics")),
                    FieldCondition(key="price", range=Range(gte=50.0, lte=200.0)),
                    FieldCondition(key="in_stock", match=MatchValue(value=True))
                ]
            )
            
            # 4. 复杂逻辑过滤
            complex_filter = Filter(
                must=[
                    FieldCondition(key="category", match=MatchValue(value="electronics"))
                ],
                should=[
                    FieldCondition(key="brand", match=MatchValue(value="Apple")),
                    FieldCondition(key="brand", match=MatchValue(value="Samsung"))
                ],
                must_not=[
                    FieldCondition(key="rating", range=Range(lt=3.0))
                ]
            )
            
            results = client.search(
                collection_name="products",
                query_vector=np.random.random(768).tolist(),
                query_filter=complex_filter,
                limit=10
            )
            
            print("过滤器功能:")
            print("  - 等值过滤: match")
            print("  - 范围过滤: range")
            print("  - 组合过滤: must/should/must_not")
            print("  - 嵌套过滤: 支持复杂逻辑")
            print("  - Payload索引: 加速过滤")
            ---
    b.评分函数
        a.功能说明
            Qdrant支持自定义评分函数。可以调整向量距离的计算方式。支持多种距离度量:余弦、欧氏、点积。可以结合Payload进行混合评分。支持自定义评分逻辑。提供评分修改器Modifier。灵活的评分机制满足不同需求。
        b.代码示例
            ---
            from qdrant_client.models import ScoredPoint, SearchRequest
            
            # 1. 基础向量搜索(余弦距离)
            results = client.search(
                collection_name="products",
                query_vector=np.random.random(768).tolist(),
                limit=10
            )
            
            # 2. 使用评分修改器
            from qdrant_client.models import ScoreModifier
            
            results = client.search(
                collection_name="products",
                query_vector=np.random.random(768).tolist(),
                score_threshold=0.7,  # 最低分数阈值
                limit=10
            )
            
            # 3. 混合评分示例(伪代码)
            def custom_scoring(vector_score, payload):
                """自定义评分函数"""
                # 向量相似度得分
                base_score = vector_score
                
                # 价格因子(价格越低,得分越高)
                price_factor = 1.0 / (1.0 + payload.get("price", 100) / 100)
                
                # 评分因子
                rating_factor = payload.get("rating", 0) / 5.0
                
                # 库存因子
                stock_factor = 1.2 if payload.get("in_stock") else 0.8
                
                # 综合得分
                final_score = base_score * (0.6 + 0.2 * price_factor + 0.1 * rating_factor + 0.1 * stock_factor)
                
                return final_score
            
            # 4. 距离度量对比
            distance_metrics = {
                "Cosine": "余弦距离,适合文本向量",
                "Euclid": "欧氏距离,适合图像向量",
                "Dot": "点积距离,适合归一化向量"
            }
            
            print("距离度量:")
            for metric, desc in distance_metrics.items():
                print(f"  {metric}: {desc}")
            
            # 5. 评分阈值过滤
            results = client.search(
                collection_name="products",
                query_vector=np.random.random(768).tolist(),
                score_threshold=0.8,  # 只返回分数>0.8的结果
                limit=10
            )
            
            print("\n评分功能:")
            print("  - 多种距离度量")
            print("  - 评分阈值过滤")
            print("  - 自定义评分逻辑")
            print("  - 混合评分支持")
            ---

1.3 架构设计

01.分布式架构
    a.集群模式
        a.功能说明
            Qdrant支持分布式集群部署。数据自动分片到多个节点。支持副本机制保证高可用。节点间自动数据同步。支持水平扩展,动态添加节点。提供一致性哈希分片策略。支持读写分离提升性能。适合大规模数据和高并发场景。
        b.代码示例
            ---
            # Qdrant集群配置
            
            cluster_config = {
                "cluster": {
                    "enabled": True,
                    "p2p": {
                        "port": 6335
                    },
                    "consensus": {
                        "tick_period_ms": 100
                    }
                },
                "storage": {
                    "storage_path": "/qdrant/storage",
                    "snapshots_path": "/qdrant/snapshots",
                    "on_disk_payload": True
                },
                "service": {
                    "http_port": 6333,
                    "grpc_port": 6334
                }
            }
            
            print("集群配置:")
            import json
            print(json.dumps(cluster_config, indent=2))
            
            # 集群特性
            cluster_features = {
                "数据分片": "自动分片到多个节点",
                "副本机制": "支持多副本保证高可用",
                "自动同步": "节点间数据自动同步",
                "水平扩展": "动态添加节点",
                "一致性哈希": "数据均匀分布",
                "故障转移": "节点故障自动切换"
            }
            
            print("\n集群特性:")
            for feature, desc in cluster_features.items():
                print(f"  {feature}: {desc}")
            
            # 集群部署架构
            deployment_architecture = [
                "1. 多个Qdrant节点组成集群",
                "2. 使用一致性哈希分片数据",
                "3. 每个分片有多个副本",
                "4. 客户端连接任意节点",
                "5. 节点间自动路由请求",
                "6. 支持动态扩缩容"
            ]
            
            print("\n部署架构:")
            for step in deployment_architecture:
                print(f"  {step}")
            ---
    b.数据分片
        a.功能说明
            Qdrant使用Shard机制管理数据。Collection自动分为多个Shard。每个Shard独立存储和索引。支持自定义Shard数量。Shard可以分布在不同节点。支持Shard副本提升可用性。提供Shard迁移和平衡机制。灵活的分片策略适应不同规模。
        b.代码示例
            ---
            from qdrant_client import QdrantClient
            from qdrant_client.models import Distance, VectorParams
            
            client = QdrantClient(host="localhost", port=6333)
            
            # 创建Collection时指定Shard数量
            client.create_collection(
                collection_name="large_dataset",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                shard_number=4,  # 4个Shard
                replication_factor=2  # 每个Shard 2个副本
            )
            
            # 获取Collection信息
            collection_info = client.get_collection(collection_name="large_dataset")
            
            print("Collection信息:")
            print(f"  Shard数量: {collection_info.config.params.shard_number}")
            print(f"  副本因子: {collection_info.config.params.replication_factor}")
            
            # Shard分布策略
            shard_strategies = {
                "轮询": "数据轮流分配到各Shard",
                "哈希": "根据ID哈希值分配",
                "范围": "根据ID范围分配",
                "自定义": "自定义分片逻辑"
            }
            
            print("\nShard分布策略:")
            for strategy, desc in shard_strategies.items():
                print(f"  {strategy}: {desc}")
            
            # Shard管理操作
            shard_operations = [
                "创建Shard: 初始化时指定数量",
                "迁移Shard: 节点间迁移数据",
                "平衡Shard: 自动负载均衡",
                "副本管理: 增加或删除副本",
                "故障恢复: 副本自动接管"
            ]
            
            print("\nShard管理:")
            for op in shard_operations:
                print(f"  - {op}")
            
            # Shard数量建议
            shard_recommendations = {
                "< 100万向量": "1-2个Shard",
                "100万-1000万": "2-4个Shard",
                "1000万-1亿": "4-8个Shard",
                "> 1亿向量": "8-16个Shard"
            }
            
            print("\nShard数量建议:")
            for scale, recommendation in shard_recommendations.items():
                print(f"  {scale}: {recommendation}")
            ---

02.性能优化
    a.索引算法
        a.功能说明
            Qdrant使用HNSW(分层可导航小世界图)索引算法。HNSW提供高效的近似最近邻搜索。支持动态插入和删除。查询性能优异,毫秒级响应。支持参数调优优化性能。内存占用可控。适合大规模向量检索。是业界主流的ANN算法。
        b.代码示例
            ---
            from qdrant_client.models import HnswConfigDiff
            
            # HNSW索引配置
            hnsw_config = HnswConfigDiff(
                m=16,  # 每个节点的连接数
                ef_construct=100,  # 构建时的搜索宽度
                full_scan_threshold=10000  # 全扫描阈值
            )
            
            # 创建Collection时配置HNSW
            client.create_collection(
                collection_name="optimized_collection",
                vectors_config=VectorParams(
                    size=768,
                    distance=Distance.COSINE,
                    hnsw_config=hnsw_config
                )
            )
            
            # HNSW参数说明
            hnsw_params = {
                "m": {
                    "描述": "每个节点的连接数",
                    "默认值": 16,
                    "范围": "4-64",
                    "影响": "越大索引越精确,内存占用越高"
                },
                "ef_construct": {
                    "描述": "构建时的搜索宽度",
                    "默认值": 100,
                    "范围": "100-500",
                    "影响": "越大构建越慢,索引越精确"
                },
                "ef": {
                    "描述": "查询时的搜索宽度",
                    "默认值": "动态",
                    "范围": "100-500",
                    "影响": "越大查询越慢,召回率越高"
                }
            }
            
            print("HNSW参数:")
            for param, info in hnsw_params.items():
                print(f"\n{param}:")
                for key, value in info.items():
                    print(f"  {key}: {value}")
            
            # 查询时指定ef参数
            from qdrant_client.models import SearchParams
            
            results = client.search(
                collection_name="optimized_collection",
                query_vector=np.random.random(768).tolist(),
                search_params=SearchParams(
                    hnsw_ef=128,  # 查询时的ef值
                    exact=False  # 是否精确搜索
                ),
                limit=10
            )
            
            # HNSW优化建议
            optimization_tips = [
                "小数据集(<10万): m=16, ef_construct=100",
                "中数据集(10万-100万): m=32, ef_construct=200",
                "大数据集(>100万): m=48, ef_construct=300",
                "高召回率要求: 增大ef值",
                "低延迟要求: 减小ef值",
                "内存受限: 减小m值",
                "定期优化索引: 提升性能"
            ]
            
            print("\nHNSW优化建议:")
            for i, tip in enumerate(optimization_tips, 1):
                print(f"  {i}. {tip}")
            ---
    b.量化压缩
        a.功能说明
            Qdrant支持向量量化压缩。可以大幅减少内存占用。支持标量量化(Scalar Quantization)。将float32压缩为uint8。内存占用减少75%。查询性能略有下降。适合内存受限场景。平衡内存和性能。
        b.代码示例
            ---
            from qdrant_client.models import ScalarQuantization, ScalarQuantizationConfig, QuantizationSearchParams
            
            # 启用标量量化
            client.create_collection(
                collection_name="quantized_collection",
                vectors_config=VectorParams(
                    size=768,
                    distance=Distance.COSINE
                ),
                quantization_config=ScalarQuantization(
                    scalar=ScalarQuantizationConfig(
                        type="int8",  # 量化类型
                        quantile=0.99,  # 分位数
                        always_ram=True  # 始终保持在内存
                    )
                )
            )
            
            # 量化配置说明
            quantization_config = {
                "type": {
                    "int8": "8位整数量化",
                    "描述": "将float32压缩为int8",
                    "压缩比": "4:1",
                    "精度损失": "轻微"
                },
                "quantile": {
                    "描述": "量化分位数",
                    "默认值": 0.99,
                    "范围": "0.9-1.0",
                    "影响": "越大精度越高"
                },
                "always_ram": {
                    "描述": "是否始终保持在内存",
                    "默认值": True,
                    "影响": "True时性能更好"
                }
            }
            
            print("量化配置:")
            import json
            print(json.dumps(quantization_config, indent=2, ensure_ascii=False))
            
            # 查询时使用量化
            results = client.search(
                collection_name="quantized_collection",
                query_vector=np.random.random(768).tolist(),
                search_params=QuantizationSearchParams(
                    ignore=False,  # 是否忽略量化
                    rescore=True,  # 是否重新评分
                    oversampling=2.0  # 过采样倍数
                ),
                limit=10
            )
            
            # 量化效果对比
            quantization_comparison = {
                "内存占用": {
                    "原始": "100%",
                    "int8量化": "25%",
                    "节省": "75%"
                },
                "查询性能": {
                    "原始": "100%",
                    "int8量化": "90-95%",
                    "损失": "5-10%"
                },
                "召回率": {
                    "原始": "100%",
                    "int8量化": "98-99%",
                    "损失": "1-2%"
                }
            }
            
            print("\n量化效果对比:")
            for metric, values in quantization_comparison.items():
                print(f"\n{metric}:")
                for key, value in values.items():
                    print(f"  {key}: {value}")
            
            # 量化使用建议
            quantization_tips = [
                "内存受限时启用量化",
                "大规模数据集推荐使用",
                "对召回率要求不极致时使用",
                "启用rescore提升精度",
                "调整oversampling平衡性能",
                "定期评估量化效果",
                "小数据集不建议量化"
            ]
            
            print("\n量化使用建议:")
            for i, tip in enumerate(quantization_tips, 1):
                print(f"  {i}. {tip}")
            ---

2 快速开始

2.1 安装部署

01.Docker部署
    a.单机部署
        a.功能说明
            Docker是最简单的Qdrant部署方式。拉取官方镜像快速启动。支持数据持久化和端口映射。适合开发测试和小规模应用。配置简单,维护方便。支持环境变量配置。可以快速升级和回滚。是推荐的部署方式。
        b.代码示例
            ---
            # 1. 拉取Qdrant镜像
            # docker pull qdrant/qdrant
            
            # 2. 启动Qdrant容器
            docker_run_command = """
            docker run -d \\
              --name qdrant \\
              -p 6333:6333 \\
              -p 6334:6334 \\
              -v $(pwd)/qdrant_storage:/qdrant/storage \\
              -v $(pwd)/qdrant_snapshots:/qdrant/snapshots \\
              qdrant/qdrant
            """
            
            print("Docker启动命令:")
            print(docker_run_command)
            
            # 3. 使用环境变量配置
            docker_run_with_env = """
            docker run -d \\
              --name qdrant \\
              -p 6333:6333 \\
              -p 6334:6334 \\
              -e QDRANT__SERVICE__HTTP_PORT=6333 \\
              -e QDRANT__SERVICE__GRPC_PORT=6334 \\
              -e QDRANT__LOG_LEVEL=INFO \\
              -v $(pwd)/qdrant_storage:/qdrant/storage \\
              qdrant/qdrant
            """
            
            print("\n带环境变量的启动:")
            print(docker_run_with_env)
            
            # 4. Docker Compose配置
            docker_compose_config = """
            version: '3.8'

            services:
              qdrant:
                image: qdrant/qdrant:latest
                container_name: qdrant
                ports:
                  - "6333:6333"
                  - "6334:6334"
                volumes:
                  - ./qdrant_storage:/qdrant/storage
                  - ./qdrant_snapshots:/qdrant/snapshots
                environment:
                  - QDRANT__SERVICE__HTTP_PORT=6333
                  - QDRANT__SERVICE__GRPC_PORT=6334
                  - QDRANT__LOG_LEVEL=INFO
                restart: unless-stopped
            """
            
            print("\nDocker Compose配置:")
            print(docker_compose_config)
            
            # 5. 验证部署
            verification_steps = [
                "1. 检查容器状态: docker ps",
                "2. 查看日志: docker logs qdrant",
                "3. 访问Web UI: http://localhost:6333/dashboard",
                "4. 测试API: curl http://localhost:6333/",
                "5. 检查健康状态: curl http://localhost:6333/health"
            ]
            
            print("\n验证部署:")
            for step in verification_steps:
                print(f"  {step}")
            
            # 6. 常用Docker命令
            docker_commands = {
                "启动": "docker start qdrant",
                "停止": "docker stop qdrant",
                "重启": "docker restart qdrant",
                "查看日志": "docker logs -f qdrant",
                "进入容器": "docker exec -it qdrant /bin/bash",
                "删除容器": "docker rm -f qdrant"
            }
            
            print("\n常用命令:")
            for action, cmd in docker_commands.items():
                print(f"  {action}: {cmd}")
            ---
    b.配置文件
        a.功能说明
            Qdrant支持通过配置文件自定义设置。配置文件为YAML格式。可以配置服务端口、存储路径、日志级别等。支持集群配置和性能优化。配置文件优先级高于环境变量。提供默认配置和自定义配置。灵活的配置机制满足不同需求。
        b.代码示例
            ---
            # Qdrant配置文件 (config.yaml)
            
            qdrant_config = """
            service:
              http_port: 6333
              grpc_port: 6334
              enable_tls: false

            storage:
              storage_path: /qdrant/storage
              snapshots_path: /qdrant/snapshots
              on_disk_payload: true
              wal_capacity_mb: 32
              wal_segments_ahead: 0

              # 性能优化
              optimizers:
                deleted_threshold: 0.2
                vacuum_min_vector_number: 1000
                default_segment_number: 0
                max_segment_size: 200000
                memmap_threshold: 50000
                indexing_threshold: 20000
                flush_interval_sec: 5
                max_optimization_threads: 1

              # HNSW索引配置
              hnsw_index:
                m: 16
                ef_construct: 100
                full_scan_threshold: 10000

            log_level: INFO

            cluster:
              enabled: false
              # p2p:
              #   port: 6335
              # consensus:
              #   tick_period_ms: 100
            """
            
            print("Qdrant配置文件:")
            print(qdrant_config)
            
            # 使用自定义配置启动
            docker_with_config = """
            docker run -d \\
              --name qdrant \\
              -p 6333:6333 \\
              -p 6334:6334 \\
              -v $(pwd)/config.yaml:/qdrant/config/production.yaml \\
              -v $(pwd)/qdrant_storage:/qdrant/storage \\
              qdrant/qdrant
            """
            
            print("\n使用自定义配置:")
            print(docker_with_config)
            
            # 配置项说明
            config_items = {
                "service": "服务端口和TLS配置",
                "storage": "存储路径和优化器配置",
                "log_level": "日志级别(DEBUG/INFO/WARN/ERROR)",
                "cluster": "集群模式配置",
                "hnsw_index": "HNSW索引默认参数"
            }
            
            print("\n配置项说明:")
            for item, desc in config_items.items():
                print(f"  {item}: {desc}")
            
            # 配置优先级
            config_priority = [
                "1. 命令行参数(最高优先级)",
                "2. 环境变量",
                "3. 配置文件",
                "4. 默认值(最低优先级)"
            ]
            
            print("\n配置优先级:")
            for priority in config_priority:
                print(f"  {priority}")
            ---

02.本地安装
    a.二进制安装
        a.功能说明
            Qdrant提供预编译的二进制文件。支持Linux、macOS、Windows平台。下载后直接运行,无需依赖。适合生产环境部署。性能优于Docker方式。支持systemd服务管理。提供自动更新机制。
        b.代码示例
            ---
            # 1. 下载Qdrant二进制文件
            download_commands = {
                "Linux": "wget https://github.com/qdrant/qdrant/releases/download/v1.7.0/qdrant-x86_64-unknown-linux-gnu.tar.gz",
                "macOS": "wget https://github.com/qdrant/qdrant/releases/download/v1.7.0/qdrant-x86_64-apple-darwin.tar.gz",
                "Windows": "下载 qdrant-x86_64-pc-windows-msvc.zip"
            }
            
            print("下载命令:")
            for platform, cmd in download_commands.items():
                print(f"  {platform}: {cmd}")
            
            # 2. 解压和安装
            install_steps = """
            # 解压
            tar -xzf qdrant-x86_64-unknown-linux-gnu.tar.gz

            # 移动到系统路径
            sudo mv qdrant /usr/local/bin/

            # 创建数据目录
            sudo mkdir -p /var/lib/qdrant/storage
            sudo mkdir -p /var/lib/qdrant/snapshots

            # 创建配置目录
            sudo mkdir -p /etc/qdrant
            """
            
            print("\n安装步骤:")
            print(install_steps)
            
            # 3. 创建systemd服务
            systemd_service = """
            [Unit]
            Description=Qdrant Vector Database
            After=network.target

            [Service]
            Type=simple
            User=qdrant
            Group=qdrant
            ExecStart=/usr/local/bin/qdrant --config-path /etc/qdrant/config.yaml
            Restart=on-failure
            RestartSec=5s

            [Install]
            WantedBy=multi-user.target
            """
            
            print("\nsystemd服务配置 (/etc/systemd/system/qdrant.service):")
            print(systemd_service)
            
            # 4. 服务管理命令
            service_commands = {
                "启动服务": "sudo systemctl start qdrant",
                "停止服务": "sudo systemctl stop qdrant",
                "重启服务": "sudo systemctl restart qdrant",
                "查看状态": "sudo systemctl status qdrant",
                "开机启动": "sudo systemctl enable qdrant",
                "查看日志": "sudo journalctl -u qdrant -f"
            }
            
            print("\n服务管理:")
            for action, cmd in service_commands.items():
                print(f"  {action}: {cmd}")
            
            # 5. 验证安装
            verification = [
                "1. 检查版本: qdrant --version",
                "2. 测试启动: qdrant --config-path /etc/qdrant/config.yaml",
                "3. 访问API: curl http://localhost:6333/",
                "4. 查看日志: tail -f /var/log/qdrant/qdrant.log"
            ]
            
            print("\n验证安装:")
            for step in verification:
                print(f"  {step}")
            ---
    b.源码编译
        a.功能说明
            从源码编译Qdrant可以获得最佳性能。需要Rust开发环境。支持自定义编译选项。可以启用特定CPU优化。适合高性能要求场景。编译时间较长。需要一定的技术能力。
        b.代码示例
            ---
            # 1. 安装Rust环境
            rust_install = """
            # 安装Rust
            curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

            # 配置环境变量
            source $HOME/.cargo/env

            # 验证安装
            rustc --version
            cargo --version
            """
            
            print("安装Rust:")
            print(rust_install)
            
            # 2. 克隆Qdrant源码
            clone_steps = """
            # 克隆仓库
            git clone https://github.com/qdrant/qdrant.git
            cd qdrant

            # 切换到稳定版本
            git checkout v1.7.0
            """
            
            print("\n克隆源码:")
            print(clone_steps)
            
            # 3. 编译Qdrant
            build_commands = {
                "开发版本": "cargo build",
                "发布版本": "cargo build --release",
                "优化编译": "RUSTFLAGS='-C target-cpu=native' cargo build --release",
                "静态链接": "cargo build --release --target x86_64-unknown-linux-musl"
            }
            
            print("\n编译命令:")
            for build_type, cmd in build_commands.items():
                print(f"  {build_type}: {cmd}")
            
            # 4. 运行编译后的程序
            run_command = """
            # 运行发布版本
            ./target/release/qdrant --config-path ./config/config.yaml

            # 或者安装到系统
            sudo cp ./target/release/qdrant /usr/local/bin/
            """
            
            print("\n运行程序:")
            print(run_command)
            
            # 5. 编译优化选项
            optimization_flags = {
                "target-cpu=native": "针对当前CPU优化",
                "lto=true": "启用链接时优化",
                "codegen-units=1": "单个代码生成单元",
                "opt-level=3": "最高优化级别"
            }
            
            print("\n编译优化:")
            for flag, desc in optimization_flags.items():
                print(f"  {flag}: {desc}")
            
            # 6. 编译注意事项
            build_notes = [
                "编译时间较长(10-30分钟)",
                "需要足够的内存(建议4GB+)",
                "发布版本比开发版本快10-100倍",
                "使用target-cpu=native获得最佳性能",
                "编译前确保依赖库完整",
                "定期更新Rust工具链"
            ]
            
            print("\n注意事项:")
            for i, note in enumerate(build_notes, 1):
                print(f"  {i}. {note}")
            ---

2.2 客户端连接

01.Python客户端
    a.安装配置
        a.功能说明
            Qdrant提供官方Python客户端库。支持同步和异步操作。提供完整的API封装。安装简单,使用方便。支持类型提示和代码补全。与主流AI框架集成良好。是最常用的客户端。
        b.代码示例
            ---
            # 1. 安装qdrant-client
            # pip install qdrant-client
            
            from qdrant_client import QdrantClient
            from qdrant_client.models import Distance, VectorParams, PointStruct
            import numpy as np
            
            # 2. 连接到Qdrant服务器
            # 本地连接
            client = QdrantClient(host="localhost", port=6333)
            
            # 远程连接
            client = QdrantClient(
                url="https://your-cluster.qdrant.io",
                api_key="your-api-key"
            )
            
            # 3. 使用内存模式(开发测试)
            client = QdrantClient(":memory:")
            
            # 4. 验证连接
            try:
                collections = client.get_collections()
                print(f"连接成功!当前有 {len(collections.collections)} 个Collection")
            except Exception as e:
                print(f"连接失败: {e}")
            
            # 5. 客户端配置选项
            client_config = {
                "host": "服务器地址",
                "port": "端口号(默认6333)",
                "grpc_port": "gRPC端口(默认6334)",
                "prefer_grpc": "优先使用gRPC(默认False)",
                "https": "是否使用HTTPS",
                "api_key": "API密钥",
                "timeout": "请求超时时间(秒)",
                "prefix": "API路径前缀"
            }
            
            print("\n客户端配置:")
            for key, desc in client_config.items():
                print(f"  {key}: {desc}")
            
            # 6. 高级配置示例
            from qdrant_client import QdrantClient
            
            client = QdrantClient(
                host="localhost",
                port=6333,
                grpc_port=6334,
                prefer_grpc=True,  # 使用gRPC提升性能
                timeout=60,  # 60秒超时
                https=False,
                api_key=None
            )
            
            print("\n客户端已配置完成")
            ---
    b.异步客户端
        a.功能说明
            Qdrant支持异步Python客户端。基于asyncio实现。适合高并发场景。可以并行执行多个请求。提升整体吞吐量。与FastAPI等异步框架集成良好。API与同步版本基本一致。
        b.代码示例
            ---
            import asyncio
            from qdrant_client import AsyncQdrantClient
            from qdrant_client.models import Distance, VectorParams, PointStruct
            import numpy as np
            
            # 1. 创建异步客户端
            async def main():
                client = AsyncQdrantClient(host="localhost", port=6333)
                
                # 2. 异步创建Collection
                await client.create_collection(
                    collection_name="async_test",
                    vectors_config=VectorParams(size=128, distance=Distance.COSINE)
                )
                
                # 3. 异步插入数据
                points = [
                    PointStruct(
                        id=i,
                        vector=np.random.random(128).tolist(),
                        payload={"index": i}
                    )
                    for i in range(100)
                ]
                
                await client.upsert(
                    collection_name="async_test",
                    points=points
                )
                
                # 4. 异步搜索
                results = await client.search(
                    collection_name="async_test",
                    query_vector=np.random.random(128).tolist(),
                    limit=10
                )
                
                print(f"找到 {len(results)} 个结果")
                
                # 5. 并发执行多个搜索
                search_tasks = [
                    client.search(
                        collection_name="async_test",
                        query_vector=np.random.random(128).tolist(),
                        limit=10
                    )
                    for _ in range(10)
                ]
                
                # 并发执行
                all_results = await asyncio.gather(*search_tasks)
                print(f"并发执行了 {len(all_results)} 个搜索")
                
                # 6. 关闭客户端
                await client.close()
            
            # 运行异步函数
            # asyncio.run(main())
            
            # 异步客户端优势
            async_advantages = [
                "高并发场景性能更好",
                "可以并行执行多个请求",
                "与异步框架集成良好",
                "提升整体吞吐量",
                "资源利用率更高"
            ]
            
            print("\n异步客户端优势:")
            for i, adv in enumerate(async_advantages, 1):
                print(f"  {i}. {adv}")
            
            # 使用场景
            use_cases = {
                "Web应用": "FastAPI/Sanic等异步框架",
                "批量处理": "并发处理大量数据",
                "实时系统": "需要低延迟的实时应用",
                "微服务": "异步微服务架构"
            }
            
            print("\n使用场景:")
            for scenario, desc in use_cases.items():
                print(f"  {scenario}: {desc}")
            ---

02.其他客户端
    a.REST API
        a.功能说明
            Qdrant提供完整的RESTful API。支持所有核心功能。使用标准HTTP协议。可以用任何语言调用。适合跨语言集成。提供OpenAPI文档。支持curl等命令行工具。是最通用的接口。
        b.代码示例
            ---
            import requests
            import json
            import numpy as np
            
            # 1. 创建Collection
            create_collection_url = "http://localhost:6333/collections/rest_test"
            create_payload = {
                "vectors": {
                    "size": 128,
                    "distance": "Cosine"
                }
            }
            
            response = requests.put(create_collection_url, json=create_payload)
            print(f"创建Collection: {response.status_code}")
            
            # 2. 插入数据
            upsert_url = "http://localhost:6333/collections/rest_test/points"
            points_payload = {
                "points": [
                    {
                        "id": 1,
                        "vector": np.random.random(128).tolist(),
                        "payload": {"name": "Item 1"}
                    },
                    {
                        "id": 2,
                        "vector": np.random.random(128).tolist(),
                        "payload": {"name": "Item 2"}
                    }
                ]
            }
            
            response = requests.put(upsert_url, json=points_payload)
            print(f"插入数据: {response.status_code}")
            
            # 3. 搜索
            search_url = "http://localhost:6333/collections/rest_test/points/search"
            search_payload = {
                "vector": np.random.random(128).tolist(),
                "limit": 10,
                "with_payload": True,
                "with_vector": False
            }
            
            response = requests.post(search_url, json=search_payload)
            results = response.json()
            print(f"搜索结果: {len(results['result'])} 个")
            
            # 4. 获取Collection信息
            info_url = "http://localhost:6333/collections/rest_test"
            response = requests.get(info_url)
            info = response.json()
            print(f"\nCollection信息:")
            print(json.dumps(info['result'], indent=2))
            
            # 5. 常用REST API端点
            api_endpoints = {
                "GET /collections": "列出所有Collection",
                "PUT /collections/{name}": "创建Collection",
                "GET /collections/{name}": "获取Collection信息",
                "DELETE /collections/{name}": "删除Collection",
                "PUT /collections/{name}/points": "插入/更新点",
                "POST /collections/{name}/points/search": "搜索",
                "POST /collections/{name}/points/scroll": "滚动查询",
                "DELETE /collections/{name}/points": "删除点"
            }
            
            print("\nREST API端点:")
            for endpoint, desc in api_endpoints.items():
                print(f"  {endpoint}: {desc}")
            
            # 6. curl示例
            curl_examples = """
            # 创建Collection
            curl -X PUT 'http://localhost:6333/collections/test' \\
              -H 'Content-Type: application/json' \\
              -d '{
                "vectors": {
                  "size": 128,
                  "distance": "Cosine"
                }
              }'

            # 搜索
            curl -X POST 'http://localhost:6333/collections/test/points/search' \\
              -H 'Content-Type: application/json' \\
              -d '{
                "vector": [0.1, 0.2, ...],
                "limit": 10
              }'
            """
            
            print("\ncurl示例:")
            print(curl_examples)
            ---
    b.gRPC接口
        a.功能说明
            Qdrant支持gRPC接口。性能优于REST API。适合高性能场景。使用Protocol Buffers序列化。支持流式传输。延迟更低,吞吐量更高。需要生成客户端代码。适合内部服务调用。
        b.代码示例
            ---
            from qdrant_client import QdrantClient
            
            # 1. 使用gRPC连接
            client = QdrantClient(
                host="localhost",
                port=6333,
                grpc_port=6334,
                prefer_grpc=True  # 优先使用gRPC
            )
            
            # 2. gRPC性能对比
            performance_comparison = {
                "延迟": {
                    "REST": "5-10ms",
                    "gRPC": "1-3ms",
                    "提升": "50-70%"
                },
                "吞吐量": {
                    "REST": "1000 QPS",
                    "gRPC": "3000-5000 QPS",
                    "提升": "3-5倍"
                },
                "序列化": {
                    "REST": "JSON(文本)",
                    "gRPC": "Protobuf(二进制)",
                    "优势": "更紧凑高效"
                }
            }
            
            print("gRPC性能对比:")
            for metric, values in performance_comparison.items():
                print(f"\n{metric}:")
                for key, value in values.items():
                    print(f"  {key}: {value}")
            
            # 3. gRPC使用场景
            grpc_use_cases = [
                "高性能要求的生产环境",
                "微服务间内部调用",
                "大规模批量操作",
                "实时搜索系统",
                "低延迟要求的应用"
            ]
            
            print("\ngRPC使用场景:")
            for i, case in enumerate(grpc_use_cases, 1):
                print(f"  {i}. {case}")
            
            # 4. REST vs gRPC选择建议
            selection_guide = {
                "REST": [
                    "开发测试环境",
                    "跨语言集成",
                    "简单的CRUD操作",
                    "需要人工调试",
                    "对性能要求不高"
                ],
                "gRPC": [
                    "生产环境",
                    "高性能要求",
                    "大规模数据操作",
                    "内部服务调用",
                    "低延迟需求"
                ]
            }
            
            print("\n选择建议:")
            for protocol, scenarios in selection_guide.items():
                print(f"\n{protocol}:")
                for scenario in scenarios:
                    print(f"  - {scenario}")
            
            # 5. gRPC配置示例
            grpc_config = {
                "prefer_grpc": True,  # 优先使用gRPC
                "grpc_port": 6334,  # gRPC端口
                "timeout": 60,  # 超时时间
                "max_message_size": 100 * 1024 * 1024  # 最大消息大小
            }
            
            print("\ngRPC配置:")
            for key, value in grpc_config.items():
                print(f"  {key}: {value}")
            ---

2.3 基础操作

01.Collection操作
    a.创建Collection
        a.功能说明
            Collection是Qdrant中存储向量的容器。创建时需指定向量维度和距离度量。支持多向量配置。可以配置HNSW索引参数。支持量化和优化器配置。创建后可以动态调整部分参数。是使用Qdrant的第一步。
        b.代码示例
            ---
            from qdrant_client import QdrantClient
            from qdrant_client.models import Distance, VectorParams, HnswConfigDiff
            
            client = QdrantClient(host="localhost", port=6333)
            
            # 1. 创建简单Collection
            client.create_collection(
                collection_name="simple_collection",
                vectors_config=VectorParams(
                    size=768,  # 向量维度
                    distance=Distance.COSINE  # 距离度量
                )
            )
            
            # 2. 创建带HNSW配置的Collection
            client.create_collection(
                collection_name="optimized_collection",
                vectors_config=VectorParams(
                    size=768,
                    distance=Distance.COSINE,
                    hnsw_config=HnswConfigDiff(
                        m=32,  # 连接数
                        ef_construct=200,  # 构建时搜索宽度
                        full_scan_threshold=10000  # 全扫描阈值
                    )
                )
            )
            
            # 3. 创建多向量Collection
            client.create_collection(
                collection_name="multimodal_collection",
                vectors_config={
                    "text": VectorParams(size=768, distance=Distance.COSINE),
                    "image": VectorParams(size=512, distance=Distance.COSINE)
                }
            )
            
            # 4. 创建分片Collection
            client.create_collection(
                collection_name="sharded_collection",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                shard_number=4,  # 4个分片
                replication_factor=2  # 每个分片2个副本
            )
            
            print("Collection创建完成")
            
            # 5. 距离度量说明
            distance_metrics = {
                "Cosine": "余弦距离,适合文本向量,范围[-1,1]",
                "Euclid": "欧氏距离,适合图像向量,范围[0,∞]",
                "Dot": "点积距离,适合归一化向量,范围[-∞,∞]"
            }
            
            print("\n距离度量:")
            for metric, desc in distance_metrics.items():
                print(f"  {metric}: {desc}")
            
            # 6. 创建参数说明
            create_params = {
                "collection_name": "Collection名称",
                "vectors_config": "向量配置(单向量或多向量)",
                "shard_number": "分片数量(默认1)",
                "replication_factor": "副本因子(默认1)",
                "write_consistency_factor": "写一致性因子",
                "on_disk_payload": "是否将Payload存储在磁盘"
            }
            
            print("\n创建参数:")
            for param, desc in create_params.items():
                print(f"  {param}: {desc}")
            ---
    b.查看Collection
        a.功能说明
            可以查看Collection的详细信息。包括向量配置、索引参数、数据统计。查看分片和副本信息。监控Collection状态。获取优化器配置。了解存储使用情况。是运维和调试的重要工具。
        b.代码示例
            ---
            # 1. 列出所有Collection
            collections = client.get_collections()
            print("所有Collection:")
            for collection in collections.collections:
                print(f"  - {collection.name}")
            
            # 2. 获取Collection详细信息
            collection_info = client.get_collection(collection_name="simple_collection")
            
            print("\nCollection详细信息:")
            print(f"  名称: {collection_info.config.params.vectors.size}")
            print(f"  向量维度: {collection_info.config.params.vectors.size}")
            print(f"  距离度量: {collection_info.config.params.vectors.distance}")
            print(f"  数据量: {collection_info.points_count}")
            print(f"  分片数: {collection_info.config.params.shard_number}")
            print(f"  副本因子: {collection_info.config.params.replication_factor}")
            
            # 3. 获取Collection统计信息
            print("\n存储统计:")
            print(f"  向量数量: {collection_info.points_count}")
            print(f"  索引状态: {collection_info.status}")
            print(f"  优化器状态: {collection_info.optimizer_status}")
            
            # 4. 检查Collection是否存在
            def collection_exists(collection_name):
                try:
                    client.get_collection(collection_name)
                    return True
                except Exception:
                    return False
            
            if collection_exists("simple_collection"):
                print("\nCollection存在")
            else:
                print("\nCollection不存在")
            
            # 5. Collection状态说明
            status_description = {
                "green": "健康状态,所有分片正常",
                "yellow": "部分分片不可用,但服务正常",
                "red": "严重问题,部分数据不可访问"
            }
            
            print("\n状态说明:")
            for status, desc in status_description.items():
                print(f"  {status}: {desc}")
            ---

02.数据操作
    a.插入数据
        a.功能说明
            向Collection中插入向量数据。支持单条和批量插入。每个Point包含ID、向量和Payload。ID可以是整数或UUID。支持自动生成ID。批量插入性能更好。插入操作是幂等的。
        b.代码示例
            ---
            import numpy as np
            from qdrant_client.models import PointStruct
            import uuid
            
            # 1. 插入单条数据
            point = PointStruct(
                id=1,
                vector=np.random.random(768).tolist(),
                payload={"name": "Item 1", "category": "A"}
            )
            
            client.upsert(
                collection_name="simple_collection",
                points=[point]
            )
            
            # 2. 批量插入数据
            points = [
                PointStruct(
                    id=i,
                    vector=np.random.random(768).tolist(),
                    payload={"name": f"Item {i}", "index": i}
                )
                for i in range(1000)
            ]
            
            client.upsert(
                collection_name="simple_collection",
                points=points,
                wait=True  # 等待插入完成
            )
            
            print(f"插入了 {len(points)} 条数据")
            
            # 3. 使用UUID作为ID
            uuid_points = [
                PointStruct(
                    id=str(uuid.uuid4()),
                    vector=np.random.random(768).tolist(),
                    payload={"name": f"UUID Item {i}"}
                )
                for i in range(100)
            ]
            
            client.upsert(
                collection_name="simple_collection",
                points=uuid_points
            )
            
            # 4. 插入多向量数据
            multimodal_point = PointStruct(
                id=1,
                vector={
                    "text": np.random.random(768).tolist(),
                    "image": np.random.random(512).tolist()
                },
                payload={"title": "Multimodal Item"}
            )
            
            client.upsert(
                collection_name="multimodal_collection",
                points=[multimodal_point]
            )
            
            # 5. 批量插入性能优化
            batch_size = 100
            total_points = 10000
            
            for i in range(0, total_points, batch_size):
                batch = [
                    PointStruct(
                        id=j,
                        vector=np.random.random(768).tolist(),
                        payload={"batch": i // batch_size, "index": j}
                    )
                    for j in range(i, min(i + batch_size, total_points))
                ]
                
                client.upsert(
                    collection_name="simple_collection",
                    points=batch,
                    wait=False  # 异步插入,提升性能
                )
            
            print(f"\n批量插入 {total_points} 条数据完成")
            
            # 6. 插入最佳实践
            best_practices = [
                "使用批量插入提升性能",
                "批量大小建议100-1000",
                "wait=False异步插入更快",
                "定期检查插入结果",
                "使用UUID避免ID冲突",
                "合理设置Payload大小"
            ]
            
            print("\n插入最佳实践:")
            for i, practice in enumerate(best_practices, 1):
                print(f"  {i}. {practice}")
            ---
    b.查询数据
        a.功能说明
            Qdrant提供多种数据查询方式。向量搜索是核心功能。支持过滤条件组合。可以滚动查询所有数据。支持按ID获取数据。提供分页和限制功能。灵活的查询能力满足不同需求。
        b.代码示例
            ---
            # 1. 向量搜索
            query_vector = np.random.random(768).tolist()
            
            results = client.search(
                collection_name="simple_collection",
                query_vector=query_vector,
                limit=10,
                with_payload=True,  # 返回Payload
                with_vector=False  # 不返回向量
            )
            
            print("搜索结果:")
            for result in results:
                print(f"  ID: {result.id}, Score: {result.score:.4f}, Payload: {result.payload}")
            
            # 2. 带过滤的搜索
            from qdrant_client.models import Filter, FieldCondition, MatchValue
            
            filtered_results = client.search(
                collection_name="simple_collection",
                query_vector=query_vector,
                query_filter=Filter(
                    must=[
                        FieldCondition(
                            key="category",
                            match=MatchValue(value="A")
                        )
                    ]
                ),
                limit=10
            )
            
            print(f"\n过滤搜索找到 {len(filtered_results)} 个结果")
            
            # 3. 按ID获取数据
            point = client.retrieve(
                collection_name="simple_collection",
                ids=[1, 2, 3],
                with_payload=True,
                with_vector=True
            )
            
            print(f"\n获取了 {len(point)} 个Point")
            
            # 4. 滚动查询(遍历所有数据)
            offset = None
            all_points = []
            
            while True:
                result = client.scroll(
                    collection_name="simple_collection",
                    limit=100,
                    offset=offset,
                    with_payload=True,
                    with_vector=False
                )
                
                points, offset = result
                all_points.extend(points)
                
                if offset is None:
                    break
            
            print(f"\n滚动查询获取了 {len(all_points)} 个Point")
            
            # 5. 计数查询
            from qdrant_client.models import CountRequest
            
            count = client.count(
                collection_name="simple_collection",
                count_filter=Filter(
                    must=[
                        FieldCondition(key="category", match=MatchValue(value="A"))
                    ]
                ),
                exact=True
            )
            
            print(f"\n符合条件的数据量: {count.count}")
            
            # 6. 查询参数说明
            query_params = {
                "limit": "返回结果数量",
                "offset": "分页偏移量",
                "with_payload": "是否返回Payload",
                "with_vector": "是否返回向量",
                "score_threshold": "最低分数阈值",
                "query_filter": "过滤条件"
            }
            
            print("\n查询参数:")
            for param, desc in query_params.items():
                print(f"  {param}: {desc}")
            ---

3 Collection管理

3.1 创建Collection

01.基础配置
    a.向量参数
        a.功能说明
            创建Collection时需要配置向量参数。指定向量维度和距离度量。支持单向量和多向量配置。可以配置HNSW索引参数。支持量化和优化器配置。配置影响性能和存储。合理配置是性能优化的基础。
        b.代码示例
            ---
            from qdrant_client import QdrantClient
            from qdrant_client.models import Distance, VectorParams, HnswConfigDiff
            
            client = QdrantClient(host="localhost", port=6333)
            
            # 1. 基础向量配置
            client.create_collection(
                collection_name="basic_collection",
                vectors_config=VectorParams(
                    size=768,  # 向量维度
                    distance=Distance.COSINE  # 距离度量
                )
            )
            
            # 2. 带HNSW配置
            client.create_collection(
                collection_name="hnsw_collection",
                vectors_config=VectorParams(
                    size=768,
                    distance=Distance.COSINE,
                    hnsw_config=HnswConfigDiff(
                        m=32,
                        ef_construct=200,
                        full_scan_threshold=10000
                    )
                )
            )
            
            # 3. 多向量配置
            client.create_collection(
                collection_name="multi_vector",
                vectors_config={
                    "text": VectorParams(size=768, distance=Distance.COSINE),
                    "image": VectorParams(size=512, distance=Distance.EUCLID)
                }
            )
            
            print("Collection创建完成")
            ---
    b.分片配置
        a.功能说明
            Qdrant支持数据分片和副本配置。分片提升并发性能和存储容量。副本提供高可用性和容错能力。可以配置分片数量和副本因子。支持动态调整分片和副本。合理配置分片和副本平衡性能和可用性。
        b.代码示例
            ---
            # 创建分片Collection
            client.create_collection(
                collection_name="sharded_collection",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                shard_number=4,  # 4个分片
                replication_factor=2,  # 每个分片2个副本
                write_consistency_factor=1  # 写一致性因子
            )
            
            # 分片配置说明
            shard_config = {
                "shard_number": "分片数量,影响并发性能",
                "replication_factor": "副本因子,影响可用性",
                "write_consistency_factor": "写一致性,影响写入性能"
            }
            
            print("分片配置:")
            for key, desc in shard_config.items():
                print(f"  {key}: {desc}")
            ---

02.高级配置
    a.优化器配置
        a.功能说明
            Qdrant提供优化器配置选项。控制索引构建和数据压缩。配置内存和磁盘使用策略。调整刷盘频率和合并策略。优化器配置影响性能和资源使用。合理配置优化器提升系统效率。
        b.代码示例
            ---
            from qdrant_client.models import OptimizersConfigDiff
            
            client.create_collection(
                collection_name="optimized_collection",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                optimizers_config=OptimizersConfigDiff(
                    deleted_threshold=0.2,
                    vacuum_min_vector_number=1000,
                    default_segment_number=0,
                    max_segment_size=200000,
                    memmap_threshold=50000,
                    indexing_threshold=20000,
                    flush_interval_sec=5,
                    max_optimization_threads=1
                )
            )
            
            print("优化器配置完成")
            ---
    b.存储配置
        a.功能说明
            Qdrant支持灵活的存储配置。可以选择内存或磁盘存储。支持Payload存储在磁盘。配置WAL容量和段数。调整mmap阈值优化内存使用。存储配置影响性能和成本。根据数据规模选择合适的存储策略。
        b.代码示例
            ---
            # 创建磁盘存储Collection
            client.create_collection(
                collection_name="disk_collection",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                on_disk_payload=True  # Payload存储在磁盘
            )
            
            # 存储策略
            storage_strategies = {
                "内存存储": "性能最好,成本高,适合小数据集",
                "磁盘存储": "成本低,性能略降,适合大数据集",
                "混合存储": "向量在内存,Payload在磁盘"
            }
            
            print("存储策略:")
            for strategy, desc in storage_strategies.items():
                print(f"  {strategy}: {desc}")
            ---

3.2 向量配置

01.距离度量
    a.余弦距离
        a.功能说明
            余弦距离衡量向量方向的相似度。范围[-1,1],1表示完全相同。适合文本向量和归一化向量。不受向量长度影响。是最常用的距离度量。语义搜索的首选。计算效率高。
        b.代码示例
            ---
            from qdrant_client.models import Distance, VectorParams
            
            # 使用余弦距离
            client.create_collection(
                collection_name="cosine_collection",
                vectors_config=VectorParams(
                    size=768,
                    distance=Distance.COSINE
                )
            )
            
            # 余弦距离特点
            cosine_features = {
                "范围": "[-1, 1]",
                "1": "完全相同",
                "0": "正交",
                "-1": "完全相反",
                "适用": "文本向量、归一化向量",
                "优势": "不受向量模长影响"
            }
            
            print("余弦距离特点:")
            for key, value in cosine_features.items():
                print(f"  {key}: {value}")
            
            # 余弦相似度计算示例
            import numpy as np
            
            def cosine_similarity(vec1, vec2):
                """计算余弦相似度"""
                dot_product = np.dot(vec1, vec2)
                norm1 = np.linalg.norm(vec1)
                norm2 = np.linalg.norm(vec2)
                return dot_product / (norm1 * norm2)
            
            vec_a = np.array([1, 2, 3])
            vec_b = np.array([4, 5, 6])
            
            similarity = cosine_similarity(vec_a, vec_b)
            print(f"\n余弦相似度: {similarity:.4f}")
            ---
    b.欧氏距离
        a.功能说明
            欧氏距离衡量向量在空间中的直线距离。范围[0,∞],0表示完全相同。适合图像向量和未归一化向量。受向量长度影响。计算直观易理解。适合需要考虑向量模长的场景。
        b.代码示例
            ---
            # 使用欧氏距离
            client.create_collection(
                collection_name="euclid_collection",
                vectors_config=VectorParams(
                    size=512,
                    distance=Distance.EUCLID
                )
            )
            
            # 欧氏距离特点
            euclid_features = {
                "范围": "[0, ∞]",
                "0": "完全相同",
                "越大": "越不相似",
                "适用": "图像向量、未归一化向量",
                "优势": "计算直观,物理意义明确"
            }
            
            print("欧氏距离特点:")
            for key, value in euclid_features.items():
                print(f"  {key}: {value}")
            
            # 欧氏距离计算示例
            def euclidean_distance(vec1, vec2):
                """计算欧氏距离"""
                return np.linalg.norm(vec1 - vec2)
            
            vec_a = np.array([1, 2, 3])
            vec_b = np.array([4, 5, 6])
            
            distance = euclidean_distance(vec_a, vec_b)
            print(f"\n欧氏距离: {distance:.4f}")
            
            # 使用场景对比
            use_cases = {
                "余弦距离": ["文本嵌入", "语义搜索", "推荐系统", "归一化向量"],
                "欧氏距离": ["图像特征", "人脸识别", "异常检测", "未归一化向量"]
            }
            
            print("\n使用场景:")
            for metric, cases in use_cases.items():
                print(f"  {metric}:")
                for case in cases:
                    print(f"    - {case}")
            ---

02.向量类型
    a.密集向量
        a.功能说明
            密集向量是最常见的向量类型。每个维度都有值。适合大多数AI应用。支持各种距离度量。索引效率高。是Qdrant的默认向量类型。广泛应用于各种场景。
        b.代码示例
            ---
            import numpy as np
            from qdrant_client.models import PointStruct
            
            # 创建密集向量
            dense_vector = np.random.random(768).tolist()
            
            point = PointStruct(
                id=1,
                vector=dense_vector,
                payload={"type": "dense", "model": "text-embedding-ada-002"}
            )
            
            client.upsert(
                collection_name="cosine_collection",
                points=[point]
            )
            
            print("密集向量插入完成")
            
            # 密集向量特点
            dense_features = [
                "每个维度都有值",
                "适合深度学习模型输出",
                "支持HNSW等高效索引",
                "查询性能优异",
                "是最常用的向量类型"
            ]
            
            print("\n密集向量特点:")
            for i, feature in enumerate(dense_features, 1):
                print(f"  {i}. {feature}")
            
            # 常见密集向量来源
            vector_sources = {
                "文本": "BERT、GPT、Sentence Transformers",
                "图像": "ResNet、ViT、CLIP",
                "音频": "Wav2Vec、HuBERT",
                "多模态": "CLIP、ALIGN"
            }
            
            print("\n常见向量来源:")
            for data_type, models in vector_sources.items():
                print(f"  {data_type}: {models}")
            ---
    b.命名向量
        a.功能说明
            命名向量允许一个Point存储多个向量。每个向量有独立的名称。支持不同维度和距离度量。适合多模态数据。可以分别检索各向量。提供更灵活的数据组织方式。是多模态应用的关键特性。
        b.代码示例
            ---
            # 创建命名向量Collection
            client.create_collection(
                collection_name="named_vectors",
                vectors_config={
                    "text": VectorParams(size=768, distance=Distance.COSINE),
                    "image": VectorParams(size=512, distance=Distance.EUCLID),
                    "audio": VectorParams(size=256, distance=Distance.DOT)
                }
            )
            
            # 插入命名向量
            point = PointStruct(
                id=1,
                vector={
                    "text": np.random.random(768).tolist(),
                    "image": np.random.random(512).tolist(),
                    "audio": np.random.random(256).tolist()
                },
                payload={"title": "Multimodal Data", "type": "video"}
            )
            
            client.upsert(collection_name="named_vectors", points=[point])
            
            # 按文本向量搜索
            text_results = client.search(
                collection_name="named_vectors",
                query_vector=("text", np.random.random(768).tolist()),
                limit=10
            )
            
            # 按图像向量搜索
            image_results = client.search(
                collection_name="named_vectors",
                query_vector=("image", np.random.random(512).tolist()),
                limit=10
            )
            
            print(f"文本搜索找到 {len(text_results)} 个结果")
            print(f"图像搜索找到 {len(image_results)} 个结果")
            
            # 命名向量应用场景
            named_vector_use_cases = [
                "多模态搜索:文本+图像+音频",
                "跨模态检索:以图搜文、以文搜图",
                "混合推荐:结合多种特征",
                "多语言支持:不同语言的向量",
                "多粒度表示:词级+句级+文档级"
            ]
            
            print("\n命名向量应用场景:")
            for i, use_case in enumerate(named_vector_use_cases, 1):
                print(f"  {i}. {use_case}")
            
            # 命名向量优势
            advantages = {
                "灵活性": "一个Point存储多种向量",
                "独立性": "每个向量独立配置和检索",
                "高效性": "避免数据冗余和同步问题",
                "扩展性": "易于添加新的向量类型"
            }
            
            print("\n命名向量优势:")
            for advantage, desc in advantages.items():
                print(f"  {advantage}: {desc}")
            ---

3.3 Payload Schema

01.Payload结构
    a.数据类型
        a.功能说明
            Payload支持丰富的数据类型。包括字符串、数字、布尔、数组、对象。支持嵌套结构。可以存储复杂的元数据。类型灵活,易于扩展。是向量数据的重要补充。提供业务逻辑支持。
        b.代码示例
            ---
            # Payload示例
            payload = {
                "title": "Product Name",  # 字符串
                "price": 99.99,  # 浮点数
                "rating": 5,  # 整数
                "in_stock": True,  # 布尔值
                "tags": ["electronics", "smartphone"],  # 数组
                "metadata": {  # 嵌套对象
                    "brand": "Apple",
                    "model": "iPhone 15",
                    "specs": {
                        "storage": "256GB",
                        "color": "Black",
                        "ram": "8GB"
                    }
                },
                "created_at": "2024-01-01T00:00:00Z"  # 时间戳
            }
            
            point = PointStruct(
                id=1,
                vector=np.random.random(768).tolist(),
                payload=payload
            )
            
            client.upsert(collection_name="products", points=[point])
            
            print("Payload插入完成")
            
            # Payload数据类型说明
            data_types = {
                "string": "字符串类型,适合文本字段",
                "integer": "整数类型,适合计数、ID等",
                "float": "浮点数类型,适合价格、评分等",
                "bool": "布尔类型,适合状态标志",
                "array": "数组类型,适合标签、类别等",
                "object": "对象类型,适合嵌套结构",
                "null": "空值类型"
            }
            
            print("\nPayload数据类型:")
            for dtype, desc in data_types.items():
                print(f"  {dtype}: {desc}")
            
            # Payload最佳实践
            best_practices = [
                "合理组织Payload结构",
                "避免存储过大的数据",
                "使用有意义的字段名",
                "保持数据类型一致",
                "为常用字段创建索引",
                "定期清理无用字段"
            ]
            
            print("\nPayload最佳实践:")
            for i, practice in enumerate(best_practices, 1):
                print(f"  {i}. {practice}")
            ---
    b.索引配置
        a.功能说明
            Payload可以创建索引加速查询。支持多种字段类型的索引。索引提升过滤性能。可以为嵌套字段创建索引。索引占用额外存储空间。合理创建索引平衡性能和存储。是优化查询性能的关键。
        b.代码示例
            ---
            # 创建Payload索引
            client.create_payload_index(
                collection_name="products",
                field_name="price",
                field_schema="float"
            )
            
            client.create_payload_index(
                collection_name="products",
                field_name="tags",
                field_schema="keyword"
            )
            
            client.create_payload_index(
                collection_name="products",
                field_name="in_stock",
                field_schema="bool"
            )
            
            # 为嵌套字段创建索引
            client.create_payload_index(
                collection_name="products",
                field_name="metadata.brand",
                field_schema="keyword"
            )
            
            print("Payload索引创建完成")
            
            # 索引类型说明
            index_types = {
                "keyword": {
                    "描述": "关键词索引,适合字符串",
                    "适用": "品牌、类别、标签等",
                    "特点": "精确匹配,支持数组"
                },
                "integer": {
                    "描述": "整数索引",
                    "适用": "ID、数量、年份等",
                    "特点": "支持范围查询"
                },
                "float": {
                    "描述": "浮点数索引",
                    "适用": "价格、评分、距离等",
                    "特点": "支持范围查询"
                },
                "bool": {
                    "描述": "布尔值索引",
                    "适用": "状态标志",
                    "特点": "二值查询"
                },
                "geo": {
                    "描述": "地理位置索引",
                    "适用": "经纬度坐标",
                    "特点": "支持地理范围查询"
                }
            }
            
            print("\n索引类型:")
            for idx_type, info in index_types.items():
                print(f"\n{idx_type}:")
                for key, value in info.items():
                    print(f"  {key}: {value}")
            
            # 索引性能影响
            index_impact = {
                "查询性能": "提升10-100倍",
                "存储空间": "增加10-30%",
                "写入性能": "略微下降5-10%",
                "内存占用": "增加索引大小"
            }
            
            print("\n索引性能影响:")
            for aspect, impact in index_impact.items():
                print(f"  {aspect}: {impact}")
            
            # 索引创建建议
            index_recommendations = [
                "为常用过滤字段创建索引",
                "避免为所有字段创建索引",
                "大数据集优先创建索引",
                "定期评估索引使用情况",
                "删除未使用的索引",
                "索引创建后需要时间生效"
            ]
            
            print("\n索引创建建议:")
            for i, rec in enumerate(index_recommendations, 1):
                print(f"  {i}. {rec}")
            ---

02.Payload操作
    a.更新Payload
        a.功能说明
            Qdrant支持动态更新Payload。可以添加、修改、删除字段。支持批量更新。更新操作不影响向量。提供灵活的元数据管理。适合动态变化的业务场景。是数据维护的重要工具。
        b.代码示例
            ---
            # 1. 设置Payload(添加或更新字段)
            client.set_payload(
                collection_name="products",
                payload={"price": 89.99, "discount": True, "discount_rate": 0.1},
                points=[1, 2, 3]
            )
            
            print("Payload更新完成")
            
            # 2. 删除Payload字段
            client.delete_payload(
                collection_name="products",
                keys=["discount", "discount_rate"],
                points=[1, 2, 3]
            )
            
            print("Payload字段删除完成")
            
            # 3. 清空Payload(保留向量)
            client.clear_payload(
                collection_name="products",
                points=[1]
            )
            
            print("Payload已清空")
            
            # 4. 批量更新Payload
            for i in range(100):
                client.set_payload(
                    collection_name="products",
                    payload={"last_updated": "2024-01-01", "version": 2},
                    points=[i],
                    wait=False  # 异步更新
                )
            
            print("批量更新完成")
            
            # 5. 条件更新Payload
            from qdrant_client.models import Filter, FieldCondition, MatchValue
            
            # 先查询符合条件的Point
            results = client.scroll(
                collection_name="products",
                scroll_filter=Filter(
                    must=[
                        FieldCondition(key="in_stock", match=MatchValue(value=True))
                    ]
                ),
                limit=1000
            )
            
            points, _ = results
            point_ids = [p.id for p in points]
            
            # 批量更新
            client.set_payload(
                collection_name="products",
                payload={"available": True},
                points=point_ids
            )
            
            print(f"更新了 {len(point_ids)} 个Point的Payload")
            
            # Payload更新最佳实践
            update_best_practices = [
                "使用批量更新提升性能",
                "wait=False异步更新更快",
                "定期清理无用字段",
                "更新前备份重要数据",
                "监控更新操作的影响",
                "避免频繁更新大量数据"
            ]
            
            print("\nPayload更新最佳实践:")
            for i, practice in enumerate(update_best_practices, 1):
                print(f"  {i}. {practice}")
            ---
    b.Payload查询
        a.功能说明
            可以基于Payload进行查询和过滤。支持等值、范围、匹配等条件。可以组合多个过滤条件。Payload查询与向量搜索结合。提供强大的数据检索能力。是实现业务逻辑的关键。支持复杂的过滤表达式。
        b.代码示例
            ---
            from qdrant_client.models import Filter, FieldCondition, Range, MatchValue, MatchAny
            
            # 1. 等值查询
            results = client.scroll(
                collection_name="products",
                scroll_filter=Filter(
                    must=[
                        FieldCondition(key="brand", match=MatchValue(value="Apple"))
                    ]
                ),
                limit=100
            )
            
            points, offset = results
            print(f"等值查询找到 {len(points)} 个结果")
            
            # 2. 范围查询
            results = client.scroll(
                collection_name="products",
                scroll_filter=Filter(
                    must=[
                        FieldCondition(key="price", range=Range(gte=50, lte=200))
                    ]
                ),
                limit=100
            )
            
            points, offset = results
            print(f"范围查询找到 {len(points)} 个结果")
            
            # 3. 数组匹配查询
            results = client.scroll(
                collection_name="products",
                scroll_filter=Filter(
                    must=[
                        FieldCondition(key="tags", match=MatchAny(any=["electronics", "smartphone"]))
                    ]
                ),
                limit=100
            )
            
            points, offset = results
            print(f"数组匹配查询找到 {len(points)} 个结果")
            
            # 4. 组合查询(AND/OR/NOT)
            complex_filter = Filter(
                must=[  # AND条件
                    FieldCondition(key="in_stock", match=MatchValue(value=True)),
                    FieldCondition(key="price", range=Range(gte=50, lte=200))
                ],
                should=[  # OR条件
                    FieldCondition(key="brand", match=MatchValue(value="Apple")),
                    FieldCondition(key="brand", match=MatchValue(value="Samsung"))
                ],
                must_not=[  # NOT条件
                    FieldCondition(key="rating", range=Range(lt=3.0))
                ]
            )
            
            results = client.scroll(
                collection_name="products",
                scroll_filter=complex_filter,
                limit=100
            )
            
            points, offset = results
            print(f"组合查询找到 {len(points)} 个结果")
            
            # 5. 嵌套字段查询
            results = client.scroll(
                collection_name="products",
                scroll_filter=Filter(
                    must=[
                        FieldCondition(key="metadata.brand", match=MatchValue(value="Apple")),
                        FieldCondition(key="metadata.specs.storage", match=MatchValue(value="256GB"))
                    ]
                ),
                limit=100
            )
            
            points, offset = results
            print(f"嵌套字段查询找到 {len(points)} 个结果")
            
            # Payload查询操作符
            query_operators = {
                "match": "精确匹配",
                "match_any": "匹配任意一个",
                "range": "范围查询(gte/gt/lte/lt)",
                "geo_radius": "地理半径查询",
                "geo_bounding_box": "地理边界框查询",
                "values_count": "数组长度查询"
            }
            
            print("\nPayload查询操作符:")
            for operator, desc in query_operators.items():
                print(f"  {operator}: {desc}")
            
            # 查询性能优化建议
            query_optimization = [
                "为常用过滤字段创建索引",
                "避免过于复杂的过滤条件",
                "使用must而不是should提升性能",
                "范围查询尽量缩小范围",
                "避免查询嵌套层级过深的字段",
                "定期分析慢查询"
            ]
            
            print("\n查询性能优化:")
            for i, tip in enumerate(query_optimization, 1):
                print(f"  {i}. {tip}")
            ---

4 数据操作

4.1 点操作Points

01.插入数据
    a.单点插入
        a.功能说明
            Point是Qdrant中的基本数据单元。包含ID、向量和Payload三部分。ID可以是整数或UUID字符串。向量是核心数据。Payload存储元数据。插入操作是幂等的,相同ID会覆盖。支持同步和异步插入。
        b.代码示例
            ---
            from qdrant_client import QdrantClient
            from qdrant_client.models import PointStruct
            import numpy as np
            
            client = QdrantClient(host="localhost", port=6333)
            
            # 1. 创建单个Point
            point = PointStruct(
                id=1,  # 整数ID
                vector=np.random.random(768).tolist(),
                payload={
                    "title": "Sample Document",
                    "category": "tech",
                    "author": "John Doe",
                    "created_at": "2024-01-01"
                }
            )
            
            # 2. 插入Point
            client.upsert(
                collection_name="documents",
                points=[point],
                wait=True  # 等待插入完成
            )
            
            print("Point插入完成")
            
            # 3. 使用UUID作为ID
            import uuid
            
            uuid_point = PointStruct(
                id=str(uuid.uuid4()),  # UUID字符串
                vector=np.random.random(768).tolist(),
                payload={"title": "UUID Document"}
            )
            
            client.upsert(
                collection_name="documents",
                points=[uuid_point]
            )
            
            print("UUID Point插入完成")
            
            # Point结构说明
            point_structure = {
                "id": "唯一标识符(整数或UUID字符串)",
                "vector": "向量数据(列表或字典)",
                "payload": "元数据(JSON对象,可选)"
            }
            
            print("\nPoint结构:")
            for field, desc in point_structure.items():
                print(f"  {field}: {desc}")
            
            # 插入参数说明
            upsert_params = {
                "collection_name": "目标Collection名称",
                "points": "Point列表",
                "wait": "是否等待完成(True/False)",
                "ordering": "写入顺序保证(weak/medium/strong)"
            }
            
            print("\n插入参数:")
            for param, desc in upsert_params.items():
                print(f"  {param}: {desc}")
            ---
    b.批量插入
        a.功能说明
            批量插入可以显著提升性能。一次插入多个Point。减少网络往返次数。支持异步插入提升吞吐量。建议批量大小100-1000。过大的批量可能导致内存问题。合理的批量大小平衡性能和资源。
        b.代码示例
            ---
            # 1. 批量创建Points
            batch_size = 100
            points = [
                PointStruct(
                    id=i,
                    vector=np.random.random(768).tolist(),
                    payload={
                        "index": i,
                        "batch": i // batch_size,
                        "category": f"cat_{i % 5}"
                    }
                )
                for i in range(1000)
            ]
            
            # 2. 批量插入
            client.upsert(
                collection_name="documents",
                points=points,
                wait=False  # 异步插入,提升性能
            )
            
            print(f"批量插入 {len(points)} 个Points")
            
            # 3. 分批插入大量数据
            total_points = 100000
            batch_size = 100
            
            for i in range(0, total_points, batch_size):
                batch = [
                    PointStruct(
                        id=j,
                        vector=np.random.random(768).tolist(),
                        payload={"index": j}
                    )
                    for j in range(i, min(i + batch_size, total_points))
                ]
                
                client.upsert(
                    collection_name="documents",
                    points=batch,
                    wait=False
                )
                
                if (i + batch_size) % 10000 == 0:
                    print(f"已插入 {i + batch_size} 个Points")
            
            print(f"\n总共插入 {total_points} 个Points")
            
            # 批量插入性能对比
            performance_comparison = {
                "单点插入": {
                    "速度": "100-500 points/s",
                    "适用": "少量数据、实时插入"
                },
                "批量插入(100)": {
                    "速度": "5,000-10,000 points/s",
                    "适用": "中等规模数据"
                },
                "批量插入(1000)": {
                    "速度": "10,000-50,000 points/s",
                    "适用": "大规模数据导入"
                }
            }
            
            print("\n性能对比:")
            for method, metrics in performance_comparison.items():
                print(f"\n{method}:")
                for key, value in metrics.items():
                    print(f"  {key}: {value}")
            
            # 批量插入最佳实践
            best_practices = [
                "批量大小建议100-1000",
                "使用wait=False异步插入",
                "监控内存使用情况",
                "大数据集分批插入",
                "定期检查插入进度",
                "插入完成后等待索引构建"
            ]
            
            print("\n批量插入最佳实践:")
            for i, practice in enumerate(best_practices, 1):
                print(f"  {i}. {practice}")
            ---

02.查询数据
    a.按ID查询
        a.功能说明
            可以通过ID直接获取Point数据。支持单个或多个ID查询。返回完整的Point信息。可以选择是否返回向量和Payload。查询速度快,适合精确查找。是最基础的查询方式。
        b.代码示例
            ---
            # 1. 查询单个Point
            point = client.retrieve(
                collection_name="documents",
                ids=[1],
                with_payload=True,
                with_vector=True
            )
            
            print(f"查询到Point: ID={point[0].id}")
            print(f"Payload: {point[0].payload}")
            
            # 2. 查询多个Points
            points = client.retrieve(
                collection_name="documents",
                ids=[1, 2, 3, 4, 5],
                with_payload=True,
                with_vector=False  # 不返回向量,节省带宽
            )
            
            print(f"\n查询到 {len(points)} 个Points")
            for p in points:
                print(f"  ID: {p.id}, Payload: {p.payload}")
            
            # 3. 查询UUID Points
            uuid_ids = [str(uuid.uuid4()) for _ in range(5)]
            
            # 先插入
            uuid_points = [
                PointStruct(
                    id=uid,
                    vector=np.random.random(768).tolist(),
                    payload={"uuid": uid}
                )
                for uid in uuid_ids
            ]
            client.upsert(collection_name="documents", points=uuid_points)
            
            # 再查询
            retrieved = client.retrieve(
                collection_name="documents",
                ids=uuid_ids,
                with_payload=True
            )
            
            print(f"\n查询到 {len(retrieved)} 个UUID Points")
            
            # 查询参数说明
            retrieve_params = {
                "collection_name": "Collection名称",
                "ids": "ID列表(整数或字符串)",
                "with_payload": "是否返回Payload(默认True)",
                "with_vector": "是否返回向量(默认False)"
            }
            
            print("\n查询参数:")
            for param, desc in retrieve_params.items():
                print(f"  {param}: {desc}")
            
            # 查询性能特点
            query_features = [
                "O(1)时间复杂度,速度极快",
                "支持批量查询",
                "可选择返回内容,节省带宽",
                "适合精确查找",
                "不受Collection大小影响"
            ]
            
            print("\n查询性能特点:")
            for i, feature in enumerate(query_features, 1):
                print(f"  {i}. {feature}")
            ---
    b.滚动查询
        a.功能说明
            滚动查询用于遍历Collection中的所有数据。支持分页和过滤。可以指定每页大小。返回数据和下一页偏移量。适合数据导出和全量扫描。支持Payload过滤。是数据迁移的重要工具。
        b.代码示例
            ---
            from qdrant_client.models import Filter, FieldCondition, MatchValue
            
            # 1. 基础滚动查询
            offset = None
            all_points = []
            
            while True:
                result = client.scroll(
                    collection_name="documents",
                    limit=100,  # 每页100条
                    offset=offset,
                    with_payload=True,
                    with_vector=False
                )
                
                points, offset = result
                all_points.extend(points)
                
                print(f"已获取 {len(all_points)} 个Points")
                
                if offset is None:  # 没有更多数据
                    break
            
            print(f"\n总共获取 {len(all_points)} 个Points")
            
            # 2. 带过滤的滚动查询
            offset = None
            filtered_points = []
            
            while True:
                result = client.scroll(
                    collection_name="documents",
                    scroll_filter=Filter(
                        must=[
                            FieldCondition(
                                key="category",
                                match=MatchValue(value="tech")
                            )
                        ]
                    ),
                    limit=100,
                    offset=offset,
                    with_payload=True
                )
                
                points, offset = result
                filtered_points.extend(points)
                
                if offset is None:
                    break
            
            print(f"\n过滤后获取 {len(filtered_points)} 个Points")
            
            # 3. 数据导出示例
            import json
            
            offset = None
            export_data = []
            
            while True:
                result = client.scroll(
                    collection_name="documents",
                    limit=1000,
                    offset=offset,
                    with_payload=True,
                    with_vector=True
                )
                
                points, offset = result
                
                for p in points:
                    export_data.append({
                        "id": p.id,
                        "vector": p.vector,
                        "payload": p.payload
                    })
                
                if offset is None:
                    break
            
            # 保存到文件
            # with open("export.json", "w") as f:
            #     json.dump(export_data, f)
            
            print(f"\n导出 {len(export_data)} 个Points")
            
            # 滚动查询参数
            scroll_params = {
                "collection_name": "Collection名称",
                "limit": "每页数量(默认10)",
                "offset": "分页偏移量",
                "scroll_filter": "过滤条件",
                "with_payload": "是否返回Payload",
                "with_vector": "是否返回向量"
            }
            
            print("\n滚动查询参数:")
            for param, desc in scroll_params.items():
                print(f"  {param}: {desc}")
            
            # 滚动查询最佳实践
            scroll_best_practices = [
                "合理设置limit,建议100-1000",
                "大数据集分批处理",
                "使用过滤减少数据量",
                "不需要向量时设置with_vector=False",
                "定期保存进度,支持断点续传",
                "监控内存使用"
            ]
            
            print("\n滚动查询最佳实践:")
            for i, practice in enumerate(scroll_best_practices, 1):
                print(f"  {i}. {practice}")
            ---

4.2 批量上传

01.批量策略
    a.分批上传
        a.功能说明
            大规模数据需要分批上传。避免单次请求过大。减少内存压力。支持断点续传。可以并行上传提升速度。监控上传进度。合理的批量策略提升效率。
        b.代码示例
            ---
            import numpy as np
            from qdrant_client.models import PointStruct
            
            # 1. 分批上传配置
            total_count = 1000000  # 100万条数据
            batch_size = 1000  # 每批1000条
            
            # 2. 分批上传
            for i in range(0, total_count, batch_size):
                batch = [
                    PointStruct(
                        id=j,
                        vector=np.random.random(768).tolist(),
                        payload={"index": j, "batch": i // batch_size}
                    )
                    for j in range(i, min(i + batch_size, total_count))
                ]
                
                client.upsert(
                    collection_name="large_dataset",
                    points=batch,
                    wait=False  # 异步上传
                )
                
                # 显示进度
                if (i + batch_size) % 100000 == 0:
                    progress = (i + batch_size) / total_count * 100
                    print(f"上传进度: {progress:.1f}% ({i + batch_size}/{total_count})")
            
            print("\n批量上传完成")
            
            # 批量大小建议
            batch_size_guide = {
                "< 1万": "批量100-500",
                "1万-10万": "批量500-1000",
                "10万-100万": "批量1000-5000",
                "> 100万": "批量5000-10000"
            }
            
            print("\n批量大小建议:")
            for scale, recommendation in batch_size_guide.items():
                print(f"  {scale}: {recommendation}")
            ---
    b.并行上传
        a.功能说明
            并行上传可以显著提升速度。使用多线程或多进程。充分利用网络带宽。注意控制并发数。避免过多并发导致服务器压力。合理的并行策略平衡速度和稳定性。
        b.代码示例
            ---
            from concurrent.futures import ThreadPoolExecutor, as_completed
            import time
            
            def upload_batch(batch_id, batch_data):
                """上传单个批次"""
                try:
                    client.upsert(
                        collection_name="parallel_upload",
                        points=batch_data,
                        wait=False
                    )
                    return batch_id, True, len(batch_data)
                except Exception as e:
                    return batch_id, False, str(e)
            
            # 准备数据
            total_count = 100000
            batch_size = 1000
            batches = []
            
            for i in range(0, total_count, batch_size):
                batch = [
                    PointStruct(
                        id=j,
                        vector=np.random.random(768).tolist(),
                        payload={"index": j}
                    )
                    for j in range(i, min(i + batch_size, total_count))
                ]
                batches.append((i // batch_size, batch))
            
            # 并行上传
            max_workers = 4  # 4个并发线程
            start_time = time.time()
            
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = [
                    executor.submit(upload_batch, batch_id, batch_data)
                    for batch_id, batch_data in batches
                ]
                
                completed = 0
                for future in as_completed(futures):
                    batch_id, success, result = future.result()
                    completed += 1
                    
                    if success:
                        print(f"批次 {batch_id} 上传成功 ({completed}/{len(batches)})")
                    else:
                        print(f"批次 {batch_id} 上传失败: {result}")
            
            elapsed = time.time() - start_time
            print(f"\n并行上传完成,耗时: {elapsed:.2f}秒")
            print(f"平均速度: {total_count / elapsed:.0f} points/s")
            
            # 并行上传建议
            parallel_recommendations = [
                "并发数建议2-8",
                "根据网络带宽调整",
                "监控服务器负载",
                "实现重试机制",
                "记录失败批次",
                "上传完成后验证数据"
            ]
            
            print("\n并行上传建议:")
            for i, rec in enumerate(parallel_recommendations, 1):
                print(f"  {i}. {rec}")
            ---

02.数据导入
    a.从文件导入
        a.功能说明
            支持从各种文件格式导入数据。常见格式包括JSON、CSV、Parquet。需要解析文件并转换为Point格式。支持流式读取大文件。可以边读边上传。提供数据验证和清洗。是数据迁移的常用方式。
        b.代码示例
            ---
            import json
            
            # 1. 从JSON文件导入
            def import_from_json(file_path, collection_name, batch_size=1000):
                """从JSON文件导入数据"""
                with open(file_path, 'r') as f:
                    data = json.load(f)
                
                batch = []
                for item in data:
                    point = PointStruct(
                        id=item['id'],
                        vector=item['vector'],
                        payload=item.get('payload', {})
                    )
                    batch.append(point)
                    
                    if len(batch) >= batch_size:
                        client.upsert(
                            collection_name=collection_name,
                            points=batch,
                            wait=False
                        )
                        print(f"已导入 {len(batch)} 条数据")
                        batch = []
                
                # 上传剩余数据
                if batch:
                    client.upsert(
                        collection_name=collection_name,
                        points=batch
                    )
                    print(f"已导入 {len(batch)} 条数据")
            
            # 使用示例
            # import_from_json("data.json", "imported_collection")
            
            # 2. 从CSV文件导入
            import csv
            
            def import_from_csv(file_path, collection_name, vector_dim=768):
                """从CSV文件导入数据"""
                batch = []
                batch_size = 1000
                
                with open(file_path, 'r') as f:
                    reader = csv.DictReader(f)
                    
                    for idx, row in enumerate(reader):
                        # 假设CSV有id, vector, title等列
                        vector = [float(x) for x in row['vector'].split(',')]
                        
                        point = PointStruct(
                            id=int(row['id']),
                            vector=vector,
                            payload={
                                'title': row.get('title', ''),
                                'category': row.get('category', '')
                            }
                        )
                        batch.append(point)
                        
                        if len(batch) >= batch_size:
                            client.upsert(
                                collection_name=collection_name,
                                points=batch,
                                wait=False
                            )
                            batch = []
                
                if batch:
                    client.upsert(collection_name=collection_name, points=batch)
            
            print("文件导入功能已定义")
            
            # 支持的文件格式
            supported_formats = {
                "JSON": "适合小到中等规模数据",
                "JSONL": "适合大规模数据,流式读取",
                "CSV": "适合表格数据",
                "Parquet": "适合大规模数据,压缩高效",
                "HDF5": "适合科学计算数据"
            }
            
            print("\n支持的文件格式:")
            for fmt, desc in supported_formats.items():
                print(f"  {fmt}: {desc}")
            ---
    b.从数据库导入
        a.功能说明
            支持从关系型数据库导入数据。常见数据库包括MySQL、PostgreSQL、MongoDB。需要建立数据库连接。查询数据并转换为向量。支持增量导入。可以定期同步数据。是企业级数据集成的常用方式。
        b.代码示例
            ---
            # 从PostgreSQL导入示例
            # import psycopg2
            
            def import_from_postgres(connection_string, query, collection_name):
                """从PostgreSQL导入数据"""
                # conn = psycopg2.connect(connection_string)
                # cursor = conn.cursor()
                # cursor.execute(query)
                
                batch = []
                batch_size = 1000
                
                # 模拟数据库查询结果
                # for row in cursor:
                #     point = PointStruct(
                #         id=row[0],  # id列
                #         vector=row[1],  # vector列
                #         payload={
                #             'title': row[2],
                #             'content': row[3]
                #         }
                #     )
                #     batch.append(point)
                #     
                #     if len(batch) >= batch_size:
                #         client.upsert(
                #             collection_name=collection_name,
                #             points=batch,
                #             wait=False
                #         )
                #         batch = []
                
                # if batch:
                #     client.upsert(collection_name=collection_name, points=batch)
                
                # cursor.close()
                # conn.close()
                
                print("PostgreSQL导入功能已定义")
            
            # 数据库导入配置
            db_import_config = {
                "connection_pool": "使用连接池提升性能",
                "batch_query": "批量查询减少往返",
                "cursor_streaming": "流式游标处理大数据",
                "error_handling": "实现重试和错误处理",
                "progress_tracking": "记录导入进度",
                "incremental_sync": "支持增量同步"
            }
            
            print("\n数据库导入配置:")
            for config, desc in db_import_config.items():
                print(f"  {config}: {desc}")
            
            # 常见数据库连接示例
            db_connections = {
                "PostgreSQL": "psycopg2.connect(host='localhost', database='mydb')",
                "MySQL": "pymysql.connect(host='localhost', database='mydb')",
                "MongoDB": "pymongo.MongoClient('mongodb://localhost:27017')",
                "SQLite": "sqlite3.connect('database.db')"
            }
            
            print("\n数据库连接示例:")
            for db, conn_str in db_connections.items():
                print(f"  {db}: {conn_str}")
            ---

4.3 更新删除

01.更新操作
    a.更新向量
        a.功能说明
            可以更新Point的向量数据。使用upsert操作覆盖原有向量。保持ID不变。Payload可以同时更新。更新操作是原子的。支持批量更新。是数据维护的重要功能。
        b.代码示例
            ---
            # 1. 更新单个Point的向量
            updated_point = PointStruct(
                id=1,
                vector=np.random.random(768).tolist(),  # 新向量
                payload={"title": "Updated Document", "version": 2}
            )
            
            client.upsert(
                collection_name="documents",
                points=[updated_point]
            )
            
            print("向量更新完成")
            
            # 2. 批量更新向量
            updated_points = [
                PointStruct(
                    id=i,
                    vector=np.random.random(768).tolist(),
                    payload={"updated": True}
                )
                for i in range(1, 101)
            ]
            
            client.upsert(
                collection_name="documents",
                points=updated_points,
                wait=True
            )
            
            print(f"批量更新 {len(updated_points)} 个向量")
            
            # 更新注意事项
            update_notes = [
                "upsert操作会完全覆盖原Point",
                "ID必须与原Point相同",
                "向量维度必须匹配",
                "Payload会被完全替换",
                "更新后需要重建索引",
                "大量更新影响查询性能"
            ]
            
            print("\n更新注意事项:")
            for i, note in enumerate(update_notes, 1):
                print(f"  {i}. {note}")
            ---
    b.更新Payload
        a.功能说明
            可以单独更新Payload而不影响向量。支持部分字段更新。可以添加、修改、删除字段。支持批量更新。更新操作高效。不需要重建向量索引。是元数据管理的关键功能。
        b.代码示例
            ---
            from qdrant_client.models import Filter, FieldCondition, MatchValue
            
            # 1. 设置Payload(添加或更新字段)
            client.set_payload(
                collection_name="documents",
                payload={
                    "status": "published",
                    "updated_at": "2024-01-15",
                    "views": 1000
                },
                points=[1, 2, 3]
            )
            
            print("Payload更新完成")
            
            # 2. 删除Payload字段
            client.delete_payload(
                collection_name="documents",
                keys=["views", "status"],
                points=[1, 2, 3]
            )
            
            print("Payload字段删除完成")
            
            # 3. 清空Payload
            client.clear_payload(
                collection_name="documents",
                points=[1]
            )
            
            print("Payload已清空")
            
            # 4. 条件更新Payload
            # 先查询符合条件的Points
            results = client.scroll(
                collection_name="documents",
                scroll_filter=Filter(
                    must=[
                        FieldCondition(key="category", match=MatchValue(value="tech"))
                    ]
                ),
                limit=1000
            )
            
            points, _ = results
            point_ids = [p.id for p in points]
            
            # 批量更新
            client.set_payload(
                collection_name="documents",
                payload={"featured": True},
                points=point_ids
            )
            
            print(f"条件更新了 {len(point_ids)} 个Points")
            
            # Payload更新操作
            payload_operations = {
                "set_payload": "设置或更新字段",
                "delete_payload": "删除指定字段",
                "clear_payload": "清空所有Payload",
                "overwrite_payload": "完全覆盖Payload"
            }
            
            print("\nPayload更新操作:")
            for op, desc in payload_operations.items():
                print(f"  {op}: {desc}")
            ---

02.删除操作
    a.按ID删除
        a.功能说明
            可以通过ID删除Point。支持单个或批量删除。删除操作是永久的。删除后ID可以重用。支持异步删除。删除操作影响索引。需要定期优化清理删除的数据。
        b.代码示例
            ---
            # 1. 删除单个Point
            client.delete(
                collection_name="documents",
                points_selector=[1]
            )
            
            print("Point删除完成")
            
            # 2. 批量删除Points
            client.delete(
                collection_name="documents",
                points_selector=[2, 3, 4, 5]
            )
            
            print("批量删除完成")
            
            # 3. 删除大量Points
            ids_to_delete = list(range(1000, 2000))
            
            client.delete(
                collection_name="documents",
                points_selector=ids_to_delete,
                wait=True
            )
            
            print(f"删除了 {len(ids_to_delete)} 个Points")
            
            # 删除后优化
            # Qdrant会自动优化,也可以手动触发
            # client.update_collection(
            #     collection_name="documents",
            #     optimizer_config=OptimizersConfigDiff(
            #         deleted_threshold=0.2
            #     )
            # )
            
            print("\n删除操作完成")
            ---
    b.条件删除
        a.功能说明
            可以根据Payload条件删除Points。支持复杂的过滤条件。批量删除符合条件的数据。删除操作是永久的。需要谨慎使用。适合数据清理和维护。是数据管理的重要工具。
        b.代码示例
            ---
            from qdrant_client.models import FilterSelector
            
            # 1. 条件删除
            client.delete(
                collection_name="documents",
                points_selector=FilterSelector(
                    filter=Filter(
                        must=[
                            FieldCondition(
                                key="status",
                                match=MatchValue(value="draft")
                            )
                        ]
                    )
                )
            )
            
            print("条件删除完成")
            
            # 2. 删除过期数据
            client.delete(
                collection_name="documents",
                points_selector=FilterSelector(
                    filter=Filter(
                        must=[
                            FieldCondition(
                                key="created_at",
                                range=Range(lt="2023-01-01")
                            )
                        ]
                    )
                )
            )
            
            print("过期数据删除完成")
            
            # 3. 删除前统计数量
            count_result = client.count(
                collection_name="documents",
                count_filter=Filter(
                    must=[
                        FieldCondition(key="status", match=MatchValue(value="draft"))
                    ]
                ),
                exact=True
            )
            
            print(f"\n将删除 {count_result.count} 个Points")
            
            # 删除最佳实践
            delete_best_practices = [
                "删除前先统计数量",
                "重要数据删除前备份",
                "使用条件删除需谨慎",
                "大量删除后优化Collection",
                "定期清理无用数据",
                "监控删除操作的影响"
            ]
            
            print("\n删除最佳实践:")
            for i, practice in enumerate(delete_best_practices, 1):
                print(f"  {i}. {practice}")
            
            # 删除操作对比
            delete_comparison = {
                "按ID删除": {
                    "速度": "快",
                    "适用": "精确删除",
                    "风险": "低"
                },
                "条件删除": {
                    "速度": "较慢",
                    "适用": "批量清理",
                    "风险": "高,需谨慎"
                }
            }
            
            print("\n删除操作对比:")
            for method, metrics in delete_comparison.items():
                print(f"\n{method}:")
                for key, value in metrics.items():
                    print(f"  {key}: {value}")
            ---

5 搜索查询

5.1 向量搜索

01.基础搜索
    a.相似度搜索
        a.功能说明
            向量搜索是Qdrant的核心功能。通过查询向量找到最相似的Points。支持多种距离度量。返回Top-K最相似结果。可以指定返回数量。支持Payload过滤。是语义搜索的基础。
        b.代码示例
            ---
            import numpy as np
            
            query_vector = np.random.random(768).tolist()
            
            results = client.search(
                collection_name="documents",
                query_vector=query_vector,
                limit=10
            )
            
            print(f"找到 {len(results)} 个相似结果")
            for i, result in enumerate(results, 1):
                print(f"{i}. ID: {result.id}, Score: {result.score:.4f}")
            ---
    b.命名向量搜索
        a.功能说明
            对于多向量Collection,可以指定搜索哪个向量。每个向量独立检索。支持跨向量组合搜索。适合多模态应用。提供更灵活的检索方式。
        b.代码示例
            ---
            text_query = np.random.random(768).tolist()
            
            text_results = client.search(
                collection_name="multimodal",
                query_vector=("text", text_query),
                limit=10
            )
            
            print(f"文本搜索找到 {len(text_results)} 个结果")
            ---

02.高级搜索
    a.带过滤的搜索
        a.功能说明
            可以在向量搜索时添加Payload过滤。先过滤再搜索提升精度。支持复杂的过滤条件。过滤和向量搜索结合。是实现业务逻辑的关键。
        b.代码示例
            ---
            from qdrant_client.models import Filter, FieldCondition, MatchValue
            
            results = client.search(
                collection_name="documents",
                query_vector=query_vector,
                query_filter=Filter(
                    must=[
                        FieldCondition(key="category", match=MatchValue(value="tech"))
                    ]
                ),
                limit=10
            )
            
            print(f"过滤后找到 {len(results)} 个结果")
            ---
    b.相似度阈值
        a.功能说明
            可以设置最小相似度阈值。只返回超过阈值的结果。过滤低质量匹配。提升结果相关性。适合高精度要求场景。
        b.代码示例
            ---
            results = client.search(
                collection_name="documents",
                query_vector=query_vector,
                limit=100,
                score_threshold=0.8
            )
            
            print(f"阈值过滤后找到 {len(results)} 个结果")
            ---

5.2 过滤器Filter

01.过滤条件
    a.基础条件
        a.功能说明
            过滤器支持多种基础条件。包括等值匹配、范围查询、数组匹配。可以组合多个条件。支持嵌套字段过滤。是实现业务逻辑的基础。
        b.代码示例
            ---
            from qdrant_client.models import Filter, FieldCondition, Range, MatchValue
            
            filter_exact = Filter(
                must=[
                    FieldCondition(key="category", match=MatchValue(value="tech"))
                ]
            )
            
            filter_range = Filter(
                must=[
                    FieldCondition(key="price", range=Range(gte=100, lte=500))
                ]
            )
            
            print("过滤条件已定义")
            ---
    b.组合条件
        a.功能说明
            支持AND、OR、NOT逻辑组合。可以构建复杂的过滤表达式。must表示AND,should表示OR,must_not表示NOT。提供强大的查询能力。
        b.代码示例
            ---
            complex_filter = Filter(
                must=[
                    FieldCondition(key="in_stock", match=MatchValue(value=True))
                ],
                should=[
                    FieldCondition(key="category", match=MatchValue(value="electronics"))
                ],
                must_not=[
                    FieldCondition(key="discontinued", match=MatchValue(value=True))
                ]
            )
            
            print("组合过滤条件已定义")
            ---

02.高级过滤
    a.地理位置过滤
        a.功能说明
            支持地理位置过滤。可以按半径或边界框查询。适合LBS应用。需要Payload包含地理坐标。提供位置相关的检索能力。
        b.代码示例
            ---
            from qdrant_client.models import GeoRadius, GeoPoint
            
            geo_filter = Filter(
                must=[
                    FieldCondition(
                        key="location",
                        geo_radius=GeoRadius(
                            center=GeoPoint(lon=116.4074, lat=39.9042),
                            radius=5000.0
                        )
                    )
                ]
            )
            
            print("地理过滤条件已定义")
            ---
    b.数组长度过滤
        a.功能说明
            可以根据数组字段的长度过滤。适合标签数量等场景。支持范围查询。提供更灵活的过滤方式。
        b.代码示例
            ---
            from qdrant_client.models import ValuesCount
            
            array_filter = Filter(
                must=[
                    FieldCondition(
                        key="tags",
                        values_count=ValuesCount(gte=3, lte=10)
                    )
                ]
            )
            
            print("数组长度过滤已定义")
            ---

5.3 Payload查询

01.查询方式
    a.滚动查询
        a.功能说明
            滚动查询用于遍历符合条件的所有数据。支持分页和过滤。可以指定每页大小。适合数据导出和全量扫描。是数据分析的重要工具。
        b.代码示例
            ---
            offset = None
            all_results = []
            
            while True:
                result = client.scroll(
                    collection_name="documents",
                    scroll_filter=Filter(
                        must=[FieldCondition(key="category", match=MatchValue(value="tech"))]
                    ),
                    limit=100,
                    offset=offset
                )
                
                points, offset = result
                all_results.extend(points)
                
                if offset is None:
                    break
            
            print(f"获取 {len(all_results)} 个结果")
            ---
    b.计数查询
        a.功能说明
            可以统计符合条件的Point数量。支持精确和近似计数。精确计数速度较慢。近似计数速度快。适合数据统计分析。
        b.代码示例
            ---
            count_result = client.count(
                collection_name="documents",
                count_filter=Filter(
                    must=[FieldCondition(key="category", match=MatchValue(value="tech"))]
                ),
                exact=True
            )
            
            print(f"精确计数: {count_result.count}")
            ---

02.聚合查询
    a.分组统计
        a.功能说明
            虽然Qdrant不直接支持SQL式聚合,但可以通过滚动查询和客户端处理实现。适合简单的统计需求。
        b.代码示例
            ---
            from collections import Counter
            
            offset = None
            category_counts = Counter()
            
            while True:
                result = client.scroll(
                    collection_name="documents",
                    limit=1000,
                    offset=offset,
                    with_payload=True
                )
                
                points, offset = result
                
                for point in points:
                    category = point.payload.get('category', 'unknown')
                    category_counts[category] += 1
                
                if offset is None:
                    break
            
            print("类别统计:")
            for category, count in category_counts.most_common():
                print(f"  {category}: {count}")
            ---
    b.范围分析
        a.功能说明
            可以分析数值字段的分布。统计最大值、最小值、平均值等。通过滚动查询获取数据。适合数据探索和分析。
        b.代码示例
            ---
            import statistics
            
            offset = None
            prices = []
            
            while True:
                result = client.scroll(
                    collection_name="products",
                    limit=1000,
                    offset=offset,
                    with_payload=True
                )
                
                points, offset = result
                
                for point in points:
                    price = point.payload.get('price')
                    if price is not None:
                        prices.append(price)
                
                if offset is None:
                    break
            
            if prices:
                print(f"价格统计: 最小{min(prices):.2f}, 最大{max(prices):.2f}, 平均{statistics.mean(prices):.2f}")
            ---

5.4 推荐系统

01.推荐策略
    a.基于向量推荐
        a.功能说明
            Qdrant提供recommend API实现推荐功能。基于正负样本计算推荐向量。支持多个正样本和负样本。适合个性化推荐场景。
        b.代码示例
            ---
            recommendations = client.recommend(
                collection_name="products",
                positive=[1, 5, 10],
                negative=[3, 7],
                limit=10
            )
            
            print(f"推荐 {len(recommendations)} 个商品")
            for rec in recommendations:
                print(f"ID: {rec.id}, Score: {rec.score:.4f}")
            ---
    b.批量推荐
        a.功能说明
            支持批量推荐请求。一次为多个用户生成推荐。提升推荐效率。适合离线推荐场景。
        b.代码示例
            ---
            from qdrant_client.models import RecommendRequest
            
            batch_requests = [
                RecommendRequest(positive=[1, 2, 3], negative=[], limit=5),
                RecommendRequest(positive=[4, 5, 6], negative=[1], limit=5)
            ]
            
            print(f"批量推荐请求已准备,共 {len(batch_requests)} 个")
            ---

02.推荐优化
    a.推荐策略
        a.功能说明
            可以调整推荐策略参数。控制正负样本的权重。设置推荐多样性。优化推荐质量。提升用户体验。
        b.代码示例
            ---
            from qdrant_client.models import RecommendStrategy
            
            recommendations = client.recommend(
                collection_name="products",
                positive=[1, 5, 10],
                strategy=RecommendStrategy.AVERAGE_VECTOR,
                limit=10
            )
            
            print(f"策略推荐 {len(recommendations)} 个结果")
            ---
    b.推荐评估
        a.功能说明
            需要评估推荐效果。统计推荐准确率和多样性。收集用户反馈。持续优化推荐模型。提升推荐质量。
        b.代码示例
            ---
            evaluation_metrics = {
                "准确率": "推荐商品被点击的比例",
                "召回率": "用户感兴趣商品被推荐的比例",
                "多样性": "推荐结果的类别分布"
            }
            
            print("推荐评估指标:")
            for metric, desc in evaluation_metrics.items():
                print(f"  {metric}: {desc}")
            ---

5.5 批量搜索

01.批量查询
    a.批量向量搜索
        a.功能说明
            支持一次提交多个搜索请求。减少网络往返次数。提升查询效率。适合批量推理场景。是性能优化的重要手段。
        b.代码示例
            ---
            from qdrant_client.models import SearchRequest
            
            query_vectors = [np.random.random(768).tolist() for _ in range(10)]
            
            search_requests = [
                SearchRequest(vector=qv, limit=5, with_payload=True)
                for qv in query_vectors
            ]
            
            batch_results = client.search_batch(
                collection_name="documents",
                requests=search_requests
            )
            
            print(f"批量搜索完成,共 {len(batch_results)} 个查询")
            ---
    b.批量推荐
        a.功能说明
            支持批量推荐请求。一次为多个用户生成推荐。提升推荐效率。优化系统吞吐量。
        b.代码示例
            ---
            recommend_requests = [
                RecommendRequest(positive=[i, i+1, i+2], negative=[], limit=5)
                for i in range(1, 100, 10)
            ]
            
            print(f"批量推荐请求已准备,共 {len(recommend_requests)} 个")
            ---

02.性能优化
    a.查询优化
        a.功能说明
            优化查询性能的关键在于合理配置。选择合适的索引参数。控制返回结果数量。使用Payload过滤减少计算。监控查询性能。
        b.代码示例
            ---
            optimization_config = {
                "索引参数": "ef=128-256平衡性能和准确度",
                "返回数量": "limit=10-50,避免过大",
                "Payload": "按需返回,不需要时设置False"
            }
            
            print("查询性能优化:")
            for aspect, config in optimization_config.items():
                print(f"  {aspect}: {config}")
            ---
    b.缓存策略
        a.功能说明
            实现查询结果缓存提升性能。缓存热门查询结果。设置合理的过期时间。减少重复计算。提升用户体验。
        b.代码示例
            ---
            cache_strategies = {
                "热门查询": "缓存时间1小时",
                "个性化查询": "缓存时间5-10分钟",
                "实时数据": "不缓存或缓存1分钟"
            }
            
            print("缓存策略建议:")
            for query_type, strategy in cache_strategies.items():
                print(f"  {query_type}: {strategy}")
            ---

6 高级特性

6.1 量化Quantization

01.量化类型
    a.标量量化
        a.功能说明
            标量量化将浮点向量压缩为整数。减少内存占用和存储空间。提升查询速度。略微降低精度。适合大规模数据集。是性能优化的重要手段。
        b.代码示例
            ---
            from qdrant_client.models import ScalarQuantization, ScalarQuantizationConfig, ScalarType
            
            client.update_collection(
                collection_name="documents",
                quantization_config=ScalarQuantization(
                    scalar=ScalarQuantizationConfig(
                        type=ScalarType.INT8,
                        quantile=0.99,
                        always_ram=True
                    )
                )
            )
            
            print("标量量化配置完成")
            ---
    b.产品量化
        a.功能说明
            产品量化是更高级的压缩方法。将向量分段量化。压缩比更高。适合超大规模数据。需要更多计算资源。平衡存储和性能。
        b.代码示例
            ---
            from qdrant_client.models import ProductQuantization, ProductQuantizationConfig
            
            client.update_collection(
                collection_name="documents",
                quantization_config=ProductQuantization(
                    product=ProductQuantizationConfig(
                        compression=CompressionRatio.X16,
                        always_ram=True
                    )
                )
            )
            
            print("产品量化配置完成")
            ---

02.量化优化
    a.性能影响
        a.功能说明
            量化显著减少内存占用。提升查询速度。略微降低召回率。需要权衡性能和精度。适合资源受限场景。
        b.代码示例
            ---
            quantization_impact = {
                "内存占用": "减少75-90%",
                "查询速度": "提升2-4倍",
                "召回率": "降低1-3%",
                "存储空间": "减少75-90%"
            }
            
            print("量化性能影响:")
            for aspect, impact in quantization_impact.items():
                print(f"  {aspect}: {impact}")
            ---
    b.量化策略
        a.功能说明
            选择合适的量化类型。根据数据规模和精度要求。配置量化参数。监控量化效果。定期评估和调整。
        b.代码示例
            ---
            quantization_strategies = {
                "小数据集(<100万)": "不使用量化",
                "中等数据集(100万-1000万)": "标量量化INT8",
                "大数据集(>1000万)": "产品量化X16"
            }
            
            print("量化策略建议:")
            for scale, strategy in quantization_strategies.items():
                print(f"  {scale}: {strategy}")
            ---

6.2 多向量Multiple Vectors

01.多向量配置
    a.向量定义
        a.功能说明
            一个Point可以包含多个向量。每个向量有独立的名称和配置。支持不同维度和距离度量。适合多模态数据。提供灵活的数据组织。
        b.代码示例
            ---
            from qdrant_client.models import VectorParams, Distance
            
            client.create_collection(
                collection_name="multimodal",
                vectors_config={
                    "text": VectorParams(size=768, distance=Distance.COSINE),
                    "image": VectorParams(size=512, distance=Distance.EUCLID),
                    "audio": VectorParams(size=256, distance=Distance.DOT)
                }
            )
            
            print("多向量Collection创建完成")
            ---
    b.向量插入
        a.功能说明
            插入多向量Point时需要提供所有向量。每个向量独立存储。可以分别检索。支持部分向量更新。提供灵活的数据管理。
        b.代码示例
            ---
            import numpy as np
            from qdrant_client.models import PointStruct
            
            point = PointStruct(
                id=1,
                vector={
                    "text": np.random.random(768).tolist(),
                    "image": np.random.random(512).tolist(),
                    "audio": np.random.random(256).tolist()
                },
                payload={"title": "Multimodal Content"}
            )
            
            client.upsert(collection_name="multimodal", points=[point])
            
            print("多向量Point插入完成")
            ---

02.多向量检索
    a.单向量检索
        a.功能说明
            可以指定检索某个向量。其他向量不参与计算。适合单模态查询。提供精准的检索能力。
        b.代码示例
            ---
            text_query = np.random.random(768).tolist()
            
            results = client.search(
                collection_name="multimodal",
                query_vector=("text", text_query),
                limit=10
            )
            
            print(f"文本向量检索找到 {len(results)} 个结果")
            ---
    b.多向量组合
        a.功能说明
            可以组合多个向量进行检索。设置不同的权重。实现跨模态搜索。提供更丰富的检索方式。
        b.代码示例
            ---
            # 多向量组合检索(如果API支持)
            # results = client.search(
            #     collection_name="multimodal",
            #     query_vectors={
            #         "text": (text_query, 0.7),
            #         "image": (image_query, 0.3)
            #     },
            #     limit=10
            # )
            
            print("多向量组合检索功能")
            ---

6.3 命名向量Named Vectors

01.命名向量管理
    a.向量命名
        a.功能说明
            命名向量提供更清晰的语义。便于理解和维护。支持动态添加新向量。提供灵活的扩展能力。是多模态应用的基础。
        b.代码示例
            ---
            vector_names = {
                "text_en": "英文文本向量",
                "text_zh": "中文文本向量",
                "image_clip": "CLIP图像向量",
                "audio_wav2vec": "Wav2Vec音频向量"
            }
            
            print("命名向量说明:")
            for name, desc in vector_names.items():
                print(f"  {name}: {desc}")
            ---
    b.向量更新
        a.功能说明
            可以单独更新某个命名向量。不影响其他向量。支持部分更新。提供灵活的数据维护能力。
        b.代码示例
            ---
            # 更新单个命名向量
            updated_point = PointStruct(
                id=1,
                vector={
                    "text": np.random.random(768).tolist()
                    # 只更新text向量,其他向量保持不变
                }
            )
            
            # client.upsert(collection_name="multimodal", points=[updated_point])
            
            print("命名向量更新功能")
            ---

02.命名向量应用
    a.多语言支持
        a.功能说明
            使用命名向量支持多语言。每种语言独立的向量。可以分别检索或组合检索。适合国际化应用。
        b.代码示例
            ---
            multilingual_config = {
                "en": VectorParams(size=768, distance=Distance.COSINE),
                "zh": VectorParams(size=768, distance=Distance.COSINE),
                "ja": VectorParams(size=768, distance=Distance.COSINE)
            }
            
            print("多语言向量配置:")
            for lang, config in multilingual_config.items():
                print(f"  {lang}: {config}")
            ---
    b.多模态融合
        a.功能说明
            命名向量实现多模态数据融合。文本、图像、音频统一管理。支持跨模态检索。提供丰富的应用场景。
        b.代码示例
            ---
            multimodal_use_cases = [
                "以图搜文:用图像向量检索相关文本",
                "以文搜图:用文本向量检索相关图像",
                "视频检索:结合图像、音频、字幕向量",
                "商品搜索:结合图片、描述、属性向量"
            ]
            
            print("多模态应用场景:")
            for i, use_case in enumerate(multimodal_use_cases, 1):
                print(f"  {i}. {use_case}")
            ---

6.4 稀疏向量Sparse Vectors

01.稀疏向量特性
    a.稀疏表示
        a.功能说明
            稀疏向量大部分元素为零。只存储非零元素。节省存储空间。适合高维稀疏数据。如TF-IDF、BM25向量。
        b.代码示例
            ---
            from qdrant_client.models import SparseVector
            
            # 稀疏向量表示
            sparse_vec = SparseVector(
                indices=[10, 25, 100, 500],
                values=[0.5, 0.8, 0.3, 0.9]
            )
            
            print(f"稀疏向量: {len(sparse_vec.indices)} 个非零元素")
            ---
    b.稀疏向量配置
        a.功能说明
            Qdrant支持稀疏向量存储和检索。可以与密集向量结合使用。实现混合检索。提供更好的检索效果。
        b.代码示例
            ---
            # 创建支持稀疏向量的Collection
            from qdrant_client.models import SparseVectorParams
            
            client.create_collection(
                collection_name="hybrid_search",
                vectors_config={
                    "dense": VectorParams(size=768, distance=Distance.COSINE)
                },
                sparse_vectors_config={
                    "sparse": SparseVectorParams()
                }
            )
            
            print("混合向量Collection创建完成")
            ---

02.混合检索
    a.密集+稀疏
        a.功能说明
            结合密集向量和稀疏向量检索。密集向量捕捉语义。稀疏向量捕捉关键词。混合检索提升准确率。是现代检索系统的趋势。
        b.代码示例
            ---
            # 插入混合向量
            hybrid_point = PointStruct(
                id=1,
                vector={
                    "dense": np.random.random(768).tolist(),
                    "sparse": SparseVector(
                        indices=[10, 25, 100],
                        values=[0.5, 0.8, 0.3]
                    )
                },
                payload={"title": "Hybrid Document"}
            )
            
            # client.upsert(collection_name="hybrid_search", points=[hybrid_point])
            
            print("混合向量插入功能")
            ---
    b.检索策略
        a.功能说明
            可以调整密集和稀疏向量的权重。优化检索效果。适应不同的应用场景。提供灵活的检索策略。
        b.代码示例
            ---
            hybrid_strategies = {
                "语义为主": "密集向量权重0.8,稀疏向量权重0.2",
                "关键词为主": "密集向量权重0.3,稀疏向量权重0.7",
                "平衡模式": "密集向量权重0.5,稀疏向量权重0.5"
            }
            
            print("混合检索策略:")
            for strategy, weights in hybrid_strategies.items():
                print(f"  {strategy}: {weights}")
            ---

7 性能优化

7.1 索引优化

01.HNSW参数
    a.构建参数
        a.功能说明
            HNSW索引的构建参数影响索引质量。m参数控制连接数。ef_construct控制构建时的搜索范围。参数越大索引越好但构建越慢。需要权衡构建时间和查询性能。
        b.代码示例
            ---
            from qdrant_client.models import HnswConfigDiff
            
            client.update_collection(
                collection_name="documents",
                hnsw_config=HnswConfigDiff(
                    m=32,
                    ef_construct=200,
                    full_scan_threshold=10000
                )
            )
            
            print("HNSW参数优化完成")
            ---
    b.查询参数
        a.功能说明
            查询时的ef参数控制搜索范围。ef越大结果越准确但速度越慢。需要根据应用场景调整。平衡准确率和延迟。
        b.代码示例
            ---
            from qdrant_client.models import SearchParams
            
            results = client.search(
                collection_name="documents",
                query_vector=query_vector,
                limit=10,
                search_params=SearchParams(
                    hnsw_ef=128,
                    exact=False
                )
            )
            
            print(f"优化搜索找到 {len(results)} 个结果")
            ---

02.索引管理
    a.索引重建
        a.功能说明
            数据更新后可能需要重建索引。优化索引结构。提升查询性能。重建过程需要时间。建议在低峰期进行。
        b.代码示例
            ---
            # 触发索引优化
            client.update_collection(
                collection_name="documents",
                optimizer_config=OptimizersConfigDiff(
                    indexing_threshold=20000
                )
            )
            
            print("索引优化已触发")
            ---
    b.索引监控
        a.功能说明
            监控索引状态和性能。统计索引大小和构建时间。分析查询性能指标。及时发现和解决问题。
        b.代码示例
            ---
            collection_info = client.get_collection(collection_name="documents")
            
            print(f"Collection信息:")
            print(f"  Points数量: {collection_info.points_count}")
            print(f"  索引状态: {collection_info.status}")
            print(f"  优化器状态: {collection_info.optimizer_status}")
            ---

7.2 查询优化

01.查询策略
    a.过滤优化
        a.功能说明
            合理使用Payload过滤减少计算量。为常用过滤字段创建索引。避免过于复杂的过滤条件。优化过滤逻辑。提升查询性能。
        b.代码示例
            ---
            # 创建Payload索引
            client.create_payload_index(
                collection_name="documents",
                field_name="category",
                field_schema="keyword"
            )
            
            client.create_payload_index(
                collection_name="documents",
                field_name="rating",
                field_schema="float"
            )
            
            print("Payload索引创建完成")
            ---
    b.批量查询
        a.功能说明
            使用批量查询减少网络往返。一次提交多个查询请求。提升整体吞吐量。适合批量推理场景。
        b.代码示例
            ---
            from qdrant_client.models import SearchRequest
            
            queries = [np.random.random(768).tolist() for _ in range(10)]
            
            requests = [
                SearchRequest(vector=q, limit=5)
                for q in queries
            ]
            
            batch_results = client.search_batch(
                collection_name="documents",
                requests=requests
            )
            
            print(f"批量查询完成,共 {len(batch_results)} 个结果")
            ---

02.性能调优
    a.参数调整
        a.功能说明
            根据实际场景调整查询参数。平衡准确率和延迟。监控查询性能指标。持续优化参数配置。
        b.代码示例
            ---
            tuning_params = {
                "低延迟场景": "ef=64, limit=10",
                "高准确率场景": "ef=256, limit=50",
                "平衡场景": "ef=128, limit=20"
            }
            
            print("参数调优建议:")
            for scenario, params in tuning_params.items():
                print(f"  {scenario}: {params}")
            ---
    b.性能监控
        a.功能说明
            监控查询延迟和吞吐量。分析慢查询。识别性能瓶颈。及时优化和调整。
        b.代码示例
            ---
            import time
            
            start = time.time()
            
            results = client.search(
                collection_name="documents",
                query_vector=query_vector,
                limit=10
            )
            
            latency = (time.time() - start) * 1000
            
            print(f"查询延迟: {latency:.2f}ms")
            ---

7.3 内存管理

01.内存优化
    a.量化配置
        a.功能说明
            使用量化减少内存占用。标量量化可减少75%内存。产品量化可减少90%内存。需要权衡内存和精度。
        b.代码示例
            ---
            from qdrant_client.models import ScalarQuantization, ScalarQuantizationConfig, ScalarType
            
            client.update_collection(
                collection_name="documents",
                quantization_config=ScalarQuantization(
                    scalar=ScalarQuantizationConfig(
                        type=ScalarType.INT8,
                        always_ram=True
                    )
                )
            )
            
            print("量化配置完成,内存占用减少约75%")
            ---
    b.磁盘存储
        a.功能说明
            将部分数据存储在磁盘。减少内存占用。适合大规模数据集。略微降低查询性能。平衡成本和性能。
        b.代码示例
            ---
            client.create_collection(
                collection_name="large_dataset",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                on_disk_payload=True
            )
            
            print("Payload存储在磁盘,节省内存")
            ---

02.资源监控
    a.内存使用
        a.功能说明
            监控内存使用情况。及时发现内存泄漏。优化内存配置。避免OOM错误。
        b.代码示例
            ---
            import psutil
            
            memory = psutil.virtual_memory()
            
            print(f"内存使用情况:")
            print(f"  总内存: {memory.total / 1024**3:.2f} GB")
            print(f"  已使用: {memory.used / 1024**3:.2f} GB")
            print(f"  使用率: {memory.percent}%")
            ---
    b.资源限制
        a.功能说明
            设置合理的资源限制。避免单个查询占用过多资源。保护系统稳定性。提供服务质量保证。
        b.代码示例
            ---
            resource_limits = {
                "最大查询数量": "limit <= 1000",
                "最大批量大小": "batch_size <= 100",
                "查询超时": "timeout = 30s"
            }
            
            print("资源限制建议:")
            for limit, value in resource_limits.items():
                print(f"  {limit}: {value}")
            ---

8 集群部署

8.1 分布式模式

01.集群架构
    a.节点角色
        a.功能说明
            Qdrant集群包含多个节点。每个节点可以处理请求。数据分片存储在不同节点。提供高可用性和扩展性。
        b.代码示例
            ---
            cluster_config = {
                "节点数量": "建议3-5个节点",
                "分片数量": "根据数据量确定",
                "副本因子": "建议2-3个副本",
                "一致性级别": "根据业务需求选择"
            }
            
            print("集群配置建议:")
            for key, value in cluster_config.items():
                print(f"  {key}: {value}")
            ---
    b.集群通信
        a.功能说明
            节点间通过gRPC通信。支持数据同步和复制。提供一致性保证。监控集群健康状态。
        b.代码示例
            ---
            # 连接集群
            from qdrant_client import QdrantClient
            
            cluster_client = QdrantClient(
                url="http://cluster-lb.example.com:6333"
            )
            
            print("集群连接已建立")
            ---

02.集群管理
    a.节点管理
        a.功能说明
            添加和移除节点。监控节点状态。处理节点故障。实现自动故障转移。
        b.代码示例
            ---
            # 获取集群信息
            # cluster_info = cluster_client.get_cluster_info()
            
            # print(f"集群节点数: {len(cluster_info.peers)}")
            # for peer in cluster_info.peers:
            #     print(f"  节点: {peer.uri}, 状态: {peer.status}")
            
            print("集群管理功能")
            ---
    b.数据迁移
        a.功能说明
            支持数据在节点间迁移。实现负载均衡。优化数据分布。提升集群性能。
        b.代码示例
            ---
            migration_steps = [
                "1. 评估当前数据分布",
                "2. 规划迁移方案",
                "3. 执行数据迁移",
                "4. 验证数据完整性",
                "5. 更新路由配置"
            ]
            
            print("数据迁移步骤:")
            for step in migration_steps:
                print(f"  {step}")
            ---

8.2 数据分片

01.分片策略
    a.分片配置
        a.功能说明
            数据分片提升并发性能。每个分片独立处理请求。分片数量影响性能和资源使用。需要根据数据量和负载确定。
        b.代码示例
            ---
            client.create_collection(
                collection_name="sharded_collection",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                shard_number=4,
                replication_factor=2
            )
            
            print("分片Collection创建完成")
            ---
    b.分片管理
        a.功能说明
            监控分片状态。优化分片分布。处理分片故障。实现分片迁移。
        b.代码示例
            ---
            shard_management = {
                "分片监控": "定期检查分片健康状态",
                "负载均衡": "确保分片负载均匀",
                "故障处理": "自动故障转移",
                "分片迁移": "支持动态调整分片"
            }
            
            print("分片管理:")
            for task, desc in shard_management.items():
                print(f"  {task}: {desc}")
            ---

02.分片优化
    a.负载均衡
        a.功能说明
            确保分片负载均匀。避免热点分片。优化数据分布。提升整体性能。
        b.代码示例
            ---
            load_balancing_strategies = [
                "基于数据量的分片",
                "基于访问频率的分片",
                "动态调整分片权重",
                "实现分片自动迁移"
            ]
            
            print("负载均衡策略:")
            for i, strategy in enumerate(load_balancing_strategies, 1):
                print(f"  {i}. {strategy}")
            ---
    b.性能监控
        a.功能说明
            监控分片性能指标。识别性能瓶颈。及时优化调整。保证服务质量。
        b.代码示例
            ---
            shard_metrics = [
                "每个分片的请求数",
                "每个分片的延迟",
                "每个分片的数据量",
                "每个分片的CPU和内存使用"
            ]
            
            print("分片监控指标:")
            for i, metric in enumerate(shard_metrics, 1):
                print(f"  {i}. {metric}")
            ---

8.3 副本配置

01.副本策略
    a.副本因子
        a.功能说明
            副本因子决定数据冗余度。更多副本提供更高可用性。也增加存储成本。需要权衡可用性和成本。
        b.代码示例
            ---
            client.create_collection(
                collection_name="ha_collection",
                vectors_config=VectorParams(size=768, distance=Distance.COSINE),
                shard_number=3,
                replication_factor=3,
                write_consistency_factor=2
            )
            
            print("高可用Collection创建完成")
            ---
    b.一致性级别
        a.功能说明
            一致性级别控制写入保证。强一致性保证数据可靠但性能较低。最终一致性性能高但可能短暂不一致。根据业务需求选择。
        b.代码示例
            ---
            consistency_levels = {
                "强一致性": "write_consistency_factor = replication_factor",
                "最终一致性": "write_consistency_factor = 1",
                "平衡模式": "write_consistency_factor = (replication_factor + 1) / 2"
            }
            
            print("一致性级别:")
            for level, config in consistency_levels.items():
                print(f"  {level}: {config}")
            ---

02.副本管理
    a.副本同步
        a.功能说明
            副本间自动同步数据。保证数据一致性。处理同步延迟。监控同步状态。
        b.代码示例
            ---
            replication_features = [
                "自动数据同步",
                "增量同步优化",
                "同步状态监控",
                "故障自动恢复"
            ]
            
            print("副本同步特性:")
            for i, feature in enumerate(replication_features, 1):
                print(f"  {i}. {feature}")
            ---
    b.故障恢复
        a.功能说明
            副本提供故障恢复能力。节点故障时自动切换。保证服务可用性。最小化数据丢失。
        b.代码示例
            ---
            failover_process = [
                "1. 检测节点故障",
                "2. 标记故障节点",
                "3. 切换到健康副本",
                "4. 更新路由配置",
                "5. 恢复故障节点"
            ]
            
            print("故障恢复流程:")
            for step in failover_process:
                print(f"  {step}")
            ---

9 AI框架集成

9.1 LangChain集成

01.LangChain配置
    a.向量存储
        a.功能说明
            Qdrant作为LangChain的向量存储后端。存储文档嵌入向量。支持语义搜索。实现RAG应用。
        b.代码示例
            ---
            from langchain.vectorstores import Qdrant
            from langchain.embeddings import OpenAIEmbeddings
            from qdrant_client import QdrantClient
            
            client = QdrantClient(host="localhost", port=6333)
            embeddings = OpenAIEmbeddings()
            
            vectorstore = Qdrant(
                client=client,
                collection_name="langchain_docs",
                embeddings=embeddings
            )
            
            print("LangChain向量存储配置完成")
            ---
    b.文档检索
        a.功能说明
            使用Qdrant检索相关文档。支持相似度搜索。结合LLM生成答案。实现问答系统。
        b.代码示例
            ---
            # 添加文档
            texts = [
                "Qdrant is a vector database",
                "LangChain is a framework for LLM applications",
                "RAG combines retrieval and generation"
            ]
            
            vectorstore.add_texts(texts)
            
            # 检索文档
            query = "What is Qdrant?"
            docs = vectorstore.similarity_search(query, k=3)
            
            print(f"检索到 {len(docs)} 个相关文档")
            for doc in docs:
                print(f"  {doc.page_content}")
            ---

02.RAG应用
    a.检索增强
        a.功能说明
            RAG结合检索和生成。先检索相关文档。再生成答案。提升答案质量和可靠性。
        b.代码示例
            ---
            from langchain.chains import RetrievalQA
            from langchain.llms import OpenAI
            
            llm = OpenAI(temperature=0)
            
            qa_chain = RetrievalQA.from_chain_type(
                llm=llm,
                chain_type="stuff",
                retriever=vectorstore.as_retriever()
            )
            
            question = "What is Qdrant used for?"
            answer = qa_chain.run(question)
            
            print(f"问题: {question}")
            print(f"答案: {answer}")
            ---
    b.对话系统
        a.功能说明
            实现基于RAG的对话系统。支持多轮对话。结合历史上下文。提供准确的回答。
        b.代码示例
            ---
            from langchain.chains import ConversationalRetrievalChain
            from langchain.memory import ConversationBufferMemory
            
            memory = ConversationBufferMemory(
                memory_key="chat_history",
                return_messages=True
            )
            
            conversation_chain = ConversationalRetrievalChain.from_llm(
                llm=llm,
                retriever=vectorstore.as_retriever(),
                memory=memory
            )
            
            print("对话系统已初始化")
            ---

9.2 LlamaIndex集成

01.LlamaIndex配置
    a.向量索引
        a.功能说明
            Qdrant作为LlamaIndex的向量存储。构建文档索引。支持高效检索。实现知识库应用。
        b.代码示例
            ---
            from llama_index import VectorStoreIndex, ServiceContext
            from llama_index.vector_stores import QdrantVectorStore
            from llama_index.storage.storage_context import StorageContext
            from qdrant_client import QdrantClient
            
            client = QdrantClient(host="localhost", port=6333)
            
            vector_store = QdrantVectorStore(
                client=client,
                collection_name="llamaindex_docs"
            )
            
            storage_context = StorageContext.from_defaults(
                vector_store=vector_store
            )
            
            print("LlamaIndex向量存储配置完成")
            ---
    b.文档索引
        a.功能说明
            索引文档到Qdrant。自动分块和嵌入。支持多种文档格式。实现语义搜索。
        b.代码示例
            ---
            from llama_index import Document
            
            documents = [
                Document(text="Qdrant is a vector database for AI applications"),
                Document(text="LlamaIndex helps build LLM applications"),
                Document(text="Vector search enables semantic retrieval")
            ]
            
            index = VectorStoreIndex.from_documents(
                documents,
                storage_context=storage_context
            )
            
            print(f"索引了 {len(documents)} 个文档")
            ---

02.查询引擎
    a.语义查询
        a.功能说明
            使用查询引擎检索和生成答案。支持自然语言查询。结合检索和推理。提供准确的回答。
        b.代码示例
            ---
            query_engine = index.as_query_engine()
            
            response = query_engine.query("What is Qdrant?")
            
            print(f"查询: What is Qdrant?")
            print(f"回答: {response}")
            ---
    b.高级查询
        a.功能说明
            支持过滤、排序等高级查询。自定义检索策略。优化查询性能。提供灵活的查询能力。
        b.代码示例
            ---
            from llama_index.vector_stores.types import MetadataFilters, ExactMatchFilter
            
            filters = MetadataFilters(
                filters=[
                    ExactMatchFilter(key="category", value="tech")
                ]
            )
            
            filtered_query_engine = index.as_query_engine(
                filters=filters
            )
            
            print("高级查询引擎已配置")
            ---

10 最佳实践

10.1 数据建模

01.向量设计
    a.向量选择
        a.功能说明
            选择合适的向量模型。考虑维度和精度。平衡性能和效果。根据应用场景选择。
        b.代码示例
            ---
            vector_models = {
                "文本": {
                    "BERT": "768维,适合通用文本",
                    "Sentence-BERT": "768维,适合句子相似度",
                    "OpenAI Ada": "1536维,高质量嵌入"
                },
                "图像": {
                    "ResNet": "2048维,适合图像分类",
                    "CLIP": "512维,适合多模态",
                    "ViT": "768维,适合图像理解"
                }
            }
            
            print("向量模型选择:")
            for data_type, models in vector_models.items():
                print(f"\n{data_type}:")
                for model, desc in models.items():
                    print(f"  {model}: {desc}")
            ---
    b.Payload设计
        a.功能说明
            合理设计Payload结构。存储必要的元数据。为常用字段创建索引。避免存储过大数据。
        b.代码示例
            ---
            payload_design = {
                "基础字段": ["id", "title", "category", "created_at"],
                "业务字段": ["author", "tags", "status"],
                "索引字段": ["category", "status", "created_at"],
                "避免字段": ["大文本内容", "二进制数据"]
            }
            
            print("Payload设计建议:")
            for field_type, fields in payload_design.items():
                print(f"  {field_type}: {', '.join(fields)}")
            ---

02.数据组织
    a.Collection设计
        a.功能说明
            合理划分Collection。按业务逻辑分离数据。避免单个Collection过大。便于管理和维护。
        b.代码示例
            ---
            collection_design = {
                "按业务": "用户、商品、文档分别建Collection",
                "按时间": "按月或按年划分历史数据",
                "按语言": "多语言数据分别存储",
                "按模态": "文本、图像、音频分别存储"
            }
            
            print("Collection设计策略:")
            for strategy, desc in collection_design.items():
                print(f"  {strategy}: {desc}")
            ---
    b.数据生命周期
        a.功能说明
            管理数据生命周期。定期清理过期数据。归档历史数据。优化存储成本。
        b.代码示例
            ---
            lifecycle_management = [
                "定期清理过期数据",
                "归档低频访问数据",
                "压缩历史数据",
                "监控数据增长",
                "规划存储容量"
            ]
            
            print("数据生命周期管理:")
            for i, task in enumerate(lifecycle_management, 1):
                print(f"  {i}. {task}")
            ---

10.2 查询优化

01.查询设计
    a.查询策略
        a.功能说明
            设计高效的查询策略。合理使用过滤条件。控制返回结果数量。优化查询性能。
        b.代码示例
            ---
            query_best_practices = [
                "为常用过滤字段创建索引",
                "使用批量查询提升吞吐量",
                "设置合理的limit值",
                "避免返回不需要的向量",
                "使用缓存减少重复查询"
            ]
            
            print("查询优化最佳实践:")
            for i, practice in enumerate(query_best_practices, 1):
                print(f"  {i}. {practice}")
            ---
    b.性能调优
        a.功能说明
            根据实际场景调优参数。监控查询性能。识别慢查询。持续优化改进。
        b.代码示例
            ---
            performance_tuning = {
                "HNSW ef": "低延迟64-128,高准确率256-512",
                "limit": "通常10-50,避免过大",
                "batch_size": "批量查询10-100",
                "timeout": "设置合理的超时时间"
            }
            
            print("性能调优参数:")
            for param, recommendation in performance_tuning.items():
                print(f"  {param}: {recommendation}")
            ---

02.缓存策略
    a.查询缓存
        a.功能说明
            缓存热门查询结果。减少重复计算。提升响应速度。降低系统负载。
        b.代码示例
            ---
            caching_strategies = {
                "热门查询": "缓存1小时",
                "个性化查询": "缓存5-10分钟",
                "实时查询": "不缓存",
                "静态数据": "缓存24小时"
            }
            
            print("缓存策略:")
            for query_type, strategy in caching_strategies.items():
                print(f"  {query_type}: {strategy}")
            ---
    b.缓存失效
        a.功能说明
            合理设置缓存过期时间。数据更新时清除相关缓存。避免返回过期数据。平衡缓存命中率和数据新鲜度。
        b.代码示例
            ---
            cache_invalidation = [
                "数据更新时清除相关缓存",
                "设置合理的TTL",
                "监控缓存命中率",
                "定期清理过期缓存"
            ]
            
            print("缓存失效策略:")
            for i, strategy in enumerate(cache_invalidation, 1):
                print(f"  {i}. {strategy}")
            ---

10.3 运维监控

01.监控指标
    a.性能指标
        a.功能说明
            监控关键性能指标。及时发现性能问题。优化系统配置。保证服务质量。
        b.代码示例
            ---
            performance_metrics = {
                "查询延迟": "P50/P95/P99延迟",
                "查询吞吐量": "QPS",
                "索引性能": "索引构建时间",
                "内存使用": "内存占用率",
                "CPU使用": "CPU利用率",
                "磁盘IO": "读写IOPS"
            }
            
            print("性能监控指标:")
            for metric, desc in performance_metrics.items():
                print(f"  {metric}: {desc}")
            ---
    b.业务指标
        a.功能说明
            监控业务相关指标。分析用户行为。优化产品功能。提升用户体验。
        b.代码示例
            ---
            business_metrics = {
                "搜索准确率": "用户点击率",
                "推荐效果": "推荐转化率",
                "数据规模": "Points数量增长",
                "用户活跃度": "查询频率"
            }
            
            print("业务监控指标:")
            for metric, desc in business_metrics.items():
                print(f"  {metric}: {desc}")
            ---

02.告警和响应
    a.告警配置
        a.功能说明
            配置合理的告警规则。及时发现异常。快速响应处理。减少故障影响。
        b.代码示例
            ---
            alert_rules = {
                "高延迟": "P99延迟 > 1000ms",
                "低吞吐": "QPS < 100",
                "高内存": "内存使用率 > 90%",
                "磁盘满": "磁盘使用率 > 85%",
                "节点故障": "节点不可达"
            }
            
            print("告警规则:")
            for alert, condition in alert_rules.items():
                print(f"  {alert}: {condition}")
            ---
    b.故障处理
        a.功能说明
            建立故障响应流程。快速定位问题。及时修复故障。总结经验教训。
        b.代码示例
            ---
            incident_response = [
                "1. 接收告警通知",
                "2. 快速评估影响",
                "3. 定位问题根因",
                "4. 执行修复方案",
                "5. 验证服务恢复",
                "6. 总结和改进"
            ]
            
            print("故障响应流程:")
            for step in incident_response:
                print(f"  {step}")
            ---