1 课程概述
1.1 课程定位
01.课程性质
a.进阶实战课程
本课程定位于大语言模型工程化的进阶实战训练,面向已掌握基础深度学习知识的学员,聚焦模型从研发到生产环境的完整落地流程。
b.工程能力培养
区别于理论研究课程,本课程强调动手实践,通过真实项目案例培养学员解决实际工程问题的能力,包括模型部署、性能优化、分布式训练等核心技能。
c.岗位技能对齐
课程内容与企业LLM工程师岗位需求深度对齐,覆盖推理优化、训练加速、向量检索等生产环境必备技术栈。
02.技术栈覆盖
a.部署推理
a.推理框架
涵盖vLLM、TGI等主流高性能推理框架,掌握大模型在生产环境的快速部署与服务化能力。
b.服务封装
学习使用FastAPI构建RESTful接口,实现模型的标准化服务封装,支持高并发请求处理。
b.模型优化
a.压缩技术
深入量化、剪枝、蒸馏等模型压缩方法,实现模型体积缩减与推理加速的双重优化。
b.分布式训练
掌握数据并行、模型并行、流水线并行等分布式策略,使用DeepSpeed、FSDP等框架进行大规模模型训练。
c.基础设施
a.向量数据库
学习Milvus、Pinecone等向量数据库的使用,构建高效的语义检索系统,支持RAG应用。
b.GPU优化
理解CUDA编程基础,掌握显存管理、混合精度训练等GPU优化技术,提升资源利用率。
03.能力目标
a.独立部署能力
能够独立完成大语言模型的生产环境部署,包括推理框架选型、性能调优、服务监控等全流程操作。
b.性能优化能力
掌握模型压缩与加速的核心方法,能够根据业务需求选择合适的优化策略,实现推理速度与模型精度的平衡。
c.工程实践能力
具备解决实际生产问题的能力,包括分布式训练调试、GPU资源管理、系统容错设计等工程实践技能。
1.2 学习目标
01.知识目标
a.理论框架掌握
a.推理优化理论
理解大模型推理的计算特性,包括自回归生成机制、KV缓存原理、注意力机制优化等核心概念。
b.分布式训练原理
掌握数据并行、模型并行、流水线并行的实现机制,理解梯度同步、通信优化、内存管理等关键技术。
c.向量检索算法
理解高维向量检索的数学原理,包括ANN算法、HNSW图索引、IVF倒排索引等检索加速方法。
b.工具链熟练度
a.推理框架
熟练使用vLLM、TGI进行模型部署,理解各框架的适用场景与性能特点。
b.训练框架
掌握DeepSpeed ZeRO、FSDP等分布式训练框架的配置与调优方法。
c.向量数据库
熟练操作Milvus、Pinecone、Chroma等向量数据库,完成数据索引与检索任务。
02.技能目标
a.部署服务化
a.模型部署
a.功能说明
能够将预训练模型部署到生产环境,支持GPU加速推理,实现毫秒级响应。
b.代码示例
---
# vLLM部署Llama-2-7B模型
from vllm import LLM, SamplingParams
# 初始化模型(自动启用GPU加速和PagedAttention优化)
llm = LLM(
model="meta-llama/Llama-2-7b-hf",
tensor_parallel_size=2, # 使用2张GPU进行张量并行
gpu_memory_utilization=0.9, # GPU显存利用率90%
max_num_seqs=128 # 最大并发序列数
)
# 配置采样参数
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=512
)
# 批量推理
prompts = [
"Explain quantum computing in simple terms:",
"What are the benefits of microservices architecture?"
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(f"Prompt: {output.prompt}")
print(f"Generated: {output.outputs[0].text}")
print(f"Tokens: {len(output.outputs[0].token_ids)}")
---
b.API服务封装
a.功能说明
使用FastAPI构建高性能RESTful接口,支持流式响应与异步处理。
b.代码示例
---
# FastAPI服务封装(支持流式响应)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
app = FastAPI(title="LLM Inference API")
class GenerateRequest(BaseModel):
prompt: str
max_tokens: int = 512
temperature: float = 0.7
stream: bool = False
# 初始化全局模型实例
llm = LLM(model="meta-llama/Llama-2-7b-hf")
@app.post("/v1/generate")
async def generate(request: GenerateRequest):
try:
if request.stream:
# 流式响应生成器
async def stream_generator():
sampling_params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens
)
for output in llm.generate([request.prompt], sampling_params):
yield f"data: {output.outputs[0].text}\n\n"
await asyncio.sleep(0.01)
return StreamingResponse(
stream_generator(),
media_type="text/event-stream"
)
else:
# 非流式响应
sampling_params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens
)
outputs = llm.generate([request.prompt], sampling_params)
return {
"text": outputs[0].outputs[0].text,
"tokens": len(outputs[0].outputs[0].token_ids),
"finish_reason": outputs[0].outputs[0].finish_reason
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 健康检查接口
@app.get("/health")
async def health_check():
return {"status": "healthy", "model": "Llama-2-7b"}
---
b.性能优化
a.模型量化
a.功能说明
掌握INT8/INT4量化技术,在保持模型精度的前提下减少显存占用50%-75%。
b.代码示例
---
# GPTQ量化Llama-2-13B模型
from transformers import AutoModelForCausalLM, AutoTokenizer
from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
# 量化配置
quantize_config = BaseQuantizeConfig(
bits=4, # 4-bit量化
group_size=128, # 量化分组大小
desc_act=False, # 激活值量化顺序
damp_percent=0.01 # 阻尼系数
)
# 加载原始模型
model_name = "meta-llama/Llama-2-13b-hf"
model = AutoGPTQForCausalLM.from_pretrained(
model_name,
quantize_config=quantize_config
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 准备校准数据(使用少量样本进行量化校准)
calibration_dataset = [
"The future of artificial intelligence is",
"Quantum computing will revolutionize",
"The key to sustainable energy is"
]
# 执行量化
model.quantize(
calibration_dataset,
batch_size=1
)
# 保存量化模型(显存占用从26GB降低到7GB)
quantized_model_dir = "./llama-2-13b-gptq-4bit"
model.save_quantized(quantized_model_dir)
tokenizer.save_pretrained(quantized_model_dir)
# 加载量化模型进行推理
quantized_model = AutoGPTQForCausalLM.from_quantized(
quantized_model_dir,
device="cuda:0"
)
inputs = tokenizer("Explain neural networks:", return_tensors="pt").to("cuda:0")
outputs = quantized_model.generate(**inputs, max_new_tokens=100)
print(tokenizer.decode(outputs[0]))
---
b.分布式训练
a.功能说明
使用DeepSpeed ZeRO-3在多GPU上训练大模型,支持千亿参数规模。
b.代码示例
---
# DeepSpeed ZeRO-3分布式训练配置
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from transformers import Trainer
import deepspeed
# DeepSpeed配置文件(ds_config.json)
ds_config = {
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 2,
"gradient_accumulation_steps": 8,
"optimizer": {
"type": "AdamW",
"params": {
"lr": 2e-5,
"betas": [0.9, 0.999],
"eps": 1e-8
}
},
"fp16": {
"enabled": True,
"loss_scale": 0,
"initial_scale_power": 16
},
"zero_optimization": {
"stage": 3, # ZeRO-3: 切分优化器状态、梯度和参数
"offload_optimizer": {
"device": "cpu", # 优化器状态卸载到CPU
"pin_memory": True
},
"offload_param": {
"device": "cpu", # 参数卸载到CPU
"pin_memory": True
},
"overlap_comm": True, # 通信与计算重叠
"contiguous_gradients": True,
"sub_group_size": 1e9,
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6
}
}
# 训练参数配置
training_args = TrainingArguments(
output_dir="./llama-2-7b-finetuned",
per_device_train_batch_size=2,
gradient_accumulation_steps=8,
num_train_epochs=3,
learning_rate=2e-5,
fp16=True,
logging_steps=10,
save_steps=500,
deepspeed=ds_config # 启用DeepSpeed
)
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
# 启动分布式训练(使用4张GPU)
# 命令: deepspeed --num_gpus=4 train.py
---
c.数据管理
a.向量索引构建
a.功能说明
使用Milvus构建百万级向量索引,支持毫秒级语义检索。
b.代码示例
---
# Milvus向量索引构建与检索
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType
from sentence_transformers import SentenceTransformer
import numpy as np
# 连接Milvus服务
connections.connect(
alias="default",
host="localhost",
port="19530"
)
# 定义集合Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000)
]
schema = CollectionSchema(fields=fields, description="Document embeddings")
# 创建集合
collection = Collection(name="documents", schema=schema)
# 创建IVF_FLAT索引(倒排文件索引,适合百万级数据)
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2", # 欧氏距离
"params": {"nlist": 1024} # 聚类中心数量
}
collection.create_index(field_name="embedding", index_params=index_params)
# 加载Embedding模型
encoder = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
# 插入向量数据
documents = [
"Artificial intelligence is transforming industries",
"Machine learning enables predictive analytics",
"Deep learning powers modern NLP applications"
]
embeddings = encoder.encode(documents)
entities = [
embeddings.tolist(),
documents
]
collection.insert(entities)
collection.flush()
# 加载集合到内存
collection.load()
# 语义检索
query = "AI applications in business"
query_embedding = encoder.encode([query])
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=query_embedding.tolist(),
anns_field="embedding",
param=search_params,
limit=3, # 返回Top-3结果
output_fields=["text"]
)
for hits in results:
for hit in hits:
print(f"Distance: {hit.distance:.4f}, Text: {hit.entity.get('text')}")
---
03.素养目标
a.工程思维
培养从业务需求到技术实现的全流程思考能力,包括系统设计、性能评估、成本优化等工程决策能力。
b.问题解决能力
具备独立排查生产环境问题的能力,包括模型推理异常、分布式训练故障、GPU资源冲突等复杂场景的调试技能。
c.持续学习意识
建立跟踪前沿技术的习惯,关注开源社区动态,及时掌握新框架、新算法的应用实践。
1.3 前置要求
01.编程基础
a.Python精通
a.核心语法
熟练掌握Python面向对象编程、装饰器、生成器、异步编程等高级特性,能够编写高质量的模块化代码。
b.科学计算库
a.NumPy数组操作
熟练使用NumPy进行张量操作、矩阵运算、广播机制等,理解底层内存布局优化。
b.代码示例
---
# NumPy高级操作示例
import numpy as np
# 高效批量处理(使用向量化避免循环)
embeddings = np.random.randn(10000, 768) # 1万个768维向量
# 错误做法:Python循环计算余弦相似度(慢)
# similarities = []
# for i in range(len(embeddings)):
# for j in range(len(embeddings)):
# sim = np.dot(embeddings[i], embeddings[j]) / (np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]))
# similarities.append(sim)
# 正确做法:向量化计算(快100倍以上)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized_embeddings = embeddings / norms
similarity_matrix = np.dot(normalized_embeddings, normalized_embeddings.T)
# 内存优化:分块处理超大矩阵
def batch_cosine_similarity(vectors, batch_size=1000):
n = len(vectors)
result = np.zeros((n, n))
for i in range(0, n, batch_size):
for j in range(0, n, batch_size):
i_end = min(i + batch_size, n)
j_end = min(j + batch_size, n)
result[i:i_end, j:j_end] = np.dot(
vectors[i:i_end],
vectors[j:j_end].T
)
return result
# 广播机制应用
query = np.random.randn(1, 768)
distances = np.linalg.norm(embeddings - query, axis=1) # 自动广播
top_k_indices = np.argpartition(distances, 5)[:5] # Top-5最近邻
---
b.Linux操作
a.命令行工具
熟练使用ssh、scp、rsync进行远程操作,掌握tmux、screen进行会话管理。
b.GPU监控
a.功能说明
掌握nvidia-smi、nvtop等工具监控GPU使用情况,诊断显存泄漏、利用率低等问题。
b.代码示例
---
# GPU监控与诊断脚本
import subprocess
import re
import time
def monitor_gpu_memory(interval=2):
"""实时监控GPU显存使用情况"""
while True:
# 执行nvidia-smi命令
result = subprocess.run(
['nvidia-smi', '--query-gpu=index,name,memory.used,memory.total,utilization.gpu',
'--format=csv,noheader,nounits'],
capture_output=True,
text=True
)
print("\n" + "="*80)
print(f"GPU Status at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)
for line in result.stdout.strip().split('\n'):
gpu_id, name, mem_used, mem_total, util = line.split(', ')
mem_percent = (int(mem_used) / int(mem_total)) * 100
print(f"GPU {gpu_id} ({name}):")
print(f" Memory: {mem_used}MB / {mem_total}MB ({mem_percent:.1f}%)")
print(f" Utilization: {util}%")
# 报警:显存使用超过90%
if mem_percent > 90:
print(f" ⚠️ WARNING: High memory usage!")
time.sleep(interval)
# 启动监控
# monitor_gpu_memory()
# 查找占用GPU的进程
def find_gpu_processes():
result = subprocess.run(
['nvidia-smi', '--query-compute-apps=pid,used_memory',
'--format=csv,noheader,nounits'],
capture_output=True,
text=True
)
print("GPU Processes:")
for line in result.stdout.strip().split('\n'):
if line:
pid, mem = line.split(', ')
# 获取进程详细信息
ps_result = subprocess.run(
['ps', '-p', pid, '-o', 'comm='],
capture_output=True,
text=True
)
process_name = ps_result.stdout.strip()
print(f" PID {pid}: {process_name} (Memory: {mem}MB)")
find_gpu_processes()
---
c.Git版本控制
熟练使用Git进行代码管理,包括分支操作、冲突解决、版本回退等,能够参与团队协作开发。
02.深度学习基础
a.PyTorch框架
a.张量操作
熟练掌握张量创建、索引、变形、广播等操作,理解autograd自动微分机制。
b.模型训练
a.功能说明
掌握完整的训练流程,包括数据加载、前向传播、反向传播、优化器更新等。
b.代码示例
---
# PyTorch完整训练流程示例
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torch.optim import AdamW
from torch.cuda.amp import autocast, GradScaler
# 自定义数据集
class TextDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_length=512):
self.encodings = tokenizer(
texts,
truncation=True,
padding='max_length',
max_length=max_length,
return_tensors='pt'
)
self.labels = torch.tensor(labels)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
item = {key: val[idx] for key, val in self.encodings.items()}
item['labels'] = self.labels[idx]
return item
# 训练函数(支持混合精度)
def train_epoch(model, dataloader, optimizer, device, use_amp=True):
model.train()
total_loss = 0
scaler = GradScaler() if use_amp else None
for batch_idx, batch in enumerate(dataloader):
# 数据移动到GPU
batch = {k: v.to(device) for k, v in batch.items()}
# 梯度清零
optimizer.zero_grad()
# 混合精度前向传播
if use_amp:
with autocast():
outputs = model(**batch)
loss = outputs.loss
# 混合精度反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
total_loss += loss.item()
# 打印进度
if (batch_idx + 1) % 10 == 0:
print(f"Batch {batch_idx+1}/{len(dataloader)}, Loss: {loss.item():.4f}")
return total_loss / len(dataloader)
# 模型初始化
from transformers import AutoModelForSequenceClassification, AutoTokenizer
model_name = "bert-base-uncased"
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 数据准备
texts = ["This movie is great!", "Terrible waste of time."]
labels = [1, 0]
dataset = TextDataset(texts, labels, tokenizer)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
# 优化器配置
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.01)
# 训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
for epoch in range(3):
avg_loss = train_epoch(model, dataloader, optimizer, device)
print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")
---
b.Transformer架构
a.注意力机制
理解Self-Attention、Multi-Head Attention的计算原理,掌握KV缓存优化技术。
b.位置编码
理解绝对位置编码、相对位置编码、旋转位置编码(RoPE)等不同方案的优劣。
c.预训练模型
熟悉BERT、GPT、LLaMA等主流模型架构,理解预训练任务(MLM、CLM)与微调策略(全量微调、LoRA、P-Tuning)。
03.数学基础
a.线性代数
a.矩阵运算
掌握矩阵乘法、特征分解、奇异值分解(SVD)等,理解在模型压缩中的应用。
b.向量空间
理解向量内积、外积、投影等几何意义,掌握余弦相似度、欧氏距离等度量方法。
b.概率统计
a.概率分布
理解正态分布、伯努利分布等常见分布,掌握最大似然估计、贝叶斯推断等方法。
b.采样方法
掌握Top-k采样、Top-p采样、温度采样等文本生成采样策略。
c.优化理论
理解梯度下降、Adam优化器、学习率调度等优化算法,掌握梯度裁剪、梯度累积等训练技巧。
04.硬件知识
a.GPU架构
a.CUDA核心
理解GPU并行计算原理,掌握CUDA核心、Tensor Core的作用与调度机制。
b.显存层次
理解全局内存、共享内存、寄存器等显存层次结构,掌握显存带宽优化方法。
b.网络通信
a.NCCL库
理解多GPU通信原理,掌握AllReduce、AllGather等集合通信操作。
b.带宽优化
理解PCIe、NVLink等互联技术,掌握通信与计算重叠、梯度压缩等优化方法。
1.4 学习时长
01.总体时长规划
a.课程周期
本课程建议学习周期为8-12周,每周投入15-20小时学习时间,总计120-240小时完成全部内容的学习与实践。
b.阶段划分
a.基础阶段
第1-3周,学习模型部署与推理优化,掌握vLLM、TGI等推理框架的使用,预计投入40-60小时。
b.进阶阶段
第4-6周,学习模型压缩与分布式训练,掌握量化、剪枝、DeepSpeed等技术,预计投入50-80小时。
c.实战阶段
第7-9周,学习向量数据库与GPU优化,完成RAG系统构建与性能调优,预计投入30-50小时。
d.综合阶段
第10-12周,学习生产环境实践,完成完整项目部署与监控,预计投入30-50小时。
02.分章节学习时长
a.第1章
课程概述,1-2小时,快速了解课程内容与学习路径。
b.第2章
a.理论学习
模型部署原理,5-8小时,理解推理框架架构与优化机制。
b.实践操作
a.vLLM部署
3-5小时,完成本地环境搭建与模型部署,调试配置参数。
b.TGI部署
3-5小时,对比TGI与vLLM性能差异,理解适用场景。
c.服务封装
4-6小时,使用FastAPI封装推理接口,实现流式响应。
c.第3章
a.理论学习
模型压缩原理,6-10小时,理解量化、剪枝、蒸馏的数学基础。
b.实践操作
a.INT8量化
4-6小时,使用BitsAndBytes对模型进行动态量化。
b.GPTQ量化
5-8小时,对Llama-2-13B进行4-bit量化,对比精度损失。
c.AWQ量化
4-6小时,学习激活值感知量化,优化推理速度。
d.第4章
a.理论学习
分布式训练原理,8-12小时,理解数据并行、模型并行、流水线并行的实现机制。
b.实践操作
a.DeepSpeed ZeRO-1/2/3
6-10小时,配置不同ZeRO stage,对比显存优化效果。
b.FSDP训练
5-8小时,使用PyTorch FSDP训练大模型,理解分片策略。
c.多机训练
6-10小时,配置多机多卡环境,调试网络通信问题。
e.第5章
a.理论学习
向量检索原理,4-6小时,理解ANN算法、HNSW索引等核心概念。
b.实践操作
a.Milvus使用
4-6小时,部署Milvus服务,构建向量索引。
b.Pinecone使用
3-5小时,使用托管服务快速构建检索系统。
c.RAG应用
6-10小时,结合LLM与向量数据库构建问答系统。
f.第6章
a.理论学习
GPU优化原理,6-10小时,学习CUDA编程基础与显存管理。
b.实践操作
a.混合精度训练
3-5小时,配置FP16/BF16训练,对比训练速度。
b.梯度累积
2-4小时,使用梯度累积模拟大batch训练。
c.性能分析
4-6小时,使用Nsight、PyTorch Profiler分析性能瓶颈。
g.第7章
a.理论学习
生产环境架构,4-6小时,学习模型监控、负载均衡等工程实践。
b.实践操作
a.监控系统
4-6小时,集成Prometheus与Grafana监控模型服务。
b.容错设计
3-5小时,实现请求重试、熔断降级等容错机制。
c.成本优化
3-5小时,分析GPU利用率,优化资源配置降低成本。
h.第8章
学习路径与资源,2-4小时,规划后续学习方向,建立技术资源库。
03.实践建议
a.动手为主
a.代码复现
每个知识点必须动手实践,复现课程中的所有代码示例,理解每个参数的作用。
b.实验对比
a.功能说明
通过对比实验深化理解,例如对比不同量化方法的精度与速度差异。
b.代码示例
---
# 对比INT8与FP16推理性能
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import time
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
# FP16推理
model_fp16 = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
# INT8推理(使用BitsAndBytes)
model_int8 = AutoModelForCausalLM.from_pretrained(
model_name,
load_in_8bit=True,
device_map="auto"
)
prompt = "Explain the theory of relativity in simple terms:"
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
# 测试FP16速度
start = time.time()
with torch.no_grad():
outputs_fp16 = model_fp16.generate(**inputs, max_new_tokens=100)
fp16_time = time.time() - start
# 测试INT8速度
start = time.time()
with torch.no_grad():
outputs_int8 = model_int8.generate(**inputs, max_new_tokens=100)
int8_time = time.time() - start
print(f"FP16 Inference Time: {fp16_time:.2f}s")
print(f"INT8 Inference Time: {int8_time:.2f}s")
print(f"Speedup: {fp16_time/int8_time:.2f}x")
# 对比显存占用
print(f"FP16 Memory: {torch.cuda.max_memory_allocated()/1024**3:.2f}GB")
torch.cuda.reset_peak_memory_stats()
_ = model_int8.generate(**inputs, max_new_tokens=100)
print(f"INT8 Memory: {torch.cuda.max_memory_allocated()/1024**3:.2f}GB")
---
b.问题导向
遇到问题及时记录,通过查阅文档、源码、社区讨论等方式解决,建立问题解决能力。
c.项目实战
a.完整项目
在学习过程中完成一个端到端项目,例如构建一个基于LLM的问答系统,涵盖模型部署、向量检索、API服务等全流程。
b.代码示例
a.功能说明
综合应用所学知识,构建生产级RAG系统。
b.实现框架
---
# 完整RAG系统架构示例
from fastapi import FastAPI
from pydantic import BaseModel
from vllm import LLM, SamplingParams
from pymilvus import Collection, connections
from sentence_transformers import SentenceTransformer
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 初始化组件
app = FastAPI(title="RAG QA System")
# 加载LLM(使用vLLM加速)
llm = LLM(
model="meta-llama/Llama-2-7b-chat-hf",
tensor_parallel_size=2,
gpu_memory_utilization=0.85
)
# 加载Embedding模型
encoder = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
# 连接Milvus
connections.connect(host="localhost", port="19530")
collection = Collection("knowledge_base")
collection.load()
class QueryRequest(BaseModel):
question: str
top_k: int = 3
max_tokens: int = 512
@app.post("/query")
async def query(request: QueryRequest):
try:
# 1. 向量检索相关文档
query_embedding = encoder.encode([request.question])
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=query_embedding.tolist(),
anns_field="embedding",
param=search_params,
limit=request.top_k,
output_fields=["text"]
)
# 2. 构建上下文
context = "\n\n".join([
hit.entity.get('text') for hit in results[0]
])
# 3. 生成Prompt
prompt = f"""Based on the following context, answer the question.
Context:
{context}
Question: {request.question}
Answer:"""
# 4. LLM生成答案
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=request.max_tokens
)
outputs = llm.generate([prompt], sampling_params)
answer = outputs[0].outputs[0].text.strip()
logger.info(f"Query: {request.question}, Answer length: {len(answer)}")
return {
"question": request.question,
"answer": answer,
"context": context,
"sources": [hit.entity.get('text')[:100] for hit in results[0]]
}
except Exception as e:
logger.error(f"Query failed: {str(e)}")
return {"error": str(e)}
@app.get("/health")
async def health():
return {"status": "healthy"}
---
04.时间分配建议
a.每日学习
工作日每天2-3小时,周末每天4-6小时,保持持续学习节奏。
b.理论与实践比例
理论学习30%,代码实践50%,项目实战20%,注重动手能力培养。
c.复习巩固
每周预留2-3小时复习本周内容,整理笔记,总结遇到的问题与解决方案。
1.5 岗位关联
01.目标岗位
a.LLM工程师
a.核心职责
负责大语言模型的工程化落地,包括模型部署、性能优化、服务监控等全流程工作,确保模型在生产环境稳定运行。
b.技能要求
a.部署能力
熟练使用vLLM、TGI等推理框架,掌握模型量化、并发优化、显存管理等技术。
b.优化能力
能够根据业务需求进行性能调优,包括推理延迟优化、吞吐量提升、成本控制等。
c.工程能力
具备API服务开发、容器化部署、监控告警等工程实践能力。
c.薪资水平
一线城市30-60K/月,核心能力为推理优化与生产部署经验。
b.AI训练工程师
a.核心职责
负责大模型的预训练与微调,使用分布式训练框架在多GPU集群上训练百亿至千亿参数模型。
b.技能要求
a.分布式训练
熟练使用DeepSpeed、FSDP、Megatron-LM等框架,掌握数据并行、模型并行、流水线并行等技术。
b.训练调优
能够诊断训练故障,优化训练速度,包括梯度累积、混合精度、通信优化等。
c.数据处理
掌握大规模数据预处理、清洗、去重等技术,理解数据质量对模型效果的影响。
c.薪资水平
一线城市35-70K/月,核心能力为多机多卡训练经验与调优能力。
c.MLOps工程师
a.核心职责
负责机器学习模型的全生命周期管理,包括模型版本控制、自动化部署、监控运维、A/B测试等。
b.技能要求
a.DevOps能力
熟练使用Docker、Kubernetes进行容器化部署,掌握CI/CD流程设计。
b.监控运维
能够搭建Prometheus、Grafana等监控系统,实现模型性能监控与告警。
c.成本优化
分析资源使用情况,优化GPU利用率,降低推理与训练成本。
c.薪资水平
一线城市30-55K/月,核心能力为自动化部署与生产运维经验。
d.向量数据库工程师
a.核心职责
负责向量数据库的选型、部署、优化,支持RAG、推荐系统等应用场景的高性能检索需求。
b.技能要求
a.数据库能力
熟练使用Milvus、Pinecone、Weaviate等向量数据库,掌握索引优化、查询调优等技术。
b.检索算法
理解HNSW、IVF、PQ等向量检索算法,能够根据数据规模选择合适的索引类型。
c.系统集成
能够将向量数据库与LLM、Embedding模型集成,构建端到端检索增强系统。
c.薪资水平
一线城市28-50K/月,核心能力为大规模向量检索优化经验。
02.能力映射
a.课程内容与岗位技能对应
a.模型部署章节
对应LLM工程师的核心能力,掌握vLLM、TGI部署与FastAPI服务封装。
b.模型压缩章节
对应LLM工程师的优化能力,掌握量化、剪枝等模型压缩技术。
c.分布式训练章节
对应AI训练工程师的核心能力,掌握DeepSpeed、FSDP等分布式框架。
d.向量数据库章节
对应向量数据库工程师与LLM工程师的检索能力,掌握Milvus等工具使用。
e.GPU优化章节
对应AI训练工程师与LLM工程师的性能优化能力,掌握混合精度、显存管理等技术。
f.生产环境实践章节
对应MLOps工程师的运维能力,掌握监控、负载均衡、容错设计等技术。
b.技能等级要求
a.初级
a.功能说明
能够在指导下完成模型部署与基础优化,使用现有工具解决常规问题。
b.代码示例
---
# 初级工程师任务:使用vLLM部署开源模型
from vllm import LLM, SamplingParams
# 按照文档配置部署参数
llm = LLM(
model="meta-llama/Llama-2-7b-hf", # 指定模型路径
tensor_parallel_size=1, # 单GPU部署
gpu_memory_utilization=0.8 # 显存利用率80%
)
# 基础推理测试
prompts = ["Hello, how are you?"]
sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(output.outputs[0].text)
---
b.中级
a.功能说明
能够独立完成模型部署与性能优化,根据业务需求调整配置参数,诊断常见问题。
b.代码示例
---
# 中级工程师任务:优化推理性能并监控
from vllm import LLM, SamplingParams
import torch
import time
import psutil
# 根据GPU型号优化配置
def get_optimal_config():
gpu_name = torch.cuda.get_device_name(0)
if "A100" in gpu_name:
return {
"tensor_parallel_size": 2,
"gpu_memory_utilization": 0.95,
"max_num_batched_tokens": 8192,
"max_num_seqs": 256
}
elif "V100" in gpu_name:
return {
"tensor_parallel_size": 1,
"gpu_memory_utilization": 0.85,
"max_num_batched_tokens": 4096,
"max_num_seqs": 128
}
else:
return {
"tensor_parallel_size": 1,
"gpu_memory_utilization": 0.8,
"max_num_batched_tokens": 2048,
"max_num_seqs": 64
}
config = get_optimal_config()
llm = LLM(model="meta-llama/Llama-2-13b-hf", **config)
# 性能基准测试
def benchmark_inference(llm, num_requests=100):
prompts = ["Explain quantum computing:"] * num_requests
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=128
)
start = time.time()
outputs = llm.generate(prompts, sampling_params)
elapsed = time.time() - start
total_tokens = sum(len(o.outputs[0].token_ids) for o in outputs)
throughput = total_tokens / elapsed
print(f"Requests: {num_requests}")
print(f"Total time: {elapsed:.2f}s")
print(f"Throughput: {throughput:.2f} tokens/s")
print(f"Latency per request: {elapsed/num_requests:.3f}s")
# 显存使用情况
print(f"GPU Memory: {torch.cuda.max_memory_allocated()/1024**3:.2f}GB")
benchmark_inference(llm)
---
c.高级
a.功能说明
能够设计大规模模型服务架构,优化分布式训练流程,解决复杂的性能瓶颈与系统故障。
b.代码示例
---
# 高级工程师任务:设计高可用推理集群
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import aioredis
from prometheus_client import Counter, Histogram, generate_latest
import logging
# Prometheus监控指标
REQUEST_COUNT = Counter('llm_requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('llm_request_latency_seconds', 'Request latency')
ERROR_COUNT = Counter('llm_errors_total', 'Total errors')
app = FastAPI()
logger = logging.getLogger(__name__)
# Redis连接池(用于请求队列与缓存)
redis = None
@app.on_event("startup")
async def startup():
global redis
redis = await aioredis.create_redis_pool('redis://localhost')
@app.on_event("shutdown")
async def shutdown():
redis.close()
await redis.wait_closed()
class InferenceRequest(BaseModel):
prompt: str
max_tokens: int = 256
temperature: float = 0.7
# 请求去重与缓存
async def get_cached_response(prompt: str):
cache_key = f"llm_cache:{hash(prompt)}"
cached = await redis.get(cache_key)
if cached:
logger.info(f"Cache hit for prompt: {prompt[:50]}")
return cached.decode()
return None
async def set_cached_response(prompt: str, response: str):
cache_key = f"llm_cache:{hash(prompt)}"
await redis.setex(cache_key, 3600, response) # 1小时过期
@app.post("/v1/completions")
@REQUEST_LATENCY.time()
async def completions(request: InferenceRequest):
REQUEST_COUNT.inc()
try:
# 检查缓存
cached = await get_cached_response(request.prompt)
if cached:
return {"text": cached, "cached": True}
# 负载均衡:选择负载最低的GPU节点
# (实际生产中使用服务发现与负载均衡器)
# 推理请求
# response = await inference_service.generate(request)
# 模拟推理
response = "Generated response"
# 缓存结果
await set_cached_response(request.prompt, response)
return {"text": response, "cached": False}
except Exception as e:
ERROR_COUNT.inc()
logger.error(f"Inference failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# Prometheus监控端点
@app.get("/metrics")
async def metrics():
return generate_latest()
# 健康检查(支持Kubernetes探针)
@app.get("/health/liveness")
async def liveness():
return {"status": "alive"}
@app.get("/health/readiness")
async def readiness():
# 检查模型是否加载完成
# 检查GPU是否可用
# 检查Redis连接是否正常
return {"status": "ready"}
---
03.职业发展路径
a.技术路线
a.初级LLM工程师
掌握基础部署与推理优化,0-2年经验,年薪30-45万。
b.中级LLM工程师
独立负责模型服务架构设计,2-4年经验,年薪45-70万。
c.高级LLM工程师
负责大规模分布式训练与推理集群,4-7年经验,年薪70-120万。
d.技术专家
引领团队技术方向,解决核心技术难题,7年以上经验,年薪120-200万。
b.管理路线
a.技术经理
管理LLM工程团队,协调资源与项目进度,3-5年经验,年薪60-100万。
b.技术总监
负责AI基础设施建设,制定技术战略,5-8年经验,年薪100-200万。
c.创业路线
积累技术与行业经验后,创立AI应用公司或加入早期创业团队担任技术合伙人。
04.企业需求分析
a.互联网大厂
a.岗位需求
需要大规模模型训练与推理经验,熟悉分布式系统,具备千卡集群运维能力。
b.技术栈
DeepSpeed、Megatron-LM、vLLM、Triton、Kubernetes等。
c.薪资范围
高级工程师50-100万,技术专家100-200万。
b.AI创业公司
a.岗位需求
需要全栈能力,从模型训练到部署运维全流程负责,快速迭代能力强。
b.技术栈
主流开源框架,注重工程效率与成本控制。
c.薪资范围
核心工程师40-80万+期权。
c.传统企业AI部门
a.岗位需求
需要稳定的模型部署与维护能力,注重系统可靠性与安全性。
b.技术栈
商业化解决方案为主,开源框架为辅。
c.薪资范围
30-60万,注重稳定性。
2 模型部署
2.1 推理框架
01.框架概述
a.推理引擎分类
a.通用推理框架
支持多种模型架构的推理引擎,包括ONNX Runtime、TensorRT、OpenVINO等,适用于CV、NLP等多个领域。
b.LLM专用框架
针对大语言模型优化的推理框架,如vLLM、TGI、FasterTransformer、LMDeploy等,针对自回归生成特性进行深度优化。
b.核心优化技术
a.PagedAttention
vLLM提出的显存管理机制,将KV缓存组织为不连续的内存块,类似操作系统的分页机制,显著提升显存利用率。
b.Continuous Batching
TGI实现的动态批处理技术,在生成过程中动态添加新请求,避免等待整批请求完成,提升吞吐量。
c.FlashAttention
优化注意力计算的CUDA kernel,通过分块计算减少HBM访问,降低推理延迟。
02.vLLM框架
a.架构设计
a.核心组件
包含调度器(Scheduler)、执行器(Executor)、KV缓存管理器(KV Cache Manager)等模块,采用异步执行架构。
b.工作流程
请求进入调度队列,调度器根据显存状态分配KV缓存块,执行器批量执行前向传播,输出token后更新缓存。
b.PagedAttention原理
a.内存管理
a.功能说明
将KV缓存划分为固定大小的Block,每个Block存储固定数量的token,通过Block Table映射逻辑位置到物理位置。
b.代码示例
---
# PagedAttention内存管理示意(简化版)
import torch
import math
class PagedKVCache:
"""模拟PagedAttention的KV缓存管理"""
def __init__(self, num_blocks, block_size, num_heads, head_dim):
"""
num_blocks: 总内存块数量
block_size: 每块存储的token数量(通常16)
num_heads: 注意力头数量
head_dim: 每个头的维度
"""
self.num_blocks = num_blocks
self.block_size = block_size
# 物理内存块: [num_blocks, block_size, num_heads, head_dim]
self.key_cache = torch.zeros(
num_blocks, block_size, num_heads, head_dim,
dtype=torch.float16, device='cuda'
)
self.value_cache = torch.zeros_like(self.key_cache)
# 空闲块列表
self.free_blocks = list(range(num_blocks))
# 每个序列的Block Table: {seq_id: [block_id1, block_id2, ...]}
self.block_tables = {}
def allocate_sequence(self, seq_id, num_tokens):
"""为新序列分配内存块"""
num_blocks_needed = math.ceil(num_tokens / self.block_size)
if len(self.free_blocks) < num_blocks_needed:
raise RuntimeError("Out of memory: no free blocks available")
# 分配块
allocated_blocks = [
self.free_blocks.pop(0) for _ in range(num_blocks_needed)
]
self.block_tables[seq_id] = allocated_blocks
print(f"Sequence {seq_id}: allocated {num_blocks_needed} blocks")
return allocated_blocks
def free_sequence(self, seq_id):
"""释放序列占用的内存块"""
if seq_id in self.block_tables:
blocks = self.block_tables.pop(seq_id)
self.free_blocks.extend(blocks)
print(f"Sequence {seq_id}: freed {len(blocks)} blocks")
def get_kv_cache(self, seq_id, position):
"""根据逻辑位置获取KV缓存"""
block_idx = position // self.block_size
offset = position % self.block_size
physical_block = self.block_tables[seq_id][block_idx]
k = self.key_cache[physical_block, offset]
v = self.value_cache[physical_block, offset]
return k, v
def append_kv(self, seq_id, position, k, v):
"""追加新的KV缓存"""
block_idx = position // self.block_size
offset = position % self.block_size
# 检查是否需要分配新块
if block_idx >= len(self.block_tables[seq_id]):
new_block = self.free_blocks.pop(0)
self.block_tables[seq_id].append(new_block)
physical_block = self.block_tables[seq_id][block_idx]
self.key_cache[physical_block, offset] = k
self.value_cache[physical_block, offset] = v
# 示例使用
cache_manager = PagedKVCache(
num_blocks=100, # 100个物理块
block_size=16, # 每块16个token
num_heads=32, # 32个注意力头
head_dim=128 # 每头128维
)
# 序列1需要30个token的缓存(需要2个块)
cache_manager.allocate_sequence(seq_id=1, num_tokens=30)
# 序列2需要50个token的缓存(需要4个块)
cache_manager.allocate_sequence(seq_id=2, num_tokens=50)
# 序列1完成,释放内存
cache_manager.free_sequence(seq_id=1)
# 内存可以被新序列复用
cache_manager.allocate_sequence(seq_id=3, num_tokens=20)
---
b.优势分析
相比传统连续内存,PagedAttention避免内存碎片化,显存利用率从约20%提升至90%以上,支持更大的并发批大小。
c.部署实战
a.基础部署
a.功能说明
使用vLLM部署Llama-2模型,支持张量并行与流式响应。
b.代码示例
---
# vLLM完整部署示例
from vllm import LLM, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
import asyncio
# 方式1:同步推理(简单场景)
def sync_inference_demo():
# 初始化LLM引擎
llm = LLM(
model="meta-llama/Llama-2-13b-hf",
tensor_parallel_size=2, # 使用2张GPU进行张量并行
dtype="float16", # 使用FP16降低显存
gpu_memory_utilization=0.9, # 显存利用率90%
max_model_len=4096, # 最大上下文长度
max_num_batched_tokens=8192, # 批处理token数
max_num_seqs=128, # 最大并发序列数
trust_remote_code=True
)
# 配置采样参数
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
top_k=50,
max_tokens=512,
frequency_penalty=0.1, # 频率惩罚
presence_penalty=0.1 # 存在惩罚
)
# 批量推理
prompts = [
"[INST] Explain quantum entanglement in simple terms. [/INST]",
"[INST] What are the main differences between Python and JavaScript? [/INST]",
"[INST] How does blockchain technology work? [/INST]"
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
tokens = output.outputs[0].token_ids
finish_reason = output.outputs[0].finish_reason
print(f"Prompt: {prompt[:50]}...")
print(f"Generated: {generated_text}")
print(f"Tokens: {len(tokens)}, Finish: {finish_reason}\n")
# 方式2:异步推理(高并发场景)
async def async_inference_demo():
# 异步引擎配置
engine_args = AsyncEngineArgs(
model="meta-llama/Llama-2-13b-hf",
tensor_parallel_size=2,
dtype="float16",
gpu_memory_utilization=0.9
)
# 创建异步引擎
engine = AsyncLLMEngine.from_engine_args(engine_args)
# 采样参数
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=256
)
# 提交多个异步请求
async def generate_text(prompt, request_id):
"""异步生成单个请求"""
results_generator = engine.generate(
prompt,
sampling_params,
request_id
)
final_output = None
async for request_output in results_generator:
final_output = request_output
text = final_output.outputs[0].text
print(f"[Request {request_id}] Generated: {text[:100]}...")
return text
# 并发执行100个请求
tasks = [
generate_text(f"Tell me about topic {i}", str(i))
for i in range(100)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
# 运行同步示例
sync_inference_demo()
# 运行异步示例
# asyncio.run(async_inference_demo())
---
b.性能监控
a.功能说明
监控vLLM的吞吐量、延迟、显存使用等指标。
b.代码示例
---
# vLLM性能监控脚本
import torch
import time
from vllm import LLM, SamplingParams
import psutil
import GPUtil
def benchmark_vllm(model_name, num_prompts=100):
"""基准测试vLLM性能"""
# 初始化模型
print(f"Loading model: {model_name}")
start_load = time.time()
llm = LLM(
model=model_name,
tensor_parallel_size=2,
gpu_memory_utilization=0.9
)
load_time = time.time() - start_load
print(f"Model loaded in {load_time:.2f}s\n")
# 采样参数
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=128
)
# 生成测试prompt
prompts = [
f"Question {i}: Explain artificial intelligence."
for i in range(num_prompts)
]
# 预热(避免首次推理开销影响结果)
print("Warming up...")
_ = llm.generate(prompts[:10], sampling_params)
# 正式测试
print(f"Benchmarking with {num_prompts} prompts...")
torch.cuda.reset_peak_memory_stats()
start_time = time.time()
outputs = llm.generate(prompts, sampling_params)
elapsed_time = time.time() - start_time
# 统计指标
total_input_tokens = sum(len(p.split()) * 1.3 for p in prompts) # 估算
total_output_tokens = sum(len(o.outputs[0].token_ids) for o in outputs)
total_tokens = total_input_tokens + total_output_tokens
throughput = total_output_tokens / elapsed_time
latency_per_request = elapsed_time / num_prompts
# GPU显存使用
max_memory_allocated = torch.cuda.max_memory_allocated() / (1024 ** 3)
# 打印结果
print("\n" + "="*60)
print("BENCHMARK RESULTS")
print("="*60)
print(f"Model: {model_name}")
print(f"Number of requests: {num_prompts}")
print(f"Total time: {elapsed_time:.2f}s")
print(f"Latency per request: {latency_per_request:.3f}s")
print(f"Throughput: {throughput:.2f} tokens/s")
print(f"Total output tokens: {int(total_output_tokens)}")
print(f"Peak GPU memory: {max_memory_allocated:.2f} GB")
print("="*60)
# GPU利用率
gpus = GPUtil.getGPUs()
for gpu in gpus:
print(f"GPU {gpu.id}: {gpu.name}")
print(f" Memory Used: {gpu.memoryUsed}MB / {gpu.memoryTotal}MB")
print(f" GPU Load: {gpu.load * 100:.1f}%")
# 运行基准测试
benchmark_vllm("meta-llama/Llama-2-7b-hf", num_prompts=100)
---
03.TGI框架
a.架构设计
a.Rust核心
TGI使用Rust编写核心推理引擎,内存安全且性能优异,Python层提供API接口。
b.Flash Attention集成
内置FlashAttention v2,降低注意力计算的内存占用与延迟。
b.Continuous Batching
a.功能说明
动态批处理机制,在生成过程中实时添加新请求,避免队头阻塞,提升吞吐量30%-50%。
b.工作原理
维护活跃请求队列,每生成一个token后检查新请求,将新请求加入当前batch,已完成的请求立即移除。
c.部署实战
a.Docker部署
a.功能说明
使用官方Docker镜像快速部署TGI服务。
b.代码示例
---
# TGI Docker部署脚本
#!/bin/bash
# 拉取TGI镜像
docker pull ghcr.io/huggingface/text-generation-inference:latest
# 设置环境变量
MODEL_ID="meta-llama/Llama-2-13b-chat-hf"
VOLUME_PATH="/data/models"
NUM_SHARDS=2 # 使用2张GPU
# 启动TGI服务
docker run --gpus all \
--shm-size 1g \
-p 8080:80 \
-v $VOLUME_PATH:/data \
-e HUGGING_FACE_HUB_TOKEN=$HF_TOKEN \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id $MODEL_ID \
--num-shard $NUM_SHARDS \
--max-concurrent-requests 128 \
--max-total-tokens 4096 \
--max-input-length 2048 \
--max-batch-prefill-tokens 8192 \
--dtype float16 \
--quantize bitsandbytes-nf4 # 启用4-bit量化
# 等待服务启动
echo "Waiting for TGI to start..."
sleep 30
# 健康检查
curl http://localhost:8080/health
# 测试推理
curl http://localhost:8080/generate \
-X POST \
-d '{"inputs":"What is deep learning?","parameters":{"max_new_tokens":100}}' \
-H 'Content-Type: application/json'
---
b.客户端调用
a.功能说明
使用HTTP客户端或Python SDK调用TGI服务。
b.代码示例
---
# TGI客户端调用示例
import requests
from typing import Iterator
import json
class TGIClient:
"""TGI推理客户端"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url
def generate(
self,
prompt: str,
max_new_tokens: int = 128,
temperature: float = 0.7,
top_p: float = 0.95,
repetition_penalty: float = 1.1,
stream: bool = False
):
"""生成文本"""
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"top_p": top_p,
"repetition_penalty": repetition_penalty,
"do_sample": True,
"return_full_text": False
}
}
if stream:
return self._generate_stream(payload)
else:
return self._generate_sync(payload)
def _generate_sync(self, payload):
"""同步生成"""
response = requests.post(
f"{self.base_url}/generate",
json=payload,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
return result[0]["generated_text"]
else:
raise Exception(f"Generation failed: {response.text}")
def _generate_stream(self, payload) -> Iterator[str]:
"""流式生成"""
response = requests.post(
f"{self.base_url}/generate_stream",
json=payload,
headers={"Content-Type": "application/json"},
stream=True
)
for line in response.iter_lines():
if line:
data = json.loads(line.decode('utf-8').replace('data:', ''))
if "token" in data:
yield data["token"]["text"]
# 使用示例
client = TGIClient()
# 同步生成
result = client.generate(
prompt="Explain quantum computing:",
max_new_tokens=200,
temperature=0.8
)
print(f"Generated: {result}")
# 流式生成
print("\nStreaming generation:")
for token in client.generate(
prompt="Write a short poem about AI:",
max_new_tokens=100,
stream=True
):
print(token, end='', flush=True)
---
04.框架对比
a.性能对比
a.吞吐量测试
vLLM在高并发场景下吞吐量通常高于TGI 10%-20%,得益于PagedAttention的显存优化。
b.延迟测试
TGI的首token延迟略低于vLLM,Rust核心带来更快的请求处理。
b.功能对比
a.vLLM优势
PagedAttention显存利用率高,支持更大并发,Python生态集成友好,易于定制化开发。
b.TGI优势
官方支持更多模型,Docker部署简单,Continuous Batching提升交互式场景吞吐量,生产级稳定性好。
c.选型建议
高并发批处理选vLLM,交互式聊天服务选TGI,资源受限场景选TGI(量化支持更好),需要深度定制选vLLM。
2.2 vLLM
01.核心特性
a.PagedAttention机制
通过分页内存管理,将KV缓存组织为不连续的Block,显存利用率提升至90%以上,支持更大的batch size。
b.高吞吐量
在高并发场景下,吞吐量比HuggingFace Transformers提升10-20倍,比FasterTransformer提升2-3倍。
02.安装配置
a.环境准备
a.功能说明
安装vLLM及其依赖,配置CUDA环境。
b.代码示例
---
# vLLM安装脚本
pip install vllm==0.2.7
pip install transformers accelerate
python -c "import vllm; print(vllm.__version__)"
---
b.张量并行部署
a.功能说明
使用多GPU进行张量并行,提升大模型推理性能。
b.代码示例
---
from vllm import LLM, SamplingParams
import torch
llm = LLM(
model="meta-llama/Llama-2-13b-hf",
tensor_parallel_size=2,
dtype="float16",
gpu_memory_utilization=0.9
)
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
outputs = llm.generate(["Explain AI:"], sampling_params)
print(outputs[0].outputs[0].text)
---
03.性能优化
a.量化推理
a.功能说明
使用AWQ量化降低显存占用,提升推理速度。
b.代码示例
---
from vllm import LLM, SamplingParams
llm = LLM(
model="TheBloke/Llama-2-13B-AWQ",
quantization="awq",
dtype="float16"
)
sampling_params = SamplingParams(temperature=0.7, max_tokens=128)
outputs = llm.generate(["What is ML?"], sampling_params)
print(f"Generated: {outputs[0].outputs[0].text}")
---
b.批处理调优
a.功能说明
调整批处理参数平衡吞吐量与延迟。
b.代码示例
---
from vllm import LLM, SamplingParams
llm = LLM(
model="meta-llama/Llama-2-7b-hf",
max_num_seqs=128,
max_num_batched_tokens=8192
)
prompts = [f"Question {i}:" for i in range(100)]
sampling_params = SamplingParams(max_tokens=128)
outputs = llm.generate(prompts, sampling_params)
print(f"Processed {len(outputs)} requests")
---
2.3 TGI
01.核心特性
a.Continuous Batching
动态批处理机制,在生成过程中实时添加新请求,避免队头阻塞,提升吞吐量30%-50%。
b.Flash Attention集成
内置FlashAttention v2,降低注意力计算的内存占用与延迟,支持更长上下文。
c.生产级稳定性
Rust核心引擎,内存安全且性能优异,官方维护,适合生产环境部署。
02.Docker部署
a.基础部署
a.功能说明
使用Docker快速部署TGI服务,支持多GPU和量化。
b.代码示例
---
# TGI Docker部署脚本
docker run --gpus all --shm-size 1g -p 8080:80 \
-v /data/models:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-2-13b-chat-hf \
--num-shard 2 \
--max-concurrent-requests 128 \
--max-total-tokens 4096 \
--dtype float16
# 健康检查
curl http://localhost:8080/health
# 测试推理
curl http://localhost:8080/generate \
-X POST \
-d '{"inputs":"What is AI?","parameters":{"max_new_tokens":100}}' \
-H 'Content-Type: application/json'
---
b.量化部署
a.功能说明
启用4-bit量化降低显存占用。
b.代码示例
---
# TGI 4-bit量化部署
docker run --gpus all --shm-size 1g -p 8080:80 \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-2-70b-chat-hf \
--num-shard 4 \
--quantize bitsandbytes-nf4 \
--max-batch-prefill-tokens 8192
---
03.客户端调用
a.HTTP客户端
a.功能说明
使用Python requests库调用TGI服务。
b.代码示例
---
import requests
import json
class TGIClient:
def __init__(self, base_url="http://localhost:8080"):
self.base_url = base_url
def generate(self, prompt, max_tokens=128, temperature=0.7):
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_tokens,
"temperature": temperature,
"top_p": 0.95,
"do_sample": True
}
}
response = requests.post(
f"{self.base_url}/generate",
json=payload
)
return response.json()[0]["generated_text"]
def generate_stream(self, prompt, max_tokens=128):
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": max_tokens}
}
response = requests.post(
f"{self.base_url}/generate_stream",
json=payload,
stream=True
)
for line in response.iter_lines():
if line:
data = json.loads(line.decode('utf-8').replace('data:', ''))
if "token" in data:
yield data["token"]["text"]
client = TGIClient()
result = client.generate("Explain quantum computing:")
print(result)
for token in client.generate_stream("Write a poem:"):
print(token, end='', flush=True)
---
b.性能监控
a.功能说明
监控TGI服务的吞吐量和延迟指标。
b.代码示例
---
import requests
import time
def benchmark_tgi(base_url, num_requests=100):
client = TGIClient(base_url)
prompts = [f"Question {i}: Explain AI." for i in range(num_requests)]
start = time.time()
for prompt in prompts:
_ = client.generate(prompt, max_tokens=128)
elapsed = time.time() - start
throughput = num_requests / elapsed
latency = elapsed / num_requests
print(f"Requests: {num_requests}")
print(f"Total time: {elapsed:.2f}s")
print(f"Throughput: {throughput:.2f} req/s")
print(f"Latency: {latency:.3f}s")
benchmark_tgi("http://localhost:8080")
---
2.4 FastAPI服务化
01.服务架构
a.RESTful API设计
使用FastAPI构建标准化推理接口,支持同步和流式响应,提供健康检查和监控端点。
b.异步处理
利用Python asyncio实现高并发请求处理,提升服务吞吐量。
c.请求队列
使用Redis或内存队列管理请求,实现负载均衡和限流。
02.基础服务
a.同步推理接口
a.功能说明
实现标准的同步推理API,返回完整生成结果。
b.代码示例
---
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from vllm import LLM, SamplingParams
import uvicorn
app = FastAPI(title="LLM Inference Service")
llm = LLM(
model="meta-llama/Llama-2-7b-chat-hf",
tensor_parallel_size=1,
gpu_memory_utilization=0.9
)
class GenerateRequest(BaseModel):
prompt: str
max_tokens: int = 256
temperature: float = 0.7
top_p: float = 0.95
class GenerateResponse(BaseModel):
text: str
tokens: int
finish_reason: str
@app.post("/v1/generate", response_model=GenerateResponse)
async def generate(request: GenerateRequest):
try:
sampling_params = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens
)
outputs = llm.generate([request.prompt], sampling_params)
output = outputs[0].outputs[0]
return GenerateResponse(
text=output.text,
tokens=len(output.token_ids),
finish_reason=output.finish_reason
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
---
b.流式推理接口
a.功能说明
实现流式响应,逐token返回生成结果,降低首token延迟。
b.代码示例
---
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import json
app = FastAPI()
class StreamRequest(BaseModel):
prompt: str
max_tokens: int = 256
temperature: float = 0.7
@app.post("/v1/generate_stream")
async def generate_stream(request: StreamRequest):
async def stream_generator():
sampling_params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens
)
for output in llm.generate([request.prompt], sampling_params):
chunk = {
"text": output.outputs[0].text,
"finish_reason": output.outputs[0].finish_reason
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.01)
return StreamingResponse(
stream_generator(),
media_type="text/event-stream"
)
---
03.高级特性
a.请求限流
a.功能说明
使用令牌桶算法限制请求速率,防止服务过载。
b.代码示例
---
from fastapi import FastAPI, HTTPException, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/v1/generate")
@limiter.limit("10/minute")
async def generate(request: Request, gen_request: GenerateRequest):
sampling_params = SamplingParams(
temperature=gen_request.temperature,
max_tokens=gen_request.max_tokens
)
outputs = llm.generate([gen_request.prompt], sampling_params)
return {"text": outputs[0].outputs[0].text}
---
b.请求缓存
a.功能说明
使用Redis缓存相同请求的结果,减少重复计算。
b.代码示例
---
from fastapi import FastAPI
import redis
import hashlib
import json
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cache_key(prompt: str, params: dict) -> str:
content = f"{prompt}:{json.dumps(params, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
@app.post("/v1/generate")
async def generate(request: GenerateRequest):
cache_key = get_cache_key(
request.prompt,
{"temp": request.temperature, "max_tokens": request.max_tokens}
)
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
sampling_params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens
)
outputs = llm.generate([request.prompt], sampling_params)
result = {"text": outputs[0].outputs[0].text}
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
---
c.监控指标
a.功能说明
集成Prometheus监控请求量、延迟、错误率等指标。
b.代码示例
---
from fastapi import FastAPI
from prometheus_client import Counter, Histogram, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
from fastapi.responses import Response
import time
app = FastAPI()
REQUEST_COUNT = Counter('llm_requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('llm_request_latency_seconds', 'Request latency')
ERROR_COUNT = Counter('llm_errors_total', 'Total errors')
@app.post("/v1/generate")
async def generate(request: GenerateRequest):
REQUEST_COUNT.inc()
start = time.time()
try:
sampling_params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens
)
outputs = llm.generate([request.prompt], sampling_params)
result = {"text": outputs[0].outputs[0].text}
REQUEST_LATENCY.observe(time.time() - start)
return result
except Exception as e:
ERROR_COUNT.inc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
---
2.5 性能优化
01.推理加速
a.KV缓存优化
a.功能说明
优化KV缓存管理,减少显存占用,提升批处理能力。
b.代码示例
---
from vllm import LLM, SamplingParams
llm = LLM(
model="meta-llama/Llama-2-13b-hf",
tensor_parallel_size=2,
gpu_memory_utilization=0.95,
max_num_seqs=256,
max_num_batched_tokens=16384,
enable_prefix_caching=True
)
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
prompts = [f"Question {i}: Explain AI." for i in range(200)]
outputs = llm.generate(prompts, sampling_params)
print(f"Processed {len(outputs)} requests")
---
b.批处理策略
a.功能说明
动态调整批大小,平衡吞吐量与延迟。
b.代码示例
---
import time
from vllm import LLM, SamplingParams
def benchmark_batch_size(model_name, batch_sizes):
results = []
for batch_size in batch_sizes:
llm = LLM(
model=model_name,
max_num_seqs=batch_size,
gpu_memory_utilization=0.9
)
prompts = [f"Q{i}:" for i in range(batch_size * 2)]
sampling_params = SamplingParams(max_tokens=128)
start = time.time()
outputs = llm.generate(prompts, sampling_params)
elapsed = time.time() - start
throughput = len(outputs) / elapsed
results.append({
"batch_size": batch_size,
"throughput": throughput,
"latency": elapsed / len(outputs)
})
for r in results:
print(f"Batch {r['batch_size']}: {r['throughput']:.2f} req/s")
benchmark_batch_size("meta-llama/Llama-2-7b-hf", [32, 64, 128])
---
02.显存优化
a.量化部署
a.功能说明
使用INT8/INT4量化降低显存占用50%-75%。
b.代码示例
---
from vllm import LLM, SamplingParams
import torch
llm_fp16 = LLM(
model="meta-llama/Llama-2-13b-hf",
dtype="float16"
)
llm_awq = LLM(
model="TheBloke/Llama-2-13B-AWQ",
quantization="awq",
dtype="float16"
)
sampling_params = SamplingParams(max_tokens=128)
prompt = ["Explain quantum computing:"]
outputs_fp16 = llm_fp16.generate(prompt, sampling_params)
mem_fp16 = torch.cuda.max_memory_allocated() / 1024**3
torch.cuda.reset_peak_memory_stats()
outputs_awq = llm_awq.generate(prompt, sampling_params)
mem_awq = torch.cuda.max_memory_allocated() / 1024**3
print(f"FP16 Memory: {mem_fp16:.2f}GB")
print(f"AWQ Memory: {mem_awq:.2f}GB")
print(f"Memory Saved: {(1 - mem_awq/mem_fp16)*100:.1f}%")
---
b.模型卸载
a.功能说明
将部分权重卸载到CPU,支持更大模型部署。
b.代码示例
---
from vllm import LLM, SamplingParams
llm = LLM(
model="meta-llama/Llama-2-70b-hf",
tensor_parallel_size=4,
gpu_memory_utilization=0.95,
swap_space=16,
cpu_offload_gb=32
)
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
outputs = llm.generate(["Explain relativity:"], sampling_params)
print(outputs[0].outputs[0].text)
---
03.并发优化
a.异步推理
a.功能说明
使用异步引擎处理高并发请求。
b.代码示例
---
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm import SamplingParams
import asyncio
async def async_inference():
engine_args = AsyncEngineArgs(
model="meta-llama/Llama-2-7b-hf",
tensor_parallel_size=1,
gpu_memory_utilization=0.9
)
engine = AsyncLLMEngine.from_engine_args(engine_args)
sampling_params = SamplingParams(temperature=0.7, max_tokens=128)
async def generate_one(prompt, request_id):
results = engine.generate(prompt, sampling_params, request_id)
final_output = None
async for output in results:
final_output = output
return final_output.outputs[0].text
tasks = [
generate_one(f"Question {i}:", str(i))
for i in range(100)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(async_inference())
---
b.负载均衡
a.功能说明
使用多实例部署和负载均衡提升服务能力。
b.代码示例
---
from fastapi import FastAPI
import httpx
import random
app = FastAPI()
BACKEND_URLS = [
"http://gpu-node-1:8000",
"http://gpu-node-2:8000",
"http://gpu-node-3:8000"
]
@app.post("/v1/generate")
async def generate(request: dict):
backend = random.choice(BACKEND_URLS)
async with httpx.AsyncClient() as client:
response = await client.post(
f"{backend}/v1/generate",
json=request,
timeout=30.0
)
return response.json()
@app.get("/health")
async def health():
healthy_backends = []
async with httpx.AsyncClient() as client:
for url in BACKEND_URLS:
try:
r = await client.get(f"{url}/health", timeout=2.0)
if r.status_code == 200:
healthy_backends.append(url)
except:
pass
return {
"total": len(BACKEND_URLS),
"healthy": len(healthy_backends)
}
---
04.性能监控
a.指标采集
a.功能说明
采集推理延迟、吞吐量、GPU利用率等关键指标。
b.代码示例
---
from prometheus_client import Counter, Histogram, Gauge
import torch
import time
REQUEST_COUNT = Counter('inference_requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('inference_latency_seconds', 'Latency')
GPU_MEMORY = Gauge('gpu_memory_used_bytes', 'GPU memory', ['gpu_id'])
THROUGHPUT = Gauge('inference_throughput_tokens_per_sec', 'Throughput')
def monitor_inference(llm, prompts, sampling_params):
REQUEST_COUNT.inc(len(prompts))
start = time.time()
outputs = llm.generate(prompts, sampling_params)
elapsed = time.time() - start
REQUEST_LATENCY.observe(elapsed / len(prompts))
total_tokens = sum(len(o.outputs[0].token_ids) for o in outputs)
THROUGHPUT.set(total_tokens / elapsed)
for i in range(torch.cuda.device_count()):
mem = torch.cuda.memory_allocated(i)
GPU_MEMORY.labels(gpu_id=str(i)).set(mem)
return outputs
---
b.性能分析
a.功能说明
使用Profiler分析性能瓶颈。
b.代码示例
---
import torch
from torch.profiler import profile, ProfilerActivity
from vllm import LLM, SamplingParams
llm = LLM(model="meta-llama/Llama-2-7b-hf")
sampling_params = SamplingParams(max_tokens=128)
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True,
profile_memory=True
) as prof:
outputs = llm.generate(["Explain AI:"], sampling_params)
print(prof.key_averages().table(
sort_by="cuda_time_total",
row_limit=10
))
prof.export_chrome_trace("trace.json")
---
3 模型压缩
3.1 量化技术
01.量化原理
a.数值表示
将FP32/FP16权重映射到INT8/INT4,通过缩放因子和零点实现精度转换,降低存储和计算开销。
b.量化方式
a.对称量化
零点为0,量化公式为Q=round(x/scale),适用于权重分布对称的场景。
b.非对称量化
零点非0,量化公式为Q=round(x/scale)+zero_point,适应任意分布,精度更高。
02.量化方法
a.训练后量化PTQ
a.功能说明
模型训练完成后直接量化,无需重新训练,速度快但精度损失较大。
b.代码示例
---
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
torch_dtype=torch.float16,
device_map="auto"
)
model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
inputs = tokenizer("Explain AI:", return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=100)
print(tokenizer.decode(outputs[0]))
---
b.量化感知训练QAT
a.功能说明
训练过程中模拟量化操作,模型学习适应量化误差,精度损失小但训练成本高。
b.代码示例
---
import torch
import torch.nn as nn
from torch.quantization import prepare_qat, convert
class QuantizedModel(nn.Module):
def __init__(self, model):
super().__init__()
self.model = model
self.quant = torch.quantization.QuantStub()
self.dequant = torch.quantization.DeQuantStub()
def forward(self, x):
x = self.quant(x)
x = self.model(x)
x = self.dequant(x)
return x
model = AutoModelForCausalLM.from_pretrained("gpt2")
qat_model = QuantizedModel(model)
qat_model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
qat_model = prepare_qat(qat_model)
for epoch in range(3):
for batch in train_loader:
outputs = qat_model(batch)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
qat_model = convert(qat_model)
---
03.量化粒度
a.逐层量化
a.功能说明
每层使用独立的缩放因子,精度高但参数多。
b.代码示例
---
import torch
def per_layer_quantize(weight, bits=8):
qmin = -(2 ** (bits - 1))
qmax = 2 ** (bits - 1) - 1
scale = (weight.max() - weight.min()) / (qmax - qmin)
zero_point = qmin - weight.min() / scale
q_weight = torch.clamp(
torch.round(weight / scale + zero_point),
qmin, qmax
).to(torch.int8)
return q_weight, scale, zero_point
weight = torch.randn(1024, 1024)
q_weight, scale, zero_point = per_layer_quantize(weight)
dequant_weight = (q_weight - zero_point) * scale
error = torch.abs(weight - dequant_weight).mean()
print(f"Quantization error: {error:.6f}")
---
b.逐通道量化
a.功能说明
每个输出通道独立量化,适用于卷积层,精度更高。
b.代码示例
---
def per_channel_quantize(weight, bits=8):
out_channels = weight.shape[0]
qmin = -(2 ** (bits - 1))
qmax = 2 ** (bits - 1) - 1
scales = []
zero_points = []
q_weights = []
for i in range(out_channels):
channel_weight = weight[i]
scale = (channel_weight.max() - channel_weight.min()) / (qmax - qmin)
zero_point = qmin - channel_weight.min() / scale
q_w = torch.clamp(
torch.round(channel_weight / scale + zero_point),
qmin, qmax
).to(torch.int8)
q_weights.append(q_w)
scales.append(scale)
zero_points.append(zero_point)
return torch.stack(q_weights), scales, zero_points
weight = torch.randn(512, 1024)
q_weight, scales, zero_points = per_channel_quantize(weight)
print(f"Quantized shape: {q_weight.shape}")
---
04.混合精度量化
a.敏感层分析
a.功能说明
识别对量化敏感的层,保持FP16精度,其他层量化为INT8。
b.代码示例
---
import torch
from transformers import AutoModelForCausalLM
def analyze_layer_sensitivity(model, calibration_data):
sensitivities = {}
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
original_weight = module.weight.data.clone()
q_weight, _, _ = per_layer_quantize(original_weight)
module.weight.data = q_weight.float()
with torch.no_grad():
outputs = model(**calibration_data)
loss = outputs.loss
sensitivities[name] = loss.item()
module.weight.data = original_weight
return sensitivities
model = AutoModelForCausalLM.from_pretrained("gpt2")
calibration_data = {"input_ids": torch.randint(0, 1000, (1, 128))}
sensitivities = analyze_layer_sensitivity(model, calibration_data)
sensitive_layers = sorted(sensitivities.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top 5 sensitive layers:")
for name, loss in sensitive_layers:
print(f"{name}: {loss:.4f}")
---
b.混合精度配置
a.功能说明
根据敏感度配置不同层的量化精度。
b.代码示例
---
def mixed_precision_quantize(model, sensitive_layers, bits=8):
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
if name in sensitive_layers:
continue
q_weight, scale, zero_point = per_layer_quantize(
module.weight.data, bits
)
module.weight.data = (q_weight - zero_point) * scale
return model
sensitive_layer_names = [name for name, _ in sensitive_layers]
quantized_model = mixed_precision_quantize(model, sensitive_layer_names)
---
3.2 剪枝技术
01.剪枝原理
a.结构化剪枝
移除整个神经元、通道或层,保持模型结构规整,硬件加速友好。
b.非结构化剪枝
移除单个权重参数,压缩率高但需要稀疏矩阵运算支持。
02.剪枝方法
a.幅度剪枝
a.功能说明
根据权重绝对值大小剪枝,简单高效。
b.代码示例
---
import torch
import torch.nn as nn
def magnitude_prune(model, sparsity=0.5):
for name, module in model.named_modules():
if isinstance(module, nn.Linear):
weight = module.weight.data
threshold = torch.quantile(torch.abs(weight), sparsity)
mask = torch.abs(weight) > threshold
module.weight.data *= mask.float()
return model
model = nn.Linear(1024, 1024)
pruned_model = magnitude_prune(model, sparsity=0.5)
sparsity_ratio = (pruned_model.weight == 0).float().mean()
print(f"Sparsity: {sparsity_ratio:.2%}")
---
b.梯度剪枝
a.功能说明
根据梯度信息剪枝,保留对损失影响大的权重。
b.代码示例
---
def gradient_prune(model, dataloader, sparsity=0.5):
gradients = {}
for name, param in model.named_parameters():
if 'weight' in name:
gradients[name] = torch.zeros_like(param)
for batch in dataloader:
outputs = model(batch)
loss = criterion(outputs, labels)
loss.backward()
for name, param in model.named_parameters():
if 'weight' in name and param.grad is not None:
gradients[name] += torch.abs(param.grad)
for name, param in model.named_parameters():
if 'weight' in name:
grad = gradients[name]
threshold = torch.quantile(grad, sparsity)
mask = grad > threshold
param.data *= mask.float()
return model
---
3.3 蒸馏技术
01.蒸馏原理
a.知识迁移
大模型(教师)的输出分布作为软标签,指导小模型(学生)学习,保留泛化能力。
b.温度参数
通过温度T软化输出分布,T越大分布越平滑,学生模型学习到更多暗知识。
02.蒸馏方法
a.响应蒸馏
a.功能说明
学生模型学习教师模型的输出logits分布。
b.代码示例
---
import torch
import torch.nn as nn
import torch.nn.functional as F
def distillation_loss(student_logits, teacher_logits, labels, T=3.0, alpha=0.7):
soft_loss = F.kl_div(
F.log_softmax(student_logits / T, dim=-1),
F.softmax(teacher_logits / T, dim=-1),
reduction='batchmean'
) * (T ** 2)
hard_loss = F.cross_entropy(student_logits, labels)
return alpha * soft_loss + (1 - alpha) * hard_loss
teacher_model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-13b-hf")
student_model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
for batch in train_loader:
with torch.no_grad():
teacher_outputs = teacher_model(**batch)
student_outputs = student_model(**batch)
loss = distillation_loss(
student_outputs.logits,
teacher_outputs.logits,
batch['labels']
)
loss.backward()
optimizer.step()
---
b.特征蒸馏
a.功能说明
学生模型学习教师模���的中间层特征表示。
b.代码示例
---
def feature_distillation_loss(student_features, teacher_features):
loss = 0
for s_feat, t_feat in zip(student_features, teacher_features):
loss += F.mse_loss(s_feat, t_feat)
return loss / len(student_features)
teacher_features = []
student_features = []
def get_features(module, input, output):
teacher_features.append(output)
for layer in teacher_model.model.layers[::4]:
layer.register_forward_hook(get_features)
with torch.no_grad():
teacher_model(**batch)
for layer in student_model.model.layers[::2]:
layer.register_forward_hook(lambda m, i, o: student_features.append(o))
student_model(**batch)
feat_loss = feature_distillation_loss(student_features, teacher_features)
---
3.4 INT8量化实战
01.BitsAndBytes量化
a.8bit量化
a.功能说明
使用BitsAndBytes库进行INT8量化,降低显存占用50%。
b.代码示例
---
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-13b-hf",
quantization_config=quantization_config,
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-13b-hf")
inputs = tokenizer("Explain quantum computing:", return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=256)
print(tokenizer.decode(outputs[0]))
print(f"Memory: {torch.cuda.max_memory_allocated()/1024**3:.2f}GB")
---
b.4bit量化
a.功能说明
使用NF4量化,显存占用降低75%,精度损失小。
b.代码示例
---
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4"
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-70b-hf",
quantization_config=quantization_config,
device_map="auto"
)
inputs = tokenizer("What is AI?", return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=128)
print(tokenizer.decode(outputs[0]))
---
02.精度评估
a.困惑度对比
a.功能说明
对比量化前后模型的困惑度,评估精度损失。
b.代码示例
---
import torch
from torch.utils.data import DataLoader
def evaluate_perplexity(model, dataloader):
model.eval()
total_loss = 0
total_tokens = 0
with torch.no_grad():
for batch in dataloader:
outputs = model(**batch)
loss = outputs.loss
total_loss += loss.item() * batch['input_ids'].numel()
total_tokens += batch['input_ids'].numel()
perplexity = torch.exp(torch.tensor(total_loss / total_tokens))
return perplexity.item()
fp16_model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
torch_dtype=torch.float16,
device_map="auto"
)
int8_model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
load_in_8bit=True,
device_map="auto"
)
fp16_ppl = evaluate_perplexity(fp16_model, eval_dataloader)
int8_ppl = evaluate_perplexity(int8_model, eval_dataloader)
print(f"FP16 Perplexity: {fp16_ppl:.2f}")
print(f"INT8 Perplexity: {int8_ppl:.2f}")
print(f"Degradation: {(int8_ppl - fp16_ppl) / fp16_ppl * 100:.2f}%")
---
3.5 GPTQ/AWQ
01.GPTQ量化
a.原理
基于最优脑量化OBQ,通过Hessian矩阵逆近似最小化量化误差,支持3-4bit量化。
b.实战
a.功能说明
使用AutoGPTQ对Llama-2进行4bit量化。
b.代码示例
---
from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
quantize_config = BaseQuantizeConfig(
bits=4,
group_size=128,
desc_act=False,
damp_percent=0.01
)
model = AutoGPTQForCausalLM.from_pretrained(
"meta-llama/Llama-2-13b-hf",
quantize_config=quantize_config
)
calibration_data = [
"The future of AI is",
"Quantum computing will",
"Machine learning enables"
]
model.quantize(calibration_data, batch_size=1)
model.save_quantized("./llama-2-13b-gptq")
quantized_model = AutoGPTQForCausalLM.from_quantized(
"./llama-2-13b-gptq",
device="cuda:0"
)
inputs = tokenizer("Explain AI:", return_tensors="pt").to("cuda")
outputs = quantized_model.generate(**inputs, max_new_tokens=200)
print(tokenizer.decode(outputs[0]))
---
02.AWQ量化
a.原理
激活值感知量化,保护重要权重通道,量化精度高于GPTQ。
b.实战
a.功能说明
使用AutoAWQ进行4bit量化,推理速度快。
b.代码示例
---
from awq import AutoAWQForCausalLM
from transformers import AutoTokenizer
model = AutoAWQForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
quant_config = {
"zero_point": True,
"q_group_size": 128,
"w_bit": 4,
"version": "GEMM"
}
model.quantize(tokenizer, quant_config=quant_config)
model.save_quantized("./llama-2-7b-awq")
model = AutoAWQForCausalLM.from_quantized(
"./llama-2-7b-awq",
fuse_layers=True
)
inputs = tokenizer("What is ML?", return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=128)
print(tokenizer.decode(outputs[0]))
---
03.性能对比
a.推理速度
a.功能说明
对比FP16、GPTQ、AWQ的推理速度。
b.代码示例
---
import time
models = {
"FP16": fp16_model,
"GPTQ": gptq_model,
"AWQ": awq_model
}
prompt = "Explain quantum computing in detail:"
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
for name, model in models.items():
torch.cuda.synchronize()
start = time.time()
outputs = model.generate(**inputs, max_new_tokens=256)
torch.cuda.synchronize()
elapsed = time.time() - start
tokens = len(outputs[0])
throughput = tokens / elapsed
print(f"{name}: {elapsed:.2f}s, {throughput:.2f} tokens/s")
---
4 分布式训练
4.1 数据并行
01.原理
a.数据切分
将batch数据切分到多个GPU,每个GPU计算部分样本的梯度,通过AllReduce同步梯度后更新参数。
b.通信模式
使用Ring-AllReduce或Tree-AllReduce进行梯度聚合,通信量为O(N),N为参数量。
02.实现方式
a.DataParallel
a.功能说明
PyTorch原生数据并行,单进程多线程,存在GIL瓶颈。
b.代码示例
---
import torch
import torch.nn as nn
model = nn.Linear(1024, 1024)
if torch.cuda.device_count() > 1:
model = nn.DataParallel(model)
model = model.cuda()
inputs = torch.randn(64, 1024).cuda()
outputs = model(inputs)
print(f"Output shape: {outputs.shape}")
---
b.DistributedDataParallel
a.功能说明
多进程数据并行,性能优于DataParallel,支持梯度累积和混合精度。
b.代码示例
---
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
dist.init_process_group(backend='nccl')
local_rank = int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
model = nn.Linear(1024, 1024).cuda()
model = DDP(model, device_ids=[local_rank])
for batch in dataloader:
outputs = model(batch)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
---
03.启动方式
a.torchrun启动
a.功能说明
使用torchrun启动多进程训练,自动设置环境变量。
b.代码示例
---
# train.py
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import AutoModelForCausalLM
def main():
dist.init_process_group(backend='nccl')
local_rank = int(os.environ['LOCAL_RANK'])
model = AutoModelForCausalLM.from_pretrained("gpt2")
model = model.cuda(local_rank)
model = DDP(model, device_ids=[local_rank])
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
for epoch in range(3):
for batch in train_dataloader:
batch = {k: v.cuda(local_rank) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
if __name__ == "__main__":
main()
# 启动命令
# torchrun --nproc_per_node=4 train.py
---
4.2 模型并行
01.原理
a.张量并行
将单层权重切分到多个GPU,每个GPU计算部分输出,通过通信聚合结果。
b.流水线并行
将模型按层切分到多个GPU,数据按micro-batch流水执行,减少GPU空闲时间。
02.张量并行实现
a.列并行
a.功能说明
将权重矩阵按列切分,输出需要AllReduce聚合。
b.代码示例
---
import torch
import torch.distributed as dist
class ColumnParallelLinear(nn.Module):
def __init__(self, in_features, out_features, world_size):
super().__init__()
self.out_features_per_partition = out_features // world_size
self.weight = nn.Parameter(
torch.randn(self.out_features_per_partition, in_features)
)
def forward(self, x):
output = torch.matmul(x, self.weight.t())
dist.all_reduce(output, op=dist.ReduceOp.SUM)
return output
world_size = dist.get_world_size()
layer = ColumnParallelLinear(1024, 4096, world_size).cuda()
x = torch.randn(32, 1024).cuda()
output = layer(x)
---
b.行并行
a.功能说明
将权重矩阵按行切分,输入需要切分,输出直接拼接。
b.代码示例
---
class RowParallelLinear(nn.Module):
def __init__(self, in_features, out_features, world_size):
super().__init__()
self.in_features_per_partition = in_features // world_size
self.weight = nn.Parameter(
torch.randn(out_features, self.in_features_per_partition)
)
def forward(self, x):
rank = dist.get_rank()
world_size = dist.get_world_size()
x_partition = x[:, rank * self.in_features_per_partition:
(rank + 1) * self.in_features_per_partition]
output = torch.matmul(x_partition, self.weight.t())
return output
layer = RowParallelLinear(4096, 1024, world_size).cuda()
---
03.Megatron-LM
a.Transformer并行
a.功能说明
对Transformer层进行张量并行切分,优化通信效率。
b.代码示例
---
from megatron import get_args, initialize_megatron
from megatron.model import GPTModel
initialize_megatron()
args = get_args()
model = GPTModel(
num_tokentypes=0,
parallel_output=True,
pre_process=True,
post_process=True
)
for batch in train_dataloader:
loss = model(batch['input_ids'], batch['attention_mask'])
loss.backward()
optimizer.step()
---
4.3 流水线并行
01.原理
a.模型切分
将模型按层切分到多个GPU,形成流水线stage。
b.Micro-batch
将batch切分为多个micro-batch,流水执行减少GPU空闲。
02.GPipe实现
a.基础流水线
a.功能说明
使用torch.distributed.pipeline实现流水线并行。
b.代码示例
---
from torch.distributed.pipeline.sync import Pipe
class ModelStage1(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(*[nn.Linear(1024, 1024) for _ in range(6)])
def forward(self, x):
return self.layers(x)
class ModelStage2(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(*[nn.Linear(1024, 1024) for _ in range(6)])
def forward(self, x):
return self.layers(x)
model = nn.Sequential(
ModelStage1().cuda(0),
ModelStage2().cuda(1)
)
model = Pipe(model, chunks=8)
inputs = torch.randn(64, 1024).cuda(0)
outputs = model(inputs)
---
03.1F1B调度
a.原理
One Forward One Backward,前向和反向交替执行,减少显存峰值。
b.代码示例
---
from deepspeed.runtime.pipe.schedule import PipelineSchedule
class OneFOneBSchedule(PipelineSchedule):
def steps(self):
total_steps = self.micro_batches * 2
for step in range(total_steps):
if step < self.micro_batches:
yield self.forward_step(step)
else:
yield self.backward_step(step - self.micro_batches)
schedule = OneFOneBSchedule(micro_batches=8, stages=4)
---
4.4 DeepSpeed
01.ZeRO优化
a.ZeRO-1
切分优化器状态,显存��约4倍。
b.ZeRO-2
切分优化器状态和梯度,显存节约8倍。
c.ZeRO-3
切分优化器状态、梯度和参数,显存节约线性增长。
02.配置与使用
a.ZeRO-2配置
a.功能说明
配置ZeRO-2训练13B模型。
b.代码示例
---
# ds_config.json
{
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 2,
"gradient_accumulation_steps": 8,
"optimizer": {
"type": "AdamW",
"params": {"lr": 2e-5}
},
"fp16": {"enabled": true},
"zero_optimization": {
"stage": 2,
"offload_optimizer": {"device": "cpu"},
"overlap_comm": true
}
}
# train.py
import deepspeed
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-13b-hf")
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
config="ds_config.json"
)
for batch in train_dataloader:
outputs = model_engine(batch)
loss = outputs.loss
model_engine.backward(loss)
model_engine.step()
# 启动: deepspeed --num_gpus=4 train.py
---
b.ZeRO-3配置
a.功能说明
配置ZeRO-3训练70B模型。
b.代码示例
---
{
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 1,
"gradient_accumulation_steps": 16,
"fp16": {"enabled": true},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {"device": "cpu"},
"offload_param": {"device": "cpu"},
"overlap_comm": true,
"sub_group_size": 1e9,
"stage3_max_live_parameters": 1e9,
"stage3_prefetch_bucket_size": 5e8
}
}
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-70b-hf")
model_engine, _, _, _ = deepspeed.initialize(
model=model,
config="ds_config_zero3.json"
)
---
03.性能优化
a.梯度累积
a.功能说明
使用梯度累积模拟大batch训练。
b.代码示例
---
gradient_accumulation_steps = 8
for i, batch in enumerate(train_dataloader):
outputs = model_engine(batch)
loss = outputs.loss / gradient_accumulation_steps
model_engine.backward(loss)
if (i + 1) % gradient_accumulation_steps == 0:
model_engine.step()
---
4.5 FSDP
01.原理
a.全分片
Fully Sharded Data Parallel,参数、梯度、优化器状态全部分片。
b.通信优化
使用AllGather和ReduceScatter优化通信,支持CPU offload。
02.基础使用
a.FSDP包装
a.功能说明
使用FSDP包装模型进行训练。
b.代码示例
---
import torch
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import CPUOffload
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-13b-hf")
model = FSDP(
model,
cpu_offload=CPUOffload(offload_params=True),
mixed_precision=torch.distributed.fsdp.MixedPrecision(
param_dtype=torch.float16,
reduce_dtype=torch.float16,
buffer_dtype=torch.float16
)
)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
---
03.高级配置
a.分片策略
a.功能说明
配置不同的分片策略优化性能。
b.代码示例
---
from torch.distributed.fsdp import ShardingStrategy
model = FSDP(
model,
sharding_strategy=ShardingStrategy.FULL_SHARD,
cpu_offload=CPUOffload(offload_params=True),
backward_prefetch=BackwardPrefetch.BACKWARD_PRE,
forward_prefetch=True
)
---
4.6 训练实战
01.完整训练流程
a.数据准备
a.功能说明
准备分布式训练数据集。
b.代码示例
---
from torch.utils.data import DataLoader, DistributedSampler
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank
)
train_dataloader = DataLoader(
train_dataset,
batch_size=2,
sampler=train_sampler,
num_workers=4
)
---
b.训练循环
a.功能说明
实现完整的分布式训练循环。
b.代码示例
---
import deepspeed
from transformers import get_linear_schedule_with_warmup
model_engine, optimizer, _, scheduler = deepspeed.initialize(
model=model,
config="ds_config.json"
)
for epoch in range(3):
model_engine.train()
train_sampler.set_epoch(epoch)
for step, batch in enumerate(train_dataloader):
batch = {k: v.cuda() for k, v in batch.items()}
outputs = model_engine(**batch)
loss = outputs.loss
model_engine.backward(loss)
model_engine.step()
if step % 100 == 0 and rank == 0:
print(f"Epoch {epoch}, Step {step}, Loss: {loss.item():.4f}")
if rank == 0:
model_engine.save_checkpoint(f"checkpoint-epoch-{epoch}")
---
02.性能监控
a.训练指标
a.功能说明
监控训练速度、显存使用等指标。
b.代码示例
---
import time
import torch
def monitor_training(model_engine, dataloader):
start_time = time.time()
total_tokens = 0
for batch in dataloader:
batch_start = time.time()
outputs = model_engine(**batch)
loss = outputs.loss
model_engine.backward(loss)
model_engine.step()
batch_time = time.time() - batch_start
tokens = batch['input_ids'].numel()
total_tokens += tokens
throughput = tokens / batch_time
print(f"Throughput: {throughput:.2f} tokens/s")
print(f"GPU Memory: {torch.cuda.max_memory_allocated()/1024**3:.2f}GB")
total_time = time.time() - start_time
avg_throughput = total_tokens / total_time
print(f"Average Throughput: {avg_throughput:.2f} tokens/s")
---
5 向量数据库
5.1 向量检索原理
01.向量表示
a.Embedding原理
将文本、图像等数据映射到高维向量空间,语义相似的数据在向量空间中距离更近。
b.距离度量
a.欧氏距离
L2距离,计算两点间直线距离,适用于稠密向量。
b.余弦相似度
计算向量夹角余弦值,范围[-1,1],不受向量模长影响。
02.检索算法
a.暴力搜索
a.功能说明
遍历所有向量计算距离,精确但速度慢,复杂度O(n)。
b.代码示例
---
import numpy as np
def brute_force_search(query, vectors, top_k=5):
distances = np.linalg.norm(vectors - query, axis=1)
indices = np.argsort(distances)[:top_k]
return indices, distances[indices]
vectors = np.random.randn(10000, 768)
query = np.random.randn(768)
indices, distances = brute_force_search(query, vectors)
print(f"Top-{len(indices)} indices: {indices}")
---
b.近似最近邻ANN
a.功能说明
牺牲少量精度换取速度提升,复杂度降至O(log n)。
b.代码示例
---
import faiss
dimension = 768
vectors = np.random.randn(100000, dimension).astype('float32')
index = faiss.IndexFlatL2(dimension)
index.add(vectors)
query = np.random.randn(1, dimension).astype('float32')
distances, indices = index.search(query, k=10)
print(f"Top-10 indices: {indices[0]}")
---
03.索引结构
a.IVF倒排索引
a.功能说明
将向量空间聚类,查询时只搜索最近的几个聚类中心。
b.代码示例
---
nlist = 100
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index.train(vectors)
index.add(vectors)
index.nprobe = 10
distances, indices = index.search(query, k=10)
---
b.HNSW图索引
a.功能说明
构建分层导航小世界图,查询时从顶层逐层搜索。
b.代码示例
---
index = faiss.IndexHNSWFlat(dimension, 32)
index.add(vectors)
distances, indices = index.search(query, k=10)
print(f"HNSW search results: {indices[0]}")
---
5.2 Milvus
01.安装部署
a.Docker部署
a.功能说明
使用Docker Compose快速部署Milvus。
b.代码示例
---
# docker-compose.yml
version: '3.5'
services:
etcd:
image: quay.io/coreos/etcd:latest
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
minio:
image: minio/minio:latest
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
command: minio server /minio_data
milvus:
image: milvusdb/milvus:latest
command: milvus run standalone
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
ports:
- "19530:19530"
# 启动: docker-compose up -d
---
02.基础操作
a.创建集合
a.功能说明
定义集合schema并创建索引。
b.代码示例
---
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
connections.connect(host="localhost", port="19530")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000)
]
schema = CollectionSchema(fields=fields)
collection = Collection(name="documents", schema=schema)
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 1024}
}
collection.create_index(field_name="embedding", index_params=index_params)
---
b.插入数据
a.功能说明
批量插入向量数据。
b.代码示例
---
from sentence_transformers import SentenceTransformer
encoder = SentenceTransformer('all-mpnet-base-v2')
texts = ["AI is transforming industries", "ML enables predictions"]
embeddings = encoder.encode(texts).tolist()
entities = [embeddings, texts]
collection.insert(entities)
collection.flush()
---
03.检索查询
a.向量搜索
a.功能说明
执行语义检索查询。
b.代码示例
---
collection.load()
query_text = "artificial intelligence applications"
query_embedding = encoder.encode([query_text]).tolist()
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=query_embedding,
anns_field="embedding",
param=search_params,
limit=5,
output_fields=["text"]
)
for hits in results:
for hit in hits:
print(f"Distance: {hit.distance:.4f}, Text: {hit.entity.get('text')}")
---
5.3 Pinecone
01.云服务配置
a.初始化
a.功能说明
配置Pinecone API密钥并创建索引。
b.代码示例
---
import pinecone
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")
if "documents" not in pinecone.list_indexes():
pinecone.create_index(
name="documents",
dimension=768,
metric="cosine"
)
index = pinecone.Index("documents")
---
02.数据操作
a.插入向量
a.功能说明
批量upsert向量数据。
b.代码示例
---
from sentence_transformers import SentenceTransformer
encoder = SentenceTransformer('all-mpnet-base-v2')
texts = ["AI transforms business", "ML predicts outcomes"]
embeddings = encoder.encode(texts)
vectors = [
(f"doc-{i}", embedding.tolist(), {"text": text})
for i, (embedding, text) in enumerate(zip(embeddings, texts))
]
index.upsert(vectors=vectors)
---
b.查询检索
a.功能说明
执行向量相似度查询。
b.代码示例
---
query_text = "machine learning applications"
query_embedding = encoder.encode([query_text])[0].tolist()
results = index.query(
vector=query_embedding,
top_k=5,
include_metadata=True
)
for match in results['matches']:
print(f"Score: {match['score']:.4f}, Text: {match['metadata']['text']}")
---
5.4 Chroma
01.本地部署
a.安装使用
a.功能说明
轻量级向量数据库,支持本地持久化。
b.代码示例
---
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.create_collection(
name="documents",
metadata={"hnsw:space": "cosine"}
)
texts = ["AI is powerful", "ML is useful"]
ids = [f"doc-{i}" for i in range(len(texts))]
collection.add(
documents=texts,
ids=ids
)
---
02.查询操作
a.语义搜索
a.功能说明
自动生成embedding并检索。
b.代码示例
---
results = collection.query(
query_texts=["artificial intelligence"],
n_results=5
)
for i, doc in enumerate(results['documents'][0]):
distance = results['distances'][0][i]
print(f"Distance: {distance:.4f}, Doc: {doc}")
---
5.5 Faiss
01.索引类型
a.Flat索引
a.功能说明
精确搜索,适合小规模数据。
b.代码示例
---
import faiss
import numpy as np
dimension = 768
vectors = np.random.randn(10000, dimension).astype('float32')
index = faiss.IndexFlatL2(dimension)
index.add(vectors)
query = np.random.randn(1, dimension).astype('float32')
distances, indices = index.search(query, k=10)
---
b.IVF索引
a.功能说明
倒排索引,适合大规模数据。
b.代码示例
---
nlist = 100
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index.train(vectors)
index.add(vectors)
index.nprobe = 10
distances, indices = index.search(query, k=10)
---
02.GPU加速
a.GPU索引
a.功能说明
使用GPU加速向量检索。
b.代码示例
---
res = faiss.StandardGpuResources()
index_cpu = faiss.IndexFlatL2(dimension)
index_gpu = faiss.index_cpu_to_gpu(res, 0, index_cpu)
index_gpu.add(vectors)
distances, indices = index_gpu.search(query, k=10)
---
5.6 实战应用
01.RAG系统
a.文档索引
a.功能说明
构建文档向量索引支持RAG检索。
b.代码示例
---
from pymilvus import connections, Collection
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer
connections.connect(host="localhost", port="19530")
collection = Collection("knowledge_base")
collection.load()
encoder = SentenceTransformer('all-mpnet-base-v2')
llm = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
def rag_query(question, top_k=3):
query_embedding = encoder.encode([question]).tolist()
results = collection.search(
data=query_embedding,
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=top_k,
output_fields=["text"]
)
context = "
".join([hit.entity.get('text') for hit in results[0]])
prompt = f"Context:
{context}
Question: {question}
Answer:"
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
outputs = llm.generate(**inputs, max_new_tokens=256)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
answer = rag_query("What is machine learning?")
print(answer)
---
02.性能优化
a.批量检索
a.功能说明
批量处理查询提升吞吐量。
b.代码示例
---
def batch_search(queries, batch_size=32):
results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i+batch_size]
embeddings = encoder.encode(batch).tolist()
batch_results = collection.search(
data=embeddings,
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=5
)
results.extend(batch_results)
return results
queries = [f"Question {i}" for i in range(100)]
results = batch_search(queries)
---
6 GPU优化
6.1 CUDA基础
01.CUDA架构
a.线程层次
Grid包含多个Block,Block包含多个Thread,通过threadIdx和blockIdx索引。
b.内存层次
全局内存、共享内存、寄存器,访问速度依次递增,容量依次递减。
02.基础编程
a.Kernel函数
a.功能说明
编写简单的CUDA kernel进行向量加法。
b.代码示例
---
import torch
@torch.jit.script
def vector_add_cuda(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
return a + b
a = torch.randn(1000000, device='cuda')
b = torch.randn(1000000, device='cuda')
c = vector_add_cuda(a, b)
print(f"Result shape: {c.shape}")
---
b.自定义算子
a.功能说明
使用PyTorch C++扩展编写CUDA算子。
b.代码示例
---
# custom_ops.cu
#include <torch/extension.h>
#include <cuda_runtime.h>
__global__ void add_kernel(float* a, float* b, float* c, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
c[idx] = a[idx] + b[idx];
}
}
torch::Tensor add_cuda(torch::Tensor a, torch::Tensor b) {
auto c = torch::zeros_like(a);
int n = a.numel();
int threads = 256;
int blocks = (n + threads - 1) / threads;
add_kernel<<<blocks, threads>>>(
a.data_ptr<float>(),
b.data_ptr<float>(),
c.data_ptr<float>(),
n
);
return c;
}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("add", &add_cuda, "CUDA add");
}
---
03.性能优化
a.内存合并
a.功能说明
优化内存访问模式,实现合并访问。
b.代码示例
---
import torch
def benchmark_memory_access():
size = 10000000
a = torch.randn(size, device='cuda')
torch.cuda.synchronize()
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
b = a * 2
end.record()
torch.cuda.synchronize()
print(f"Time: {start.elapsed_time(end):.2f}ms")
benchmark_memory_access()
---
6.2 显存管理
01.显存分配
a.动态分配
a.功能说明
PyTorch自动管理显存分配与释放。
b.代码示例
---
import torch
print(f"Allocated: {torch.cuda.memory_allocated()/1024**3:.2f}GB")
print(f"Reserved: {torch.cuda.memory_reserved()/1024**3:.2f}GB")
x = torch.randn(10000, 10000, device='cuda')
print(f"After allocation: {torch.cuda.memory_allocated()/1024**3:.2f}GB")
del x
torch.cuda.empty_cache()
print(f"After release: {torch.cuda.memory_allocated()/1024**3:.2f}GB")
---
b.显存池
a.功能说明
配置显存分配器减少碎片。
b.代码示例
---
torch.cuda.set_per_process_memory_fraction(0.8, 0)
snapshot = torch.cuda.memory_snapshot()
print(f"Memory segments: {len(snapshot)}")
---
02.显存优化
a.梯度检查点
a.功能说明
使用gradient checkpointing减少显存占用。
b.代码示例
---
from torch.utils.checkpoint import checkpoint
import torch.nn as nn
class CheckpointModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(1024, 1024) for _ in range(10)
])
def forward(self, x):
for layer in self.layers:
x = checkpoint(layer, x)
return x
model = CheckpointModel().cuda()
x = torch.randn(32, 1024, device='cuda')
output = model(x)
---
b.激活重计算
a.功能说明
重计算激活值而非存储,节省显存。
b.代码示例
---
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
gradient_checkpointing=True
).cuda()
inputs = torch.randint(0, 1000, (2, 128), device='cuda')
outputs = model(inputs, labels=inputs)
loss = outputs.loss
loss.backward()
---
6.3 混合精度训练
01.自动混合精度
a.AMP原理
使用FP16进行前向和反向计算,FP32存储权重和优化器状态,加速训练。
b.实现
a.功能说明
使用torch.cuda.amp进行混合精度训练。
b.代码示例
---
import torch
from torch.cuda.amp import autocast, GradScaler
model = torch.nn.Linear(1024, 1024).cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
for epoch in range(3):
for batch in dataloader:
optimizer.zero_grad()
with autocast():
outputs = model(batch)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
---
02.BF16训练
a.原理
BFloat16保留FP32的指数位,数值范围更大,适合训练。
b.实现
a.功能说明
使用BF16进行训练。
b.代码示例
---
model = model.to(torch.bfloat16).cuda()
for batch in dataloader:
batch = batch.to(torch.bfloat16).cuda()
outputs = model(batch)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
---
6.4 梯度累积
01.原理
a.模拟大batch
将batch切分为多个micro-batch,累积梯度后统一更新,模拟大batch训练。
b.显存节约
每次只处理小batch,显存占用降低,支持更大模型训练。
02.实现
a.基础实现
a.功能说明
实现梯度累积训练循环。
b.代码示例
---
accumulation_steps = 4
model.zero_grad()
for i, batch in enumerate(dataloader):
outputs = model(batch)
loss = outputs.loss / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
model.zero_grad()
---
b.混合精度梯度累积
a.功能说明
结合AMP和梯度累积。
b.代码示例
---
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
accumulation_steps = 8
for i, batch in enumerate(dataloader):
with autocast():
outputs = model(batch)
loss = outputs.loss / accumulation_steps
scaler.scale(loss).backward()
if (i + 1) % accumulation_steps == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
---
6.5 性能分析工具
01.PyTorch Profiler
a.基础分析
a.功能说明
使用Profiler分析训练性能。
b.代码示例
---
from torch.profiler import profile, ProfilerActivity
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True,
profile_memory=True
) as prof:
for _ in range(10):
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print(prof.key_averages().table(
sort_by="cuda_time_total",
row_limit=10
))
---
b.TensorBoard可视化
a.功能说明
导出trace文件到TensorBoard查看。
b.代码示例
---
from torch.profiler import profile, tensorboard_trace_handler
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
schedule=torch.profiler.schedule(wait=1, warmup=1, active=3),
on_trace_ready=tensorboard_trace_handler('./log')
) as prof:
for step, batch in enumerate(dataloader):
outputs = model(batch)
loss = outputs.loss
loss.backward()
optimizer.step()
prof.step()
# tensorboard --logdir=./log
---
02.Nsight Systems
a.系统级分析
a.功能说明
使用Nsight分析GPU利用率和瓶颈。
b.代码示例
---
# 启动分析
# nsys profile -o output python train.py
import torch
model = torch.nn.Linear(1024, 1024).cuda()
for _ in range(100):
x = torch.randn(128, 1024, device='cuda')
y = model(x)
torch.cuda.synchronize()
---
7 生产环境实践
7.1 模型监控
01.指标体系
a.性能指标
监控推理延迟、吞吐���、队列长度等关键性能指标。
b.资源指标
监控GPU利用率、显存占用、CPU使用率等资源消耗。
02.Prometheus集成
a.指标暴露
a.功能说明
使用Prometheus客户端暴露监控指标。
b.代码示例
---
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
REQUEST_COUNT = Counter('llm_requests_total', 'Total requests')
REQUEST_LATENCY = Histogram('llm_latency_seconds', 'Request latency')
GPU_UTILIZATION = Gauge('gpu_utilization_percent', 'GPU utilization')
start_http_server(8000)
def process_request(prompt):
REQUEST_COUNT.inc()
start = time.time()
result = model.generate(prompt)
REQUEST_LATENCY.observe(time.time() - start)
return result
---
03.Grafana可视化
a.仪表盘配置
a.功能说明
配置Grafana仪表盘展示监控数据。
b.代码示例
---
# prometheus.yml
scrape_configs:
- job_name: 'llm_service'
static_configs:
- targets: ['localhost:8000']
# 启动Prometheus
# prometheus --config.file=prometheus.yml
# Grafana查询示例
# rate(llm_requests_total[5m])
# histogram_quantile(0.95, llm_latency_seconds)
---
7.2 日志管理
01.结构化日志
a.日志格式
a.功能说明
使用JSON格式记录结构化日志。
b.代码示例
---
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module
}
return json.dumps(log_data)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("Model inference started")
---
02.日志聚合
a.ELK集成
a.功能说明
将日志发送到Elasticsearch进行聚合分析。
b.代码示例
---
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
def log_to_elasticsearch(level, message, metadata):
doc = {
'timestamp': datetime.utcnow(),
'level': level,
'message': message,
'metadata': metadata
}
es.index(index='llm-logs', document=doc)
log_to_elasticsearch('INFO', 'Request processed', {'latency': 0.5})
---
7.3 负载均衡
01.Nginx配置
a.轮询策略
a.功能说明
配置Nginx实现多实例负载均衡。
b.代码示例
---
# nginx.conf
upstream llm_backend {
server 192.168.1.10:8000;
server 192.168.1.11:8000;
server 192.168.1.12:8000;
}
server {
listen 80;
location / {
proxy_pass http://llm_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
---
02.动态路由
a.智能分发
a.功能说明
根据请求特征动态选择后端实例。
b.代码示例
---
from fastapi import FastAPI
import httpx
app = FastAPI()
BACKENDS = {
'small': 'http://gpu-node-1:8000',
'large': 'http://gpu-node-2:8000'
}
@app.post("/generate")
async def generate(prompt: str, max_tokens: int):
backend = BACKENDS['large'] if max_tokens > 512 else BACKENDS['small']
async with httpx.AsyncClient() as client:
response = await client.post(f"{backend}/generate",
json={"prompt": prompt, "max_tokens": max_tokens})
return response.json()
---
7.4 容错机制
01.重试策略
a.指数退避
a.功能说明
实现请求重试与指数退避。
b.代码示例
---
import time
import httpx
async def retry_request(url, data, max_retries=3):
for attempt in range(max_retries):
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, timeout=30.0)
return response.json()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
---
02.熔断降级
a.熔断器
a.功能说明
实现熔断器模式防止级联故障。
b.代码示例
---
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = 'CLOSED'
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failures = 0
self.state = 'CLOSED'
def on_failure(self):
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = 'OPEN'
breaker = CircuitBreaker()
result = breaker.call(model.generate, prompt)
---
7.5 成本优化
01.资源调度
a.动态扩缩容
a.功能说明
根据负载动态调整GPU实例数量。
b.代码示例
---
import boto3
def scale_gpu_instances(target_utilization=0.7):
cloudwatch = boto3.client('cloudwatch')
autoscaling = boto3.client('autoscaling')
response = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='GPUUtilization',
Dimensions=[{'Name': 'AutoScalingGroupName', 'Value': 'llm-asg'}],
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Average']
)
avg_util = response['Datapoints'][0]['Average']
if avg_util > target_utilization:
autoscaling.set_desired_capacity(
AutoScalingGroupName='llm-asg',
DesiredCapacity=current_capacity + 1
)
---
02.成本分析
a.使用统计
a.功能说明
统计GPU使用成本与效率。
b.代码示例
---
def calculate_cost(gpu_hours, gpu_type='A100'):
pricing = {
'A100': 3.06,
'V100': 2.48,
'T4': 0.526
}
hourly_rate = pricing.get(gpu_type, 0)
total_cost = gpu_hours * hourly_rate
return {
'gpu_type': gpu_type,
'hours': gpu_hours,
'hourly_rate': hourly_rate,
'total_cost': total_cost
}
cost = calculate_cost(100, 'A100')
print(f"Total cost: ${cost['total_cost']:.2f}")
---
8 学习路径与资源
8.1 推荐学习顺序
01.基础阶段
a.第1-2周
学习模型部署基础,掌握vLLM和TGI的使用,完成本地模型部署实验。
b.第3-4周
学习模型压缩技术,实践INT8量化和GPTQ/AWQ量化,对比性能差异。
02.进阶阶段
a.第5-6周
学习分布式训练,掌握DeepSpeed ZeRO和FSDP的配置与使用。
b.第7-8周
学习向量数据库,使用Milvus构建RAG系统,完成端到端应用。
03.实战阶段
a.第9-10周
学习GPU优化技术,实践混合精度训练和梯度累积。
b.第11-12周
学习生产环境实践,完成监控、日志、负载均衡等工程化部署。
8.2 在线课程推荐
01.官方课程
a.HuggingFace课程
免费的Transformers和NLP课程,涵盖模型微调、部署等内容。
b.DeepLearning.AI
吴恩达的LLM系列课程,包括Prompt Engineering、LangChain等。
02.实战项目
a.GitHub开源项目
学习vLLM、TGI、DeepSpeed等项目源码,理解工程实现细节。
b.Kaggle竞赛
参与NLP相关竞赛,积累实战经验。
8.3 技术文档
01.官方文档
a.PyTorch文档
https://pytorch.org/docs - PyTorch官方文档
b.HuggingFace文档
https://huggingface.co/docs - Transformers库文档
c.vLLM文档
https://docs.vllm.ai - vLLM部署文档
02.技术博客
a.公司技术博客
OpenAI、Anthropic、Meta AI等公司的技术博客
b.个人博客
Jay Alammar、Sebastian Raschka等专家的技术博客
8.4 开源项目
01.推理框架
a.vLLM
https://github.com/vllm-project/vllm - 高性能推理框架
b.TGI
https://github.com/huggingface/text-generation-inference - HuggingFace推理服务
02.训练框架
a.DeepSpeed
https://github.com/microsoft/DeepSpeed - 微软分布式训练框架
b.Megatron-LM
https://github.com/NVIDIA/Megatron-LM - NVIDIA大模型训练框架
03.应用框架
a.LangChain
https://github.com/langchain-ai/langchain - LLM应用开发框架
b.LlamaIndex
https://github.com/run-llama/llama_index - RAG应用框架
8.5 常见问题
01.部署问题
a.显存不足
使用量化、梯度检查点、模型并行等技术降低显存占用。
b.推理速度慢
使用vLLM、TGI等优化框架,启用批处理和KV缓存优化。
02.训练问题
a.训练不收敛
检查学习率、batch size、梯度裁剪等超参数配置。
b.多卡通信慢
优化网络配置,使用NVLink或InfiniBand高速互联。
03.工程问题
a.服务稳定性
实现重试、熔断、限流等容错机制,配置监控告警。
b.成本控制
使用Spot实例、动态扩缩容、模型压缩等降低成本。