1 平台介绍
1.1 核心功能
01.追踪调试
a.链路追踪
a.功能说明
LangSmith提供完整的LLM应用执行链路追踪,记录每个LangChain组件的输入输出、调用关系、耗时等。自动捕获Chain、Agent、LLM、Tool等组件的执行细节。支持嵌套调用、并行执行的可视化展示。通过Web界面查看完整调用树,定位问题节点。追踪数据包含提示词、响应内容、Token使用量、延迟、错误信息等。LangSmith追踪是调试复杂LLM应用的核心功能。
b.代码示例
---
import os
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import ChatPromptTemplate
# 1. 配置LangSmith追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_langsmith_api_key"
os.environ["LANGCHAIN_PROJECT"] = "my_project"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
# 2. 执行LangChain应用(自动追踪)
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的AI助手"),
("human", "{question}")
])
chain = LLMChain(llm=llm, prompt=prompt)
# 执行会自动追踪到LangSmith
result = chain.invoke({"question": "什么是LangSmith?"})
print(result)
# 在 https://smith.langchain.com 查看追踪详情
# 3. 查看追踪信息包含
# - Chain执行时间
# - LLM调用详情(提示词、响应、Token数)
# - 中间步骤的输入输出
# - 错误堆栈(如果有)
# - 元数据(模型、参数等)
# 4. 自定义追踪标签
from langchain.callbacks import LangChainTracer
tracer = LangChainTracer(
project_name="my_project",
tags=["production", "v2.0", "experiment-A"]
)
result = chain.invoke(
{"question": "测试问题"},
config={"callbacks": [tracer]}
)
# 5. 追踪自定义函数
from langsmith import traceable
@traceable(run_type="chain", name="custom_processing")
def custom_process(text: str) -> str:
"""自定义处理函数(自动追踪)"""
# 处理逻辑
processed = text.upper()
return processed
result = custom_process("hello world")
# 该函数执行也会被追踪到LangSmith
# 6. 嵌套追踪
@traceable(name="main_pipeline")
def main_pipeline(input_text: str):
"""主流程(包含子调用)"""
# 步骤1:预处理
preprocessed = preprocess(input_text)
# 步骤2:LLM推理
result = chain.invoke({"question": preprocessed})
# 步骤3:后处理
final = postprocess(result)
return final
@traceable(name="preprocess")
def preprocess(text: str):
return text.strip().lower()
@traceable(name="postprocess")
def postprocess(result: dict):
return result["text"]
# 执行后在LangSmith看到完整调用树
output = main_pipeline(" 请介绍LangSmith ")
---
b.数据集管理
a.功能说明
LangSmith提供数据集管理功能,用于存储测试用例、评估数据、生产日志等。支持多种数据格式(文本、JSON、CSV等)。数据集可用于批量评估、回归测试、模型对比。版本管理确保数据集可追溯。支持数据导入导出、分享、权限控制。数据集是系统化测试和持续优化的基础。
b.代码示例
---
from langsmith import Client
# 1. 创建LangSmith客户端
client = Client(
api_key="your_api_key",
api_url="https://api.smith.langchain.com"
)
# 2. 创建数据集
dataset_name = "customer_service_qa"
dataset = client.create_dataset(
dataset_name=dataset_name,
description="客服问答测试数据集"
)
print(f"数据集ID:{dataset.id}")
# 3. 添加示例到数据集
examples = [
{
"inputs": {"question": "如何退货?"},
"outputs": {"answer": "请在订单页面申请退货,填写退货原因。"}
},
{
"inputs": {"question": "配送需要多久?"},
"outputs": {"answer": "一般3-5个工作日送达。"}
},
{
"inputs": {"question": "可以开发票吗?"},
"outputs": {"answer": "可以,下单时选择开具发票即可。"}
}
]
for example in examples:
client.create_example(
dataset_id=dataset.id,
inputs=example["inputs"],
outputs=example["outputs"]
)
print(f"已添加{len(examples)}个示例")
# 4. 读取数据集
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
print(f"数据集包含{len(examples)}个示例")
for ex in examples:
print(f" 输入:{ex.inputs}")
print(f" 输出:{ex.outputs}")
# 5. 批量上传数据
import pandas as pd
# 从CSV读取
df = pd.read_csv("qa_data.csv")
for _, row in df.iterrows():
client.create_example(
dataset_id=dataset.id,
inputs={"question": row["question"]},
outputs={"answer": row["answer"]},
metadata={"source": "csv", "category": row.get("category")}
)
# 6. 数据集版本管理
# 创建数据集快照
snapshot = client.create_dataset_version(
dataset_id=dataset.id,
version="v1.0",
description="初始版本"
)
# 列出所有版本
versions = client.list_dataset_versions(dataset_id=dataset.id)
for ver in versions:
print(f"版本:{ver.version}, 创建于:{ver.created_at}")
---
02.测试评估
a.自动评估
a.功能说明
LangSmith支持对LLM应用进行自动化评估,对比输出与期望结果。提供多种内置评估器(准确性、相似度、有害内容检测等)。支持自定义评估函数,实现业务特定的评估逻辑。批量运行评估,生成评估报告。追踪评估历史,对比不同版本性能。自动评估加速迭代周期,提升应用质量。
b.代码示例
---
from langsmith import Client
from langsmith.evaluation import evaluate
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
client = Client()
# 1. 定义被评估的应用
llm = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template(
"根据问题给出简洁的答案:{question}"
)
def my_app(inputs: dict) -> dict:
"""待评估的应用"""
question = inputs["question"]
result = llm.invoke(prompt.format(question=question))
return {"answer": result.content}
# 2. 使用内置评估器
from langsmith.evaluation import LangChainStringEvaluator
# 准确性评估器
accuracy_evaluator = LangChainStringEvaluator(
"labeled_criteria",
config={
"criteria": {
"accuracy": "答案是否准确回答了问题?"
}
}
)
# 3. 运行评估
dataset_name = "customer_service_qa"
results = evaluate(
my_app,
data=dataset_name,
evaluators=[accuracy_evaluator],
experiment_prefix="gpt35_eval",
description="GPT-3.5评估测试"
)
print(f"评估完成:{results}")
# 4. 自定义评估器
def custom_evaluator(run, example):
"""自定义评估逻辑"""
prediction = run.outputs["answer"]
reference = example.outputs["answer"]
# 检查关键词是否出现
keywords = ["退货", "订单", "申请"]
contains_keywords = any(kw in prediction for kw in keywords)
# 长度检查
length_ok = len(prediction) > 10 and len(prediction) < 200
score = 1.0 if (contains_keywords and length_ok) else 0.0
return {
"key": "custom_check",
"score": score,
"comment": f"关键词:{contains_keywords}, 长度:{length_ok}"
}
# 使用自定义评估器
results = evaluate(
my_app,
data=dataset_name,
evaluators=[custom_evaluator],
experiment_prefix="custom_eval"
)
# 5. 多评估器组合
from langsmith.evaluation import evaluate
evaluators = [
accuracy_evaluator,
custom_evaluator,
LangChainStringEvaluator("relevance"), # 相关性
LangChainStringEvaluator("conciseness") # 简洁性
]
results = evaluate(
my_app,
data=dataset_name,
evaluators=evaluators,
experiment_prefix="comprehensive_eval",
max_concurrency=5 # 并发评估
)
# 6. 评估结果分析
# 在LangSmith Web界面查看:
# - 每个样本的得分
# - 失败案例详情
# - 评估指标统计
# - 不同实验的对比
---
b.监控告警
a.功能说明
LangSmith提供生产环境的实时监控功能,追踪应用性能、错误率、成本等指标。设置告警规则,在异常发生时及时通知。监控LLM Token使用量、API调用延迟、失败率等关键指标。支持自定义监控指标,满足特定业务需求。集成Slack、邮件等通知渠道。生产监控确保系统稳定运行,快速响应问题。
b.代码示例
---
from langsmith import Client
import os
# 1. 生产环境配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"
# 2. 应用执行(自动监控)
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
# 每次调用都会被追踪
response = llm.invoke("用户问题")
# 3. 自定义监控指标
from langsmith import traceable
from langsmith.run_helpers import get_current_run_tree
@traceable(name="business_logic")
def process_request(user_input: str):
"""业务处理(带自定义指标)"""
run_tree = get_current_run_tree()
# 处理逻辑
result = llm.invoke(user_input)
# 添加自定义指标
if run_tree:
run_tree.extra = {
"user_satisfaction": 0.85,
"response_quality": "good",
"business_metric": 100
}
return result
# 4. 错误追踪
@traceable(name="error_prone_operation")
def risky_operation(data: dict):
"""可能出错的操作"""
try:
result = process_data(data)
return result
except Exception as e:
# 错误会自动记录到LangSmith
raise e
# 5. 通过API查询监控数据
client = Client()
# 查询最近的运行记录
runs = client.list_runs(
project_name="production",
start_time="2024-01-01",
end_time="2024-01-31",
filter='eq(status, "error")' # 只看错误
)
error_count = sum(1 for _ in runs)
print(f"错误数量:{error_count}")
# 6. 设置告警(通过Web界面)
# 在LangSmith平台配置:
# - 错误率 > 5% 时告警
# - 平均延迟 > 3秒 时告警
# - Token使用量超过预算时告警
# - 通知到Slack或邮件
---
1.2 使用场景
01.开发调试
a.快速定位问题
a.功能说明
开发阶段使用LangSmith快速定位LLM应用的问题。查看完整调用链路,识别错误节点。分析提示词效果,对比不同参数的输出。追踪Token消耗,优化成本。检查中间步骤的数据流转,发现逻辑错误。LangSmith的可视化界面让调试更直观高效,大幅缩短开发周期。
b.代码示例
---
import os
from langchain.chat_models import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate
# 1. 启用调试追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "development"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"
# 2. 创建Agent(复杂应用)
def search_tool(query: str) -> str:
"""搜索工具"""
# 模拟搜索
return f"搜索结果:{query}"
def calculator_tool(expression: str) -> str:
"""计算器工具"""
try:
result = eval(expression)
return str(result)
except:
return "计算错误"
tools = [
Tool(name="search", func=search_tool, description="搜索信息"),
Tool(name="calculator", func=calculator_tool, description="数学计算")
]
llm = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个AI助手,可以使用工具回答问题"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}")
])
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 3. 执行(追踪到LangSmith)
try:
result = agent_executor.invoke({
"input": "搜索Python教程,然后计算 25 + 17"
})
print(f"结果:{result}")
except Exception as e:
print(f"错误:{e}")
# 错误信息会记录到LangSmith,包含完整堆栈
# 4. 在LangSmith Web界面查看
# - Agent思考过程
# - 工具调用顺序
# - 每步的输入输出
# - Token使用明细
# - 错误位置和原因
# 5. 对比不同提示词效果
prompts = [
"你是一个专业助手",
"你是一个友好的AI",
"你是一个严谨的专家"
]
for p in prompts:
prompt_template = ChatPromptTemplate.from_messages([
("system", p),
("human", "{input}")
])
chain = prompt_template | llm
# 每个提示词生成独立的trace
result = chain.invoke({"input": "介绍一下人工智能"})
print(f"提示词'{p[:10]}...'的结果:{result.content[:50]}...")
# 在LangSmith对比不同trace的效果
# 6. 性能分析
from langsmith import traceable
import time
@traceable(name="slow_operation")
def slow_function(data: str):
"""慢速操作"""
time.sleep(2) # 模拟慢操作
return data.upper()
@traceable(name="fast_operation")
def fast_function(data: str):
"""快速操作"""
return data.lower()
# 执行并在LangSmith查看耗时对比
slow_function("test")
fast_function("test")
---
b.团队协作
a.功能说明
团队使用LangSmith共享调试信息、提示词、测试数据。成员可以查看他人的trace,协作定位问题。Prompt Hub存储和版本管理提示词模板。数据集共享用于统一的测试标准。评论和标注功能支持异步沟通。权限管理确保数据安全。团队协作提升开发效率,保持工作同步。
b.代码示例
---
from langsmith import Client
client = Client()
# 1. 共享项目
# 在Web界面邀请团队成员到项目
# 成员可以查看所有trace
# 2. Prompt Hub使用
# 在Web界面创建提示词模板
# 命名为 "customer_service_greeting"
# 在代码中引用
from langchain.prompts import load_prompt
# 从Hub加载提示词
prompt = client.pull_prompt("customer_service_greeting")
# 使用提示词
llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm
result = chain.invoke({"customer_name": "张三"})
# 3. 提示词版本管理
# 更新提示词后推送新版本
from langchain.prompts import ChatPromptTemplate
new_prompt = ChatPromptTemplate.from_messages([
("system", "你是专业的客服代表,态度友好"),
("human", "客户{customer_name}有问题:{question}")
])
# 推送到Hub
client.push_prompt(
"customer_service_greeting",
object=new_prompt,
tags=["v2.0", "improved"]
)
# 团队成员可以选择使用特定版本
prompt_v1 = client.pull_prompt("customer_service_greeting", version="v1.0")
prompt_v2 = client.pull_prompt("customer_service_greeting", version="v2.0")
# 4. 共享数据集
dataset_name = "team_test_cases"
# 成员A创建数据集
dataset = client.create_dataset(
dataset_name=dataset_name,
description="团队共享测试用例"
)
client.create_example(
dataset_id=dataset.id,
inputs={"question": "测试问题1"},
outputs={"answer": "期望答案1"}
)
# 成员B可以读取和使用
shared_dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=shared_dataset.id))
print(f"团队数据集包含{len(examples)}个用例")
# 5. 评论和反馈
# 在Web界面可以对特定trace添加评论
# 例如:"这个提示词效果不好,建议修改"
# 或者:"这个错误需要优先修复"
# 通过API添加反馈
run_id = "specific_run_id"
client.create_feedback(
run_id=run_id,
key="review",
score=0.8,
comment="输出质量不错,但可以更简洁"
)
# 6. 权限控制
# 在Web界面设置:
# - 管理员:完全访问
# - 开发者:可读写
# - 查看者:只读
# - 按项目分配权限
---
02.测试评估
a.回归测试
a.功能说明
使用LangSmith进行回归测试,确保代码变更不影响现有功能。维护测试数据集,每次发布前运行评估。对比新旧版本的输出,识别性能退化。自动化测试流程,集成到CI/CD。追踪测试历史,分析质量趋势。回归测试保障系统稳定,降低发布风险。
b.代码示例
---
from langsmith import Client
from langsmith.evaluation import evaluate
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
client = Client()
# 1. 创建回归测试数据集
dataset_name = "regression_test_suite"
# 如果不存在则创建
try:
dataset = client.read_dataset(dataset_name=dataset_name)
except:
dataset = client.create_dataset(
dataset_name=dataset_name,
description="回归测试套件"
)
# 2. 定义应用版本
def app_v1(inputs: dict) -> dict:
"""应用版本1"""
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
prompt = ChatPromptTemplate.from_template("回答:{question}")
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
def app_v2(inputs: dict) -> dict:
"""应用版本2(改进后)"""
llm = ChatOpenAI(model="gpt-4", temperature=0.5)
prompt = ChatPromptTemplate.from_template("请简洁回答:{question}")
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
# 3. 定义评估器
def accuracy_evaluator(run, example):
"""准确性评估"""
from difflib import SequenceMatcher
predicted = run.outputs["answer"]
expected = example.outputs["answer"]
similarity = SequenceMatcher(None, predicted, expected).ratio()
return {
"key": "accuracy",
"score": similarity
}
def length_evaluator(run, example):
"""长度评估"""
answer = run.outputs["answer"]
length = len(answer)
# 50-150字为理想
if 50 <= length <= 150:
score = 1.0
elif length < 50:
score = length / 50
else:
score = max(0, 1 - (length - 150) / 150)
return {
"key": "length",
"score": score
}
# 4. 运行版本1评估(baseline)
results_v1 = evaluate(
app_v1,
data=dataset_name,
evaluators=[accuracy_evaluator, length_evaluator],
experiment_prefix="v1_baseline"
)
print("V1评估结果:", results_v1)
# 5. 运行版本2评估(新版本)
results_v2 = evaluate(
app_v2,
data=dataset_name,
evaluators=[accuracy_evaluator, length_evaluator],
experiment_prefix="v2_candidate"
)
print("V2评估结果:", results_v2)
# 6. 对比分析
# 在LangSmith Web界面对比两个实验
# 查看每个测试用例的差异
# 识别退化的用例
# 通过API获取对比数据
v1_runs = list(client.list_runs(
project_name="v1_baseline",
execution_order=1
))
v2_runs = list(client.list_runs(
project_name="v2_candidate",
execution_order=1
))
# 计算改进和退化
improvements = 0
regressions = 0
for r1, r2 in zip(v1_runs, v2_runs):
if r2.feedback_stats.get("accuracy", 0) > r1.feedback_stats.get("accuracy", 0):
improvements += 1
elif r2.feedback_stats.get("accuracy", 0) < r1.feedback_stats.get("accuracy", 0):
regressions += 1
print(f"改进:{improvements}个用例")
print(f"退化:{regressions}个用例")
# 7. CI/CD集成
# 在CI脚本中运行
import sys
# 设置失败阈值
if regressions > 2:
print("回归测试失败:退化用例过多")
sys.exit(1)
else:
print("回归测试通过")
sys.exit(0)
---
b.生产监控
a.功能说明
生产环境使用LangSmith持续监控应用运行状态。追踪实时请求、响应时间、错误率、Token消耗。设置告警规则,异常时立即通知。分析用户行为模式,识别常见问题。监控成本支出,避免超预算。生产监控确保服务质量,快速响应故障。
b.代码示例
---
import os
from langsmith import Client
from langchain.chat_models import ChatOpenAI
# 1. 生产环境配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
# 2. 应用执行(自动监控)
llm = ChatOpenAI(model="gpt-4")
def handle_user_request(user_input: str):
"""处理用户请求"""
try:
response = llm.invoke(user_input)
return response.content
except Exception as e:
# 错误会自动追踪
print(f"错误:{e}")
return "抱歉,服务暂时不可用"
# 用户请求
result = handle_user_request("你好")
# 3. 自定义监控指标
from langsmith import traceable
from langsmith.run_helpers import get_current_run_tree
@traceable(name="production_request")
def monitored_request(user_id: str, request: str):
"""带监控的请求处理"""
run_tree = get_current_run_tree()
# 处理请求
response = handle_user_request(request)
# 添加业务指标
if run_tree:
run_tree.extra = {
"user_id": user_id,
"request_length": len(request),
"response_length": len(response),
"cached": False
}
return response
# 4. 监控数据查询
client = Client()
# 查询今天的错误
from datetime import datetime, timedelta
today = datetime.now()
yesterday = today - timedelta(days=1)
error_runs = client.list_runs(
project_name="production",
start_time=yesterday,
filter='eq(status, "error")'
)
error_count = sum(1 for _ in error_runs)
print(f"过去24小时错误数:{error_count}")
# 5. 性能统计
all_runs = client.list_runs(
project_name="production",
start_time=yesterday,
limit=1000
)
latencies = []
total_tokens = 0
for run in all_runs:
if run.latency:
latencies.append(run.latency)
if run.total_tokens:
total_tokens += run.total_tokens
if latencies:
import statistics
avg_latency = statistics.mean(latencies)
p95_latency = statistics.quantiles(latencies, n=20)[18]
print(f"平均延迟:{avg_latency:.0f}ms")
print(f"P95延迟:{p95_latency:.0f}ms")
print(f"总Token:{total_tokens}")
# 6. 告警脚本
def check_and_alert():
"""检查指标并告警"""
# 查询最近1小时
recent_time = datetime.now() - timedelta(hours=1)
recent_runs = list(client.list_runs(
project_name="production",
start_time=recent_time
))
total = len(recent_runs)
errors = sum(1 for r in recent_runs if r.status == "error")
if total > 0:
error_rate = errors / total
# 错误率超过5%告警
if error_rate > 0.05:
send_alert(f"错误率过高:{error_rate:.1%}")
# 请求量异常
if total < 10:
send_alert(f"请求量异常低:{total}")
def send_alert(message: str):
"""发送告警"""
# 集成Slack、邮件等
print(f"🚨 告警:{message}")
# 定期运行检查
check_and_alert()
---
1.3 环境配置
01.API配置
a.获取API Key
a.功能说明
使用LangSmith需要先注册账号并获取API Key。访问smith.langchain.com注册,在设置页面生成API Key。API Key用于身份验证,调用LangSmith API。支持创建多个Key,分配不同权限。Key可以设置有效期,过期自动失效。妥善保管Key,避免泄露。API Key是使用LangSmith的前提。
b.代码示例
---
# 1. 注册LangSmith账号
# 访问 https://smith.langchain.com
# 点击"Sign Up"注册
# 验证邮箱
# 2. 创建API Key
# 登录后点击右上角头像
# 选择"Settings" -> "API Keys"
# 点击"Create API Key"
# 输入Key名称(如"development")
# 复制生成的Key(只显示一次)
# 3. 在代码中配置
import os
# 方式1:环境变量(推荐)
os.environ["LANGCHAIN_API_KEY"] = "ls_xxx..."
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "my_project"
# 方式2:.env文件
# 创建.env文件
"""
LANGCHAIN_API_KEY=ls_xxx...
LANGCHAIN_TRACING_V2=true
LANGCHAIN_PROJECT=my_project
"""
# 使用python-dotenv加载
from dotenv import load_dotenv
load_dotenv()
# 方式3:配置文件
import json
config = {
"api_key": "ls_xxx...",
"tracing": True,
"project": "my_project"
}
with open("langsmith_config.json", "w") as f:
json.dump(config, f)
# 读取配置
with open("langsmith_config.json", "r") as f:
config = json.load(f)
os.environ["LANGCHAIN_API_KEY"] = config["api_key"]
# 4. 验证配置
from langsmith import Client
try:
client = Client()
# 测试连接
projects = list(client.list_projects(limit=1))
print("✓ LangSmith配置成功")
except Exception as e:
print(f"✗ 配置失败:{e}")
# 5. 多环境配置
import os
ENV = os.getenv("APP_ENV", "development")
if ENV == "development":
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_DEV_KEY")
os.environ["LANGCHAIN_PROJECT"] = "dev"
elif ENV == "production":
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_PROD_KEY")
os.environ["LANGCHAIN_PROJECT"] = "prod"
# 6. Key轮换
# 定期在Web界面生成新Key
# 更新环境变量
# 删除旧Key
# Key管理最佳实践:
# - 使用密钥管理服务(如AWS Secrets Manager)
# - 不要提交到版本控制
# - 设置合理的权限
# - 定期轮换Key
---
b.项目设置
a.功能说明
LangSmith使用Project组织追踪数据,每个Project独立管理trace、数据集、评估等。创建不同Project区分开发、测试、生产环境。配置Project默认标签、采样率等参数。支持Project归档、删除、导出。合理的Project结构便于数据管理和权限控制。
b.代码示例
---
import os
from langsmith import Client
client = Client()
# 1. 创建Project
# 在代码中指定Project(自动创建)
os.environ["LANGCHAIN_PROJECT"] = "customer_service"
# 或通过API创建
project = client.create_project(
project_name="customer_service",
description="客服系统追踪"
)
# 2. 列出所有Project
projects = list(client.list_projects())
for proj in projects:
print(f"项目:{proj.name}")
print(f" ID:{proj.id}")
print(f" 创建时间:{proj.created_at}")
# 3. 环境隔离
# development
os.environ["LANGCHAIN_PROJECT"] = "dev"
# 开发环境的trace进入dev项目
# staging
os.environ["LANGCHAIN_PROJECT"] = "staging"
# 测试环境的trace进入staging项目
# production
os.environ["LANGCHAIN_PROJECT"] = "prod"
# 生产环境的trace进入prod项目
# 4. 动态Project
def get_project_name():
"""根据环境动态选择Project"""
env = os.getenv("APP_ENV", "dev")
user = os.getenv("USER", "unknown")
return f"{env}_{user}"
os.environ["LANGCHAIN_PROJECT"] = get_project_name()
# 5. Project配置
# 在Web界面可以配置:
# - 采样率:只追踪一定比例的请求
# - 保留期:自动删除旧数据
# - 通知设置:错误告警
# 通过API设置采样率
import random
def should_trace():
"""采样决策"""
sampling_rate = 0.1 # 10%采样率
return random.random() < sampling_rate
# 条件追踪
if should_trace():
os.environ["LANGCHAIN_TRACING_V2"] = "true"
else:
os.environ["LANGCHAIN_TRACING_V2"] = "false"
# 6. Project管理
# 归档旧Project
def archive_old_projects():
"""归档3个月前的Project"""
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=90)
projects = client.list_projects()
for proj in projects:
if proj.created_at < cutoff:
# 导出数据
export_project_data(proj.name)
# 删除Project
client.delete_project(project_id=proj.id)
print(f"已归档:{proj.name}")
def export_project_data(project_name: str):
"""导出Project数据"""
runs = client.list_runs(
project_name=project_name,
limit=10000
)
data = []
for run in runs:
data.append({
"id": str(run.id),
"name": run.name,
"inputs": run.inputs,
"outputs": run.outputs,
"created_at": run.start_time.isoformat()
})
import json
with open(f"{project_name}_export.json", "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
---
02.本地部署
a.企业私有化
a.功能说明
LangSmith支持企业私有化部署,数据存储在企业内部。使用Docker容器化部署LangSmith服务。配置企业数据库(PostgreSQL)存储追踪数据。集成企业SSO实现统一身份认证。配置网络策略,确保数据不出内网。私有化部署满足数据合规和安全要求,适用于金融、政府等敏感行业。
b.代码示例
---
# 1. 企业部署架构
"""
+------------------+
| LangChain Apps |
+------------------+
|
| HTTP/gRPC
v
+------------------+
| LangSmith Server |(私有部署)
+------------------+
|
v
+------------------+
| PostgreSQL DB |(企业数据库)
+------------------+
"""
# 2. Docker Compose部署
# docker-compose.yml
"""
version: '3.8'
services:
langsmith:
image: langchain/langsmith:latest
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/langsmith
- SECRET_KEY=your_secret_key
- [email protected]
depends_on:
- db
db:
image: postgres:14
environment:
- POSTGRES_USER=langsmith
- POSTGRES_PASSWORD=secure_password
- POSTGRES_DB=langsmith
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
"""
# 启动服务
# docker-compose up -d
# 3. 应用配置(连接私有部署)
import os
# 配置私有LangSmith地址
os.environ["LANGCHAIN_ENDPOINT"] = "http://langsmith.company.internal:8000"
os.environ["LANGCHAIN_API_KEY"] = "internal_api_key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "my_project"
# 正常使用(数据发送到内部服务)
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
result = llm.invoke("test")
# 4. 信创环境适配
# 使用达梦数据库替代PostgreSQL
# docker-compose-dm.yml
"""
version: '3.8'
services:
langsmith:
image: langchain/langsmith:dm-compatible
environment:
- DATABASE_URL=dm://SYSDBA:SYSDBA@dm:5236/LANGSMITH
- DB_TYPE=dameng
dm:
image: dameng/dm8:latest
ports:
- "5236:5236"
environment:
- INSTANCE_NAME=LANGSMITH
volumes:
- dmdata:/opt/dmdbms/data
volumes:
dmdata:
"""
# 5. 麒麟系统部署
# 在麒麟服务器上安装Docker
# sudo yum install docker-ce
# 加载信创镜像
# docker load -i langsmith-kylin.tar
# 运行容器
# docker run -d --name langsmith \
# -p 8000:8000 \
# -e DATABASE_URL=dm://... \
# langsmith-kylin:latest
# 6. 高可用部署
# 使用Kubernetes部署多副本
# langsmith-deployment.yaml
"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: langsmith
spec:
replicas: 3
selector:
matchLabels:
app: langsmith
template:
metadata:
labels:
app: langsmith
spec:
containers:
- name: langsmith
image: langsmith:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: langsmith-secret
key: db_url
---
apiVersion: v1
kind: Service
metadata:
name: langsmith
spec:
type: LoadBalancer
selector:
app: langsmith
ports:
- port: 80
targetPort: 8000
"""
# 部署
# kubectl apply -f langsmith-deployment.yaml
# 7. 监控和维护
# 查看日志
# docker logs langsmith
# 备份数据库
# docker exec db pg_dump langsmith > backup.sql
# 更新版本
# docker pull langchain/langsmith:latest
# docker-compose up -d
---
b.离线使用
a.功能说明
完全离线环境下使用LangSmith功能。禁用云端追踪,使用本地存储。追踪数据保存到本地数据库或文件。使用本地UI查看trace和分析数据。定期导出数据用于备份和分析。离线模式适用于无网络环境、数据敏感场景,保障业务连续性和数据安全。
b.代码示例
---
import os
import json
from datetime import datetime
# 1. 禁用云端追踪
os.environ["LANGCHAIN_TRACING_V2"] = "false"
# 2. 本地文件存储
class LocalTracer:
"""本地追踪器"""
def __init__(self, storage_dir: str = "./traces"):
self.storage_dir = storage_dir
os.makedirs(storage_dir, exist_ok=True)
def log_run(self, run_data: dict):
"""记录运行"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.storage_dir}/run_{timestamp}.json"
with open(filename, "w") as f:
json.dump(run_data, f, indent=2, ensure_ascii=False)
def list_runs(self):
"""列出所有运行"""
import glob
files = glob.glob(f"{self.storage_dir}/run_*.json")
runs = []
for file in files:
with open(file, "r") as f:
runs.append(json.load(f))
return runs
# 使用本地追踪
local_tracer = LocalTracer()
# 3. 自定义追踪装饰器
def local_traceable(name: str = None):
"""本地追踪装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = datetime.now()
try:
result = func(*args, **kwargs)
status = "success"
error = None
except Exception as e:
result = None
status = "error"
error = str(e)
raise
finally:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 记录运行数据
run_data = {
"name": name or func.__name__,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration": duration,
"inputs": {"args": str(args), "kwargs": str(kwargs)},
"outputs": str(result),
"status": status,
"error": error
}
local_tracer.log_run(run_data)
return result
return wrapper
return decorator
# 使用
@local_traceable(name="process_data")
def process_data(text: str):
"""处理数据"""
return text.upper()
result = process_data("hello")
# 4. SQLite本地存储
import sqlite3
class SQLiteTracer:
"""SQLite追踪器"""
def __init__(self, db_path: str = "traces.db"):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
"""初始化数据库"""
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
start_time TEXT,
duration REAL,
inputs TEXT,
outputs TEXT,
status TEXT,
error TEXT
)
""")
self.conn.commit()
def log_run(self, run_data: dict):
"""记录运行"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO runs
(name, start_time, duration, inputs, outputs, status, error)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
run_data["name"],
run_data["start_time"],
run_data["duration"],
json.dumps(run_data["inputs"]),
run_data["outputs"],
run_data["status"],
run_data.get("error")
))
self.conn.commit()
def query_runs(self, limit: int = 100):
"""查询运行记录"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM runs
ORDER BY id DESC
LIMIT ?
""", (limit,))
return cursor.fetchall()
sqlite_tracer = SQLiteTracer()
# 5. 本地UI
# 使用Streamlit创建本地查看界面
"""
# trace_viewer.py
import streamlit as st
import sqlite3
import pandas as pd
st.title("本地Trace查看器")
conn = sqlite3.connect("traces.db")
df = pd.read_sql_query("SELECT * FROM runs", conn)
st.dataframe(df)
# 详情查看
if st.selectbox("选择运行", df["id"]):
run_id = st.selectbox("选择运行", df["id"])
run = df[df["id"] == run_id].iloc[0]
st.json({
"名称": run["name"],
"状态": run["status"],
"耗时": f"{run['duration']:.3f}秒",
"输入": run["inputs"],
"输出": run["outputs"]
})
"""
# 运行UI
# streamlit run trace_viewer.py
# 6. 数据导出
def export_traces(output_file: str = "traces_export.json"):
"""导出所有trace"""
runs = local_tracer.list_runs()
with open(output_file, "w") as f:
json.dump(runs, f, indent=2, ensure_ascii=False)
print(f"已导出{len(runs)}条trace到{output_file}")
export_traces()
---
2 追踪调试
2.1 自动追踪
01.环境变量配置
a.基础配置
a.功能说明
通过设置环境变量启用LangSmith自动追踪,无需修改代码。LANGCHAIN_TRACING_V2设置为true启用追踪。LANGCHAIN_API_KEY配置认证密钥。LANGCHAIN_PROJECT指定项目名称。LANGCHAIN_ENDPOINT配置服务地址(私有部署时使用)。自动追踪捕获所有LangChain组件的执行,零侵入集成现有应用。
b.代码示例
---
import os
# 1. 最小配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_api_key"
os.environ["LANGCHAIN_PROJECT"] = "my_project"
# 现在所有LangChain调用都会被追踪
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
result = llm.invoke("Hello") # 自动追踪
# 2. 完整配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_KEY")
os.environ["LANGCHAIN_PROJECT"] = "production"
# 3. 条件追踪
import sys
# 只在开发环境追踪
if "--debug" in sys.argv:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
else:
os.environ["LANGCHAIN_TRACING_V2"] = "false"
# 4. 动态开关
def enable_tracing(enable: bool = True):
"""动态启用/禁用追踪"""
os.environ["LANGCHAIN_TRACING_V2"] = "true" if enable else "false"
# 开发时启用
enable_tracing(True)
# 生产环境可选择性启用
enable_tracing(os.getenv("ENABLE_TRACE", "false") == "true")
# 5. 多项目配置
class ProjectConfig:
"""项目配置管理"""
def __init__(self):
self.projects = {
"dev": "development_project",
"test": "testing_project",
"prod": "production_project"
}
def set_project(self, env: str):
"""设置当前项目"""
project = self.projects.get(env, "default")
os.environ["LANGCHAIN_PROJECT"] = project
config = ProjectConfig()
config.set_project("dev")
# 6. 验证配置
def verify_tracing_config():
"""验证追踪配置"""
required = ["LANGCHAIN_API_KEY", "LANGCHAIN_PROJECT"]
missing = []
for key in required:
if not os.getenv(key):
missing.append(key)
if missing:
print(f"❌ 缺少配置:{', '.join(missing)}")
return False
if os.getenv("LANGCHAIN_TRACING_V2") != "true":
print("⚠️ 追踪未启用")
return False
print("✓ 追踪配置正确")
return True
verify_tracing_config()
---
b.Chain追踪
a.功能说明
LangSmith自动追踪LangChain的Chain执行,记录完整调用链路。捕获输入、输出、中间步骤、提示词、模型响应等。展示Chain的嵌套结构和数据流转。统计Token使用量、执行时间、成本估算。Chain追踪帮助理解复杂工作流,优化性能和成本。
b.代码示例
---
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain, SequentialChain
# 配置追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "chain_tracing"
# 1. 简单Chain追踪
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = ChatPromptTemplate.from_template(
"将以下文本翻译成{language}:{text}"
)
chain = LLMChain(llm=llm, prompt=prompt)
# 执行会自动追踪
result = chain.run(language="英文", text="你好世界")
# 在LangSmith查看:
# - Chain名称和类型
# - 输入参数(language, text)
# - 格式化后的提示词
# - LLM响应
# - 输出结果
# - Token使用(prompt tokens, completion tokens)
# - 执行时间
# 2. Sequential Chain追踪
# 第一步:生成大纲
outline_prompt = ChatPromptTemplate.from_template(
"为主题'{topic}'生成文章大纲"
)
outline_chain = LLMChain(llm=llm, prompt=outline_prompt, output_key="outline")
# 第二步:撰写内容
content_prompt = ChatPromptTemplate.from_template(
"根据大纲撰写文章:{outline}"
)
content_chain = LLMChain(llm=llm, prompt=content_prompt, output_key="content")
# 组合Sequential Chain
overall_chain = SequentialChain(
chains=[outline_chain, content_chain],
input_variables=["topic"],
output_variables=["outline", "content"],
verbose=True
)
# 执行
result = overall_chain({"topic": "人工智能"})
# 在LangSmith查看嵌套结构:
# SequentialChain
# ├─ outline_chain
# │ ├─ Prompt formatting
# │ ├─ LLM call
# │ └─ Output: outline
# └─ content_chain
# ├─ Prompt formatting
# ├─ LLM call
# └─ Output: content
# 3. 带Memory的Chain
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
conversation_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好的AI助手"),
("placeholder", "{chat_history}"),
("human", "{input}")
])
conversation_chain = LLMChain(
llm=llm,
prompt=conversation_prompt,
memory=memory
)
# 多轮对话
conversation_chain.run("我叫张三")
conversation_chain.run("我的名字是什么?")
# 追踪显示:
# - 每轮对话的完整上下文
# - Memory状态变化
# - 上下文累积的Token消耗
# 4. 自定义Chain名称
custom_chain = LLMChain(
llm=llm,
prompt=prompt,
metadata={"chain_name": "translation_chain"} # 自定义名称
)
result = custom_chain.run(language="日语", text="谢谢")
# 5. Chain失败追踪
try:
# 故意传入错误参数
chain.run(text="测试") # 缺少language参数
except Exception as e:
# 错误会被追踪,包含堆栈信息
print(f"错误:{e}")
---
02.组件追踪
a.Agent追踪
a.功能说明
Agent执行涉及多步推理和工具调用,追踪尤为重要。LangSmith记录Agent的思考过程、决策逻辑、工具选择、执行结果。可视化Agent的ReAct循环(Reasoning-Action-Observation)。追踪每个工具的调用参数和返回值。分析Agent的执行效率,识别优化机会。
b.代码示例
---
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.chat_models import ChatOpenAI
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
# 1. 创建工具
def search_tool(query: str) -> str:
"""搜索工具"""
# 模拟搜索
results = {
"Python": "Python是一种高级编程语言",
"AI": "人工智能是计算机科学的分支"
}
return results.get(query, f"未找到'{query}'的信息")
def calculator_tool(expression: str) -> str:
"""计算器工具"""
try:
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算错误:{e}"
tools = [
Tool(
name="search",
func=search_tool,
description="搜索知识库,输入:搜索词"
),
Tool(
name="calculator",
func=calculator_tool,
description="执行数学计算,输入:数学表达式"
)
]
# 2. 创建Agent
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个AI助手,可以使用工具回答问题"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
# 3. 执行Agent(自动追踪)
result = agent_executor.invoke({
"input": "搜索Python的信息,然后计算 100 + 200"
})
# 在LangSmith查看:
# AgentExecutor
# ├─ Iteration 1
# │ ├─ LLM思考(决定调用search工具)
# │ ├─ Tool: search("Python")
# │ └─ Observation: "Python是一种..."
# ├─ Iteration 2
# │ ├─ LLM思考(决定调用calculator工具)
# │ ├─ Tool: calculator("100 + 200")
# │ └─ Observation: "计算结果:300"
# └─ Final Answer
# 4. 复杂Agent追踪
result = agent_executor.invoke({
"input": "搜索AI的信息,如果找到了,计算信息长度乘以2"
})
# 追踪显示:
# - Agent如何解析任务
# - 工具调用顺序
# - 每步的输入输出
# - 最终如何得出答案
# 5. Agent失败分析
result = agent_executor.invoke({
"input": "计算一个非常复杂的数学问题:sin(cos(tan(123)))"
})
# 如果工具失败,追踪显示:
# - 错误发生在哪个工具
# - 错误信息
# - Agent如何处理错误
# - 是否重试或切换策略
# 6. 自定义Agent追踪标签
from langchain.callbacks import LangChainTracer
tracer = LangChainTracer(
project_name="agent_experiments",
tags=["multi_tool", "complex_reasoning"]
)
result = agent_executor.invoke(
{"input": "测试问题"},
config={"callbacks": [tracer]}
)
---
b.Tool追踪
a.功能说明
工具是Agent的执行单元,追踪工具调用有助于优化工具设计。记录工具的输入参数、执行时间、返回结果、错误信息。统计工具使用频率,识别热点工具。分析工具性能,发现慢查询。追踪工具失败原因,改进错误处理。工具追踪提升Agent的可靠性和效率。
b.代码示例
---
from langchain.tools import StructuredTool
from langsmith import traceable
import time
# 1. 基础工具追踪
@traceable(run_type="tool", name="database_query")
def query_database(query: str) -> str:
"""查询数据库工具(自动追踪)"""
time.sleep(0.5) # 模拟数据库查询
results = [
{"id": 1, "name": "用户A"},
{"id": 2, "name": "用户B"}
]
return str(results)
# 工具调用会被追踪
result = query_database("SELECT * FROM users")
# 2. 结构化工具
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="搜索关键词")
max_results: int = Field(default=10, description="最大结果数")
@traceable(run_type="tool")
def advanced_search(query: str, max_results: int = 10) -> str:
"""高级搜索工具"""
# 搜索逻辑
results = [f"结果{i}" for i in range(max_results)]
return f"找到{len(results)}条结果"
search_tool = StructuredTool.from_function(
func=advanced_search,
name="search",
description="搜索信息",
args_schema=SearchInput
)
# 3. 工具性能监控
@traceable(name="slow_api_call")
def call_external_api(endpoint: str) -> dict:
"""调用外部API(追踪性能)"""
import requests
start = time.time()
try:
response = requests.get(endpoint, timeout=5)
data = response.json()
# 记录性能指标
elapsed = time.time() - start
print(f"API调用耗时:{elapsed:.2f}秒")
return data
except Exception as e:
print(f"API调用失败:{e}")
raise
# 4. 工具错误追踪
@traceable(name="risky_operation")
def risky_tool(data: str) -> str:
"""可能失败的工具"""
try:
# 可能出错的操作
if not data:
raise ValueError("数据为空")
processed = process_data(data)
return processed
except ValueError as e:
# 错误会被追踪
print(f"验证错误:{e}")
raise
except Exception as e:
print(f"未知错误:{e}")
raise
# 5. 批量工具调用追踪
@traceable(name="batch_processing")
def batch_tool(items: list) -> list:
"""批量处理工具"""
results = []
for i, item in enumerate(items):
# 每个item的处理会作为子span
result = process_single_item(item, index=i)
results.append(result)
return results
@traceable(name="process_item")
def process_single_item(item: str, index: int) -> str:
"""处理单个item"""
return f"处理完成:{item}"
# 追踪显示批量操作的详细过程
batch_tool(["item1", "item2", "item3"])
# 6. 工具使用统计
from langsmith import Client
client = Client()
# 查询工具使用情况
runs = client.list_runs(
project_name="agent_experiments",
filter='eq(run_type, "tool")',
limit=1000
)
# 统计每个工具的调用次数
tool_counts = {}
tool_errors = {}
for run in runs:
tool_name = run.name
tool_counts[tool_name] = tool_counts.get(tool_name, 0) + 1
if run.status == "error":
tool_errors[tool_name] = tool_errors.get(tool_name, 0) + 1
print("工具使用统计:")
for tool, count in sorted(tool_counts.items(), key=lambda x: x[1], reverse=True):
errors = tool_errors.get(tool, 0)
error_rate = errors / count if count > 0 else 0
print(f" {tool}: {count}次调用, 错误率{error_rate:.1%}")
---
2.2 手动追踪
01.装饰器追踪
a.@traceable装饰器
a.功能说明
使用@traceable装饰器手动标记需要追踪的函数。适用于自定义函数、业务逻辑、非LangChain组件。指定run_type(llm、chain、tool等)分类运行。设置name自定义显示名称。添加tags分组管理。装饰器追踪扩展LangSmith能力,追踪完整应用流程。
b.代码示例
---
from langsmith import traceable
import time
# 1. 基础追踪
@traceable
def my_function(text: str) -> str:
"""自定义函数(追踪)"""
processed = text.upper()
return processed
result = my_function("hello")
# 函数执行会出现在LangSmith
# 2. 指定run_type
@traceable(run_type="chain", name="数据处理流程")
def data_pipeline(data: dict) -> dict:
"""数据处理流程"""
# 清洗
cleaned = clean_data(data)
# 转换
transformed = transform_data(cleaned)
# 验证
validated = validate_data(transformed)
return validated
@traceable(run_type="tool", name="数据清洗")
def clean_data(data: dict) -> dict:
"""清洗数据"""
return {k: v for k, v in data.items() if v is not None}
@traceable(run_type="tool", name="数据转换")
def transform_data(data: dict) -> dict:
"""转换数据"""
return {k: str(v).upper() for k, v in data.items()}
@traceable(run_type="tool", name="数据验证")
def validate_data(data: dict) -> dict:
"""验证数据"""
assert len(data) > 0, "数据为空"
return data
# 执行
result = data_pipeline({"name": "test", "value": 123, "empty": None})
# LangSmith显示嵌套调用树:
# 数据处理流程 (chain)
# ├─ 数据清洗 (tool)
# ├─ 数据转换 (tool)
# └─ 数据验证 (tool)
# 3. 添加标签
@traceable(
name="critical_operation",
tags=["production", "high_priority"]
)
def important_function(data: str):
"""重要函数(带标签)"""
return process(data)
# 可以通过标签过滤查询
# 4. 元数据注入
@traceable(name="with_metadata")
def function_with_metadata(user_id: str, action: str):
"""带元数据的函数"""
from langsmith.run_helpers import get_current_run_tree
# 获取当前run
run = get_current_run_tree()
if run:
# 添加自定义元数据
run.extra = {
"user_id": user_id,
"action": action,
"timestamp": time.time()
}
result = perform_action(action)
return result
# 5. 异步函数追踪
import asyncio
@traceable(name="async_operation")
async def async_function(data: str) -> str:
"""异步函数追踪"""
await asyncio.sleep(1)
return data.upper()
# 异步执行
result = asyncio.run(async_function("test"))
# 6. 类方法追踪
class DataProcessor:
"""数据处理器"""
@traceable(name="process_method")
def process(self, data: str) -> str:
"""处理方法(追踪)"""
return self._internal_process(data)
@traceable(name="internal_process")
def _internal_process(self, data: str) -> str:
"""内部处理"""
return data.lower()
processor = DataProcessor()
result = processor.process("TEST")
# 7. 条件追踪
def conditional_traceable(enable: bool = True):
"""条件性追踪装饰器"""
def decorator(func):
if enable:
return traceable(func)
else:
return func
return decorator
# 开发环境追踪
@conditional_traceable(enable=True)
def dev_function(data):
return process(data)
# 生产环境不追踪(性能优化)
@conditional_traceable(enable=False)
def prod_function(data):
return process(data)
---
b.上下文管理
a.功能说明
使用RunTree手动管理追踪上下文,精细控制追踪生命周期。创建根运行和子运行,构建调用树。设置运行的输入、输出、元数据。手动结束运行,记录执行时间。上下文管理适用于复杂场景,如循环、条件分支、动态调用等。
b.代码示例
---
from langsmith import Client, RunTree
from langsmith.run_helpers import get_current_run_tree
client = Client()
# 1. 手动创建Run
root_run = RunTree(
name="manual_process",
run_type="chain",
inputs={"input": "test data"},
project_name="manual_tracing"
)
# 执行操作
result = "processed data"
# 结束Run
root_run.end(outputs={"output": result})
root_run.post() # 上传到LangSmith
# 2. 嵌套Run
root = RunTree(
name="parent_operation",
run_type="chain",
inputs={"data": "input"},
project_name="manual_tracing"
)
# 子Run 1
child1 = root.create_child(
name="step1",
run_type="tool",
inputs={"step": 1}
)
# 执行步骤1
step1_result = "step1 done"
child1.end(outputs={"result": step1_result})
# 子Run 2
child2 = root.create_child(
name="step2",
run_type="tool",
inputs={"step": 2, "previous": step1_result}
)
# 执行步骤2
step2_result = "step2 done"
child2.end(outputs={"result": step2_result})
# 结束父Run
root.end(outputs={"final": "all done"})
root.post()
# 3. 循环中的追踪
root = RunTree(
name="batch_processing",
run_type="chain",
inputs={"items": ["a", "b", "c"]},
project_name="manual_tracing"
)
results = []
for i, item in enumerate(["a", "b", "c"]):
# 为每个item创建子Run
child = root.create_child(
name=f"process_item_{i}",
run_type="tool",
inputs={"item": item, "index": i}
)
# 处理
processed = item.upper()
results.append(processed)
# 结束子Run
child.end(outputs={"result": processed})
# 结束批处理
root.end(outputs={"results": results})
root.post()
# 4. 错误处理
root = RunTree(
name="operation_with_error",
run_type="chain",
inputs={"data": "test"},
project_name="manual_tracing"
)
try:
# 可能出错的操作
result = risky_operation("test")
root.end(outputs={"result": result})
except Exception as e:
# 记录错误
root.end(
outputs={"error": str(e)},
error=str(e)
)
finally:
root.post()
# 5. 动态调用追踪
def dynamic_workflow(steps: list):
"""动态工作流追踪"""
root = RunTree(
name="dynamic_workflow",
run_type="chain",
inputs={"steps": steps},
project_name="manual_tracing"
)
results = {}
for step in steps:
# 动态创建子Run
child = root.create_child(
name=step["name"],
run_type=step.get("type", "tool"),
inputs=step.get("inputs", {})
)
# 执行步骤
result = execute_step(step)
results[step["name"]] = result
child.end(outputs={"result": result})
root.end(outputs=results)
root.post()
return results
# 执行动态工作流
steps = [
{"name": "load", "type": "tool", "inputs": {"source": "db"}},
{"name": "process", "type": "chain", "inputs": {"data": "..."}},
{"name": "save", "type": "tool", "inputs": {"dest": "file"}}
]
results = dynamic_workflow(steps)
# 6. 获取当前Run上下文
@traceable(name="parent")
def parent_function():
"""父函数"""
# 获取当前Run
current_run = get_current_run_tree()
if current_run:
# 添加元数据
current_run.extra = {"custom": "metadata"}
# 调用子函数
child_function()
@traceable(name="child")
def child_function():
"""子函数"""
# 也可以获取当前Run
current_run = get_current_run_tree()
if current_run:
print(f"当前Run: {current_run.name}")
parent_function()
---
02.批量追踪
a.批量上传
a.功能说明
批量上传多个运行记录到LangSmith,减少网络请求次数。收集本地运行数据,定期批量提交。适用于离线场景、高频调用、性能优化。设置缓冲区大小和提交间隔。批量追踪提升系统性能,降低网络开销。
b.代码示例
---
from langsmith import Client, RunTree
import time
from typing import List
client = Client()
# 1. 批量创建Run
runs: List[RunTree] = []
for i in range(100):
run = RunTree(
name=f"batch_run_{i}",
run_type="chain",
inputs={"index": i},
project_name="batch_tracing"
)
# 模拟执行
result = f"result_{i}"
run.end(outputs={"result": result})
# 添加到批次
runs.append(run)
# 批量上传
for run in runs:
run.post()
print(f"批量上传{len(runs)}个运行记录")
# 2. 缓冲批量上传
class BufferedTracer:
"""缓冲追踪器"""
def __init__(self, buffer_size: int = 10):
self.buffer_size = buffer_size
self.buffer: List[RunTree] = []
def add_run(self, run: RunTree):
"""添加运行记录"""
self.buffer.append(run)
# 达到缓冲大小时上传
if len(self.buffer) >= self.buffer_size:
self.flush()
def flush(self):
"""上传缓冲的运行"""
if not self.buffer:
return
for run in self.buffer:
run.post()
print(f"上传{len(self.buffer)}个运行记录")
self.buffer.clear()
def __del__(self):
"""析构时上传剩余记录"""
self.flush()
# 使用缓冲追踪
tracer = BufferedTracer(buffer_size=20)
for i in range(50):
run = RunTree(
name=f"buffered_run_{i}",
run_type="chain",
inputs={"index": i},
project_name="buffered_tracing"
)
run.end(outputs={"result": i})
tracer.add_run(run)
# 确保上传剩余
tracer.flush()
# 3. 定时批量上传
import threading
class ScheduledTracer:
"""定时批量追踪器"""
def __init__(self, interval: float = 5.0):
self.interval = interval
self.buffer: List[RunTree] = []
self.lock = threading.Lock()
self.timer = None
self.start_timer()
def add_run(self, run: RunTree):
"""添加运行记录"""
with self.lock:
self.buffer.append(run)
def start_timer(self):
"""启动定时器"""
self.timer = threading.Timer(self.interval, self.upload)
self.timer.daemon = True
self.timer.start()
def upload(self):
"""定时上传"""
with self.lock:
if self.buffer:
for run in self.buffer:
run.post()
print(f"定时上传{len(self.buffer)}个记录")
self.buffer.clear()
# 重新启动定时器
self.start_timer()
def stop(self):
"""停止追踪器"""
if self.timer:
self.timer.cancel()
self.upload() # 上传剩余
# 使用定时追踪
scheduled_tracer = ScheduledTracer(interval=10.0)
for i in range(100):
run = RunTree(
name=f"scheduled_run_{i}",
run_type="chain",
inputs={"index": i},
project_name="scheduled_tracing"
)
run.end(outputs={"result": i})
scheduled_tracer.add_run(run)
time.sleep(0.1)
# 停止并上传剩余
scheduled_tracer.stop()
# 4. 异步批量上传
import asyncio
from typing import List
class AsyncBatchTracer:
"""异步批量追踪器"""
def __init__(self, batch_size: int = 50):
self.batch_size = batch_size
self.queue: List[RunTree] = []
async def add_run(self, run: RunTree):
"""异步添加运行"""
self.queue.append(run)
if len(self.queue) >= self.batch_size:
await self.flush()
async def flush(self):
"""异步上传"""
if not self.queue:
return
# 模拟异步上传
tasks = [self._upload_one(run) for run in self.queue]
await asyncio.gather(*tasks)
print(f"异步上传{len(self.queue)}个记录")
self.queue.clear()
async def _upload_one(self, run: RunTree):
"""上传单个记录"""
await asyncio.sleep(0.01) # 模拟网络延迟
run.post()
# 使用异步追踪
async def async_tracing():
tracer = AsyncBatchTracer(batch_size=20)
for i in range(100):
run = RunTree(
name=f"async_run_{i}",
run_type="chain",
inputs={"index": i},
project_name="async_tracing"
)
run.end(outputs={"result": i})
await tracer.add_run(run)
await tracer.flush()
asyncio.run(async_tracing())
# 5. 信创环境批量追踪
# 达梦数据库本地缓存
import dmPython
class DmBatchTracer:
"""达梦数据库批量追踪器"""
def __init__(self, connection_string: str, batch_size: int = 100):
self.conn = dmPython.connect(connection_string)
self.batch_size = batch_size
self.buffer = []
self._init_table()
def _init_table(self):
"""初始化表"""
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS trace_buffer (
id INTEGER IDENTITY(1,1) PRIMARY KEY,
run_data TEXT,
uploaded INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def add_run(self, run: RunTree):
"""添加运行到本地缓存"""
import json
cursor = self.conn.cursor()
cursor.execute(
"INSERT INTO trace_buffer (run_data) VALUES (?)",
(json.dumps(run.__dict__, default=str),)
)
self.conn.commit()
self.buffer.append(run)
if len(self.buffer) >= self.batch_size:
self.upload_batch()
def upload_batch(self):
"""批量上传"""
for run in self.buffer:
run.post()
# 标记为已上传
cursor = self.conn.cursor()
cursor.execute(
"UPDATE trace_buffer SET uploaded = 1 WHERE uploaded = 0"
)
self.conn.commit()
print(f"批量上传{len(self.buffer)}条记录")
self.buffer.clear()
# dm_tracer = DmBatchTracer(
# "dm://SYSDBA:SYSDBA@localhost:5236/TRACE",
# batch_size=50
# )
---
2.3 运行记录
01.查询运行
a.API查询
a.功能说明
使用LangSmith API查询运行记录,支持过滤、排序、分页等操作。按项目、时间范围、状态、标签等条件筛选。获取运行的详细信息(输入、输出、元数据等)。统计分析运行数据,生成报表。API查询实现自动化监控、数据导出、自定义分析等功能。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
client = Client()
# 1. 基础查询
# 获取最近10条运行
runs = client.list_runs(
project_name="my_project",
limit=10
)
for run in runs:
print(f"运行ID: {run.id}")
print(f"名称: {run.name}")
print(f"状态: {run.status}")
print(f"开始时间: {run.start_time}")
print()
# 2. 时间范围查询
# 查询过去7天的运行
week_ago = datetime.now() - timedelta(days=7)
runs = client.list_runs(
project_name="my_project",
start_time=week_ago,
limit=100
)
print(f"过去7天运行数:{sum(1 for _ in runs)}")
# 3. 状态过滤
# 只查询成功的运行
success_runs = client.list_runs(
project_name="my_project",
filter='eq(status, "success")',
limit=50
)
# 只查询错误的运行
error_runs = client.list_runs(
project_name="my_project",
filter='eq(status, "error")',
limit=50
)
print(f"成功: {sum(1 for _ in success_runs)}")
print(f"错误: {sum(1 for _ in error_runs)}")
# 4. 复杂过滤
# 查询慢查询(延迟>5秒)
slow_runs = client.list_runs(
project_name="my_project",
filter='gt(latency, 5000)', # 毫秒
limit=20
)
for run in slow_runs:
print(f"慢查询: {run.name}, 耗时: {run.latency}ms")
# 查询高Token消耗(>1000 tokens)
high_token_runs = client.list_runs(
project_name="my_project",
filter='gt(total_tokens, 1000)',
limit=20
)
# 5. 标签查询
# 查询带特定标签的运行
tagged_runs = client.list_runs(
project_name="my_project",
filter='has(tags, "production")',
limit=50
)
# 6. 获取运行详情
# 获取特定运行的完整信息
run_id = "specific_run_id"
run = client.read_run(run_id)
print("运行详情:")
print(f" 名称: {run.name}")
print(f" 类型: {run.run_type}")
print(f" 输入: {run.inputs}")
print(f" 输出: {run.outputs}")
print(f" 错误: {run.error}")
print(f" 元数据: {run.extra}")
print(f" Token统计:")
print(f" Prompt: {run.prompt_tokens}")
print(f" Completion: {run.completion_tokens}")
print(f" Total: {run.total_tokens}")
# 7. 分页查询
def fetch_all_runs(project_name: str, page_size: int = 100):
\"\"\"分页获取所有运行\"\"\"
all_runs = []
offset = 0
while True:
runs = list(client.list_runs(
project_name=project_name,
limit=page_size,
offset=offset
))
if not runs:
break
all_runs.extend(runs)
offset += page_size
print(f"已获取{len(all_runs)}条记录...")
return all_runs
all_runs = fetch_all_runs("my_project")
print(f"总计{len(all_runs)}条运行记录")
# 8. 统计查询
def analyze_runs(project_name: str):
\"\"\"分析运行统计\"\"\"
runs = client.list_runs(
project_name=project_name,
limit=1000
)
total = 0
success = 0
error = 0
total_latency = 0
total_tokens = 0
for run in runs:
total += 1
if run.status == "success":
success += 1
elif run.status == "error":
error += 1
if run.latency:
total_latency += run.latency
if run.total_tokens:
total_tokens += run.total_tokens
return {
"total": total,
"success": success,
"error": error,
"success_rate": success / total if total > 0 else 0,
"avg_latency": total_latency / total if total > 0 else 0,
"total_tokens": total_tokens
}
stats = analyze_runs("my_project")
print(f"统计信息:")
print(f" 总运行数: {stats['total']}")
print(f" 成功率: {stats['success_rate']:.1%}")
print(f" 平均延迟: {stats['avg_latency']:.0f}ms")
print(f" 总Token: {stats['total_tokens']}")
---
b.Web界面查看
a.功能说明
LangSmith Web界面提供可视化的运行记录查看。查看调用树、执行时间线、输入输出详情。搜索和过滤运行,快速定位问题。对比不同运行的差异。添加评论和标注,团队协作。导出运行数据,离线分析。Web界面是日常使用的主要方式,直观高效。
b.代码示例
---
# 访问 https://smith.langchain.com
# 1. 项目视图
# 左侧选择项目
# 显示项目的运行列表
# 可以看到:
# - 运行名称
# - 状态(成功/失败)
# - 开始时间
# - 耗时
# - Token使用量
# 2. 运行详情
# 点击任意运行查看详情
# 包含:
# - 输入输出
# - 调用树(树形结构)
# - 时间线(横向展示)
# - 元数据
# - 反馈和评论
# 3. 调用树视图
# 展示嵌套的调用关系
# 例如:
# AgentExecutor
# ├─ LLM Call (思考)
# ├─ Tool: search
# │ └─ API Call
# ├─ LLM Call (思考)
# ├─ Tool: calculator
# └─ LLM Call (最终答案)
# 4. 时间线视图
# 横向时间轴展示执行过程
# 可以看到:
# - 哪个步骤最耗时
# - 并行执行的部分
# - 等待时间
# 5. 搜索和过滤
# 顶部搜索栏
# 支持:
# - 文本搜索(搜索输入/输出内容)
# - 状态过滤(成功/失败)
# - 时间范围
# - 标签过滤
# - 运行类型(llm/chain/tool等)
# 6. 对比运行
# 选择多个运行
# 点击"Compare"
# 并排查看差异:
# - 输入的不同
# - 输出的不同
# - 性能差异
# - 成本差异
# 7. 添加反馈
# 在运行详情页
# 点击"Add Feedback"
# 可以:
# - 打分(1-5星)
# - 添加评论
# - 标记问题类型
# - @团队成员
# 8. 导出数据
# 选择运行
# 点击"Export"
# 选择格式:
# - JSON
# - CSV
# - 下载trace完整数据
# 以下是通过代码生成Web链接
from langsmith import Client
client = Client()
# 获取运行并生成链接
runs = client.list_runs(
project_name="my_project",
limit=5
)
print("运行链接:")
for run in runs:
# 构造Web界面链接
url = f"https://smith.langchain.com/o/{run.id}"
print(f" {run.name}: {url}")
# 在代码中打开浏览器查看
import webbrowser
def open_run_in_browser(run_id: str):
\"\"\"在浏览器中打开运行详情\"\"\"
url = f"https://smith.langchain.com/o/{run_id}"
webbrowser.open(url)
# 自动打开最近一次运行
latest_run = next(client.list_runs(
project_name="my_project",
limit=1
))
open_run_in_browser(str(latest_run.id))
---
02.运行分析
a.性能分析
a.功能说明
分析运行的性能指标,识别瓶颈和优化机会。统计平均延迟、P95延迟、吞吐量等。分析Token使用模式,优化成本。识别慢查询,优化提示词或模型选择。对比不同版本的性能变化。性能分析指导优化决策,提升应用效率。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
import statistics
client = Client()
# 1. 延迟分析
def analyze_latency(project_name: str):
\"\"\"分析延迟分布\"\"\"
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=7),
limit=1000
)
latencies = []
for run in runs:
if run.latency:
latencies.append(run.latency)
if not latencies:
return None
return {
"count": len(latencies),
"min": min(latencies),
"max": max(latencies),
"mean": statistics.mean(latencies),
"median": statistics.median(latencies),
"stdev": statistics.stdev(latencies) if len(latencies) > 1 else 0,
"p50": statistics.quantiles(latencies, n=2)[0],
"p95": statistics.quantiles(latencies, n=20)[18],
"p99": statistics.quantiles(latencies, n=100)[98]
}
latency_stats = analyze_latency("my_project")
print("延迟统计(ms):")
print(f" 样本数: {latency_stats['count']}")
print(f" 平均: {latency_stats['mean']:.0f}")
print(f" 中位数: {latency_stats['median']:.0f}")
print(f" P95: {latency_stats['p95']:.0f}")
print(f" P99: {latency_stats['p99']:.0f}")
print(f" 最大: {latency_stats['max']:.0f}")
# 2. Token分析
def analyze_tokens(project_name: str):
\"\"\"分析Token使用\"\"\"
runs = client.list_runs(
project_name=project_name,
limit=1000
)
total_tokens = 0
prompt_tokens = 0
completion_tokens = 0
count = 0
for run in runs:
if run.total_tokens:
total_tokens += run.total_tokens
prompt_tokens += run.prompt_tokens or 0
completion_tokens += run.completion_tokens or 0
count += 1
if count == 0:
return None
# 估算成本(GPT-4示例:输入$0.03/1K,输出$0.06/1K)
input_cost = (prompt_tokens / 1000) * 0.03
output_cost = (completion_tokens / 1000) * 0.06
total_cost = input_cost + output_cost
return {
"total_tokens": total_tokens,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"avg_tokens_per_run": total_tokens / count,
"total_cost": total_cost,
"avg_cost_per_run": total_cost / count
}
token_stats = analyze_tokens("my_project")
print("\\nToken统计:")
print(f" 总Token: {token_stats['total_tokens']:,}")
print(f" 平均Token/运行: {token_stats['avg_tokens_per_run']:.0f}")
print(f" 总成本: ${token_stats['total_cost']:.2f}")
print(f" 平均成本/运行: ${token_stats['avg_cost_per_run']:.4f}")
# 3. 错误率分析
def analyze_error_rate(project_name: str):
\"\"\"分析错误率\"\"\"
runs = client.list_runs(
project_name=project_name,
limit=1000
)
total = 0
errors = 0
error_types = {}
for run in runs:
total += 1
if run.status == "error":
errors += 1
# 统计错误类型
error_msg = run.error or "Unknown"
error_type = error_msg.split(":")[0] if ":" in error_msg else error_msg
error_types[error_type] = error_types.get(error_type, 0) + 1
error_rate = errors / total if total > 0 else 0
return {
"total": total,
"errors": errors,
"error_rate": error_rate,
"error_types": error_types
}
error_stats = analyze_error_rate("my_project")
print("\\n错误统计:")
print(f" 总运行: {error_stats['total']}")
print(f" 错误数: {error_stats['errors']}")
print(f" 错误率: {error_stats['error_rate']:.1%}")
print(" 错误类型:")
for err_type, count in sorted(error_stats['error_types'].items(), key=lambda x: x[1], reverse=True):
print(f" {err_type}: {count}")
# 4. 时间趋势分析
def analyze_time_trends(project_name: str, days: int = 7):
\"\"\"分析时间趋势\"\"\"
from collections import defaultdict
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=days),
limit=10000
)
# 按日期分组
daily_stats = defaultdict(lambda: {"count": 0, "errors": 0, "latencies": []})
for run in runs:
date = run.start_time.date()
daily_stats[date]["count"] += 1
if run.status == "error":
daily_stats[date]["errors"] += 1
if run.latency:
daily_stats[date]["latencies"].append(run.latency)
# 计算每日指标
trends = []
for date in sorted(daily_stats.keys()):
stats = daily_stats[date]
trends.append({
"date": date,
"count": stats["count"],
"error_rate": stats["errors"] / stats["count"] if stats["count"] > 0 else 0,
"avg_latency": statistics.mean(stats["latencies"]) if stats["latencies"] else 0
})
return trends
trends = analyze_time_trends("my_project", days=7)
print("\\n每日趋势:")
print(f"{'日期':<12} {'运行数':>8} {'错误率':>8} {'平均延迟':>10}")
print("-" * 45)
for t in trends:
print(f"{t['date']} {t['count']:>8} {t['error_rate']:>7.1%} {t['avg_latency']:>9.0f}ms")
# 5. 对比分析
def compare_versions(project_v1: str, project_v2: str):
\"\"\"对比两个版本\"\"\"
v1_latency = analyze_latency(project_v1)
v2_latency = analyze_latency(project_v2)
v1_tokens = analyze_tokens(project_v1)
v2_tokens = analyze_tokens(project_v2)
print("\\n版本对比:")
print(f"{'指标':<20} {'V1':>15} {'V2':>15} {'变化':>15}")
print("-" * 70)
# 延迟对比
latency_change = ((v2_latency['mean'] - v1_latency['mean']) / v1_latency['mean']) * 100
print(f"{'平均延迟(ms)':<20} {v1_latency['mean']:>15.0f} {v2_latency['mean']:>15.0f} {latency_change:>14.1f}%")
# Token对比
token_change = ((v2_tokens['avg_tokens_per_run'] - v1_tokens['avg_tokens_per_run']) / v1_tokens['avg_tokens_per_run']) * 100
print(f"{'平均Token':<20} {v1_tokens['avg_tokens_per_run']:>15.0f} {v2_tokens['avg_tokens_per_run']:>15.0f} {token_change:>14.1f}%")
# 成本对比
cost_change = ((v2_tokens['avg_cost_per_run'] - v1_tokens['avg_cost_per_run']) / v1_tokens['avg_cost_per_run']) * 100
print(f"{'平均成本($)':<20} {v1_tokens['avg_cost_per_run']:>15.4f} {v2_tokens['avg_cost_per_run']:>15.4f} {cost_change:>14.1f}%")
# compare_versions("my_project_v1", "my_project_v2")
# 6. 生成性能报告
def generate_performance_report(project_name: str):
\"\"\"生成完整性能报告\"\"\"
import json
from datetime import datetime
report = {
"project": project_name,
"generated_at": datetime.now().isoformat(),
"latency": analyze_latency(project_name),
"tokens": analyze_tokens(project_name),
"errors": analyze_error_rate(project_name),
"trends": [
{**t, "date": t["date"].isoformat()}
for t in analyze_time_trends(project_name)
]
}
# 保存报告
filename = f"{project_name}_performance_{datetime.now().strftime('%Y%m%d')}.json"
with open(filename, "w") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\\n报告已保存:{filename}")
return report
generate_performance_report("my_project")
---
2.4 日志分析
01.日志查询
a.过滤条件
a.功能说明
LangSmith支持强大的日志查询语言,使用类SQL的语法过滤运行记录。支持等于(eq)、大于(gt)、小于(lt)、包含(has)、范围(between)等操作符。可以组合多个条件(and、or、not)。过滤字段包括状态、延迟、Token、标签、时间等。高效的过滤快速定位目标运行,提升分析效率。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
client = Client()
# 1. 基础过滤
# 状态过滤
success_runs = client.list_runs(
project_name="my_project",
filter='eq(status, "success")'
)
error_runs = client.list_runs(
project_name="my_project",
filter='eq(status, "error")'
)
# 2. 数值比较
# 延迟大于3秒
slow_runs = client.list_runs(
project_name="my_project",
filter='gt(latency, 3000)'
)
# Token使用量大于500
high_token_runs = client.list_runs(
project_name="my_project",
filter='gt(total_tokens, 500)'
)
# Token使用量在100-500之间
medium_token_runs = client.list_runs(
project_name="my_project",
filter='and(gt(total_tokens, 100), lt(total_tokens, 500))'
)
# 3. 字符串匹配
# 名称包含特定文本
search_runs = client.list_runs(
project_name="my_project",
filter='contains(name, "search")'
)
# 4. 标签过滤
# 包含production标签
prod_runs = client.list_runs(
project_name="my_project",
filter='has(tags, "production")'
)
# 包含多个标签
tagged_runs = client.list_runs(
project_name="my_project",
filter='and(has(tags, "production"), has(tags, "v2"))'
)
# 5. 运行类型过滤
# 只查询LLM调用
llm_runs = client.list_runs(
project_name="my_project",
filter='eq(run_type, "llm")'
)
# 只查询工具调用
tool_runs = client.list_runs(
project_name="my_project",
filter='eq(run_type, "tool")'
)
# 6. 复合条件
# 查询生产环境的慢查询错误
complex_filter = '''
and(
eq(status, "error"),
gt(latency, 5000),
has(tags, "production")
)
'''
critical_errors = client.list_runs(
project_name="my_project",
filter=complex_filter
)
print(f"关键错误数:{sum(1 for _ in critical_errors)}")
# 7. 时间范围 + 其他条件
# 过去24小时的错误
yesterday = datetime.now() - timedelta(days=1)
recent_errors = client.list_runs(
project_name="my_project",
start_time=yesterday,
filter='eq(status, "error")'
)
# 8. 自定义查询函数
def find_runs(
project: str,
status: str = None,
min_latency: int = None,
max_latency: int = None,
tags: list = None,
limit: int = 100
):
\"\"\"通用查询函数\"\"\"
filters = []
if status:
filters.append(f'eq(status, "{status}")')
if min_latency:
filters.append(f'gt(latency, {min_latency})')
if max_latency:
filters.append(f'lt(latency, {max_latency})')
if tags:
tag_filters = [f'has(tags, "{tag}")' for tag in tags]
filters.extend(tag_filters)
# 组合所有条件
if filters:
filter_expr = "and(" + ", ".join(filters) + ")" if len(filters) > 1 else filters[0]
else:
filter_expr = None
return client.list_runs(
project_name=project,
filter=filter_expr,
limit=limit
)
# 使用通用查询
runs = find_runs(
project="my_project",
status="error",
min_latency=2000,
tags=["production"]
)
print(f"查询结果:{sum(1 for _ in runs)}条")
---
b.日志聚合
a.功能说明
聚合多条运行记录,进行统计分析。按时间段、标签、运行类型等维度分组。计算每组的统计指标(计数、平均值、百分位数等)。生成图表和报表。日志聚合揭示系统整体状况和趋势,支持决策制定。
b.代码示例
---
from langsmith import Client
from collections import defaultdict
from datetime import datetime, timedelta
import statistics
client = Client()
# 1. 按日期聚合
def aggregate_by_date(project_name: str, days: int = 7):
\"\"\"按日期聚合统计\"\"\"
start_time = datetime.now() - timedelta(days=days)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=10000
)
# 按日期分组
daily_data = defaultdict(lambda: {
"runs": 0,
"successes": 0,
"errors": 0,
"latencies": [],
"tokens": 0
})
for run in runs:
date = run.start_time.date()
daily_data[date]["runs"] += 1
if run.status == "success":
daily_data[date]["successes"] += 1
elif run.status == "error":
daily_data[date]["errors"] += 1
if run.latency:
daily_data[date]["latencies"].append(run.latency)
if run.total_tokens:
daily_data[date]["tokens"] += run.total_tokens
# 计算每日统计
results = []
for date in sorted(daily_data.keys()):
data = daily_data[date]
results.append({
"date": date.isoformat(),
"total_runs": data["runs"],
"success_rate": data["successes"] / data["runs"] if data["runs"] > 0 else 0,
"error_rate": data["errors"] / data["runs"] if data["runs"] > 0 else 0,
"avg_latency": statistics.mean(data["latencies"]) if data["latencies"] else 0,
"total_tokens": data["tokens"]
})
return results
daily_report = aggregate_by_date("my_project", days=7)
print("\\n每日统计:")
for day in daily_report:
print(f"{day['date']}:")
print(f" 运行数: {day['total_runs']}")
print(f" 成功率: {day['success_rate']:.1%}")
print(f" 平均延迟: {day['avg_latency']:.0f}ms")
print(f" Token: {day['total_tokens']:,}")
print()
# 2. 按标签聚合
def aggregate_by_tags(project_name: str):
\"\"\"按标签聚合\"\"\"
runs = client.list_runs(
project_name=project_name,
limit=1000
)
tag_stats = defaultdict(lambda: {
"count": 0,
"errors": 0,
"total_latency": 0
})
for run in runs:
tags = run.tags or []
for tag in tags:
tag_stats[tag]["count"] += 1
if run.status == "error":
tag_stats[tag]["errors"] += 1
if run.latency:
tag_stats[tag]["total_latency"] += run.latency
# 计算指标
results = []
for tag, stats in tag_stats.items():
results.append({
"tag": tag,
"count": stats["count"],
"error_rate": stats["errors"] / stats["count"],
"avg_latency": stats["total_latency"] / stats["count"]
})
# 按运行数排序
results.sort(key=lambda x: x["count"], reverse=True)
return results
tag_report = aggregate_by_tags("my_project")
print("\\n标签统计:")
for item in tag_report[:10]:
print(f"{item['tag']}:")
print(f" 运行数: {item['count']}")
print(f" 错误率: {item['error_rate']:.1%}")
print(f" 平均延迟: {item['avg_latency']:.0f}ms")
# 3. 按运行类型聚合
def aggregate_by_run_type(project_name: str):
\"\"\"按运行类型聚合\"\"\"
runs = client.list_runs(
project_name=project_name,
limit=1000
)
type_stats = defaultdict(lambda: {
"count": 0,
"latencies": [],
"tokens": []
})
for run in runs:
run_type = run.run_type or "unknown"
type_stats[run_type]["count"] += 1
if run.latency:
type_stats[run_type]["latencies"].append(run.latency)
if run.total_tokens:
type_stats[run_type]["tokens"].append(run.total_tokens)
# 生成报告
print("\\n按类型统计:")
for run_type, stats in type_stats.items():
print(f"{run_type}:")
print(f" 数量: {stats['count']}")
if stats['latencies']:
print(f" 平均延迟: {statistics.mean(stats['latencies']):.0f}ms")
if stats['tokens']:
print(f" 平均Token: {statistics.mean(stats['tokens']):.0f}")
print()
aggregate_by_run_type("my_project")
# 4. 按小时聚合(热力图数据)
def aggregate_by_hour(project_name: str):
\"\"\"按小时聚合(用于生成热力图)\"\"\"
runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=1),
limit=10000
)
hourly_data = defaultdict(int)
for run in runs:
hour = run.start_time.hour
hourly_data[hour] += 1
return dict(sorted(hourly_data.items()))
hourly_counts = aggregate_by_hour("my_project")
print("\\n每小时运行数:")
for hour, count in hourly_counts.items():
bar = "█" * (count // 10)
print(f"{hour:02d}:00 {bar} {count}")
---
02.异常分析
a.错误归类
a.功能说明
分析错误运行,归类错误原因。提取错误消息,识别常见错误模式。统计每类错误的出现频率和影响范围。追踪错误趋势,判断是否有系统性问题。优先修复高频错误和严重错误。错误归类帮助快速定位和解决问题。
b.代码示例
---
from langsmith import Client
from collections import Counter
import re
client = Client()
# 1. 收集所有错误
error_runs = client.list_runs(
project_name="my_project",
filter='eq(status, "error")',
limit=1000
)
errors = []
for run in error_runs:
errors.append({
"id": str(run.id),
"name": run.name,
"error": run.error or "Unknown",
"timestamp": run.start_time
})
# 2. 错误分类
def classify_error(error_msg: str) -> str:
\"\"\"分类错误类型\"\"\"
error_lower = error_msg.lower()
if "timeout" in error_lower or "timed out" in error_lower:
return "超时错误"
elif "connection" in error_lower or "network" in error_lower:
return "网络错误"
elif "rate limit" in error_lower or "quota" in error_lower:
return "限流错误"
elif "validation" in error_lower or "invalid" in error_lower:
return "验证错误"
elif "permission" in error_lower or "unauthorized" in error_lower:
return "权限错误"
elif "not found" in error_lower or "404" in error_lower:
return "资源不存在"
else:
return "其他错误"
# 分类统计
error_types = Counter()
error_examples = defaultdict(list)
for err in errors:
err_type = classify_error(err["error"])
error_types[err_type] += 1
# 保存前3个示例
if len(error_examples[err_type]) < 3:
error_examples[err_type].append(err)
# 3. 输出分类结果
print("\\n错误分类统计:")
print(f"{'错误类型':<15} {'数量':>8} {'占比':>8}")
print("-" * 35)
total_errors = sum(error_types.values())
for err_type, count in error_types.most_common():
percentage = (count / total_errors) * 100
print(f"{err_type:<15} {count:>8} {percentage:>7.1f}%")
# 4. 显示典型示例
print("\\n典型错误示例:")
for err_type, examples in error_examples.items():
print(f"\\n【{err_type}】")
for ex in examples[:2]:
print(f" 运行: {ex['name']}")
print(f" 消息: {ex['error'][:100]}...")
print()
# 5. 错误趋势分析
def analyze_error_trends(project_name: str, days: int = 7):
\"\"\"分析错误趋势\"\"\"
start_time = datetime.now() - timedelta(days=days)
error_runs = client.list_runs(
project_name=project_name,
start_time=start_time,
filter='eq(status, "error")',
limit=10000
)
# 按日期统计错误
daily_errors = defaultdict(lambda: defaultdict(int))
for run in error_runs:
date = run.start_time.date()
err_type = classify_error(run.error or "")
daily_errors[date][err_type] += 1
return daily_errors
trends = analyze_error_trends("my_project", days=7)
print("\\n错误趋势:")
for date in sorted(trends.keys()):
total = sum(trends[date].values())
print(f"{date}: {total}个错误")
for err_type, count in sorted(trends[date].items(), key=lambda x: x[1], reverse=True):
print(f" - {err_type}: {count}")
# 6. 生成错误报告
def generate_error_report(project_name: str):
\"\"\"生成错误分析报告\"\"\"
import json
# 收集错误数据
error_runs = list(client.list_runs(
project_name=project_name,
filter='eq(status, "error")',
limit=1000
))
# 分类
classified = defaultdict(list)
for run in error_runs:
err_type = classify_error(run.error or "")
classified[err_type].append({
"id": str(run.id),
"name": run.name,
"error": run.error,
"time": run.start_time.isoformat()
})
# 生成报告
report = {
"project": project_name,
"generated_at": datetime.now().isoformat(),
"total_errors": len(error_runs),
"error_types": {
err_type: {
"count": len(errors),
"examples": errors[:3]
}
for err_type, errors in classified.items()
}
}
# 保存报告
filename = f"error_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"错误报告已生成:{filename}")
return report
generate_error_report("my_project")
---
3 数据集管理
3.1 创建数据集
01.数据集定义
a.创建数据集
a.功能说明
数据集是LangSmith中组织测试数据的容器,包含输入-输出对作为测试样本。使用create_dataset()创建数据集,指定名称和描述。数据集用于批量评估、回归测试、模型对比等场景。支持多种数据格式,如文本、JSON、结构化数据。合理组织数据集是系统化测试的基础。
b.代码示例
---
from langsmith import Client
client = Client()
# 1. 创建数据集
dataset = client.create_dataset(
dataset_name="customer_qa_test",
description="客服问答测试数据集",
data_type="kv" # key-value类型
)
print(f"数据集已创建:{dataset.id}")
print(f"名称:{dataset.name}")
print(f"描述:{dataset.description}")
# 2. 检查数据集是否存在
def get_or_create_dataset(dataset_name: str, description: str = ""):
\"\"\"获取或创建数据集\"\"\"
try:
# 尝试获取
dataset = client.read_dataset(dataset_name=dataset_name)
print(f"数据集已存在:{dataset.name}")
return dataset
except:
# 不存在则创建
dataset = client.create_dataset(
dataset_name=dataset_name,
description=description
)
print(f"数据集已创建:{dataset.name}")
return dataset
# 使用
dataset = get_or_create_dataset(
"qa_dataset",
"问答系统测试集"
)
# 3. 添加样本
# 添加单个样本
example = client.create_example(
dataset_id=dataset.id,
inputs={"question": "如何退货?"},
outputs={"answer": "请在订单页面点击退货按钮,填写退货原因即可。"}
)
print(f"样本已添加:{example.id}")
# 4. 批量添加样本
examples_data = [
{
"inputs": {"question": "配送需要多久?"},
"outputs": {"answer": "一般3-5个工作日送达。"}
},
{
"inputs": {"question": "可以开发票吗?"},
"outputs": {"answer": "可以,下单时选择开具发票。"}
},
{
"inputs": {"question": "支持货到付款吗?"},
"outputs": {"answer": "支持货到付款,配送时支付即可。"}
},
{
"inputs": {"question": "如何联系客服?"},
"outputs": {"answer": "可拨打400-xxx-xxxx或在线咨询。"}
}
]
for example_data in examples_data:
client.create_example(
dataset_id=dataset.id,
inputs=example_data["inputs"],
outputs=example_data["outputs"]
)
print(f"已添加{len(examples_data)}个样本")
# 5. 从CSV批量导入
import pandas as pd
# 读取CSV文件
df = pd.read_csv("qa_data.csv")
# CSV格式:question,answer
for _, row in df.iterrows():
client.create_example(
dataset_id=dataset.id,
inputs={"question": row["question"]},
outputs={"answer": row["answer"]},
metadata={"source": "csv", "row": int(row.name)}
)
print(f"从CSV导入{len(df)}个样本")
# 6. 从生产日志创建数据集
def create_dataset_from_production(
source_project: str,
dataset_name: str,
sample_size: int = 100
):
\"\"\"从生产运行创建数据集\"\"\"
# 获取生产运行(只要成功的)
runs = client.list_runs(
project_name=source_project,
filter='eq(status, "success")',
limit=sample_size
)
# 创建数据集
dataset = get_or_create_dataset(
dataset_name,
f"从{source_project}生产数据采样"
)
# 添加样本
count = 0
for run in runs:
if run.inputs and run.outputs:
client.create_example(
dataset_id=dataset.id,
inputs=run.inputs,
outputs=run.outputs,
metadata={
"source_run_id": str(run.id),
"source_project": source_project,
"timestamp": run.start_time.isoformat()
}
)
count += 1
print(f"从生产数据创建{count}个测试样本")
return dataset
# 使用
test_dataset = create_dataset_from_production(
"production",
"prod_sample_test",
sample_size=50
)
# 7. 数据集元数据
# 添加元数据到样本
client.create_example(
dataset_id=dataset.id,
inputs={"question": "测试问题"},
outputs={"answer": "测试答案"},
metadata={
"category": "退货",
"difficulty": "easy",
"language": "zh-CN",
"created_by": "张三",
"tags": ["basic", "common"]
}
)
---
b.数据格式
a.功能说明
LangSmith支持多种数据格式的数据集。KV格式(key-value)存储输入输出对,最常用。LLM格式存储消息历史。Chat格式支持多轮对话。自定义格式支持复杂结构化数据。选择合适的格式匹配应用场景。数据格式影响评估方式和结果展示。
b.代码示例
---
from langsmith import Client
from langchain.schema import HumanMessage, AIMessage, SystemMessage
client = Client()
# 1. KV格式(默认)
kv_dataset = client.create_dataset(
dataset_name="kv_dataset",
description="Key-Value格式数据集",
data_type="kv"
)
client.create_example(
dataset_id=kv_dataset.id,
inputs={"question": "什么是AI?"},
outputs={"answer": "AI是人工智能"}
)
# 2. LLM格式(文本生成)
llm_dataset = client.create_dataset(
dataset_name="llm_dataset",
description="LLM文本生成数据集",
data_type="llm"
)
client.create_example(
dataset_id=llm_dataset.id,
inputs={"input": "写一首诗"},
outputs={"output": "春眠不觉晓,处处闻啼鸟..."}
)
# 3. Chat格式(对话)
chat_dataset = client.create_dataset(
dataset_name="chat_dataset",
description="对话数据集",
data_type="chat"
)
# 对话历史
client.create_example(
dataset_id=chat_dataset.id,
inputs={
"messages": [
{"role": "system", "content": "你是AI助手"},
{"role": "user", "content": "你好"},
{"role": "assistant", "content": "你好!有什么可以帮您?"},
{"role": "user", "content": "天气怎么样?"}
]
},
outputs={
"response": "今天天气晴朗,适合外出。"
}
)
# 4. 结构化数据
structured_dataset = client.create_dataset(
dataset_name="structured_dataset",
description="结构化数据集"
)
# 复杂输入输出
client.create_example(
dataset_id=structured_dataset.id,
inputs={
"user_profile": {
"id": "123",
"name": "张三",
"level": "VIP"
},
"query": "我的订单状态",
"context": {
"session_id": "abc",
"timestamp": "2024-01-01T10:00:00"
}
},
outputs={
"response": "您的订单已发货",
"order_info": {
"order_id": "ORD001",
"status": "shipped"
},
"confidence": 0.95
}
)
# 5. 多语言数据集
multilang_dataset = client.create_dataset(
dataset_name="multilang_qa",
description="多语言问答"
)
languages = {
"zh-CN": ("你好", "你好!很高兴见到你"),
"en-US": ("Hello", "Hello! Nice to meet you"),
"ja-JP": ("こんにちは", "こんにちは!お会いできて嬉しいです")
}
for lang, (question, answer) in languages.items():
client.create_example(
dataset_id=multilang_dataset.id,
inputs={"question": question},
outputs={"answer": answer},
metadata={"language": lang}
)
# 6. 带标注的数据集
annotated_dataset = client.create_dataset(
dataset_name="annotated_dataset",
description="带标注的数据集"
)
client.create_example(
dataset_id=annotated_dataset.id,
inputs={"text": "这个产品太棒了!"},
outputs={
"sentiment": "positive",
"score": 0.95,
"aspects": {
"quality": "good",
"value": "excellent"
}
},
metadata={
"annotator": "专家A",
"confidence": "high",
"verified": True
}
)
---
02.数据采集
a.手动录入
a.功能说明
通过Web界面或API手动录入测试数据。适用于精心设计的测试用例、边界情况、特殊场景等。支持在线编辑,方便快速修改。可以添加元数据、标签等辅助信息。手动录入质量高,针对性强,但效率较低,适合小规模数据集。
b.代码示例
---
from langsmith import Client
client = Client()
# 1. 手动创建样本
dataset = client.read_dataset(dataset_name="my_dataset")
# 边界情况
client.create_example(
dataset_id=dataset.id,
inputs={"question": ""}, # 空输入
outputs={"answer": "请输入您的问题"},
metadata={"category": "edge_case", "type": "empty_input"}
)
# 超长输入
long_text = "问题" * 1000
client.create_example(
dataset_id=dataset.id,
inputs={"question": long_text},
outputs={"answer": "问题过长,请简化"},
metadata={"category": "edge_case", "type": "long_input"}
)
# 特殊字符
client.create_example(
dataset_id=dataset.id,
inputs={"question": "订单号:#@$%^&*()"},
outputs={"answer": "订单号格式错误"},
metadata={"category": "edge_case", "type": "special_chars"}
)
# 2. 交互式录入
def interactive_add_examples(dataset_id: str):
\"\"\"交互式添加样本\"\"\"
print("添加测试样本(输入'quit'退出)")
while True:
question = input("\\n问题: ")
if question.lower() == 'quit':
break
answer = input("期望答案: ")
category = input("分类(可选): ")
metadata = {}
if category:
metadata["category"] = category
client.create_example(
dataset_id=dataset_id,
inputs={"question": question},
outputs={"answer": answer},
metadata=metadata
)
print("✓ 已添加")
# 使用交互式录入
# interactive_add_examples(dataset.id)
# 3. 从模板批量生成
templates = [
{
"question_template": "如何{action}?",
"answer_template": "请在{location}进行{action}操作。",
"variants": [
{"action": "退货", "location": "订单页面"},
{"action": "修改地址", "location": "个人中心"},
{"action": "取消订单", "location": "订单详情"}
]
}
]
for template in templates:
for variant in template["variants"]:
question = template["question_template"].format(**variant)
answer = template["answer_template"].format(**variant)
client.create_example(
dataset_id=dataset.id,
inputs={"question": question},
outputs={"answer": answer},
metadata={"generated": True, "template": "action"}
)
# 4. 从Notion/Confluence导入
# 假设有API获取文档内容
def import_from_docs(dataset_id: str, doc_url: str):
\"\"\"从文档系统导入\"\"\"
# 获取文档内容
doc_content = fetch_document(doc_url)
# 解析QA对
qa_pairs = parse_qa_from_markdown(doc_content)
for qa in qa_pairs:
client.create_example(
dataset_id=dataset_id,
inputs={"question": qa["q"]},
outputs={"answer": qa["a"]},
metadata={"source": "docs", "url": doc_url}
)
print(f"从文档导入{len(qa_pairs)}个QA")
def parse_qa_from_markdown(content: str):
\"\"\"从Markdown解析QA对\"\"\"
import re
# 匹配 Q: ... A: ... 模式
pattern = r'Q: (.*?)\\nA: (.*?)(?=\\nQ:|$)'
matches = re.findall(pattern, content, re.DOTALL)
return [{"q": q.strip(), "a": a.strip()} for q, a in matches]
---
b.自动采集
a.功能说明
从生产环境自动采集数据构建测试集。采样真实用户请求和响应,确保测试数据的真实性。设置采样率和过滤规则,选择高质量样本。自动去重、清洗、标注。定期更新数据集,保持测试数据的时效性。自动采集降低维护成本,提升数据集覆盖度。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
import random
client = Client()
# 1. 采样生产数据
def sample_production_data(
source_project: str,
dataset_name: str,
sample_rate: float = 0.1,
max_samples: int = 100
):
\"\"\"采样生产数据\"\"\"
# 获取最近7天的成功运行
runs = client.list_runs(
project_name=source_project,
start_time=datetime.now() - timedelta(days=7),
filter='eq(status, "success")',
limit=int(max_samples / sample_rate)
)
# 创建数据集
dataset = get_or_create_dataset(
dataset_name,
f"生产数据采样({sample_rate*100}%)"
)
# 采样
sampled = 0
for run in runs:
if random.random() < sample_rate:
if run.inputs and run.outputs:
client.create_example(
dataset_id=dataset.id,
inputs=run.inputs,
outputs=run.outputs,
metadata={
"source_run": str(run.id),
"sampled_at": datetime.now().isoformat()
}
)
sampled += 1
if sampled >= max_samples:
break
print(f"采样完成:{sampled}个样本")
return dataset
# 执行采样
sampled_dataset = sample_production_data(
"production",
"prod_test_set",
sample_rate=0.1,
max_samples=100
)
# 2. 智能采样(多样性)
def smart_sample(
source_project: str,
dataset_name: str,
categories: list,
samples_per_category: int = 10
):
\"\"\"按类别均衡采样\"\"\"
dataset = get_or_create_dataset(dataset_name, "智能采样数据集")
for category in categories:
# 查询该类别的运行
runs = client.list_runs(
project_name=source_project,
filter=f'has(tags, "{category}")',
limit=samples_per_category * 2
)
# 随机采样
runs_list = list(runs)
sampled_runs = random.sample(
runs_list,
min(samples_per_category, len(runs_list))
)
# 添加到数据集
for run in sampled_runs:
if run.inputs and run.outputs:
client.create_example(
dataset_id=dataset.id,
inputs=run.inputs,
outputs=run.outputs,
metadata={"category": category}
)
print(f"智能采样完成,每类{samples_per_category}个")
# 使用
smart_sample(
"production",
"balanced_test_set",
categories=["咨询", "投诉", "建议", "订单查询"],
samples_per_category=25
)
# 3. 去重
def deduplicate_dataset(dataset_name: str):
\"\"\"数据集去重\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
# 计算指纹
seen = set()
duplicates = []
for ex in examples:
# 使用输入作为指纹
import hashlib
import json
fingerprint = hashlib.md5(
json.dumps(ex.inputs, sort_keys=True).encode()
).hexdigest()
if fingerprint in seen:
duplicates.append(ex.id)
else:
seen.add(fingerprint)
# 删除重复
for dup_id in duplicates:
client.delete_example(dup_id)
print(f"删除{len(duplicates)}个重复样本")
# 4. 数据清洗
def clean_dataset(dataset_name: str):
\"\"\"清洗数据集\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
removed = 0
for ex in examples:
inputs = ex.inputs
outputs = ex.outputs
# 删除空数据
if not inputs or not outputs:
client.delete_example(ex.id)
removed += 1
continue
# 删除过短数据
if isinstance(inputs.get("question"), str):
if len(inputs["question"]) < 3:
client.delete_example(ex.id)
removed += 1
continue
# 删除异常数据
if "error" in str(outputs).lower():
client.delete_example(ex.id)
removed += 1
print(f"清洗完成,删除{removed}个无效样本")
# 5. 定时自动采集
import schedule
import time
def scheduled_sampling():
\"\"\"定时采集任务\"\"\"
print(f"执行定时采集:{datetime.now()}")
sample_production_data(
"production",
"daily_sample",
sample_rate=0.05,
max_samples=50
)
# 每天凌晨2点执行
schedule.every().day.at("02:00").do(scheduled_sampling)
# 运行调度器
# while True:
# schedule.run_pending()
# time.sleep(60)
---
3.2 数据上传
01.批量上传
a.CSV导入
a.功能说明
从CSV文件批量导入数据到LangSmith数据集。CSV格式简单,易于编辑和维护。使用pandas读取CSV,遍历行创建样本。支持自定义列映射,灵活适配不同格式。可以添加额外的元数据列。CSV导入适合大规模数据集的初始创建和更新。
b.代码示例
---
from langsmith import Client
import pandas as pd
client = Client()
# 1. 准备CSV文件
# qa_data.csv内容示例:
# question,answer,category,difficulty
# 如何退货?,在订单页面申请退货,售后,easy
# 配送多久?,3-5个工作日,物流,easy
# 发票问题,下单时选择开具发票,财务,medium
# 2. 读取CSV
df = pd.read_csv("qa_data.csv")
print(f"读取{len(df)}行数据")
# 3. 创建数据集
dataset = client.create_dataset(
dataset_name="qa_from_csv",
description="从CSV导入的问答数据"
)
# 4. 批量导入
for index, row in df.iterrows():
client.create_example(
dataset_id=dataset.id,
inputs={"question": row["question"]},
outputs={"answer": row["answer"]},
metadata={
"category": row.get("category", ""),
"difficulty": row.get("difficulty", ""),
"row_number": index + 1
}
)
# 每10行打印进度
if (index + 1) % 10 == 0:
print(f"已导入{index + 1}行")
print(f"\\n导入完成:{len(df)}个样本")
# 5. 带验证的导入
def import_csv_with_validation(csv_file: str, dataset_name: str):
\"\"\"带数据验证的CSV导入\"\"\"
df = pd.read_csv(csv_file)
dataset = get_or_create_dataset(dataset_name, f"从{csv_file}导入")
valid_count = 0
invalid_count = 0
for index, row in df.iterrows():
# 验证数据
if not row["question"] or pd.isna(row["question"]):
print(f"跳过第{index+1}行:问题为空")
invalid_count += 1
continue
if not row["answer"] or pd.isna(row["answer"]):
print(f"跳过第{index+1}行:答案为空")
invalid_count += 1
continue
# 导入有效数据
client.create_example(
dataset_id=dataset.id,
inputs={"question": str(row["question"]).strip()},
outputs={"answer": str(row["answer"]).strip()},
metadata={
k: str(v) for k, v in row.items()
if k not in ["question", "answer"] and not pd.isna(v)
}
)
valid_count += 1
print(f"\\n导入结果:")
print(f" 有效:{valid_count}")
print(f" 无效:{invalid_count}")
print(f" 成功率:{valid_count/(valid_count+invalid_count):.1%}")
import_csv_with_validation("qa_data.csv", "validated_qa_dataset")
# 6. 增量导入
def incremental_csv_import(csv_file: str, dataset_name: str):
\"\"\"增量CSV导入(避免重复)\"\"\"
df = pd.read_csv(csv_file)
dataset = get_or_create_dataset(dataset_name)
# 获取现有样本
existing = list(client.list_examples(dataset_id=dataset.id))
existing_questions = {ex.inputs.get("question") for ex in existing}
# 只导入新数据
new_count = 0
for _, row in df.iterrows():
question = str(row["question"]).strip()
if question not in existing_questions:
client.create_example(
dataset_id=dataset.id,
inputs={"question": question},
outputs={"answer": str(row["answer"]).strip()}
)
new_count += 1
print(f"增量导入{new_count}个新样本")
incremental_csv_import("qa_data_new.csv", "qa_dataset")
---
b.JSON导入
a.功能说明
从JSON文件导入复杂结构化数据。JSON支持嵌套对象、数组等复杂类型,适合多字段、多层级的数据。使用json模块解析文件,映射到LangSmith样本格式。支持批量导入多个JSON对象。JSON导入适合从API、数据库、其他系统导出的数据。
b.代码示例
---
from langsmith import Client
import json
client = Client()
# 1. JSON文件格式
# qa_data.json示例:
"""
[
{
"inputs": {
"question": "如何退货?",
"user_context": {
"user_id": "123",
"order_id": "ORD001"
}
},
"outputs": {
"answer": "请在订单页面申请退货",
"confidence": 0.95
},
"metadata": {
"category": "售后",
"timestamp": "2024-01-01T10:00:00"
}
},
...
]
"""
# 2. 读取并导入
with open("qa_data.json", "r", encoding="utf-8") as f:
data = json.load(f)
dataset = get_or_create_dataset("qa_from_json", "从JSON导入")
for item in data:
client.create_example(
dataset_id=dataset.id,
inputs=item["inputs"],
outputs=item["outputs"],
metadata=item.get("metadata", {})
)
print(f"导入{len(data)}个样本")
# 3. JSONL格式导入
# 每行一个JSON对象
def import_jsonl(jsonl_file: str, dataset_name: str):
\"\"\"导入JSONL文件\"\"\"
dataset = get_or_create_dataset(dataset_name)
count = 0
with open(jsonl_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
item = json.loads(line)
client.create_example(
dataset_id=dataset.id,
inputs=item["inputs"],
outputs=item["outputs"],
metadata=item.get("metadata", {})
)
count += 1
print(f"从JSONL导入{count}个样本")
import_jsonl("qa_data.jsonl", "qa_jsonl_dataset")
# 4. 从API导入
def import_from_api(api_url: str, dataset_name: str):
\"\"\"从API获取数据并导入\"\"\"
import requests
# 调用API
response = requests.get(api_url)
data = response.json()
dataset = get_or_create_dataset(dataset_name, f"从{api_url}导入")
# 导入数据
for item in data["items"]:
client.create_example(
dataset_id=dataset.id,
inputs=item["inputs"],
outputs=item["outputs"],
metadata={"source": "api", "imported_at": datetime.now().isoformat()}
)
print(f"从API导入{len(data['items'])}个样本")
# import_from_api("https://api.example.com/qa_data", "api_dataset")
# 5. 从数据库导入
def import_from_database(dataset_name: str):
\"\"\"从数据库导入\"\"\"
import psycopg2
# 连接数据库
conn = psycopg2.connect(
"postgresql://user:pass@localhost/mydb"
)
cursor = conn.cursor()
# 查询数据
cursor.execute("""
SELECT question, answer, category
FROM qa_table
WHERE verified = true
""")
rows = cursor.fetchall()
# 导入到LangSmith
dataset = get_or_create_dataset(dataset_name, "从数据库导入")
for question, answer, category in rows:
client.create_example(
dataset_id=dataset.id,
inputs={"question": question},
outputs={"answer": answer},
metadata={"category": category, "source": "database"}
)
conn.close()
print(f"从数据库导入{len(rows)}个样本")
# import_from_database("db_qa_dataset")
# 6. 批量导入优化
def optimized_bulk_import(data: list, dataset_name: str, batch_size: int = 100):
\"\"\"优化的批量导入\"\"\"
dataset = get_or_create_dataset(dataset_name)
total = len(data)
imported = 0
for i in range(0, total, batch_size):
batch = data[i:i+batch_size]
for item in batch:
client.create_example(
dataset_id=dataset.id,
inputs=item["inputs"],
outputs=item["outputs"],
metadata=item.get("metadata", {})
)
imported += len(batch)
print(f"进度:{imported}/{total} ({imported/total*100:.1f}%)")
print(f"\\n批量导入完成:{imported}个样本")
# 大规模数据导入
large_data = [{"inputs": {...}, "outputs": {...}} for _ in range(1000)]
optimized_bulk_import(large_data, "large_dataset", batch_size=50)
---
02.数据导出
a.导出格式
a.功能说明
LangSmith支持将数据集导出为多种格式,便于离线分析、备份、迁移等。导出为CSV适合表格软件查看。导出为JSON保留完整结构。导出为JSONL适合大数据处理。通过API或Web界面导出。导出包含输入、输出、元数据等完整信息。
b.代码示例
---
from langsmith import Client
import json
import pandas as pd
client = Client()
# 1. 导出为JSON
def export_dataset_json(dataset_name: str, output_file: str):
\"\"\"导出数据集为JSON\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
# 转换为JSON格式
data = []
for ex in examples:
data.append({
"id": str(ex.id),
"inputs": ex.inputs,
"outputs": ex.outputs,
"metadata": ex.metadata,
"created_at": ex.created_at.isoformat()
})
# 保存
with open(output_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"已导出{len(data)}个样本到{output_file}")
export_dataset_json("my_dataset", "export.json")
# 2. 导出为CSV
def export_dataset_csv(dataset_name: str, output_file: str):
\"\"\"导出数据集为CSV\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
# 构建DataFrame
rows = []
for ex in examples:
row = {
"id": str(ex.id),
"question": ex.inputs.get("question", ""),
"answer": ex.outputs.get("answer", ""),
"created_at": ex.created_at.isoformat()
}
# 添加元数据字段
if ex.metadata:
for key, value in ex.metadata.items():
row[f"meta_{key}"] = value
rows.append(row)
# 保存CSV
df = pd.DataFrame(rows)
df.to_csv(output_file, index=False, encoding="utf-8-sig")
print(f"已导出{len(df)}个样本到{output_file}")
export_dataset_csv("my_dataset", "export.csv")
# 3. 导出为JSONL
def export_dataset_jsonl(dataset_name: str, output_file: str):
\"\"\"导出数据集为JSONL\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
with open(output_file, "w", encoding="utf-8") as f:
for ex in examples:
item = {
"inputs": ex.inputs,
"outputs": ex.outputs,
"metadata": ex.metadata
}
f.write(json.dumps(item, ensure_ascii=False) + "\\n")
print(f"已导出{len(examples)}个样本到{output_file}")
export_dataset_jsonl("my_dataset", "export.jsonl")
# 4. 选择性导出
def export_filtered_examples(
dataset_name: str,
output_file: str,
filter_func
):
\"\"\"导出符合条件的样本\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
# 过滤
filtered = [ex for ex in examples if filter_func(ex)]
# 导出
data = []
for ex in filtered:
data.append({
"inputs": ex.inputs,
"outputs": ex.outputs,
"metadata": ex.metadata
})
with open(output_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"导出{len(filtered)}个过滤后的样本")
# 只导出特定分类
export_filtered_examples(
"my_dataset",
"售后_samples.json",
lambda ex: ex.metadata.get("category") == "售后"
)
# 5. 导出评估结果
def export_evaluation_results(experiment_name: str, output_file: str):
\"\"\"导出评估结果\"\"\"
runs = client.list_runs(
project_name=experiment_name,
limit=1000
)
results = []
for run in runs:
results.append({
"run_id": str(run.id),
"inputs": run.inputs,
"outputs": run.outputs,
"feedback": run.feedback_stats,
"latency": run.latency,
"tokens": run.total_tokens
})
# 保存
df = pd.DataFrame(results)
df.to_csv(output_file, index=False, encoding="utf-8-sig")
print(f"导出评估结果:{len(results)}条")
export_evaluation_results("my_experiment", "evaluation_results.csv")
# 6. 备份整个数据集
def backup_dataset(dataset_name: str, backup_dir: str = "./backups"):
\"\"\"备份数据集\"\"\"
import os
from datetime import datetime
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"{backup_dir}/{dataset_name}_{timestamp}.json"
# 导出
export_dataset_json(dataset_name, backup_file)
print(f"备份完成:{backup_file}")
# 定期备份
backup_dataset("important_dataset")
---
3.3 数据版本
01.版本管理
a.创建版本
a.功能说明
数据集支持版本管理,记录数据的演化历史。创建版本快照保存当前数据集状态。版本间可以对比差异,回退到历史版本。适用于数据集持续更新、A/B测试、版本对比等场景。版本管理确保数据可追溯,支持实验复现。
b.代码示例
---
from langsmith import Client
from datetime import datetime
client = Client()
# 1. 创建数据集版本
dataset = client.read_dataset(dataset_name="my_dataset")
# 创建版本快照
version = client.create_dataset_version(
dataset_id=dataset.id,
version="v1.0",
description="初始版本",
tags=["baseline", "verified"]
)
print(f"版本已创建:{version.version}")
# 2. 更新数据后创建新版本
# 添加新样本
client.create_example(
dataset_id=dataset.id,
inputs={"question": "新问题"},
outputs={"answer": "新答案"}
)
# 创建新版本
version_v2 = client.create_dataset_version(
dataset_id=dataset.id,
version="v1.1",
description="添加10个新样本",
tags=["updated"]
)
# 3. 列出所有版本
versions = client.list_dataset_versions(dataset_id=dataset.id)
print("\\n数据集版本历史:")
for ver in versions:
print(f" {ver.version}: {ver.description}")
print(f" 创建于:{ver.created_at}")
print(f" 标签:{ver.tags}")
print()
# 4. 获取特定版本
v1_data = client.read_dataset_version(
dataset_id=dataset.id,
version="v1.0"
)
print(f"V1.0样本数:{len(list(client.list_examples(dataset_id=v1_data.id)))}")
# 5. 对比版本差异
def compare_dataset_versions(dataset_name: str, ver1: str, ver2: str):
\"\"\"对比两个版本的差异\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
# 获取两个版本
v1 = client.read_dataset_version(dataset_id=dataset.id, version=ver1)
v2 = client.read_dataset_version(dataset_id=dataset.id, version=ver2)
# 获取样本
v1_examples = {ex.id: ex for ex in client.list_examples(dataset_id=v1.id)}
v2_examples = {ex.id: ex for ex in client.list_examples(dataset_id=v2.id)}
# 计算差异
added = set(v2_examples.keys()) - set(v1_examples.keys())
removed = set(v1_examples.keys()) - set(v2_examples.keys())
common = set(v1_examples.keys()) & set(v2_examples.keys())
print(f"\\n版本对比 ({ver1} vs {ver2}):")
print(f" 新增样本:{len(added)}")
print(f" 删除样本:{len(removed)}")
print(f" 保留样本:{len(common)}")
# 检查修改的样本
modified = 0
for ex_id in common:
if v1_examples[ex_id].outputs != v2_examples[ex_id].outputs:
modified += 1
print(f" 修改样本:{modified}")
compare_dataset_versions("my_dataset", "v1.0", "v1.1")
# 6. 版本回退
def rollback_dataset(dataset_name: str, target_version: str):
\"\"\"回退到指定版本\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
# 获取目标版本
target = client.read_dataset_version(
dataset_id=dataset.id,
version=target_version
)
# 删除当前所有样本
current_examples = list(client.list_examples(dataset_id=dataset.id))
for ex in current_examples:
client.delete_example(ex.id)
# 从目标版本恢复样本
target_examples = list(client.list_examples(dataset_id=target.id))
for ex in target_examples:
client.create_example(
dataset_id=dataset.id,
inputs=ex.inputs,
outputs=ex.outputs,
metadata=ex.metadata
)
print(f"已回退到版本{target_version},恢复{len(target_examples)}个样本")
# 慎用:回退会删除当前数据
# rollback_dataset("my_dataset", "v1.0")
# 7. 版本命名规范
class DatasetVersionManager:
\"\"\"数据集版本管理器\"\"\"
def __init__(self, client, dataset_name: str):
self.client = client
self.dataset = client.read_dataset(dataset_name=dataset_name)
def create_version(self, description: str, tags: list = None):
\"\"\"创建版本(自动递增版本号)\"\"\"
# 获取最新版本号
versions = list(self.client.list_dataset_versions(
dataset_id=self.dataset.id
))
if versions:
# 解析版本号
latest_ver = versions[0].version
major, minor = map(int, latest_ver.lstrip("v").split("."))
# 递增小版本号
new_version = f"v{major}.{minor+1}"
else:
new_version = "v1.0"
# 创建新版本
version = self.client.create_dataset_version(
dataset_id=self.dataset.id,
version=new_version,
description=description,
tags=tags or []
)
print(f"创建版本:{new_version}")
return version
def create_major_version(self, description: str):
\"\"\"创建主版本\"\"\"
versions = list(self.client.list_dataset_versions(
dataset_id=self.dataset.id
))
if versions:
latest_ver = versions[0].version
major, _ = map(int, latest_ver.lstrip("v").split("."))
new_version = f"v{major+1}.0"
else:
new_version = "v1.0"
return self.client.create_dataset_version(
dataset_id=self.dataset.id,
version=new_version,
description=description,
tags=["major_release"]
)
# 使用版本管理器
manager = DatasetVersionManager(client, "my_dataset")
# 创建小版本
manager.create_version("修复5个样本的答案", tags=["bugfix"])
# 创建大版本
manager.create_major_version("重大更新:重构所有样本", tags=["major_release"])
---
b.版本对比
a.功能说明
对比不同版本的数据集,识别变化内容。查看新增、删除、修改的样本。分析版本间的质量变化。使用不同版本进行评估对比,验证改进效果。版本对比帮助理解数据集演化,评估变更影响,支持版本选择决策。
b.代码示例
---
from langsmith import Client
from difflib import unified_diff
import json
client = Client()
# 1. 对比两个版本
def diff_dataset_versions(dataset_name: str, ver1: str, ver2: str):
\"\"\"详细对比两个版本\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
# 获取版本
v1_data = client.read_dataset_version(dataset_id=dataset.id, version=ver1)
v2_data = client.read_dataset_version(dataset_id=dataset.id, version=ver2)
# 获取样本
v1_examples = {str(ex.id): ex for ex in client.list_examples(dataset_id=v1_data.id)}
v2_examples = {str(ex.id): ex for ex in client.list_examples(dataset_id=v2_data.id)}
# 分析差异
report = {
"version_1": ver1,
"version_2": ver2,
"added": [],
"removed": [],
"modified": []
}
# 新增
for ex_id in set(v2_examples.keys()) - set(v1_examples.keys()):
ex = v2_examples[ex_id]
report["added"].append({
"id": ex_id,
"inputs": ex.inputs,
"outputs": ex.outputs
})
# 删除
for ex_id in set(v1_examples.keys()) - set(v2_examples.keys()):
ex = v1_examples[ex_id]
report["removed"].append({
"id": ex_id,
"inputs": ex.inputs
})
# 修改
for ex_id in set(v1_examples.keys()) & set(v2_examples.keys()):
v1_ex = v1_examples[ex_id]
v2_ex = v2_examples[ex_id]
if v1_ex.outputs != v2_ex.outputs:
report["modified"].append({
"id": ex_id,
"inputs": v2_ex.inputs,
"old_output": v1_ex.outputs,
"new_output": v2_ex.outputs
})
return report
diff_report = diff_dataset_versions("my_dataset", "v1.0", "v1.1")
print(f"\\n版本差异报告:")
print(f" 新增:{len(diff_report['added'])}个")
print(f" 删除:{len(diff_report['removed'])}个")
print(f" 修改:{len(diff_report['modified'])}个")
# 显示修改详情
if diff_report["modified"]:
print("\\n修改样本示例:")
for item in diff_report["modified"][:3]:
print(f" 问题:{item['inputs']}")
print(f" 旧答案:{item['old_output']}")
print(f" 新答案:{item['new_output']}")
print()
# 2. 文本diff
def text_diff(text1: str, text2: str):
\"\"\"文本差异对比\"\"\"
diff = unified_diff(
text1.splitlines(keepends=True),
text2.splitlines(keepends=True),
fromfile="v1",
tofile="v2"
)
return ''.join(diff)
# 对比具体样本的文本差异
for item in diff_report["modified"]:
old_text = json.dumps(item["old_output"], ensure_ascii=False, indent=2)
new_text = json.dumps(item["new_output"], ensure_ascii=False, indent=2)
diff_text = text_diff(old_text, new_text)
if diff_text:
print(f"\\n样本{item['id']}的差异:")
print(diff_text)
# 3. 质量对比
def compare_quality(dataset_name: str, ver1: str, ver2: str):
\"\"\"对比两个版本的质量\"\"\"
# 对两个版本分别运行评估
from langsmith.evaluation import evaluate
def my_app(inputs):
return {"answer": "测试"}
# V1评估
results_v1 = evaluate(
my_app,
data=(dataset_name, ver1),
evaluators=[accuracy_evaluator],
experiment_prefix=f"{dataset_name}_{ver1}"
)
# V2评估
results_v2 = evaluate(
my_app,
data=(dataset_name, ver2),
evaluators=[accuracy_evaluator],
experiment_prefix=f"{dataset_name}_{ver2}"
)
print(f"\\n质量对比:")
print(f" {ver1}: {results_v1}")
print(f" {ver2}: {results_v2}")
# compare_quality("my_dataset", "v1.0", "v1.1")
---
4 测试评估
4.1 评估器
01.内置评估器
a.准确性评估
a.功能说明
内置准确性评估器对比预测输出与期望输出,判断答案正确性。支持精确匹配、模糊匹配、语义相似度等多种模式。使用LLM作为评判器,理解答案的语义。适用于问答、分类、生成等任务。准确性评估是最基础也是最重要的评估指标。
b.代码示例
---
from langsmith import Client
from langsmith.evaluation import evaluate, LangChainStringEvaluator
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
client = Client()
# 1. 定义被评估的应用
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
prompt = ChatPromptTemplate.from_template(
"简洁回答:{question}"
)
def qa_app(inputs: dict) -> dict:
\"\"\"问答应用\"\"\"
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
# 2. 使用准确性评估器
accuracy_evaluator = LangChainStringEvaluator(
"qa", # 问答任务
config={
"eval_llm": ChatOpenAI(model="gpt-4", temperature=0)
}
)
# 3. 运行评估
results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[accuracy_evaluator],
experiment_prefix="accuracy_test"
)
print(f"评估结果:{results}")
# 4. 标准评估器
# 相关性评估
relevance_evaluator = LangChainStringEvaluator(
"relevance",
config={
"criteria": "答案是否与问题相关?"
}
)
# 简洁性评估
conciseness_evaluator = LangChainStringEvaluator(
"conciseness",
config={
"criteria": "答案是否简洁明了?"
}
)
# 有害内容检测
harmfulness_evaluator = LangChainStringEvaluator(
"harmfulness",
config={
"criteria": "答案是否包含有害内容?"
}
)
# 5. 组合多个评估器
results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[
accuracy_evaluator,
relevance_evaluator,
conciseness_evaluator,
harmfulness_evaluator
],
experiment_prefix="comprehensive_eval"
)
print("\\n综合评估:")
print(f" 准确性:{results['accuracy']:.2%}")
print(f" 相关性:{results['relevance']:.2%}")
print(f" 简洁性:{results['conciseness']:.2%}")
print(f" 安全性:{results['harmfulness']:.2%}")
# 6. 自定义评估标准
custom_evaluator = LangChainStringEvaluator(
"labeled_criteria",
config={
"criteria": {
"completeness": "答案是否完整回答了问题的所有方面?",
"clarity": "答案表述是否清晰易懂?",
"politeness": "答案语气是否礼貌友好?"
}
}
)
results = evaluate(
qa_app,
data="customer_service_qa",
evaluators=[custom_evaluator],
experiment_prefix="custom_criteria_eval"
)
---
b.相似度评估
a.功能说明
相似度评估器计算预测输出与期望输出的相似度。支持字符串编辑距离、词向量相似度、语义嵌入相似度等算法。适用于文本生成、摘要、翻译等任务。相似度评估比精确匹配更灵活,允许表述差异但语义相同的答案通过。
b.代码示例
---
from langsmith.evaluation import evaluate
from difflib import SequenceMatcher
import numpy as np
# 1. 字符串相似度评估器
def string_similarity_evaluator(run, example):
\"\"\"基于编辑距离的相似度\"\"\"
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
# 计算相似度
similarity = SequenceMatcher(None, prediction, reference).ratio()
return {
"key": "string_similarity",
"score": similarity,
"comment": f"相似度:{similarity:.2%}"
}
# 2. 词向量相似度
def embedding_similarity_evaluator(run, example):
\"\"\"基于嵌入的语义相似度\"\"\"
from langchain.embeddings import OpenAIEmbeddings
from sklearn.metrics.pairwise import cosine_similarity
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
# 计算嵌入
embeddings = OpenAIEmbeddings()
pred_emb = embeddings.embed_query(prediction)
ref_emb = embeddings.embed_query(reference)
# 余弦相似度
similarity = cosine_similarity([pred_emb], [ref_emb])[0][0]
return {
"key": "embedding_similarity",
"score": float(similarity),
"comment": f"语义相似度:{similarity:.2%}"
}
# 3. BLEU评分(机器翻译常用)
def bleu_evaluator(run, example):
\"\"\"BLEU评分评估器\"\"\"
from nltk.translate.bleu_score import sentence_bleu
prediction = run.outputs.get("answer", "").split()
reference = [example.outputs.get("answer", "").split()]
score = sentence_bleu(reference, prediction)
return {
"key": "bleu_score",
"score": score,
"comment": f"BLEU:{score:.2%}"
}
# 4. ROUGE评分(摘要评估常用)
def rouge_evaluator(run, example):
\"\"\"ROUGE评分评估器\"\"\"
from rouge import Rouge
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
rouge = Rouge()
scores = rouge.get_scores(prediction, reference)[0]
# 使用ROUGE-L F1分数
score = scores["rouge-l"]["f"]
return {
"key": "rouge_score",
"score": score,
"comment": f"ROUGE-L:{score:.2%}"
}
# 5. 综合相似度评估
def combined_similarity_evaluator(run, example):
\"\"\"综合多种相似度指标\"\"\"
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
# 字符串相似度
str_sim = SequenceMatcher(None, prediction, reference).ratio()
# 词重叠相似度
pred_words = set(prediction.lower().split())
ref_words = set(reference.lower().split())
if len(pred_words | ref_words) > 0:
word_sim = len(pred_words & ref_words) / len(pred_words | ref_words)
else:
word_sim = 0.0
# 加权平均
combined_score = 0.5 * str_sim + 0.5 * word_sim
return {
"key": "combined_similarity",
"score": combined_score,
"comment": f"综合相似度:{combined_score:.2%}"
}
# 6. 使用相似度评估器
results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[
string_similarity_evaluator,
embedding_similarity_evaluator,
combined_similarity_evaluator
],
experiment_prefix="similarity_eval"
)
---
02.自定义评估器
a.规则评估
a.功能说明
自定义规则评估器基于业务逻辑判断输出质量。检查输出格式、长度、关键词、结构等。实现特定领域的验证规则。适用于有明确规范要求的场景,如客服回复规范、报告格式等。规则评估快速、确定,但缺乏灵活性。
b.代码示例
---
def rule_based_evaluator(run, example):
\"\"\"基于规则的评估器\"\"\"
answer = run.outputs.get("answer", "")
score = 1.0
issues = []
# 规则1:长度检查
if len(answer) < 10:
score -= 0.3
issues.append("答案过短")
elif len(answer) > 500:
score -= 0.2
issues.append("答案过长")
# 规则2:礼貌用语检查
polite_words = ["请", "谢谢", "您好", "感谢"]
if not any(word in answer for word in polite_words):
score -= 0.2
issues.append("缺少礼貌用语")
# 规则3:禁用词检查
forbidden_words = ["不知道", "不清楚", "无法回答"]
if any(word in answer for word in forbidden_words):
score -= 0.5
issues.append("包含禁用词")
# 规则4:联系方式检查
if "400" in answer or "客服" in answer:
score += 0.1 # 提供了联系方式,加分
score = max(0.0, min(1.0, score))
return {
"key": "rule_compliance",
"score": score,
"comment": f"规则评分:{score:.2%}。问题:{', '.join(issues) if issues else '无'}"
}
# 客服回复规范检查
def customer_service_evaluator(run, example):
\"\"\"客服回复规范评估\"\"\"
answer = run.outputs.get("answer", "")
score = 100.0
# 开头问候
if not any(answer.startswith(greet) for greet in ["您好", "你好", "感谢"]):
score -= 10
# 结尾礼貌
if not any(answer.endswith(end) for end in ["。", "!", "谢谢"]):
score -= 5
# 包含解决方案
solution_keywords = ["可以", "请", "方法", "步骤"]
if not any(kw in answer for kw in solution_keywords):
score -= 20
# 避免否定语气
negative_keywords = ["不行", "不可以", "无法", "没有办法"]
if any(kw in answer for kw in negative_keywords):
score -= 15
score = max(0, min(100, score)) / 100
return {
"key": "cs_compliance",
"score": score
}
# JSON格式验证
def json_format_evaluator(run, example):
\"\"\"JSON格式验证评估器\"\"\"
import json
output = run.outputs.get("json_output", "")
try:
# 尝试解析JSON
data = json.loads(output)
# 检查必需字段
required_fields = ["status", "message", "data"]
missing = [f for f in required_fields if f not in data]
if missing:
score = 0.5
comment = f"缺少字段:{missing}"
else:
score = 1.0
comment = "JSON格式正确"
except json.JSONDecodeError as e:
score = 0.0
comment = f"JSON解析失败:{e}"
return {
"key": "json_format",
"score": score,
"comment": comment
}
# 使用规则评估器
results = evaluate(
qa_app,
data="customer_service_qa",
evaluators=[
rule_based_evaluator,
customer_service_evaluator
],
experiment_prefix="rule_eval"
)
---
b.LLM评估
a.功能说明
使用LLM作为评估器,利用其语言理解能力判断输出质量。定义评估标准和提示词,让LLM打分或判断。适用于主观性强、需要语义理解的评估,如创意性、流畅性、专业性等。LLM评估灵活强大,但成本较高、速度较慢。
b.代码示例
---
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
# 1. LLM评估器
def llm_quality_evaluator(run, example):
\"\"\"使用LLM评估答案质量\"\"\"
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
question = example.inputs.get("question", "")
# 评估LLM
eval_llm = ChatOpenAI(model="gpt-4", temperature=0)
# 评估提示词
eval_prompt = ChatPromptTemplate.from_template('''
你是一个专业的答案质量评估专家。请评估以下答案的质量。
问题:{question}
参考答案:{reference}
待评估答案:{prediction}
评估维度:
1. 准确性:答案是否正确
2. 完整性:是否回答了问题的所有方面
3. 清晰度:表述是否清晰易懂
4. 专业性:是否专业规范
请给出0-100的总分,并简要说明理由。
输出格式:
分数:XX
理由:XXX
''')
# 生成评估
chain = eval_prompt | eval_llm
result = chain.invoke({
"question": question,
"reference": reference,
"prediction": prediction
})
# 解析分数
content = result.content
try:
score_line = [l for l in content.split("\\n") if l.startswith("分数")][0]
score = int(score_line.split(":")[1]) / 100
except:
score = 0.5
return {
"key": "llm_quality",
"score": score,
"comment": content
}
# 2. 维度评估
def llm_dimensional_evaluator(run, example):
\"\"\"多维度LLM评估\"\"\"
prediction = run.outputs.get("answer", "")
eval_llm = ChatOpenAI(model="gpt-4", temperature=0)
dimensions = {
"accuracy": "准确性(0-10)",
"relevance": "相关性(0-10)",
"clarity": "清晰度(0-10)",
"completeness": "完整性(0-10)"
}
eval_prompt = ChatPromptTemplate.from_template('''
评估以下答案:{answer}
维度:{dimension}
只输出分数(0-10的整数):
''')
scores = {}
for dim, desc in dimensions.items():
result = eval_llm.invoke(
eval_prompt.format(answer=prediction, dimension=desc)
)
try:
scores[dim] = int(result.content.strip()) / 10
except:
scores[dim] = 0.5
# 综合得分
overall_score = sum(scores.values()) / len(scores)
return {
"key": "llm_dimensional",
"score": overall_score,
"comment": f"维度评分:{scores}"
}
# 3. 对比评估
def llm_comparison_evaluator(run, example):
\"\"\"让LLM对比两个答案\"\"\"
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
question = example.inputs.get("question", "")
eval_llm = ChatOpenAI(model="gpt-4", temperature=0)
eval_prompt = ChatPromptTemplate.from_template('''
问题:{question}
答案A:{reference}
答案B:{prediction}
请对比两个答案,判断哪个更好。
只回答:A更好 / B更好 / 相当
''')
result = eval_llm.invoke({
"question": question,
"reference": reference,
"prediction": prediction
})
response = result.content.strip()
if "B更好" in response:
score = 1.0
elif "相当" in response:
score = 0.8
else:
score = 0.6
return {
"key": "llm_comparison",
"score": score,
"comment": response
}
# 使用LLM评估器
results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[
llm_quality_evaluator,
llm_dimensional_evaluator,
llm_comparison_evaluator
],
experiment_prefix="llm_eval",
max_concurrency=2 # LLM评估较慢,限制并发
)
---
4.2 评估指标
01.系统指标
a.准确率
a.功能说明
准确率(Accuracy)是评估分类和问答任务最基本的指标,表示预测正确的样本占总样本的比例。计算公式:Accuracy = 正确数 / 总数。适用于类别平衡的场景。对于不平衡数据,准确率可能产生误导,需要结合其他指标(精确率、召回率等)。准确率直观易懂,是评估的首选指标。
b.代码示例
---
from langsmith import Client
from langsmith.evaluation import evaluate
client = Client()
# 1. 计算准确率
def accuracy_evaluator(run, example):
\"\"\"准确率评估器\"\"\"
prediction = run.outputs.get("answer", "").strip().lower()
reference = example.outputs.get("answer", "").strip().lower()
# 精确匹配
is_correct = prediction == reference
return {
"key": "accuracy",
"score": 1.0 if is_correct else 0.0
}
# 2. 评估并计算准确率
results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[accuracy_evaluator],
experiment_prefix="accuracy_test"
)
# 获取详细结果
runs = client.list_runs(
project_name="accuracy_test",
limit=1000
)
correct = 0
total = 0
for run in runs:
if run.feedback_stats:
accuracy = run.feedback_stats.get("accuracy", 0)
if accuracy == 1.0:
correct += 1
total += 1
accuracy_rate = correct / total if total > 0 else 0
print(f"准确率:{accuracy_rate:.2%} ({correct}/{total})")
# 3. 分类任务准确率
def classification_accuracy(run, example):
\"\"\"分类准确率\"\"\"
predicted_class = run.outputs.get("class", "")
true_class = example.outputs.get("class", "")
return {
"key": "classification_accuracy",
"score": 1.0 if predicted_class == true_class else 0.0
}
# 4. Top-K准确率
def topk_accuracy_evaluator(run, example, k=3):
\"\"\"Top-K准确率(预测的前K个结果中有正确答案)\"\"\"
predictions = run.outputs.get("top_predictions", [])
reference = example.outputs.get("answer", "")
# 检查reference是否在前K个预测中
is_correct = reference in predictions[:k]
return {
"key": f"top{k}_accuracy",
"score": 1.0 if is_correct else 0.0
}
# 5. 模糊匹配准确率
def fuzzy_accuracy_evaluator(run, example, threshold=0.8):
\"\"\"模糊匹配准确率(相似度超过阈值即算正确)\"\"\"
from difflib import SequenceMatcher
prediction = run.outputs.get("answer", "")
reference = example.outputs.get("answer", "")
similarity = SequenceMatcher(None, prediction, reference).ratio()
return {
"key": "fuzzy_accuracy",
"score": 1.0 if similarity >= threshold else 0.0,
"comment": f"相似度:{similarity:.2%}"
}
# 6. 分组准确率
def grouped_accuracy(project_name: str):
\"\"\"按类别分组计算准确率\"\"\"
runs = client.list_runs(project_name=project_name, limit=1000)
# 按类别分组
from collections import defaultdict
grouped = defaultdict(lambda: {"correct": 0, "total": 0})
for run in runs:
category = run.extra.get("category", "未分类")
accuracy = run.feedback_stats.get("accuracy", 0)
grouped[category]["total"] += 1
if accuracy == 1.0:
grouped[category]["correct"] += 1
# 输出每个类别的准确率
print("\\n分组准确率:")
for category, stats in grouped.items():
rate = stats["correct"] / stats["total"] if stats["total"] > 0 else 0
print(f" {category}: {rate:.2%} ({stats['correct']}/{stats['total']})")
grouped_accuracy("accuracy_test")
---
b.精确率召回率
a.功能说明
精确率(Precision)表示预测为正例中真正正例的比例,召回率(Recall)表示真正正例中被预测为正例的比例。F1分数是精确率和召回率的调和平均,平衡两者。适用于信息检索、推荐、检测等任务。精确率关注预测质量,召回率关注覆盖度,需要根据业务需求权衡。
b.代码示例
---
def precision_recall_evaluator(run, example):
\"\"\"精确率和召回率评估器\"\"\"
predicted_items = set(run.outputs.get("items", []))
reference_items = set(example.outputs.get("items", []))
if not predicted_items:
precision = 0.0
else:
true_positives = len(predicted_items & reference_items)
precision = true_positives / len(predicted_items)
if not reference_items:
recall = 0.0
else:
true_positives = len(predicted_items & reference_items)
recall = true_positives / len(reference_items)
# F1分数
if precision + recall > 0:
f1 = 2 * (precision * recall) / (precision + recall)
else:
f1 = 0.0
return {
"key": "precision_recall",
"score": f1,
"comment": f"Precision: {precision:.2%}, Recall: {recall:.2%}, F1: {f1:.2%}"
}
# 混淆矩阵
def confusion_matrix_evaluator(run, example):
\"\"\"计算混淆矩阵\"\"\"
predicted = run.outputs.get("label", "")
actual = example.outputs.get("label", "")
# TP, TN, FP, FN
if predicted == "positive" and actual == "positive":
result_type = "TP"
elif predicted == "negative" and actual == "negative":
result_type = "TN"
elif predicted == "positive" and actual == "negative":
result_type = "FP"
else:
result_type = "FN"
return {
"key": "confusion_matrix",
"score": 1.0 if result_type in ["TP", "TN"] else 0.0,
"comment": result_type
}
---
02.业务指标
a.用户满意度
a.功能说明
用户满意度是最直观的业务指标,反映用户对输出的满意程度。可以通过人工评分、点赞/踩、星级评价等方式收集。需要设计合理的满意度评分标准。定期分析满意度趋势,识别问题。用户满意度是产品成功的核心指标。
b.代码示例
---
# 1. 人工评分
def collect_human_feedback(run_id: str, score: float, comment: str = ""):
\"\"\"收集人工反馈\"\"\"
client.create_feedback(
run_id=run_id,
key="user_satisfaction",
score=score, # 0-1之间
comment=comment
)
# 示例使用
runs = client.list_runs(project_name="production", limit=10)
for run in runs:
# 模拟人工评分
score = 0.85 # 实际应该由人工打分
collect_human_feedback(
str(run.id),
score,
"答案准确但不够详细"
)
# 2. 星级评价
def star_rating_evaluator(run_id: str, stars: int):
\"\"\"星级评价(1-5星)\"\"\"
# 转换为0-1分数
score = stars / 5.0
client.create_feedback(
run_id=run_id,
key="star_rating",
score=score,
comment=f"{stars}星"
)
# 3. 点赞/踩
def thumbs_feedback(run_id: str, is_positive: bool):
\"\"\"点赞/踩反馈\"\"\"
score = 1.0 if is_positive else 0.0
client.create_feedback(
run_id=run_id,
key="thumbs",
score=score,
comment="👍" if is_positive else "👎"
)
# 4. 满意度统计
def analyze_satisfaction(project_name: str):
\"\"\"分析用户满意度\"\"\"
runs = client.list_runs(project_name=project_name, limit=1000)
satisfaction_scores = []
for run in runs:
if run.feedback_stats:
sat = run.feedback_stats.get("user_satisfaction")
if sat is not None:
satisfaction_scores.append(sat)
if satisfaction_scores:
import statistics
avg_sat = statistics.mean(satisfaction_scores)
median_sat = statistics.median(satisfaction_scores)
# 满意率(>0.7算满意)
satisfied_count = sum(1 for s in satisfaction_scores if s > 0.7)
satisfaction_rate = satisfied_count / len(satisfaction_scores)
print(f"\\n用户满意度分析:")
print(f" 平均满意度:{avg_sat:.2%}")
print(f" 中位数:{median_sat:.2%}")
print(f" 满意率:{satisfaction_rate:.2%}")
print(f" 样本数:{len(satisfaction_scores)}")
analyze_satisfaction("production")
# 5. 满意度趋势
def satisfaction_trend(project_name: str, days: int = 7):
\"\"\"满意度趋势分析\"\"\"
from datetime import datetime, timedelta
from collections import defaultdict
start_time = datetime.now() - timedelta(days=days)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=10000
)
daily_scores = defaultdict(list)
for run in runs:
if run.feedback_stats:
sat = run.feedback_stats.get("user_satisfaction")
if sat is not None:
date = run.start_time.date()
daily_scores[date].append(sat)
print("\\n满意度趋势:")
for date in sorted(daily_scores.keys()):
scores = daily_scores[date]
avg = sum(scores) / len(scores)
print(f" {date}: {avg:.2%} ({len(scores)}个样本)")
satisfaction_trend("production", days=7)
---
b.解决率
a.功能说明
解决率表示问题被成功解决的比例,是客服、技术支持等场景的关键指标。可以通过后续追问、工单状态、用户反馈等判断是否解决。需要定义"解决"的标准(如无后续咨询、用户确认满意等)。解决率反映系统的实际效用,是业务价值的直接体现。
b.代码示例
---
# 1. 解决率评估器
def resolution_evaluator(run, example):
\"\"\"问题解决率评估\"\"\"
# 检查是否有后续追问
has_followup = run.extra.get("has_followup", False)
# 检查用户是否确认解决
user_confirmed = run.extra.get("user_confirmed_resolved", False)
# 判断是否解决
is_resolved = user_confirmed or not has_followup
return {
"key": "resolution_rate",
"score": 1.0 if is_resolved else 0.0,
"comment": "已解决" if is_resolved else "未解决"
}
# 2. 首次解决率
def first_contact_resolution(run, example):
\"\"\"首次接触解决率(FCR)\"\"\"
# 检查是否在第一次回复就解决
interaction_count = run.extra.get("interaction_count", 1)
is_resolved = run.extra.get("is_resolved", False)
fcr = 1.0 if (interaction_count == 1 and is_resolved) else 0.0
return {
"key": "first_contact_resolution",
"score": fcr
}
# 3. 解决率统计
def calculate_resolution_rate(project_name: str):
\"\"\"计算解决率\"\"\"
runs = client.list_runs(project_name=project_name, limit=1000)
total = 0
resolved = 0
for run in runs:
if run.feedback_stats:
res = run.feedback_stats.get("resolution_rate")
if res is not None:
total += 1
if res == 1.0:
resolved += 1
resolution_rate = resolved / total if total > 0 else 0
print(f"\\n解决率:{resolution_rate:.2%} ({resolved}/{total})")
return resolution_rate
calculate_resolution_rate("customer_service")
---
4.3 批量评估
01.评估运行
a.执行评估
a.功能说明
批量评估在数据集上批量运行应用,自动应用评估器打分。使用evaluate()函数执行评估,指定应用、数据集、评估器列表。自动记录每个样本的运行结果和评估分数。生成评估报告,汇总统计信息。批量评估是系统化测试的核心,支持快速验证和对比。
b.代码示例
---
from langsmith import Client
from langsmith.evaluation import evaluate
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
client = Client()
# 1. 定义应用
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
prompt = ChatPromptTemplate.from_template(
"简洁回答:{question}"
)
def qa_app(inputs: dict) -> dict:
\"\"\"问答应用\"\"\"
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
# 2. 定义评估器
def accuracy_evaluator(run, example):
prediction = run.outputs.get("answer", "").strip()
reference = example.outputs.get("answer", "").strip()
return {
"key": "accuracy",
"score": 1.0 if prediction == reference else 0.0
}
def length_evaluator(run, example):
answer = run.outputs.get("answer", "")
length = len(answer)
if 20 <= length <= 200:
score = 1.0
elif length < 20:
score = length / 20
else:
score = max(0, 1 - (length - 200) / 200)
return {
"key": "length_appropriateness",
"score": score
}
# 3. 执行评估
results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[accuracy_evaluator, length_evaluator],
experiment_prefix="qa_eval_v1",
description="GPT-3.5评估测试",
max_concurrency=5 # 并发度
)
print(f"\\n评估完成:")
print(f" 实验名称:{results['experiment_name']}")
print(f" 运行数量:{results['run_count']}")
print(f" 准确率:{results['accuracy']:.2%}")
print(f" 长度适当性:{results['length_appropriateness']:.2%}")
# 4. 查看详细结果
eval_runs = client.list_runs(
project_name=results['experiment_name'],
limit=100
)
print("\\n样本结果:")
for i, run in enumerate(list(eval_runs)[:5]):
print(f"\\n样本 {i+1}:")
print(f" 输入:{run.inputs}")
print(f" 输出:{run.outputs}")
print(f" 评分:{run.feedback_stats}")
# 5. 失败样本分析
failed_runs = []
for run in eval_runs:
if run.feedback_stats:
accuracy = run.feedback_stats.get("accuracy", 0)
if accuracy == 0:
failed_runs.append(run)
print(f"\\n失败样本数:{len(failed_runs)}")
if failed_runs:
print("\\n失败样本示例:")
for run in failed_runs[:3]:
print(f" 输入:{run.inputs}")
print(f" 预测:{run.outputs}")
print(f" 期望:见数据集")
print()
# 6. 导出评估结果
def export_evaluation_results(experiment_name: str):
\"\"\"导出评估结果\"\"\"
import pandas as pd
runs = client.list_runs(
project_name=experiment_name,
limit=1000
)
results = []
for run in runs:
results.append({
"run_id": str(run.id),
"question": run.inputs.get("question", ""),
"predicted": run.outputs.get("answer", ""),
"accuracy": run.feedback_stats.get("accuracy", 0),
"length_score": run.feedback_stats.get("length_appropriateness", 0),
"latency_ms": run.latency,
"total_tokens": run.total_tokens
})
df = pd.DataFrame(results)
df.to_csv(f"{experiment_name}_results.csv", index=False, encoding="utf-8-sig")
print(f"结果已导出到:{experiment_name}_results.csv")
export_evaluation_results(results['experiment_name'])
---
b.并发控制
a.功能说明
批量评估支持并发执行,提升评估速度。通过max_concurrency参数控制并发数。需要考虑API限流、系统负载等因素。过高并发可能触发限流或影响稳定性。建议根据评估器类型调整并发度(LLM评估器并发度较低,规则评估器可以较高)。合理的并发控制平衡速度和稳定性。
b.代码示例
---
# 1. 调整并发度
# 规则评估器(快速)
rule_results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[rule_based_evaluator],
experiment_prefix="rule_eval",
max_concurrency=20 # 高并发
)
# LLM评估器(慢速)
llm_results = evaluate(
qa_app,
data="qa_dataset",
evaluators=[llm_quality_evaluator],
experiment_prefix="llm_eval",
max_concurrency=2 # 低并发
)
# 2. 分批评估
def batched_evaluation(app, dataset_name: str, batch_size: int = 50):
\"\"\"分批评估大数据集\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
total_batches = (len(examples) + batch_size - 1) // batch_size
print(f"总样本数:{len(examples)},分{total_batches}批处理")
for i in range(0, len(examples), batch_size):
batch = examples[i:i+batch_size]
batch_num = i // batch_size + 1
print(f"\\n处理第{batch_num}/{total_batches}批...")
# 为这批创建临时数据集
temp_dataset = client.create_dataset(
dataset_name=f"{dataset_name}_batch_{batch_num}_temp"
)
# 添加样本
for ex in batch:
client.create_example(
dataset_id=temp_dataset.id,
inputs=ex.inputs,
outputs=ex.outputs
)
# 评估这批
results = evaluate(
app,
data=temp_dataset.name,
evaluators=[accuracy_evaluator],
experiment_prefix=f"batch_{batch_num}"
)
print(f"第{batch_num}批完成,准确率:{results['accuracy']:.2%}")
# 清理临时数据集
client.delete_dataset(dataset_id=temp_dataset.id)
# batched_evaluation(qa_app, "large_dataset", batch_size=100)
# 3. 限流处理
import time
from functools import wraps
def rate_limited_evaluator(calls_per_minute: int = 60):
\"\"\"限流评估器装饰器\"\"\"
min_interval = 60.0 / calls_per_minute
last_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait_time = min_interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
@rate_limited_evaluator(calls_per_minute=30)
def rate_limited_llm_evaluator(run, example):
\"\"\"限流的LLM评估器\"\"\"
# 调用外部API的评估逻辑
return llm_quality_evaluator(run, example)
---
02.评估报告
a.统计分析
a.功能说明
评估完成后生成统计报告,汇总关键指标。计算平均值、中位数、标准差、百分位数等统计量。按类别、难度、标签等维度分组分析。生成图表可视化结果。统计分析帮助全面理解评估结果,发现规律和问题。
b.代码示例
---
import pandas as pd
import statistics
def generate_evaluation_report(experiment_name: str):
\"\"\"生成评估报告\"\"\"
runs = list(client.list_runs(
project_name=experiment_name,
limit=1000
))
if not runs:
print("无评估数据")
return
# 收集数据
data = []
for run in runs:
data.append({
"accuracy": run.feedback_stats.get("accuracy", 0),
"latency": run.latency or 0,
"tokens": run.total_tokens or 0,
"category": run.extra.get("category", "未分类")
})
df = pd.DataFrame(data)
# 生成报告
report = f\"\"\"
评估报告:{experiment_name}
{'='*60}
1. 总体统计
样本数:{len(df)}
准确率:{df['accuracy'].mean():.2%}
中位数:{df['accuracy'].median():.2%}
标准差:{df['accuracy'].std():.4f}
2. 性能统计
平均延迟:{df['latency'].mean():.0f}ms
P50延迟:{df['latency'].quantile(0.5):.0f}ms
P95延迟:{df['latency'].quantile(0.95):.0f}ms
3. Token统计
总Token:{df['tokens'].sum():,}
平均Token:{df['tokens'].mean():.0f}
估算成本:${df['tokens'].sum() * 0.002 / 1000:.2f}
4. 分类统计
\"\"\"
# 按类别统计
for category in df['category'].unique():
cat_df = df[df['category'] == category]
report += f\" {category}:准确率{cat_df['accuracy'].mean():.2%},样本数{len(cat_df)}\\n\"
print(report)
# 保存报告
with open(f"{experiment_name}_report.txt", "w", encoding="utf-8") as f:
f.write(report)
return df
df = generate_evaluation_report("qa_eval_v1")
# 可视化
import matplotlib.pyplot as plt
# 准确率分布
plt.figure(figsize=(10, 6))
plt.hist(df['accuracy'], bins=20, edgecolor='black')
plt.xlabel('准确率')
plt.ylabel('样本数')
plt.title('准确率分布')
plt.savefig("accuracy_distribution.png")
# 分类对比
category_stats = df.groupby('category')['accuracy'].mean()
plt.figure(figsize=(10, 6))
category_stats.plot(kind='bar')
plt.xlabel('类别')
plt.ylabel('平均准确率')
plt.title('各类别准确率')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("category_comparison.png")
---
b.对比分析
a.功能说明
对比不同实验的评估结果,识别性能差异。对比不同模型、提示词、参数配置的效果。并排展示关键指标,计算差异百分比。识别改进和退化的样本。对比分析指导版本选择和优化方向。
b.代码示例
---
def compare_experiments(exp1: str, exp2: str):
\"\"\"对比两个实验\"\"\"
# 获取两个实验的运行
runs1 = list(client.list_runs(project_name=exp1, limit=1000))
runs2 = list(client.list_runs(project_name=exp2, limit=1000))
# 计算指标
def calc_metrics(runs):
accuracies = [r.feedback_stats.get("accuracy", 0) for r in runs if r.feedback_stats]
latencies = [r.latency for r in runs if r.latency]
tokens = [r.total_tokens for r in runs if r.total_tokens]
return {
"accuracy": statistics.mean(accuracies) if accuracies else 0,
"latency": statistics.mean(latencies) if latencies else 0,
"tokens": sum(tokens)
}
metrics1 = calc_metrics(runs1)
metrics2 = calc_metrics(runs2)
# 生成对比报告
print(f\"\"\"
实验对比:{exp1} vs {exp2}
{'='*70}
指标 {exp1:<20} {exp2:<20} 变化
{'-'*70}
准确率 {metrics1['accuracy']:<20.2%} {metrics2['accuracy']:<20.2%} {((metrics2['accuracy']-metrics1['accuracy'])/metrics1['accuracy']*100):+.1f}%
平均延迟 {metrics1['latency']:<20.0f}ms {metrics2['latency']:<20.0f}ms {((metrics2['latency']-metrics1['latency'])/metrics1['latency']*100):+.1f}%
总Token {metrics1['tokens']:<20,} {metrics2['tokens']:<20,} {((metrics2['tokens']-metrics1['tokens'])/metrics1['tokens']*100):+.1f}%
\"\"\")
# 样本级对比
# 假设两个实验使用相同数据集和顺序
print("\\n样本级差异分析:")
improved = 0
regressed = 0
for r1, r2 in zip(runs1, runs2):
acc1 = r1.feedback_stats.get("accuracy", 0) if r1.feedback_stats else 0
acc2 = r2.feedback_stats.get("accuracy", 0) if r2.feedback_stats else 0
if acc2 > acc1:
improved += 1
elif acc2 < acc1:
regressed += 1
total = min(len(runs1), len(runs2))
print(f" 改进样本:{improved} ({improved/total:.1%})")
print(f" 退化样本:{regressed} ({regressed/total:.1%})")
print(f" 持平样本:{total-improved-regressed} ({(total-improved-regressed)/total:.1%})")
compare_experiments("qa_eval_v1", "qa_eval_v2")
---
4.4 A/B测试
01.实验设计
a.对照组设置
a.功能说明
A/B测试通过对照实验对比不同版本的效果。设置对照组(A)和实验组(B),使用相同数据集评估。对照组通常是当前版本,实验组是改进版本。确保实验条件一致,只改变一个变量。A/B测试提供科学的版本对比,支持数据驱动决策。
b.代码示例
---
from langsmith import Client
from langsmith.evaluation import evaluate
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
client = Client()
# 1. 版本A(对照组)
def app_version_a(inputs: dict) -> dict:
\"\"\"版本A:使用GPT-3.5\"\"\"
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
prompt = ChatPromptTemplate.from_template(
"回答问题:{question}"
)
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
# 2. 版本B(实验组)
def app_version_b(inputs: dict) -> dict:
\"\"\"版本B:使用GPT-4\"\"\"
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = ChatPromptTemplate.from_template(
"简洁回答问题:{question}"
)
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
# 3. 评估器
def accuracy_evaluator(run, example):
prediction = run.outputs.get("answer", "").strip().lower()
reference = example.outputs.get("answer", "").strip().lower()
return {
"key": "accuracy",
"score": 1.0 if prediction == reference else 0.0
}
# 4. 执行A/B测试
# 使用相同数据集
dataset_name = "qa_test_set"
# 评估版本A
results_a = evaluate(
app_version_a,
data=dataset_name,
evaluators=[accuracy_evaluator],
experiment_prefix="ab_test_version_a",
description="A/B测试 - 版本A(GPT-3.5)"
)
print(f"版本A评估完成:")
print(f" 准确率:{results_a['accuracy']:.2%}")
# 评估版本B
results_b = evaluate(
app_version_b,
data=dataset_name,
evaluators=[accuracy_evaluator],
experiment_prefix="ab_test_version_b",
description="A/B测试 - 版本B(GPT-4)"
)
print(f"\\n版本B评估完成:")
print(f" 准确率:{results_b['accuracy']:.2%}")
# 5. 对比分析
print(f\"\"\"
\\nA/B测试结果对比:
{'='*50}
指标 版本A 版本B 差异
{'-'*50}
准确率 {results_a['accuracy']:.2%} {results_b['accuracy']:.2%} {(results_b['accuracy']-results_a['accuracy']):.2%}
\"\"\")
# 6. 统计显著性检验
def statistical_significance_test(exp_a: str, exp_b: str):
\"\"\"统计显著性检验\"\"\"
from scipy import stats
# 获取两组结果
runs_a = list(client.list_runs(project_name=exp_a, limit=1000))
runs_b = list(client.list_runs(project_name=exp_b, limit=1000))
# 提取准确率
scores_a = [r.feedback_stats.get("accuracy", 0) for r in runs_a if r.feedback_stats]
scores_b = [r.feedback_stats.get("accuracy", 0) for r in runs_b if r.feedback_stats]
# t检验
t_stat, p_value = stats.ttest_ind(scores_a, scores_b)
print(f\"\\n统计显著性检验:\")
print(f\" t统计量:{t_stat:.4f}\")
print(f\" p值:{p_value:.4f}\")
if p_value < 0.05:
print(f\" 结论:差异显著(p < 0.05)\")
else:
print(f\" 结论:差异不显著(p >= 0.05)\")
statistical_significance_test(
"ab_test_version_a",
"ab_test_version_b"
)
---
b.多变量测试
a.功能说明
多变量测试同时测试多个变量的组合效果。例如测试不同模型、提示词、温度参数的组合。设计正交实验减少测试次数。分析每个变量的独立影响和交互作用。多变量测试更全面但成本更高,需要合理设计实验方案。
b.代码示例
---
# 1. 定义变量
models = ["gpt-3.5-turbo", "gpt-4"]
temperatures = [0.3, 0.7]
prompts = [
"回答问题:{question}",
"简洁回答问题:{question}"
]
# 2. 生成所有组合
from itertools import product
combinations = list(product(models, temperatures, prompts))
print(f"总共{len(combinations)}种组合:")
for i, (model, temp, prompt) in enumerate(combinations):
print(f" 组合{i+1}: {model}, temp={temp}, prompt='{prompt[:20]}...'")
# 3. 定义可配置的应用
def configurable_app(model: str, temperature: float, prompt_template: str):
\"\"\"可配置的应用工厂\"\"\"
def app(inputs: dict) -> dict:
llm = ChatOpenAI(model=model, temperature=temperature)
prompt = ChatPromptTemplate.from_template(prompt_template)
chain = prompt | llm
result = chain.invoke(inputs)
return {"answer": result.content}
return app
# 4. 测试所有组合
results = []
for i, (model, temp, prompt_template) in enumerate(combinations):
print(f"\\n测试组合{i+1}/{len(combinations)}...")
app = configurable_app(model, temp, prompt_template)
eval_result = evaluate(
app,
data="qa_test_set",
evaluators=[accuracy_evaluator],
experiment_prefix=f"multivar_test_{i+1}",
description=f"Model:{model}, Temp:{temp}, Prompt:{prompt_template[:30]}"
)
results.append({
"combination": i+1,
"model": model,
"temperature": temp,
"prompt": prompt_template[:30],
"accuracy": eval_result["accuracy"]
})
# 5. 分析结果
import pandas as pd
df = pd.DataFrame(results)
print("\\n多变量测试结果:")
print(df.to_string(index=False))
# 最佳组合
best = df.loc[df['accuracy'].idxmax()]
print(f\"\\n最佳组合:\")
print(f\" 模型:{best['model']}\")
print(f\" 温度:{best['temperature']}\")
print(f\" 提示词:{best['prompt']}\")
print(f\" 准确率:{best['accuracy']:.2%}\")
# 6. 变量影响分析
print(\"\\n变量影响分析:\")
# 模型影响
model_effect = df.groupby('model')['accuracy'].mean()
print(f\" 模型平均准确率:\")
for model, acc in model_effect.items():
print(f\" {model}: {acc:.2%}\")
# 温度影响
temp_effect = df.groupby('temperature')['accuracy'].mean()
print(f\" 温度平均准确率:\")
for temp, acc in temp_effect.items():
print(f\" {temp}: {acc:.2%}\")
---
02.实验管理
a.实验追踪
a.功能说明
追踪所有A/B测试实验的配置、结果、决策。记录实验目的、假设、变量、数据集。保存评估结果和分析报告。追踪实验状态(进行中、已完成、已采纳等)。实验追踪建立知识库,避免重复实验,支持经验积累。
b.代码示例
---
import json
from datetime import datetime
class ExperimentTracker:
\"\"\"实验追踪器\"\"\"
def __init__(self, log_file: str = "experiments_log.json"):
self.log_file = log_file
self.experiments = self._load_log()
def _load_log(self):
\"\"\"加载实验记录\"\"\"
try:
with open(self.log_file, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return []
def _save_log(self):
\"\"\"保存实验记录\"\"\"
with open(self.log_file, "w", encoding="utf-8") as f:
json.dump(self.experiments, f, indent=2, ensure_ascii=False)
def start_experiment(
self,
name: str,
hypothesis: str,
variables: dict,
dataset: str
):
\"\"\"开始新实验\"\"\"
experiment = {
"id": len(self.experiments) + 1,
"name": name,
"hypothesis": hypothesis,
"variables": variables,
"dataset": dataset,
"status": "in_progress",
"started_at": datetime.now().isoformat(),
"results": None,
"decision": None
}
self.experiments.append(experiment)
self._save_log()
print(f"实验已开始:{name} (ID: {experiment['id']})")
return experiment["id"]
def complete_experiment(
self,
experiment_id: int,
results: dict,
decision: str
):
\"\"\"完成实验\"\"\"
for exp in self.experiments:
if exp["id"] == experiment_id:
exp["status"] = "completed"
exp["results"] = results
exp["decision"] = decision
exp["completed_at"] = datetime.now().isoformat()
break
self._save_log()
print(f"实验{experiment_id}已完成")
def list_experiments(self, status: str = None):
\"\"\"列出实验\"\"\"
filtered = self.experiments
if status:
filtered = [e for e in filtered if e["status"] == status]
print(f\"\\n实验列表({len(filtered)}个):\")
for exp in filtered:
print(f\" [{exp['id']}] {exp['name']}\")
print(f\" 状态:{exp['status']}\")
print(f\" 假设:{exp['hypothesis']}\")
if exp.get("results"):
print(f\" 结果:{exp['results']}\")
print()
# 使用实验追踪器
tracker = ExperimentTracker()
# 开始新实验
exp_id = tracker.start_experiment(
name="GPT-4 vs GPT-3.5对比",
hypothesis="GPT-4能提升10%准确率",
variables={"model": ["gpt-3.5-turbo", "gpt-4"]},
dataset="qa_test_set"
)
# ... 运行实验 ...
# 完成实验
tracker.complete_experiment(
exp_id,
results={"accuracy_a": 0.75, "accuracy_b": 0.82},
decision="采纳版本B(GPT-4),准确率提升7%"
)
# 查看实验历史
tracker.list_experiments()
---
b.决策支持
a.功能说明
基于A/B测试结果做出版本选择决策。综合考虑准确率、成本、延迟等多个维度。计算性价比,权衡精度和成本。考虑业务需求和约束条件。生成决策建议和理由。决策支持将实验结果转化为行动,指导版本发布。
b.代码示例
---
class ABTestDecision:
\"\"\"A/B测试决策支持\"\"\"
def __init__(self, results_a: dict, results_b: dict):
self.results_a = results_a
self.results_b = results_b
def calculate_score(
self,
version: str,
accuracy_weight: float = 0.5,
cost_weight: float = 0.3,
latency_weight: float = 0.2
):
\"\"\"计算综合得分\"\"\"
results = self.results_a if version == "A" else self.results_b
# 归一化各指标(0-1)
accuracy_score = results.get("accuracy", 0)
# 成本归一化(成本越低越好,反转)
cost = results.get("cost", 0)
cost_score = 1 - min(cost / 100, 1)
# 延迟归一化(延迟越低越好,反转)
latency = results.get("latency", 0)
latency_score = 1 - min(latency / 5000, 1)
# 加权总分
total_score = (
accuracy_score * accuracy_weight +
cost_score * cost_weight +
latency_score * latency_weight
)
return total_score
def recommend(self):
\"\"\"生成推荐\"\"\"
score_a = self.calculate_score("A")
score_b = self.calculate_score("B")
print(f\"\\n决策分析:\")
print(f\"{'='*60}\")
print(f\"版本A综合得分:{score_a:.3f}\")
print(f\" 准确率:{self.results_a.get('accuracy', 0):.2%}\")
print(f\" 成本:${self.results_a.get('cost', 0):.2f}\")
print(f\" 延迟:{self.results_a.get('latency', 0):.0f}ms\")
print()
print(f\"版本B综合得分:{score_b:.3f}\")
print(f\" 准确率:{self.results_b.get('accuracy', 0):.2%}\")
print(f\" 成本:${self.results_b.get('cost', 0):.2f}\")
print(f\" 延迟:{self.results_b.get('latency', 0):.0f}ms\")
print()
if score_b > score_a:
improvement = (score_b - score_a) / score_a * 100
print(f\"推荐:版本B\")
print(f\"理由:综合得分提升{improvement:.1f}%\")
return "B"
else:
print(f\"推荐:版本A\")
print(f\"理由:版本B未带来显著改进,保持当前版本降低风险\")
return "A"
# 使用决策支持
decision = ABTestDecision(
results_a={
"accuracy": 0.75,
"cost": 10.0,
"latency": 1500
},
results_b={
"accuracy": 0.82,
"cost": 30.0,
"latency": 2000
}
)
recommended = decision.recommend()
---
5 监控告警
5.1 生产监控
01.实时追踪
a.生产追踪配置
a.功能说明
生产环境启用LangSmith追踪,监控真实用户请求。设置环境变量启用追踪,指定生产项目。配置采样率降低追踪开销。确保追踪不影响用户体验。实时追踪提供生产环境的完整可观测性,快速发现和定位问题。
b.代码示例
---
import os
from langsmith import Client
from langchain.chat_models import ChatOpenAI
# 1. 生产环境配置
# 启用追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_PROD_KEY")
# 2. 采样配置
import random
def should_trace() -> bool:
"""采样决策(10%采样率)"""
return random.random() < 0.1
# 条件追踪
if should_trace():
os.environ["LANGCHAIN_TRACING_V2"] = "true"
else:
os.environ["LANGCHAIN_TRACING_V2"] = "false"
# 3. 动态采样率
class AdaptiveSampler:
"""自适应采样器"""
def __init__(self, base_rate: float = 0.1):
self.base_rate = base_rate
self.error_count = 0
self.total_count = 0
def should_trace(self, is_error: bool = False) -> bool:
"""决定是否追踪"""
self.total_count += 1
if is_error:
self.error_count += 1
# 错误率高时提升采样率
error_rate = self.error_count / self.total_count if self.total_count > 0 else 0
if error_rate > 0.1:
# 错误率超过10%,100%追踪
return True
elif error_rate > 0.05:
# 错误率5-10%,50%追踪
return random.random() < 0.5
else:
# 正常情况,基础采样率
return random.random() < self.base_rate
sampler = AdaptiveSampler(base_rate=0.1)
# 4. 请求处理
def handle_request(user_input: str):
"""处理用户请求"""
try:
# 决定是否追踪
should_sample = sampler.should_trace()
if should_sample:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
else:
os.environ["LANGCHAIN_TRACING_V2"] = "false"
# 处理请求
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke(user_input)
return response.content
except Exception as e:
# 发生错误,通知采样器
sampler.should_trace(is_error=True)
raise
# 5. 带元数据的追踪
from langsmith import traceable
from langsmith.run_helpers import get_current_run_tree
@traceable(name="production_request")
def handle_with_metadata(user_id: str, request: str):
"""带元数据的请求处理"""
run = get_current_run_tree()
if run:
# 添加生产元数据
run.extra = {
"environment": "production",
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"request_length": len(request)
}
# 处理请求
result = handle_request(request)
return result
# 6. 关键路径追踪
def should_trace_critical(request_type: str) -> bool:
"""关键路径始终追踪"""
critical_types = ["payment", "order", "security"]
if request_type in critical_types:
return True
else:
return sampler.should_trace()
# 使用
request_type = "payment"
if should_trace_critical(request_type):
os.environ["LANGCHAIN_TRACING_V2"] = "true"
# 7. 信创环境生产追踪
# 达梦数据库存储追踪数据
import dmPython
class DmProductionTracer:
"""达梦数据库生产追踪器"""
def __init__(self, connection_string: str):
self.conn = dmPython.connect(connection_string)
self._init_tables()
def _init_tables(self):
"""初始化表"""
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS production_traces (
trace_id VARCHAR(100) PRIMARY KEY,
user_id VARCHAR(50),
request_data TEXT,
response_data TEXT,
latency_ms INTEGER,
status VARCHAR(20),
error_msg TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def log_trace(self, trace_data: dict):
"""记录追踪数据"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO production_traces
(trace_id, user_id, request_data, response_data,
latency_ms, status, error_msg)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
trace_data["trace_id"],
trace_data.get("user_id", ""),
trace_data.get("request", ""),
trace_data.get("response", ""),
trace_data.get("latency", 0),
trace_data.get("status", "success"),
trace_data.get("error", "")
))
self.conn.commit()
# dm_tracer = DmProductionTracer("dm://SYSDBA:SYSDBA@localhost:5236/PROD")
---
b.实时监控面板
a.功能说明
实时监控面板展示生产系统的运行状态。显示请求量、成功率、错误率、延迟等关键指标。实时刷新数据,快速发现异常。支持时间范围选择和过滤。监控面板是运维团队的重要工具,提供系统健康度的直观视图。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
import time
client = Client()
# 1. 实时监控数据获取
def get_realtime_metrics(project_name: str, minutes: int = 5):
"""获取实时指标"""
start_time = datetime.now() - timedelta(minutes=minutes)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=1000
)
metrics = {
"total_requests": 0,
"success_count": 0,
"error_count": 0,
"latencies": [],
"tokens": 0
}
for run in runs:
metrics["total_requests"] += 1
if run.status == "success":
metrics["success_count"] += 1
elif run.status == "error":
metrics["error_count"] += 1
if run.latency:
metrics["latencies"].append(run.latency)
if run.total_tokens:
metrics["tokens"] += run.total_tokens
# 计算衍生指标
if metrics["total_requests"] > 0:
metrics["success_rate"] = metrics["success_count"] / metrics["total_requests"]
metrics["error_rate"] = metrics["error_count"] / metrics["total_requests"]
else:
metrics["success_rate"] = 0
metrics["error_rate"] = 0
if metrics["latencies"]:
import statistics
metrics["avg_latency"] = statistics.mean(metrics["latencies"])
metrics["p95_latency"] = statistics.quantiles(metrics["latencies"], n=20)[18]
else:
metrics["avg_latency"] = 0
metrics["p95_latency"] = 0
return metrics
# 2. 打印监控面板
def print_monitoring_dashboard(project_name: str):
"""打印监控面板"""
metrics = get_realtime_metrics(project_name, minutes=5)
print("\\n" + "="*60)
print(f"生产监控面板 - {project_name}")
print(f"时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
print()
print(f"📊 请求统计(最近5分钟)")
print(f" 总请求数:{metrics['total_requests']}")
print(f" 成功数:{metrics['success_count']}")
print(f" 错误数:{metrics['error_count']}")
print()
print(f"✅ 成功率:{metrics['success_rate']:.2%}")
print(f"❌ 错误率:{metrics['error_rate']:.2%}")
print()
print(f"⏱️ 延迟")
print(f" 平均:{metrics['avg_latency']:.0f}ms")
print(f" P95:{metrics['p95_latency']:.0f}ms")
print()
print(f"🔢 Token使用:{metrics['tokens']:,}")
print("="*60)
# 3. 持续监控
def continuous_monitoring(project_name: str, interval: int = 10):
"""持续监控(每N秒刷新)"""
try:
while True:
print_monitoring_dashboard(project_name)
time.sleep(interval)
except KeyboardInterrupt:
print("\\n监控已停止")
# 运行持续监控
# continuous_monitoring("production", interval=10)
# 4. Web监控面板(Flask)
from flask import Flask, jsonify, render_template_string
app = Flask(__name__)
@app.route("/api/metrics")
def api_metrics():
"""API:获取监控指标"""
metrics = get_realtime_metrics("production", minutes=5)
return jsonify(metrics)
@app.route("/")
def dashboard():
"""监控面板页面"""
html = """
<!DOCTYPE html>
<html>
<head>
<title>生产监控面板</title>
<style>
body { font-family: Arial; margin: 20px; }
.metric { display: inline-block; margin: 10px;
padding: 20px; border: 1px solid #ddd;
border-radius: 5px; min-width: 200px; }
.metric-value { font-size: 32px; font-weight: bold; }
.metric-label { color: #666; }
</style>
<script>
function updateMetrics() {
fetch('/api/metrics')
.then(r => r.json())
.then(data => {
document.getElementById('total').innerText = data.total_requests;
document.getElementById('success_rate').innerText =
(data.success_rate * 100).toFixed(1) + '%';
document.getElementById('error_rate').innerText =
(data.error_rate * 100).toFixed(1) + '%';
document.getElementById('latency').innerText =
data.avg_latency.toFixed(0) + 'ms';
});
}
setInterval(updateMetrics, 5000);
updateMetrics();
</script>
</head>
<body>
<h1>生产监控面板</h1>
<div class="metric">
<div class="metric-label">总请求数</div>
<div class="metric-value" id="total">-</div>
</div>
<div class="metric">
<div class="metric-label">成功率</div>
<div class="metric-value" id="success_rate">-</div>
</div>
<div class="metric">
<div class="metric-label">错误率</div>
<div class="metric-value" id="error_rate">-</div>
</div>
<div class="metric">
<div class="metric-label">平均延迟</div>
<div class="metric-value" id="latency">-</div>
</div>
</body>
</html>
"""
return render_template_string(html)
# 启动Web监控
# app.run(host='0.0.0.0', port=5000)
---
02.日志收集
a.结构化日志
a.功能说明
收集生产环境的结构化日志,便于查询和分析。记录请求ID、用户ID、时间戳、输入输出、状态、耗时等信息。使用JSON格式存储日志。集成日志系统(如ELK、Loki等)。结构化日志是问题排查和数据分析的基础。
b.代码示例
---
import logging
import json
from datetime import datetime
# 1. 配置结构化日志
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, log_file: str = "production.log"):
self.logger = logging.getLogger("production")
self.logger.setLevel(logging.INFO)
# 文件处理器
handler = logging.FileHandler(log_file, encoding="utf-8")
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_request(
self,
request_id: str,
user_id: str,
input_data: str,
output_data: str,
status: str,
latency_ms: int,
error: str = None
):
"""记录请求日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"request_id": request_id,
"user_id": user_id,
"input": input_data,
"output": output_data,
"status": status,
"latency_ms": latency_ms,
"error": error
}
self.logger.info(json.dumps(log_entry, ensure_ascii=False))
logger = StructuredLogger()
# 2. 使用结构化日志
import uuid
import time
def process_with_logging(user_id: str, user_input: str):
"""带日志的处理"""
request_id = str(uuid.uuid4())
start_time = time.time()
try:
# 处理请求
result = handle_request(user_input)
# 记录成功日志
latency = int((time.time() - start_time) * 1000)
logger.log_request(
request_id=request_id,
user_id=user_id,
input_data=user_input,
output_data=result,
status="success",
latency_ms=latency
)
return result
except Exception as e:
# 记录错误日志
latency = int((time.time() - start_time) * 1000)
logger.log_request(
request_id=request_id,
user_id=user_id,
input_data=user_input,
output_data="",
status="error",
latency_ms=latency,
error=str(e)
)
raise
# 3. 日志查询
def query_logs(
log_file: str = "production.log",
status: str = None,
user_id: str = None,
start_time: datetime = None
):
"""查询日志"""
results = []
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
try:
log_entry = json.loads(line)
# 过滤条件
if status and log_entry.get("status") != status:
continue
if user_id and log_entry.get("user_id") != user_id:
continue
if start_time:
log_time = datetime.fromisoformat(log_entry["timestamp"])
if log_time < start_time:
continue
results.append(log_entry)
except json.JSONDecodeError:
continue
return results
# 查询错误日志
errors = query_logs(status="error")
print(f"错误日志数:{len(errors)}")
# 4. 日志聚合分析
def analyze_logs(log_file: str = "production.log"):
"""分析日志"""
from collections import defaultdict
stats = {
"total": 0,
"success": 0,
"error": 0,
"latencies": [],
"users": set()
}
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
try:
log = json.loads(line)
stats["total"] += 1
if log["status"] == "success":
stats["success"] += 1
else:
stats["error"] += 1
stats["latencies"].append(log["latency_ms"])
stats["users"].add(log["user_id"])
except:
continue
# 输出统计
print(f"\\n日志分析:")
print(f" 总请求:{stats['total']}")
print(f" 成功率:{stats['success']/stats['total']:.2%}")
print(f" 活跃用户:{len(stats['users'])}")
if stats["latencies"]:
import statistics
print(f" 平均延迟:{statistics.mean(stats['latencies']):.0f}ms")
analyze_logs()
---
5.2 性能指标
01.延迟监控
a.延迟分析
a.功能说明
监控系统延迟,确保响应时间满足SLA要求。追踪平均延迟、P50/P95/P99延迟等分位数指标。分析延迟分布,识别异常峰值。对比不同时段、不同类型请求的延迟。延迟监控是用户体验的关键指标,直接影响满意度。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
import statistics
client = Client()
# 1. 获取延迟数据
def get_latency_metrics(project_name: str, hours: int = 1):
\"\"\"获取延迟指标\"\"\"
start_time = datetime.now() - timedelta(hours=hours)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=10000
)
latencies = []
for run in runs:
if run.latency:
latencies.append(run.latency)
if not latencies:
return None
# 计算统计指标
latencies_sorted = sorted(latencies)
return {
"count": len(latencies),
"min": min(latencies),
"max": max(latencies),
"mean": statistics.mean(latencies),
"median": statistics.median(latencies),
"stdev": statistics.stdev(latencies) if len(latencies) > 1 else 0,
"p50": latencies_sorted[len(latencies) // 2],
"p95": latencies_sorted[int(len(latencies) * 0.95)],
"p99": latencies_sorted[int(len(latencies) * 0.99)]
}
metrics = get_latency_metrics("production", hours=1)
print(f\"\\n延迟指标(最近1小时):\")
print(f\" 样本数:{metrics['count']}\")
print(f\" 平均:{metrics['mean']:.0f}ms\")
print(f\" 中位数:{metrics['median']:.0f}ms\")
print(f\" P95:{metrics['p95']:.0f}ms\")
print(f\" P99:{metrics['p99']:.0f}ms\")
print(f\" 最大:{metrics['max']:.0f}ms\")
# 2. 延迟趋势
def latency_trend(project_name: str, days: int = 7):
\"\"\"延迟趋势分析\"\"\"
from collections import defaultdict
start_time = datetime.now() - timedelta(days=days)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=50000
)
# 按日期分组
daily_latencies = defaultdict(list)
for run in runs:
if run.latency:
date = run.start_time.date()
daily_latencies[date].append(run.latency)
# 计算每日指标
print(f\"\\n延迟趋势(最近{days}天):\")
print(f\"{'日期':<12} {'平均':>8} {'P95':>8} {'P99':>8}\")
print(\"-\" * 40)
for date in sorted(daily_latencies.keys()):
latencies = sorted(daily_latencies[date])
avg = statistics.mean(latencies)
p95 = latencies[int(len(latencies) * 0.95)]
p99 = latencies[int(len(latencies) * 0.99)]
print(f\"{date} {avg:>7.0f}ms {p95:>7.0f}ms {p99:>7.0f}ms\")
latency_trend(\"production\", days=7)
# 3. 慢请求分析
def analyze_slow_requests(project_name: str, threshold_ms: int = 3000):
\"\"\"分析慢请求\"\"\"
runs = client.list_runs(
project_name=project_name,
filter=f'gt(latency, {threshold_ms})',
limit=100
)
slow_requests = list(runs)
print(f\"\\n慢请求分析(>{threshold_ms}ms):\")
print(f\" 数量:{len(slow_requests)}\")
if slow_requests:
print(\"\\n示例:\")
for i, run in enumerate(slow_requests[:5]):
print(f\" {i+1}. 延迟:{run.latency}ms\")
print(f\" 输入:{str(run.inputs)[:50]}...\")
print(f\" Token:{run.total_tokens}\")
print()
analyze_slow_requests(\"production\", threshold_ms=3000)
# 4. SLA合规性
def check_sla_compliance(project_name: str, sla_p95_ms: int = 2000):
\"\"\"检查SLA合规性\"\"\"
metrics = get_latency_metrics(project_name, hours=1)
if not metrics:
print(\"无数据\")
return
p95_latency = metrics['p95']
is_compliant = p95_latency <= sla_p95_ms
print(f\"\\nSLA合规性检查:\")
print(f\" SLA要求:P95 <= {sla_p95_ms}ms\")
print(f\" 实际P95:{p95_latency:.0f}ms\")
print(f\" 状态:{'✅ 合规' if is_compliant else '❌ 不合规'}\")
if not is_compliant:
overage = p95_latency - sla_p95_ms
print(f\" 超出:{overage:.0f}ms ({overage/sla_p95_ms:.1%})\")
check_sla_compliance(\"production\", sla_p95_ms=2000)
---
b.吞吐量监控
a.功能说明
监控系统吞吐量,衡量处理能力。统计每秒/每分钟/每小时的请求数(QPS/QPM/QPH)。分析吞吐量变化趋势,识别流量高峰和低谷。评估系统负载,判断是否需要扩容。吞吐量监控是容量规划的依据。
b.代码示例
---
def get_throughput_metrics(project_name: str, minutes: int = 5):
\"\"\"获取吞吐量指标\"\"\"
start_time = datetime.now() - timedelta(minutes=minutes)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=10000
)
total_requests = sum(1 for _ in runs)
# 计算QPS
qps = total_requests / (minutes * 60)
qpm = total_requests / minutes
return {
"total_requests": total_requests,
"time_window_minutes": minutes,
"qps": qps,
"qpm": qpm
}
throughput = get_throughput_metrics(\"production\", minutes=5)
print(f\"\\n吞吐量指标(最近{throughput['time_window_minutes']}分钟):\")
print(f\" 总请求数:{throughput['total_requests']}\")
print(f\" QPS:{throughput['qps']:.2f}\")
print(f\" QPM:{throughput['qpm']:.2f}\")
# 实时吞吐量趋势
def throughput_trend(project_name: str, hours: int = 24):
\"\"\"吞吐量趋势(按小时)\"\"\"
from collections import defaultdict
start_time = datetime.now() - timedelta(hours=hours)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=50000
)
# 按小时分组
hourly_counts = defaultdict(int)
for run in runs:
hour = run.start_time.replace(minute=0, second=0, microsecond=0)
hourly_counts[hour] += 1
print(f\"\\n吞吐量趋势(最近{hours}小时):\")
print(f\"{'时间':<20} {'请求数':>10} {'QPS':>10}\")
print(\"-\" * 45)
for hour in sorted(hourly_counts.keys()):
count = hourly_counts[hour]
qps = count / 3600
print(f\"{hour.strftime('%Y-%m-%d %H:%M')} {count:>10} {qps:>10.2f}\")
throughput_trend(\"production\", hours=24)
---
02.错误监控
a.错误率追踪
a.功能说明
追踪系统错误率,确保稳定性。计算总体错误率和各类型错误占比。监控错误率趋势,及时发现异常波动。分析错误模式,识别共性问题。错误率是系统可靠性的核心指标。
b.代码示例
---
def get_error_metrics(project_name: str, hours: int = 1):
\"\"\"获取错误指标\"\"\"
start_time = datetime.now() - timedelta(hours=hours)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=10000
)
total = 0
errors = 0
error_types = {}
for run in runs:
total += 1
if run.status == \"error\":
errors += 1
# 分类错误
error_msg = run.error or \"Unknown\"
error_type = classify_error(error_msg)
error_types[error_type] = error_types.get(error_type, 0) + 1
error_rate = errors / total if total > 0 else 0
return {
\"total\": total,
\"errors\": errors,
\"error_rate\": error_rate,
\"error_types\": error_types
}
def classify_error(error_msg: str) -> str:
\"\"\"分类错误\"\"\"
error_lower = error_msg.lower()
if \"timeout\" in error_lower:
return \"超时\"
elif \"rate limit\" in error_lower:
return \"限流\"
elif \"connection\" in error_lower:
return \"连接错误\"
elif \"validation\" in error_lower:
return \"验证错误\"
else:
return \"其他\"
error_metrics = get_error_metrics(\"production\", hours=1)
print(f\"\\n错误指标(最近1小时):\")
print(f\" 总请求:{error_metrics['total']}\")
print(f\" 错误数:{error_metrics['errors']}\")
print(f\" 错误率:{error_metrics['error_rate']:.2%}\")
print(f\"\\n错误类型分布:\")
for err_type, count in sorted(error_metrics['error_types'].items(),
key=lambda x: x[1], reverse=True):
percentage = count / error_metrics['errors'] * 100
print(f\" {err_type}: {count} ({percentage:.1f}%)\")
# 错误率告警
def check_error_rate_alert(project_name: str, threshold: float = 0.05):
\"\"\"检查错误率告警\"\"\"
metrics = get_error_metrics(project_name, hours=1)
if metrics['error_rate'] > threshold:
print(f\"\\n🚨 错误率告警!\")
print(f\" 当前错误率:{metrics['error_rate']:.2%}\")
print(f\" 阈值:{threshold:.2%}\")
print(f\" 超出:{(metrics['error_rate'] - threshold):.2%}\")
# 返回告警信息
return {
\"alert\": True,
\"error_rate\": metrics['error_rate'],
\"threshold\": threshold
}
else:
print(f\"\\n✅ 错误率正常({metrics['error_rate']:.2%})\")
return {\"alert\": False}
check_error_rate_alert(\"production\", threshold=0.05)
---
5.3 异常告警
01.告警规则
a.阈值告警
a.功能说明
设置指标阈值触发告警,及时发现异常。配置错误率、延迟、吞吐量等指标的告警阈值。支持多级告警(警告、严重、紧急)。避免告警疲劳,合理设置阈值和频率。阈值告警是主动监控的关键机制。
b.代码示例
---
from datetime import datetime, timedelta
from langsmith import Client
client = Client()
# 1. 告警规则配置
class AlertRule:
\"\"\"告警规则\"\"\"
def __init__(
self,
name: str,
metric: str,
threshold: float,
comparison: str = \"gt\", # gt, lt, eq
severity: str = \"warning\" # warning, critical, emergency
):
self.name = name
self.metric = metric
self.threshold = threshold
self.comparison = comparison
self.severity = severity
def check(self, value: float) -> bool:
\"\"\"检查是否触发\"\"\"
if self.comparison == \"gt\":
return value > self.threshold
elif self.comparison == \"lt\":
return value < self.threshold
elif self.comparison == \"eq\":
return value == self.threshold
return False
# 2. 定义告警规则
alert_rules = [
AlertRule(
name=\"高错误率\",
metric=\"error_rate\",
threshold=0.05,
comparison=\"gt\",
severity=\"critical\"
),
AlertRule(
name=\"高延迟\",
metric=\"p95_latency\",
threshold=3000,
comparison=\"gt\",
severity=\"warning\"
),
AlertRule(
name=\"低吞吐量\",
metric=\"qps\",
threshold=1.0,
comparison=\"lt\",
severity=\"warning\"
)
]
# 3. 检查告警
def check_alerts(project_name: str, rules: list):
\"\"\"检查所有告警规则\"\"\"
# 获取指标
metrics = get_realtime_metrics(project_name, minutes=5)
triggered_alerts = []
for rule in rules:
metric_value = metrics.get(rule.metric)
if metric_value is not None and rule.check(metric_value):
triggered_alerts.append({
\"rule\": rule.name,
\"metric\": rule.metric,
\"value\": metric_value,
\"threshold\": rule.threshold,
\"severity\": rule.severity,
\"timestamp\": datetime.now()
})
return triggered_alerts
# 执行检查
alerts = check_alerts(\"production\", alert_rules)
if alerts:
print(f\"\\n🚨 触发{len(alerts)}个告警:\")
for alert in alerts:
print(f\" [{alert['severity'].upper()}] {alert['rule']}\")
print(f\" 指标:{alert['metric']}\")
print(f\" 当前值:{alert['value']}\")
print(f\" 阈值:{alert['threshold']}\")
print()
else:
print(\"\\n✅ 无告警\")
# 4. 告警频率控制
class AlertThrottler:
\"\"\"告警限流器\"\"\"
def __init__(self, cooldown_minutes: int = 15):
self.cooldown_minutes = cooldown_minutes
self.last_alert_time = {}
def should_alert(self, alert_name: str) -> bool:
\"\"\"是否应该发送告警\"\"\"
now = datetime.now()
last_time = self.last_alert_time.get(alert_name)
if last_time is None:
# 首次告警
self.last_alert_time[alert_name] = now
return True
elapsed = (now - last_time).total_seconds() / 60
if elapsed >= self.cooldown_minutes:
# 冷却期已过
self.last_alert_time[alert_name] = now
return True
else:
# 还在冷却期
return False
throttler = AlertThrottler(cooldown_minutes=15)
# 使用限流器
for alert in alerts:
if throttler.should_alert(alert['rule']):
print(f\"发送告警:{alert['rule']}\")
# send_alert(alert)
else:
print(f\"跳过告警(冷却中):{alert['rule']}\")
# 5. 告警聚合
def aggregate_alerts(alerts: list) -> dict:
\"\"\"聚合告警\"\"\"
from collections import defaultdict
by_severity = defaultdict(list)
for alert in alerts:
by_severity[alert['severity']].append(alert)
return dict(by_severity)
aggregated = aggregate_alerts(alerts)
print(\"\\n告警聚合:\")
for severity, alert_list in aggregated.items():
print(f\" {severity.upper()}: {len(alert_list)}个\")
---
b.异常检测
a.功能说明
基于历史数据自动检测异常,无需手动设置阈值。使用统计方法(如3-sigma规则)识别异常值。检测趋势异常、突变、周期性异常等。适用于动态变化的指标。异常检测提供智能告警,减少配置维护。
b.代码示例
---
import statistics
from datetime import datetime, timedelta
def detect_anomalies(project_name: str, metric: str = \"latency\"):
\"\"\"异常检测\"\"\"
# 获取历史数据(最近7天)
start_time = datetime.now() - timedelta(days=7)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=50000
)
# 提取指标值
values = []
for run in runs:
if metric == \"latency\" and run.latency:
values.append(run.latency)
elif metric == \"tokens\" and run.total_tokens:
values.append(run.total_tokens)
if len(values) < 30:
print(\"数据不足,无法检测异常\")
return []
# 计算统计特征
mean = statistics.mean(values)
stdev = statistics.stdev(values)
# 3-sigma规则
lower_bound = mean - 3 * stdev
upper_bound = mean + 3 * stdev
# 检测最近1小时的数据
recent_start = datetime.now() - timedelta(hours=1)
recent_runs = client.list_runs(
project_name=project_name,
start_time=recent_start,
limit=1000
)
anomalies = []
for run in recent_runs:
value = None
if metric == \"latency\":
value = run.latency
elif metric == \"tokens\":
value = run.total_tokens
if value and (value < lower_bound or value > upper_bound):
anomalies.append({
\"run_id\": str(run.id),
\"value\": value,
\"mean\": mean,
\"stdev\": stdev,
\"z_score\": (value - mean) / stdev if stdev > 0 else 0
})
return anomalies
anomalies = detect_anomalies(\"production\", metric=\"latency\")
if anomalies:
print(f\"\\n检测到{len(anomalies)}个异常:\")
for anom in anomalies[:5]:
print(f\" 运行:{anom['run_id']}\")
print(f\" 值:{anom['value']:.0f} (平均:{anom['mean']:.0f}, Z分数:{anom['z_score']:.2f})\")
print()
else:
print(\"\\n无异常检测到\")
---
02.通知渠道
a.Slack集成
a.功能说明
通过Slack发送告警通知,快速响应问题。配置Slack Webhook实现消息推送。支持富文本格式、按钮、表情等。可以@相关人员。Slack集成是团队协作的常用方式。
b.代码示例
---
import requests
import json
def send_slack_alert(webhook_url: str, alert: dict):
\"\"\"发送Slack告警\"\"\"
# 构造消息
severity_emoji = {
\"warning\": \"⚠️\",
\"critical\": \"🔴\",
\"emergency\": \"🚨\"
}
emoji = severity_emoji.get(alert['severity'], \"ℹ️\")
message = {
\"text\": f\"{emoji} 生产告警\",
\"blocks\": [
{
\"type\": \"header\",
\"text\": {
\"type\": \"plain_text\",
\"text\": f\"{emoji} {alert['rule']}\"
}
},
{
\"type\": \"section\",
\"fields\": [
{
\"type\": \"mrkdwn\",
\"text\": f\"*级别:*\\n{alert['severity'].upper()}\"
},
{
\"type\": \"mrkdwn\",
\"text\": f\"*时间:*\\n{alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}\"
},
{
\"type\": \"mrkdwn\",
\"text\": f\"*指标:*\\n{alert['metric']}\"
},
{
\"type\": \"mrkdwn\",
\"text\": f\"*当前值:*\\n{alert['value']}\"
}
]
},
{
\"type\": \"context\",
\"elements\": [
{
\"type\": \"mrkdwn\",
\"text\": f\"阈值:{alert['threshold']}\"
}
]
}
]
}
# 发送
response = requests.post(
webhook_url,
data=json.dumps(message),
headers={\"Content-Type\": \"application/json\"}
)
if response.status_code == 200:
print(f\"✓ Slack告警已发送:{alert['rule']}\")
else:
print(f\"✗ Slack发送失败:{response.status_code}\")
# 使用
# SLACK_WEBHOOK = \"https://hooks.slack.com/services/YOUR/WEBHOOK/URL\"
# for alert in alerts:
# send_slack_alert(SLACK_WEBHOOK, alert)
# 邮件告警
def send_email_alert(smtp_config: dict, alert: dict):
\"\"\"发送邮件告警\"\"\"
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart()
msg['From'] = smtp_config['from']
msg['To'] = smtp_config['to']
msg['Subject'] = f\"[{alert['severity'].upper()}] {alert['rule']}\"
body = f\"\"\"
生产告警
告警规则:{alert['rule']}
严重级别:{alert['severity'].upper()}
触发时间:{alert['timestamp']}
指标:{alert['metric']}
当前值:{alert['value']}
阈值:{alert['threshold']}
请及时处理。
\"\"\"
msg.attach(MIMEText(body, 'plain'))
# 发送
with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
server.starttls()
server.login(smtp_config['user'], smtp_config['password'])
server.send_message(msg)
print(f\"✓ 邮件告警已发送:{alert['rule']}\")
# 使用
# smtp_config = {
# \"host\": \"smtp.gmail.com\",
# \"port\": 587,
# \"user\": \"[email protected]\",
# \"password\": \"your_password\",
# \"from\": \"[email protected]\",
# \"to\": \"[email protected]\"
# }
# send_email_alert(smtp_config, alert)
---
5.4 成本分析
01.Token消耗
a.成本统计
a.功能说明
统计Token消耗,计算LLM API使用成本。追踪每天/每周/每月的Token使用量。分不同模型计算成本(GPT-4成本高于GPT-3.5)。分析成本趋势,预测未来支出。成本统计是预算管理的基础。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
client = Client()
# 1. 计算成本
def calculate_cost(project_name: str, days: int = 1):
\"\"\"计算成本\"\"\"
start_time = datetime.now() - timedelta(days=days)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=50000
)
# 模型价格(示例,单位:$/1K tokens)
model_prices = {
\"gpt-4\": {\"input\": 0.03, \"output\": 0.06},
\"gpt-3.5-turbo\": {\"input\": 0.0015, \"output\": 0.002},
\"claude-2\": {\"input\": 0.008, \"output\": 0.024}
}
total_cost = 0.0
model_costs = {}
total_tokens = 0
for run in runs:
if not run.total_tokens:
continue
# 识别模型
model = run.extra.get(\"model\", \"gpt-3.5-turbo\")
if model not in model_prices:
model = \"gpt-3.5-turbo\" # 默认
# 计算成本
input_cost = (run.prompt_tokens or 0) / 1000 * model_prices[model][\"input\"]
output_cost = (run.completion_tokens or 0) / 1000 * model_prices[model][\"output\"]
run_cost = input_cost + output_cost
total_cost += run_cost
total_tokens += run.total_tokens
# 按模型统计
if model not in model_costs:
model_costs[model] = {\"cost\": 0, \"tokens\": 0, \"requests\": 0}
model_costs[model][\"cost\"] += run_cost
model_costs[model][\"tokens\"] += run.total_tokens
model_costs[model][\"requests\"] += 1
return {
\"total_cost\": total_cost,
\"total_tokens\": total_tokens,
\"model_costs\": model_costs,
\"days\": days
}
cost_data = calculate_cost(\"production\", days=1)
print(f\"\\n成本统计(最近{cost_data['days']}天):\")
print(f\" 总成本:${cost_data['total_cost']:.2f}\")
print(f\" 总Token:{cost_data['total_tokens']:,}\")
print(f\"\\n按模型分解:\")
for model, stats in cost_data['model_costs'].items():
print(f\" {model}:\")
print(f\" 成本:${stats['cost']:.2f} ({stats['cost']/cost_data['total_cost']*100:.1f}%)\")
print(f\" Token:{stats['tokens']:,}\")
print(f\" 请求数:{stats['requests']}\")
print()
# 2. 成本趋势
def cost_trend(project_name: str, days: int = 30):
\"\"\"成本趋势分析\"\"\"
from collections import defaultdict
start_time = datetime.now() - timedelta(days=days)
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
limit=100000
)
daily_costs = defaultdict(float)
daily_tokens = defaultdict(int)
model_prices = {
\"gpt-4\": {\"input\": 0.03, \"output\": 0.06},
\"gpt-3.5-turbo\": {\"input\": 0.0015, \"output\": 0.002}
}
for run in runs:
if not run.total_tokens:
continue
date = run.start_time.date()
model = run.extra.get(\"model\", \"gpt-3.5-turbo\")
if model not in model_prices:
model = \"gpt-3.5-turbo\"
input_cost = (run.prompt_tokens or 0) / 1000 * model_prices[model][\"input\"]
output_cost = (run.completion_tokens or 0) / 1000 * model_prices[model][\"output\"]
daily_costs[date] += input_cost + output_cost
daily_tokens[date] += run.total_tokens
print(f\"\\n成本趋势(最近{days}天):\")
print(f\"{'日期':<12} {'成本':>10} {'Token':>12}\")
print(\"-\" * 38)
for date in sorted(daily_costs.keys()):
print(f\"{date} ${daily_costs[date]:>9.2f} {daily_tokens[date]:>11,}\")
cost_trend(\"production\", days=7)
# 3. 成本预测
def predict_monthly_cost(project_name: str):
\"\"\"预测月度成本\"\"\"
# 基于最近7天预测
recent_cost = calculate_cost(project_name, days=7)
daily_avg_cost = recent_cost['total_cost'] / 7
monthly_projection = daily_avg_cost * 30
print(f\"\\n月度成本预测:\")
print(f\" 最近7天日均:${daily_avg_cost:.2f}\")
print(f\" 预测月度成本:${monthly_projection:.2f}\")
return monthly_projection
predicted = predict_monthly_cost(\"production\")
# 4. 成本告警
def check_cost_alert(project_name: str, budget: float):
\"\"\"检查成本告警\"\"\"
current_cost = calculate_cost(project_name, days=1)['total_cost']
daily_budget = budget / 30
if current_cost > daily_budget:
print(f\"\\n🚨 成本超预算!\")
print(f\" 当前成本:${current_cost:.2f}\")
print(f\" 每日预算:${daily_budget:.2f}\")
print(f\" 超出:${current_cost - daily_budget:.2f}\")
return True
else:
print(f\"\\n✅ 成本正常(${current_cost:.2f} / ${daily_budget:.2f})\")
return False
check_cost_alert(\"production\", budget=1000.0)
---
b.成本优化
a.功能说明
分析成本构成,识别优化机会。对比不同模型的成本效益。优化Prompt减少Token使用。缓存频繁请求降低API调用。合理选择模型(简单任务用小模型)。成本优化在保证质量前提下降低支出。
b.代码示例
---
# 1. 模型成本对比
def compare_model_costs(project_a: str, project_b: str):
\"\"\"对比不同模型成本\"\"\"
cost_a = calculate_cost(project_a, days=7)
cost_b = calculate_cost(project_b, days=7)
print(f\"\\n模型成本对比(7天):\")
print(f\" 项目A:${cost_a['total_cost']:.2f}\")
print(f\" 项目B:${cost_b['total_cost']:.2f}\")
print(f\" 差异:${abs(cost_a['total_cost'] - cost_b['total_cost']):.2f}\")
if cost_b['total_cost'] < cost_a['total_cost']:
savings = cost_a['total_cost'] - cost_b['total_cost']
savings_pct = savings / cost_a['total_cost'] * 100
print(f\" 节省:${savings:.2f} ({savings_pct:.1f}%)\")
# compare_model_costs(\"prod_gpt4\", \"prod_gpt35\")
# 2. Token使用优化
def analyze_token_usage(project_name: str):
\"\"\"分析Token使用\"\"\"
runs = client.list_runs(
project_name=project_name,
limit=1000
)
high_token_runs = []
for run in runs:
if run.total_tokens and run.total_tokens > 2000:
high_token_runs.append({
\"run_id\": str(run.id),
\"tokens\": run.total_tokens,
\"input_length\": len(str(run.inputs)),
\"output_length\": len(str(run.outputs))
})
if high_token_runs:
print(f\"\\n高Token使用分析(>{2000}):\")
print(f\" 数量:{len(high_token_runs)}\")
# 排序
high_token_runs.sort(key=lambda x: x['tokens'], reverse=True)
print(\"\\n前5个:\")
for i, run in enumerate(high_token_runs[:5]):
print(f\" {i+1}. Token:{run['tokens']}, 输入长度:{run['input_length']}\")
analyze_token_usage(\"production\")
# 3. 缓存策略
class ResponseCache:
\"\"\"响应缓存\"\"\"
def __init__(self):
self.cache = {}
self.hits = 0
self.misses = 0
def get_cache_key(self, inputs: dict) -> str:
\"\"\"生成缓存键\"\"\"
import hashlib
import json
key_str = json.dumps(inputs, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, inputs: dict):
\"\"\"获取缓存\"\"\"
key = self.get_cache_key(inputs)
if key in self.cache:
self.hits += 1
return self.cache[key]
else:
self.misses += 1
return None
def set(self, inputs: dict, output):
\"\"\"设置缓存\"\"\"
key = self.get_cache_key(inputs)
self.cache[key] = output
def stats(self):
\"\"\"缓存统计\"\"\"
total = self.hits + self.misses
hit_rate = self.hits / total if total > 0 else 0
return {
\"hits\": self.hits,
\"misses\": self.misses,
\"hit_rate\": hit_rate
}
cache = ResponseCache()
# 使用缓存
def cached_query(inputs: dict):
\"\"\"带缓存的查询\"\"\"
# 检查缓存
cached = cache.get(inputs)
if cached:
return cached
# 缓存未命中,调用API
result = qa_app(inputs)
# 存入缓存
cache.set(inputs, result)
return result
# 缓存统计
stats = cache.stats()
print(f\"\\n缓存统计:\")
print(f\" 命中:{stats['hits']}\")
print(f\" 未命中:{stats['misses']}\")
print(f\" 命中率:{stats['hit_rate']:.2%}\")
# 4. 成本优化建议
def cost_optimization_suggestions(project_name: str):
\"\"\"生成成本优化建议\"\"\"
cost_data = calculate_cost(project_name, days=7)
print(\"\\n💡 成本优化建议:\")
# 检查模型使用
for model, stats in cost_data['model_costs'].items():
if model == \"gpt-4\":
cost_pct = stats['cost'] / cost_data['total_cost'] * 100
if cost_pct > 70:
print(f\" 1. GPT-4占比{cost_pct:.1f}%,考虑将简单任务迁移到GPT-3.5\")
# 估算节省
potential_savings = stats['cost'] * 0.3 * (1 - 0.05)
print(f\" 预计可节省:${potential_savings:.2f}/周\")
# 检查缓存
cache_stats = cache.stats()
if cache_stats['hit_rate'] < 0.2:
print(f\" 2. 缓存命中率仅{cache_stats['hit_rate']:.1%},优化缓存策略\")
# 检查Token使用
avg_tokens = cost_data['total_tokens'] / sum(
s['requests'] for s in cost_data['model_costs'].values()
)
if avg_tokens > 1000:
print(f\" 3. 平均Token使用{avg_tokens:.0f},优化Prompt减少Token\")
print()
cost_optimization_suggestions(\"production\")
---
6 协作功能
6.1 团队管理
01.用户管理
a.成员邀请
a.功能说明
通过LangSmith Web界面邀请团队成员,协同工作。设置不同角色和权限(管理员、开发者、查看者)。成员可以访问共享的项目、数据集、Prompt等资源。支持团队级别的数据隔离和访问控制。团队管理是多人协作的基础,确保资源共享和安全。
b.代码示例
---
# 团队管理主要通过Web界面完成
# 访问 https://smith.langchain.com
# 1. 邀请成员步骤
# - 点击右上角组织设置
# - 选择"团队成员"
# - 点击"邀请成员"
# - 输入邮箱地址
# - 选择角色:
# - 管理员(Admin):完全访问权限
# - 开发者(Developer):可读写项目和数据
# - 查看者(Viewer):只读权限
# - 发送邀请
# 2. 角色权限
# 管理员:
# - 管理团队成员
# - 创建/删除项目
# - 管理数据集
# - 配置组织设置
# - 查看所有追踪数据
# 开发者:
# - 创建项目
# - 上传数据集
# - 运行评估
# - 查看追踪数据
# - 添加反馈
# 查看者:
# - 查看项目列表
# - 查看追踪数据
# - 查看评估结果
# - 不能修改数据
# 3. 通过API查询团队信息
from langsmith import Client
client = Client()
# 获取当前用户信息
# user_info = client.get_current_user()
# print(f\"当前用户:{user_info['email']}\")
# print(f\"角色:{user_info['role']}\")
# 4. 项目访问控制
# 创建项目时可以设置可见性
import os
os.environ[\"LANGCHAIN_PROJECT\"] = \"team_project\"
# 团队成员都可以访问这个项目的追踪数据
# 5. 数据集共享
# 创建的数据集默认对团队可见
dataset = client.create_dataset(
dataset_name=\"team_shared_dataset\",
description=\"团队共享测试数据集\"
)
# 团队成员可以:
# - 查看数据集
# - 添加样本
# - 使用数据集进行评估
# 6. 审计日志
# 管理员可以查看团队活动日志
# - 成员加入/离开
# - 项目创建/删除
# - 数据集修改
# - 权限变更
---
b.权限控制
a.功能说明
细粒度的权限控制确保数据安全和合规。按项目分配权限,不同项目可以有不同成员。敏感项目限制访问范围。审计操作日志,追踪谁做了什么。权限控制在保证协作的同时保护敏感数据。
b.代码示例
---
# 1. 项目级权限
# 在Web界面配置:
# - 进入项目设置
# - 选择"访问控制"
# - 添加/移除成员
# - 设置成员权限
# 2. 数据集权限
# 数据集可以设置为:
# - 公开:所有团队成员可见
# - 私有:只有创建者和管理员可见
# - 共享:指定成员可见
# 3. API Key权限
# 不同API Key可以有不同权限
# - 生产Key:只读权限
# - 开发Key:读写权限
# - 管理Key:完全权限
# 设置环境变量使用不同Key
import os
# 生产环境:只读Key
if os.getenv(\"ENV\") == \"production\":
os.environ[\"LANGCHAIN_API_KEY\"] = os.getenv(\"LANGSMITH_READONLY_KEY\")
else:
# 开发环境:读写Key
os.environ[\"LANGCHAIN_API_KEY\"] = os.getenv(\"LANGSMITH_DEV_KEY\")
# 4. IP白名单
# 在组织设置中配置IP白名单
# 只允许特定IP访问
# 5. 数据导出限制
# 控制谁可以导出数据
# - 管理员:可以导出所有数据
# - 开发者:可以导出自己的项目数据
# - 查看者:不能导出
# 6. 合规性
# - SOC 2认证
# - GDPR合规
# - 数据加密(传输和存储)
# - 定期安全审计
---
02.协作工作流
a.代码审查
a.功能说明
团队协作开发LLM应用,需要代码审查流程。共享追踪链接,团队成员查看执行详情。在追踪上添加评论,讨论问题和改进。标记问题运行,跟踪修复进度。代码审查提升代码质量,促进知识分享。
b.代码示例
---
from langsmith import Client
client = Client()
# 1. 共享追踪链接
# 获取最近的运行
runs = list(client.list_runs(
project_name=\"my_project\",
limit=1
))
if runs:
run = runs[0]
run_id = str(run.id)
# 生成分享链接
share_url = f\"https://smith.langchain.com/o/{run_id}\"
print(f\"\\n分享链接:{share_url}\")
print(\"团队成员可以查看完整执行详情\")
# 2. 添加反馈/评论
# 在特定运行上添加反馈
def add_review_feedback(run_id: str, reviewer: str, comment: str, score: float):
\"\"\"添加审查反馈\"\"\"
client.create_feedback(
run_id=run_id,
key=\"code_review\",
score=score,
comment=f\"[{reviewer}] {comment}\"
)
print(f\"✓ 反馈已添加\")
# 使用
# add_review_feedback(
# run_id=\"xxx\",
# reviewer=\"张三\",
# comment=\"提示词需要优化,输出不够简洁\",
# score=0.6
# )
# 3. 标记问题运行
def flag_problematic_run(run_id: str, issue: str):
\"\"\"标记问题运行\"\"\"
client.create_feedback(
run_id=run_id,
key=\"issue\",
score=0.0,
comment=f\"问题:{issue}\"
)
# 4. 查询待审查的运行
# 查询最近的运行,按需审查
recent_runs = client.list_runs(
project_name=\"my_project\",
limit=20
)
print(\"\\n待审查运行:\")
for i, run in enumerate(recent_runs):
print(f\"{i+1}. ID: {run.id}\")
print(f\" 状态:{run.status}\")
print(f\" 时间:{run.start_time}\")
# 检查是否已审查
if run.feedback_stats and \"code_review\" in run.feedback_stats:
print(f\" ✓ 已审查\")
else:
print(f\" ⏳ 待审查\")
print()
# 5. 审查工作流
class ReviewWorkflow:
\"\"\"审查工作流\"\"\"
def __init__(self, client):
self.client = client
def submit_for_review(self, run_id: str, assignee: str):
\"\"\"提交审查\"\"\"
self.client.create_feedback(
run_id=run_id,
key=\"review_status\",
score=0.0,
comment=f\"待审查,分配给:{assignee}\"
)
def approve(self, run_id: str, reviewer: str):
\"\"\"批准\"\"\"
self.client.create_feedback(
run_id=run_id,
key=\"review_status\",
score=1.0,
comment=f\"已批准 by {reviewer}\"
)
def request_changes(self, run_id: str, reviewer: str, changes: str):
\"\"\"请求修改\"\"\"
self.client.create_feedback(
run_id=run_id,
key=\"review_status\",
score=0.5,
comment=f\"需要修改 by {reviewer}: {changes}\"
)
workflow = ReviewWorkflow(client)
# 使用工作流
# workflow.submit_for_review(run_id, assignee=\"李四\")
# workflow.request_changes(run_id, \"李四\", \"提示词需要更具体\")
# workflow.approve(run_id, \"李四\")
---
b.知识共享
a.功能说明
团队共享成功的Prompt、评估方法、最佳实践。使用Prompt Hub管理Prompt模板库。共享数据集和评估器配置。文档化经验教训和调试技巧。知识共享避免重复工作,加速新成员上手,提升团队整体能力。
b.代码示例
---
# 1. Prompt Hub使用(见6.2节详细内容)
# 团队共享Prompt模板
# 2. 共享评估器
# 创建可复用的评估器库
# evaluators.py
\"\"\"
团队共享评估器库
\"\"\"
def accuracy_evaluator(run, example):
\"\"\"准确性评估器\"\"\"
prediction = run.outputs.get(\"answer\", \"\").strip()
reference = example.outputs.get(\"answer\", \"\").strip()
return {
\"key\": \"accuracy\",
\"score\": 1.0 if prediction == reference else 0.0
}
def length_evaluator(run, example):
\"\"\"长度评估器\"\"\"
answer = run.outputs.get(\"answer\", \"\")
length = len(answer)
if 20 <= length <= 200:
score = 1.0
elif length < 20:
score = length / 20
else:
score = max(0, 1 - (length - 200) / 200)
return {
\"key\": \"length\",
\"score\": score
}
# 团队成员可以导入使用
# from evaluators import accuracy_evaluator, length_evaluator
# 3. 共享数据集模板
# datasets.py
def create_qa_dataset(client, name: str, examples: list):
\"\"\"创建标准QA数据集\"\"\"
dataset = client.create_dataset(
dataset_name=name,
description=\"标准问答数据集\"
)
for ex in examples:
client.create_example(
dataset_id=dataset.id,
inputs={\"question\": ex[\"q\"]},
outputs={\"answer\": ex[\"a\"]},
metadata=ex.get(\"meta\", {})
)
return dataset
# 4. 最佳实践文档
# 在团队Wiki或Notion记录:
# - 常见问题解决方案
# - Prompt工程技巧
# - 评估标准和流程
# - 性能优化经验
# 5. 团队培训
# 定期分享会,展示:
# - 优秀的实现案例
# - 调试技巧
# - 新功能使用
# - 性能优化方法
# 6. 内部工具库
# 构建团队共享的工具函数
# utils.py
import json
def export_run_details(client, run_id: str, output_file: str):
\"\"\"导出运行详情\"\"\"
run = client.read_run(run_id)
details = {
\"id\": str(run.id),
\"inputs\": run.inputs,
\"outputs\": run.outputs,
\"latency\": run.latency,
\"tokens\": run.total_tokens,
\"feedback\": run.feedback_stats
}
with open(output_file, \"w\") as f:
json.dump(details, f, indent=2, ensure_ascii=False)
def batch_evaluate(client, app, dataset_name: str, evaluators: list):
\"\"\"批量评估标准流程\"\"\"
from langsmith.evaluation import evaluate
results = evaluate(
app,
data=dataset_name,
evaluators=evaluators,
experiment_prefix=f\"{dataset_name}_eval\"
)
return results
# 团队成员可以复用这些工具
---
6.2 Prompt Hub
01.Prompt管理
a.创建Prompt
a.功能说明
Prompt Hub是LangSmith的Prompt模板管理中心,团队共享和版本化Prompt。通过Web界面或API创建Prompt模板。支持变量占位符,实现参数化。添加描述和标签方便检索。Prompt Hub集中管理提示词,避免散落在代码各处,便于维护和复用。
b.代码示例
---
from langsmith import Client
from langchain.prompts import ChatPromptTemplate
client = Client()
# 1. 在Web界面创建Prompt
# 访问 https://smith.langchain.com/prompts
# 点击"创建Prompt"
# 输入Prompt内容,使用{variable}作为占位符
# 添加描述和标签
# 保存
# 2. 通过API推送Prompt
# 创建Prompt模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的{role},擅长{skill}"),
("human", "{question}")
])
# 推送到Hub
client.push_prompt(
\"customer_service_prompt\",
object=prompt,
tags=[\"customer_service\", \"v1.0\"]
)
print(\"✓ Prompt已推送到Hub\")
# 3. 从Hub拉取Prompt
# 获取最新版本
prompt = client.pull_prompt(\"customer_service_prompt\")
# 使用Prompt
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model=\"gpt-4\")
chain = prompt | llm
result = chain.invoke({
\"role\": \"客服代表\",
\"skill\": \"解决用户问题\",
\"question\": \"如何退货?\"
})
print(f\"结果:{result.content}\")
# 4. 获取特定版本
# 拉取指定版本的Prompt
prompt_v1 = client.pull_prompt(\"customer_service_prompt\", version=\"v1.0\")
prompt_v2 = client.pull_prompt(\"customer_service_prompt\", version=\"v2.0\")
# 5. 列出所有Prompt
# prompts = client.list_prompts()
# for p in prompts:
# print(f\"{p.name}: {p.description}\")
# 6. 更新Prompt
# 修改Prompt后重新推送
updated_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好的{role},专业{skill}"),
("human", "用户问题:{question}"),
("human", "请用简洁的语言回答\")
])
client.push_prompt(
\"customer_service_prompt\",
object=updated_prompt,
tags=[\"customer_service\", \"v2.0\", \"improved\"]
)
print(\"✓ Prompt已更新\")
---
b.Prompt版本
a.功能说明
Prompt Hub支持版本管理,追踪Prompt演化历史。每次推送创建新版本,旧版本保留。可以回退到任意历史版本。对比不同版本的效果。版本管理支持安全的Prompt迭代和实验。
b.代码示例
---
# 1. 版本命名
# 推送时指定版本标签
client.push_prompt(
\"qa_prompt\",
object=prompt,
tags=[\"v1.0\", \"baseline\"]
)
# 后续版本
client.push_prompt(
\"qa_prompt\",
object=improved_prompt,
tags=[\"v1.1\", \"improved\"]
)
# 2. 查看版本历史
# 在Web界面可以看到所有版本
# 包括:版本号、创建时间、创建者、标签
# 3. 版本对比
# 拉取两个版本
v1 = client.pull_prompt(\"qa_prompt\", version=\"v1.0\")
v2 = client.pull_prompt(\"qa_prompt\", version=\"v1.1\")
# 使用相同输入测试
test_input = {
\"question\": \"什么是机器学习?\"
}
llm = ChatOpenAI(model=\"gpt-4\")
result_v1 = (v1 | llm).invoke(test_input)
result_v2 = (v2 | llm).invoke(test_input)
print(\"V1输出:\", result_v1.content)
print(\"V2输出:\", result_v2.content)
# 4. A/B测试不同版本
from langsmith.evaluation import evaluate
def app_v1(inputs):
prompt = client.pull_prompt(\"qa_prompt\", version=\"v1.0\")
chain = prompt | llm
result = chain.invoke(inputs)
return {\"answer\": result.content}
def app_v2(inputs):
prompt = client.pull_prompt(\"qa_prompt\", version=\"v1.1\")
chain = prompt | llm
result = chain.invoke(inputs)
return {\"answer\": result.content}
# 评估两个版本
results_v1 = evaluate(app_v1, data=\"qa_dataset\",
experiment_prefix=\"prompt_v1\")
results_v2 = evaluate(app_v2, data=\"qa_dataset\",
experiment_prefix=\"prompt_v2\")
print(f\"V1准确率:{results_v1['accuracy']:.2%}\")
print(f\"V2准确率:{results_v2['accuracy']:.2%}\")
# 5. 回退版本
# 如果新版本效果不好,可以回退
# 继续使用旧版本
stable_prompt = client.pull_prompt(\"qa_prompt\", version=\"v1.0\")
# 在生产环境使用稳定版本
production_chain = stable_prompt | llm
---
02.Prompt优化
a.迭代优化
a.功能说明
基于评估结果和用户反馈持续优化Prompt。分析失败案例,识别Prompt不足。实验不同表述、结构、示例。使用A/B测试验证改进效果。优化是渐进过程,需要持续迭代。
b.代码示例
---
# 1. 分析失败案例
def analyze_failed_cases(project_name: str):
\"\"\"分析失败案例\"\"\"
# 获取低分运行
runs = client.list_runs(
project_name=project_name,
limit=100
)
failed_cases = []
for run in runs:
if run.feedback_stats:
accuracy = run.feedback_stats.get(\"accuracy\", 1.0)
if accuracy < 0.5:
failed_cases.append({
\"input\": run.inputs,
\"output\": run.outputs,
\"expected\": \"...\" # 从数据集获取
})
print(f\"\\n失败案例分析({len(failed_cases)}个):\")
for i, case in enumerate(failed_cases[:5]):
print(f\"\\n案例{i+1}:\")
print(f\" 输入:{case['input']}\")
print(f\" 实际输出:{case['output']}\")
print(f\" 期望输出:{case['expected']}\")
analyze_failed_cases(\"production\")
# 2. Prompt变体实验
# 原始Prompt
prompt_v1 = ChatPromptTemplate.from_template(
\"回答问题:{question}\"
)
# 改进版本1:添加上下文
prompt_v2 = ChatPromptTemplate.from_template(
\"\"\"作为专业客服,请回答用户问题。
问题:{question}
要求:回答准确、简洁、友好。\"\"\"
)
# 改进版本2:提供示例
prompt_v3 = ChatPromptTemplate.from_template(
\"\"\"作为专业客服,请回答用户问题。
示例:
Q: 如何退货?
A: 请在订单页面点击"申请退货",选择退货原因后提交即可。
现在请回答:
Q: {question}
A:\"\"\"
)
# 推送所有版本到Hub
for i, prompt in enumerate([prompt_v1, prompt_v2, prompt_v3], 1):
client.push_prompt(
\"qa_prompt\",
object=prompt,
tags=[f\"experiment_v{i}\"]
)
# 3. 批量测试
def test_prompt_variants(prompt_names: list, test_inputs: list):
\"\"\"测试多个Prompt变体\"\"\"
results = {}
llm = ChatOpenAI(model=\"gpt-4\")
for prompt_name in prompt_names:
prompt = client.pull_prompt(prompt_name)
chain = prompt | llm
outputs = []
for inp in test_inputs:
result = chain.invoke(inp)
outputs.append(result.content)
results[prompt_name] = outputs
return results
test_inputs = [
{\"question\": \"如何退货?\"},
{\"question\": \"配送时间?\"},
{\"question\": \"可以开发票吗?\"}
]
results = test_prompt_variants(
[\"qa_prompt:experiment_v1\", \"qa_prompt:experiment_v2\", \"qa_prompt:experiment_v3\"],
test_inputs
)
# 对比输出
for prompt_name, outputs in results.items():
print(f\"\\n{prompt_name}:\")
for i, output in enumerate(outputs):
print(f\" {i+1}. {output[:50]}...\")
# 4. 自动优化
# 使用LLM优化Prompt
def optimize_prompt_with_llm(original_prompt: str, failed_examples: list):
\"\"\"使用LLM优化Prompt\"\"\"
optimizer_llm = ChatOpenAI(model=\"gpt-4\")
optimization_prompt = f\"\"\"
你是一个Prompt工程专家。请优化以下Prompt,使其在这些失败案例上表现更好。
原始Prompt:
{original_prompt}
失败案例:
{failed_examples}
请提供优化后的Prompt:
\"\"\"
result = optimizer_llm.invoke(optimization_prompt)
return result.content
# optimized = optimize_prompt_with_llm(prompt_v1, failed_cases)
# print(f\"优化建议:{optimized}\")
---
6.3 版本控制
01.Git集成
a.代码版本管理
a.功能说明
将LangSmith配置与代码版本控制系统(Git)集成,追踪代码和配置的同步变化。代码仓库记录应用逻辑,LangSmith记录运行数据。通过标签关联代码版本和运行记录。代码审查流程包括评估结果检查。Git集成实现端到端的可追溯性。
b.代码示例
---
import os
import subprocess
from langsmith import Client
from langsmith import traceable
client = Client()
# 1. 获取Git版本信息
def get_git_info():
\"\"\"获取Git提交信息\"\"\"
try:
# 获取当前commit hash
commit_hash = subprocess.check_output(
[\"git\", \"rev-parse\", \"HEAD\"]
).decode().strip()
# 获取分支名
branch = subprocess.check_output(
[\"git\", \"rev-parse\", \"--abbrev-ref\", \"HEAD\"]
).decode().strip()
# 获取提交信息
commit_msg = subprocess.check_output(
[\"git\", \"log\", \"-1\", \"--pretty=%B\"]
).decode().strip()
return {
\"commit\": commit_hash,
\"branch\": branch,
\"message\": commit_msg
}
except:
return {\"commit\": \"unknown\", \"branch\": \"unknown\", \"message\": \"\"}
# 2. 追踪时包含Git信息
@traceable(name=\"versioned_operation\")
def versioned_function(data: str):
\"\"\"带版本信息的函数\"\"\"
from langsmith.run_helpers import get_current_run_tree
git_info = get_git_info()
# 添加Git信息到追踪
run = get_current_run_tree()
if run:
run.extra = {
\"git_commit\": git_info[\"commit\"],
\"git_branch\": git_info[\"branch\"],
\"git_message\": git_info[\"message\"]
}
# 执行操作
result = process_data(data)
return result
# 3. CI/CD集成
# 在CI pipeline中运行评估
# .github/workflows/test.yml
\"\"\"
name: Evaluate on Commit
on: [push, pull_request]
jobs:
evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install langsmith langchain
- name: Run Evaluation
env:
LANGCHAIN_API_KEY: ${{ secrets.LANGSMITH_API_KEY }}
run: |
python evaluate.py
- name: Check Results
run: |
python check_evaluation_results.py
\"\"\"
# evaluate.py
def run_ci_evaluation():
\"\"\"CI中运行评估\"\"\"
from langsmith.evaluation import evaluate
git_info = get_git_info()
results = evaluate(
my_app,
data=\"regression_test_set\",
evaluators=[accuracy_evaluator],
experiment_prefix=f\"ci_{git_info['commit'][:8]}\"
)
# 检查是否通过
if results['accuracy'] < 0.8:
print(f\"❌ 评估失败:准确率{results['accuracy']:.2%} < 80%\")
exit(1)
else:
print(f\"✅ 评估通过:准确率{results['accuracy']:.2%}\")
exit(0)
# 4. 标签关联
# 发布时添加Git标签
# git tag v1.0.0
# git push origin v1.0.0
# 在LangSmith中使用标签
os.environ[\"LANGCHAIN_PROJECT\"] = \"production\"
@traceable(tags=[\"v1.0.0\", \"release\"])
def production_handler(request):
\"\"\"生产处理器(带版本标签)\"\"\"
result = handle_request(request)
return result
# 查询特定版本的运行
runs = client.list_runs(
project_name=\"production\",
filter='has(tags, \"v1.0.0\")'
)
print(f\"版本v1.0.0的运行数:{sum(1 for _ in runs)}\")
# 5. 版本对比
def compare_code_versions(tag1: str, tag2: str):
\"\"\"对比两个代码版本的性能\"\"\"
# 获取两个版本的运行
runs1 = client.list_runs(
project_name=\"production\",
filter=f'has(tags, \"{tag1}\")',
limit=1000
)
runs2 = client.list_runs(
project_name=\"production\",
filter=f'has(tags, \"{tag2}\")',
limit=1000
)
# 计算指标
def calc_metrics(runs):
latencies = [r.latency for r in runs if r.latency]
successes = sum(1 for r in runs if r.status == \"success\")
total = sum(1 for _ in runs)
import statistics
return {
\"avg_latency\": statistics.mean(latencies) if latencies else 0,
\"success_rate\": successes / total if total > 0 else 0
}
metrics1 = calc_metrics(runs1)
metrics2 = calc_metrics(runs2)
print(f\"\\n版本对比:{tag1} vs {tag2}\")
print(f\" 延迟:{metrics1['avg_latency']:.0f}ms -> {metrics2['avg_latency']:.0f}ms\")
print(f\" 成功率:{metrics1['success_rate']:.2%} -> {metrics2['success_rate']:.2%}\")
# compare_code_versions(\"v1.0.0\", \"v1.1.0\")
---
b.配置管理
a.功能说明
版本化管理LangSmith配置(项目名称、采样率、评估器等)。使用配置文件存储设置。不同环境使用不同配置文件。配置变更与代码变更一起追踪。配置管理确保环境一致性,支持快速切换和回滚。
b.代码示例
---
import yaml
import json
# 1. 配置文件
# langsmith_config.yaml
\"\"\"
development:
api_key: dev_key
project: dev_project
sampling_rate: 1.0
staging:
api_key: staging_key
project: staging_project
sampling_rate: 0.5
production:
api_key: prod_key
project: prod_project
sampling_rate: 0.1
\"\"\"
# 2. 加载配置
class LangSmithConfig:
\"\"\"LangSmith配置管理\"\"\"
def __init__(self, config_file: str = \"langsmith_config.yaml\"):
with open(config_file, \"r\") as f:
self.config = yaml.safe_load(f)
def load_env_config(self, env: str):
\"\"\"加载环境配置\"\"\"
env_config = self.config.get(env, {})
os.environ[\"LANGCHAIN_API_KEY\"] = env_config.get(\"api_key\", \"\")
os.environ[\"LANGCHAIN_PROJECT\"] = env_config.get(\"project\", \"\")
# 采样率
self.sampling_rate = env_config.get(\"sampling_rate\", 1.0)
print(f\"✓ 已加载{env}环境配置\")
# 使用配置
config = LangSmithConfig()
# 根据环境变量选择配置
env = os.getenv(\"APP_ENV\", \"development\")
config.load_env_config(env)
# 3. 评估器配置
# evaluators_config.json
\"\"\"
{
\"qa_evaluators\": [
{
\"name\": \"accuracy\",
\"type\": \"custom\",
\"config\": {}
},
{
\"name\": \"length\",
\"type\": \"custom\",
\"config\": {
\"min_length\": 20,
\"max_length\": 200
}
}
]
}
\"\"\"
# 加载评估器配置
def load_evaluators(config_file: str):
\"\"\"加载评估器配置\"\"\"
with open(config_file, \"r\") as f:
config = json.load(f)
evaluators = []
for eval_config in config[\"qa_evaluators\"]:
# 根据配置创建评估器
evaluator = create_evaluator(eval_config)
evaluators.append(evaluator)
return evaluators
# evaluators = load_evaluators(\"evaluators_config.json\")
# 4. 版本化配置
# 将配置文件提交到Git
# git add langsmith_config.yaml
# git commit -m \"Update LangSmith config\"
# 5. 配置验证
def validate_config(config: dict):
\"\"\"验证配置\"\"\"
required_fields = [\"api_key\", \"project\"]
for env in [\"development\", \"staging\", \"production\"]:
env_config = config.get(env, {})
for field in required_fields:
if not env_config.get(field):
raise ValueError(f\"配置错误:{env}.{field}缺失\")
print(\"✓ 配置验证通过\")
# 加载并验证
with open(\"langsmith_config.yaml\") as f:
config_data = yaml.safe_load(f)
validate_config(config_data)
# 6. 环境变量覆盖
# 支持环境变量覆盖配置文件
api_key = os.getenv(\"LANGSMITH_API_KEY\") or config_data[env][\"api_key\"]
project = os.getenv(\"LANGCHAIN_PROJECT\") or config_data[env][\"project\"]
os.environ[\"LANGCHAIN_API_KEY\"] = api_key
os.environ[\"LANGCHAIN_PROJECT\"] = project
---
7 最佳实践
7.1 调试技巧
01.问题定位
a.追踪分析
a.功能说明
使用LangSmith追踪快速定位问题根源。查看完整调用链,识别失败节点。检查输入输出,发现数据异常。分析延迟分布,定位性能瓶颈。对比成功和失败案例,找出差异。追踪分析是调试的起点,提供问题的完整上下文。
b.代码示例
---
from langsmith import Client
from datetime import datetime, timedelta
client = Client()
# 1. 查找错误运行
error_runs = client.list_runs(
project_name=\"production\",
filter='eq(status, \"error\")',
limit=10
)
print(\"最近的错误运行:\")
for run in error_runs:
print(f\"\\nID: {run.id}\")
print(f\"时间:{run.start_time}\")
print(f\"错误:{run.error}\")
print(f\"输入:{run.inputs}\")
# 2. 深入分析特定运行
def analyze_failed_run(run_id: str):
\"\"\"分析失败运行\"\"\"
run = client.read_run(run_id)
print(f\"\\n失败运行分析:{run_id}\")
print(f\"{'='*60}\")
print(f\"\\n运行类型:{run.run_type}\")
print(f\"名称:{run.name}\")
print(f\"状态:{run.status}\")
print(f\"\\n输入:\")
print(run.inputs)
print(f\"\\n输出:\")
print(run.outputs)
print(f\"\\n错误信息:\")
print(run.error)
print(f\"\\n堆栈追踪:\")
# print(run.stack_trace)
print(f\"\\n元数据:\")
print(run.extra)
# 使用
if error_runs:
first_error = list(error_runs)[0]
analyze_failed_run(str(first_error.id))
# 3. 对比成功和失败
def compare_success_failure(project_name: str):
\"\"\"对比成功和失败运行\"\"\"
# 获取成功运行
success_runs = list(client.list_runs(
project_name=project_name,
filter='eq(status, \"success\")',
limit=10
))
# 获取失败运行
failure_runs = list(client.list_runs(
project_name=project_name,
filter='eq(status, \"error\")',
limit=10
))
print(\"\\n成功 vs 失败对比:\")
print(f\"\\n成功案例特征:\")
for run in success_runs[:3]:
print(f\" 输入长度:{len(str(run.inputs))}\")
print(f\" 延迟:{run.latency}ms\")
print(f\" Token:{run.total_tokens}\")
print()
print(f\"\\n失败案例特征:\")
for run in failure_runs[:3]:
print(f\" 输入长度:{len(str(run.inputs))}\")
print(f\" 错误类型:{run.error[:50] if run.error else 'N/A'}...\")
print()
compare_success_failure(\"production\")
# 4. 时间线分析
def timeline_analysis(run_id: str):
\"\"\"时间线分析\"\"\"
run = client.read_run(run_id)
# 如果是嵌套运行,分析子运行
# child_runs = client.list_runs(
# parent_run_id=run_id
# )
print(f\"\\n时间线分析:\")
print(f\"开始:{run.start_time}\")
print(f\"结束:{run.end_time}\")
print(f\"总耗时:{run.latency}ms\")
# 分析子步骤耗时
# for child in child_runs:
# print(f\" - {child.name}: {child.latency}ms\")
# 5. 输入模式分析
def analyze_input_patterns(project_name: str):
\"\"\"分析导致失败的输入模式\"\"\"
error_runs = client.list_runs(
project_name=project_name,
filter='eq(status, \"error\")',
limit=100
)
# 收集失败输入
error_inputs = []
for run in error_runs:
if run.inputs:
error_inputs.append(run.inputs)
print(f\"\\n失败输入模式分析:\")
print(f\"样本数:{len(error_inputs)}\")
# 分析共性
# - 输入长度
lengths = [len(str(inp)) for inp in error_inputs]
if lengths:
import statistics
print(f\"平均输入长度:{statistics.mean(lengths):.0f}字符\")
# - 特殊字符
special_chars_count = sum(
1 for inp in error_inputs
if any(c in str(inp) for c in ['@', '#', '$', '%'])
)
print(f\"包含特殊字符:{special_chars_count}/{len(error_inputs)}\")
analyze_input_patterns(\"production\")
---
b.断点调试
a.功能说明
在关键位置添加追踪点,记录中间状态。使用@traceable装饰器标记调试函数。记录变量值、执行路径、条件分支。对比预期和实际值,发现偏差。断点调试帮助理解执行流程,定位逻辑错误。
b.代码示例
---
from langsmith import traceable
from langsmith.run_helpers import get_current_run_tree
# 1. 添加调试追踪点
@traceable(name=\"preprocess_step\")
def preprocess(data: str):
\"\"\"预处理(带调试)\"\"\"
run = get_current_run_tree()
# 记录原始输入
if run:
run.extra = {\"original_length\": len(data)}
# 执行处理
cleaned = data.strip().lower()
# 记录处理后状态
if run:
run.extra[\"cleaned_length\"] = len(cleaned)
run.extra[\"modification\"] = \"stripped and lowercased\"
return cleaned
@traceable(name=\"validate_step\")
def validate(data: str):
\"\"\"验证(带调试)\"\"\"
run = get_current_run_tree()
is_valid = len(data) > 0 and len(data) < 1000
if run:
run.extra = {
\"is_valid\": is_valid,
\"data_length\": len(data),
\"validation_rules\": \"0 < length < 1000\"
}
if not is_valid:
raise ValueError(f\"验证失败:长度{len(data)}\")
return data
@traceable(name=\"main_pipeline\")
def pipeline(raw_data: str):
\"\"\"完整流程(带调试)\"\"\"
# 每个步骤都会被追踪
cleaned = preprocess(raw_data)
validated = validate(cleaned)
result = process_data(validated)
return result
# 执行后在LangSmith查看每个步骤的详细信息
# 2. 条件断点
@traceable(name=\"conditional_debug\")
def process_with_conditional_debug(data: str, debug: bool = False):
\"\"\"条件调试\"\"\"
if debug:
run = get_current_run_tree()
if run:
# 详细记录
run.extra = {
\"debug_mode\": True,
\"data_sample\": data[:100],
\"data_type\": type(data).__name__,
\"data_repr\": repr(data)
}
result = expensive_operation(data)
return result
# 启用调试
result = process_with_conditional_debug(\"test\", debug=True)
# 3. 异常捕获
@traceable(name=\"error_tracking\")
def risky_operation(data: dict):
\"\"\"捕获异常详情\"\"\"
run = get_current_run_tree()
try:
# 可能失败的操作
result = data[\"key\"]
processed = transform(result)
return processed
except KeyError as e:
# 记录异常上下文
if run:
run.extra = {
\"error_type\": \"KeyError\",
\"missing_key\": str(e),
\"available_keys\": list(data.keys())
}
raise
except Exception as e:
# 记录通用异常
if run:
run.extra = {
\"error_type\": type(e).__name__,
\"error_message\": str(e),
\"data_snapshot\": str(data)[:200]
}
raise
# 4. 性能断点
import time
@traceable(name=\"performance_check\")
def check_performance(data: str):
\"\"\"性能检查点\"\"\"
run = get_current_run_tree()
start = time.time()
# 执行操作
result = slow_function(data)
elapsed = (time.time() - start) * 1000
# 记录性能
if run:
run.extra = {
\"elapsed_ms\": elapsed,
\"is_slow\": elapsed > 1000,
\"data_size\": len(data)
}
return result
---
02.常见问题
a.错误处理
a.功能说明
识别和处理LLM应用的常见错误类型。超时错误、限流错误、解析错误、验证错误等。实现重试机制和降级策略。优雅处理异常,避免系统崩溃。错误处理提升系统健壮性,改善用户体验。
b.代码示例
---
import time
from functools import wraps
# 1. 重试装饰器
def retry_on_error(max_retries: int = 3, backoff: float = 1.0):
\"\"\"错误重试装饰器\"\"\"
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
wait_time = backoff * (2 ** attempt)
print(f\"重试 {attempt+1}/{max_retries},等待{wait_time}秒...\")
time.sleep(wait_time)
else:
print(f\"重试失败,已达最大次数\")
raise
return wrapper
return decorator
# 使用
@retry_on_error(max_retries=3, backoff=1.0)
@traceable(name=\"api_call_with_retry\")
def call_llm_api(prompt: str):
\"\"\"带重试的API调用\"\"\"
# LLM API调用
result = llm.invoke(prompt)
return result
# 2. 超时处理
from functools import wraps
import signal
class TimeoutError(Exception):
pass
def timeout(seconds: int):
\"\"\"超时装饰器\"\"\"
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
def handler(signum, frame):
raise TimeoutError(f\"操作超时({seconds}秒)\")
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
@timeout(seconds=10)
@traceable(name=\"operation_with_timeout\")
def long_running_operation(data: str):
\"\"\"带超时的操作\"\"\"
result = process_data(data)
return result
# 3. 限流处理
class RateLimitHandler:
\"\"\"限流处理器\"\"\"
def __init__(self, max_requests: int = 60, window: int = 60):
self.max_requests = max_requests
self.window = window
self.requests = []
def can_proceed(self) -> bool:
\"\"\"检查是否可以继续\"\"\"
now = time.time()
# 清理过期请求
self.requests = [t for t in self.requests if now - t < self.window]
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
else:
return False
def wait_if_needed(self):
\"\"\"等待直到可以继续\"\"\"
while not self.can_proceed():
time.sleep(1)
rate_limiter = RateLimitHandler(max_requests=10, window=60)
@traceable(name=\"rate_limited_call\")
def rate_limited_function(data: str):
\"\"\"限流函数\"\"\"
rate_limiter.wait_if_needed()
result = call_api(data)
return result
# 4. 降级策略
@traceable(name=\"operation_with_fallback\")
def operation_with_fallback(data: str):
\"\"\"带降级的操作\"\"\"
try:
# 尝试主方法(GPT-4)
result = call_gpt4(data)
return result
except Exception as e:
print(f\"主方法失败,使用降级方案:{e}\")
try:
# 降级到GPT-3.5
result = call_gpt35(data)
return result
except Exception as e2:
print(f\"降级方案也失败,使用本地缓存:{e2}\")
# 最后的降级:返回缓存或默认值
result = get_cached_response(data)
return result
# 5. 验证错误处理
def validate_input(data: dict):
\"\"\"输入验证\"\"\"
required_fields = [\"question\", \"context\"]
for field in required_fields:
if field not in data:
raise ValueError(f\"缺少必需字段:{field}\")
# 类型验证
if not isinstance(data[\"question\"], str):
raise TypeError(f\"question必须是字符串\")
# 长度验证
if len(data[\"question\"]) > 1000:
raise ValueError(f\"question过长({len(data['question'])}字符)\")
@traceable(name=\"validated_operation\")
def safe_operation(data: dict):
\"\"\"带验证的操作\"\"\"
try:
validate_input(data)
except (ValueError, TypeError) as e:
# 记录验证错误
print(f\"输入验证失败:{e}\")
return {\"error\": str(e), \"status\": \"invalid_input\"}
# 执行操作
result = process(data)
return {\"result\": result, \"status\": \"success\"}
---
7.2 性能优化
01.延迟优化
a.并发处理
a.功能说明
使用并发提升处理速度,降低整体延迟。并行调用多个LLM请求。使用异步I/O避免阻塞。批量处理减少往返次数。并发处理在保证正确性前提下大幅提升性能。
b.代码示例
---
import asyncio
from langchain.chat_models import ChatOpenAI
# 1. 异步LLM调用
async def async_llm_call(prompt: str):
\"\"\"异步LLM调用\"\"\"
llm = ChatOpenAI(model=\"gpt-4\")
# 使用异步方法
result = await llm.ainvoke(prompt)
return result.content
# 2. 并发批量处理
async def batch_process(prompts: list):
\"\"\"批量处理多个提示词\"\"\"
# 创建异步任务
tasks = [async_llm_call(prompt) for prompt in prompts]
# 并发执行
results = await asyncio.gather(*tasks)
return results
# 使用
prompts = [
\"介绍Python\",
\"介绍JavaScript\",
\"介绍Rust\"
]
results = asyncio.run(batch_process(prompts))
for i, result in enumerate(results):
print(f\"{i+1}. {result[:50]}...\")
# 3. 限制并发数
from asyncio import Semaphore
async def limited_concurrent_process(prompts: list, max_concurrent: int = 5):
\"\"\"限制并发数的批量处理\"\"\"
semaphore = Semaphore(max_concurrent)
async def process_one(prompt):
async with semaphore:
return await async_llm_call(prompt)
tasks = [process_one(p) for p in prompts]
results = await asyncio.gather(*tasks)
return results
# 处理100个请求,最多5个并发
large_prompts = [f\"问题{i}\" for i in range(100)]
results = asyncio.run(limited_concurrent_process(large_prompts, max_concurrent=5))
# 4. 流式并发
async def streaming_process(prompts: list):
\"\"\"流式处理(边生成边返回)\"\"\"
for prompt in prompts:
result = await async_llm_call(prompt)
yield result
async def consume_stream():
async for result in streaming_process(prompts):
print(f\"收到结果:{result[:30]}...\")
# asyncio.run(consume_stream())
# 5. 性能对比
import time
def sequential_process(prompts: list):
\"\"\"顺序处理\"\"\"
llm = ChatOpenAI(model=\"gpt-4\")
results = []
for prompt in prompts:
result = llm.invoke(prompt)
results.append(result.content)
return results
# 对比
test_prompts = [f\"测试{i}\" for i in range(10)]
# 顺序处理
start = time.time()
sequential_results = sequential_process(test_prompts)
sequential_time = time.time() - start
# 并发处理
start = time.time()
concurrent_results = asyncio.run(batch_process(test_prompts))
concurrent_time = time.time() - start
print(f\"\\n性能对比:\")
print(f\"顺序处理:{sequential_time:.2f}秒\")
print(f\"并发处理:{concurrent_time:.2f}秒\")
print(f\"加速比:{sequential_time/concurrent_time:.2f}x\")
---
b.缓存策略
a.功能说明
缓存频繁请求的结果,避免重复API调用。实现多级缓存(内存、Redis、数据库)。设置合理的过期时间。缓存命中率监控和优化。缓存策略显著降低延迟和成本。
b.代码示例
---
import hashlib
import json
# 1. 内存缓存
class MemoryCache:
\"\"\"内存缓存\"\"\"
def __init__(self):
self.cache = {}
def get_key(self, inputs: dict) -> str:
\"\"\"生成缓存键\"\"\"
key_str = json.dumps(inputs, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, inputs: dict):
\"\"\"获取缓存\"\"\"
key = self.get_key(inputs)
return self.cache.get(key)
def set(self, inputs: dict, output):
\"\"\"设置缓存\"\"\"
key = self.get_key(inputs)
self.cache[key] = output
cache = MemoryCache()
def cached_llm_call(inputs: dict):
\"\"\"带缓存的LLM调用\"\"\"
# 检查缓存
cached = cache.get(inputs)
if cached:
print(\"✓ 缓存命中\")
return cached
# 缓存未命中,调用API
print(\"✗ 缓存未命中,调用API\")
llm = ChatOpenAI(model=\"gpt-4\")
result = llm.invoke(inputs[\"question\"])
# 存入缓存
cache.set(inputs, result.content)
return result.content
# 2. Redis缓存
# import redis
# class RedisCache:
# \"\"\"Redis缓存\"\"\"
#
# def __init__(self, host: str = \"localhost\", port: int = 6379):
# self.redis = redis.Redis(host=host, port=port, db=0)
#
# def get(self, inputs: dict):
# key = self.get_key(inputs)
# cached = self.redis.get(key)
# if cached:
# return json.loads(cached)
# return None
#
# def set(self, inputs: dict, output, ttl: int = 3600):
# \"\"\"设置缓存(带过期时间)\"\"\"
# key = self.get_key(inputs)
# self.redis.setex(key, ttl, json.dumps(output))
# 3. LRU缓存
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_function(question: str) -> str:
\"\"\"使用lru_cache装饰器\"\"\"
llm = ChatOpenAI(model=\"gpt-4\")
result = llm.invoke(question)
return result.content
# 4. 缓存统计
class CacheWithStats(MemoryCache):
\"\"\"带统计的缓存\"\"\"
def __init__(self):
super().__init__()
self.hits = 0
self.misses = 0
def get(self, inputs: dict):
result = super().get(inputs)
if result is not None:
self.hits += 1
else:
self.misses += 1
return result
def stats(self):
total = self.hits + self.misses
hit_rate = self.hits / total if total > 0 else 0
return {
\"hits\": self.hits,
\"misses\": self.misses,
\"hit_rate\": hit_rate
}
cache_with_stats = CacheWithStats()
# 使用后查看统计
stats = cache_with_stats.stats()
print(f\"\\n缓存统计:\")
print(f\" 命中:{stats['hits']}\")
print(f\" 未命中:{stats['misses']}\")
print(f\" 命中率:{stats['hit_rate']:.2%}\")
---
02.资源优化
a.批量处理
a.功能说明
批量处理多个请求,提升吞吐量。合并多个小请求为一个大请求。使用批量API降低开销。平衡批次大小和延迟。批量处理适用于非实时场景,显著提升效率。
b.代码示例
---
# 1. 批量LLM调用
def batch_llm_call(prompts: list, batch_size: int = 10):
\"\"\"批量LLM调用\"\"\"
llm = ChatOpenAI(model=\"gpt-4\")
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
# 批量调用
batch_results = llm.generate(batch)
for gen in batch_results.generations:
results.append(gen[0].text)
return results
# 2. 批量评估
from langsmith.evaluation import evaluate
def batch_evaluate(dataset_name: str, batch_size: int = 50):
\"\"\"分批评估大数据集\"\"\"
dataset = client.read_dataset(dataset_name=dataset_name)
examples = list(client.list_examples(dataset_id=dataset.id))
total = len(examples)
batches = (total + batch_size - 1) // batch_size
print(f\"总样本:{total},分{batches}批处理\")
all_results = []
for i in range(0, total, batch_size):
batch = examples[i:i+batch_size]
print(f\"\\n处理批次{i//batch_size + 1}/{batches}...\")
# 为批次创建临时数据集
temp_dataset = client.create_dataset(
dataset_name=f\"temp_batch_{i}\")
for ex in batch:
client.create_example(
dataset_id=temp_dataset.id,
inputs=ex.inputs,
outputs=ex.outputs
)
# 评估批次
results = evaluate(
my_app,
data=temp_dataset.name,
evaluators=[accuracy_evaluator]
)
all_results.append(results)
# 清理
client.delete_dataset(dataset_id=temp_dataset.id)
return all_results
# 3. 批量写入
def batch_write_examples(dataset_id: str, examples: list, batch_size: int = 100):
\"\"\"批量写入样本\"\"\"
total = len(examples)
for i in range(0, total, batch_size):
batch = examples[i:i+batch_size]
# 批量创建
for ex in batch:
client.create_example(
dataset_id=dataset_id,
inputs=ex[\"inputs\"],
outputs=ex[\"outputs\"]
)
print(f\"已写入{min(i+batch_size, total)}/{total}\")
---
7.3 成本控制
01.模型选择
a.分层策略
a.功能说明
根据任务复杂度选择合适的模型,平衡成本和质量。简单任务用小模型(GPT-3.5),复杂任务用大模型(GPT-4)。实现动态路由,自动选择模型。监控每个模型的成本效益比。分层策略在保证质量前提下显著降低成本。
b.代码示例
---
from langchain.chat_models import ChatOpenAI
# 1. 任务复杂度分类
def classify_complexity(question: str) -> str:
\"\"\"分类任务复杂度\"\"\"
# 简单规则分类
if len(question) < 20:
return \"simple\"
complex_keywords = [
\"分析\", \"比较\", \"评估\", \"解释\", \"推理\",
\"为什么\", \"如何\", \"原理\"
]
if any(kw in question for kw in complex_keywords):
return \"complex\"
return \"medium\"
# 2. 分层路由
def route_to_model(question: str) -> str:
\"\"\"根据复杂度路由到合适模型\"\"\"
complexity = classify_complexity(question)
if complexity == \"simple\":
model = \"gpt-3.5-turbo\"
print(f\"✓ 简单任务,使用{model}\")
elif complexity == \"complex\":
model = \"gpt-4\"
print(f\"✓ 复杂任务,使用{model}\")
else:
model = \"gpt-3.5-turbo\"
print(f\"✓ 中等任务,使用{model}\")
# 调用对应模型
llm = ChatOpenAI(model=model)
result = llm.invoke(question)
return result.content
# 使用
simple_q = \"今天星期几?\"
complex_q = \"请分析并比较深度学习和传统机器学习的优缺点\"
answer1 = route_to_model(simple_q)
answer2 = route_to_model(complex_q)
# 3. 成本对比
def calculate_cost_savings():
\"\"\"计算成本节省\"\"\"
# 假设数据
total_requests = 10000
simple_ratio = 0.6 # 60%简单任务
complex_ratio = 0.4 # 40%复杂任务
# 全部使用GPT-4
gpt4_cost = total_requests * 0.01 # 假设$0.01/请求
# 分层策略
simple_requests = total_requests * simple_ratio
complex_requests = total_requests * complex_ratio
stratified_cost = (
simple_requests * 0.001 + # GPT-3.5
complex_requests * 0.01 # GPT-4
)
savings = gpt4_cost - stratified_cost
savings_pct = savings / gpt4_cost * 100
print(f\"\\n成本对比:\")
print(f\" 全部GPT-4:${gpt4_cost:.2f}\")
print(f\" 分层策略:${stratified_cost:.2f}\")
print(f\" 节省:${savings:.2f} ({savings_pct:.1f}%)\")
calculate_cost_savings()
# 4. 质量监控
from langsmith import Client
client = Client()
def monitor_model_quality():
\"\"\"监控各模型质量\"\"\"
# 查询使用GPT-3.5的运行
gpt35_runs = client.list_runs(
project_name=\"production\",
filter='has(extra, \"model\") and eq(extra.model, \"gpt-3.5-turbo\")',
limit=100
)
# 查询使用GPT-4的运行
gpt4_runs = client.list_runs(
project_name=\"production\",
filter='has(extra, \"model\") and eq(extra.model, \"gpt-4\")',
limit=100
)
# 计算质量指标
def calc_quality(runs):
accuracies = [
r.feedback_stats.get(\"accuracy\", 0)
for r in runs if r.feedback_stats
]
if accuracies:
import statistics
return statistics.mean(accuracies)
return 0
gpt35_quality = calc_quality(gpt35_runs)
gpt4_quality = calc_quality(gpt4_runs)
print(f\"\\n质量监控:\")
print(f\" GPT-3.5准确率:{gpt35_quality:.2%}\")
print(f\" GPT-4准确率:{gpt4_quality:.2%}\")
monitor_model_quality()
# 5. 自适应调整
class AdaptiveRouter:
\"\"\"自适应模型路由器\"\"\"
def __init__(self, quality_threshold: float = 0.85):
self.quality_threshold = quality_threshold
self.performance = {
\"gpt-3.5-turbo\": [],
\"gpt-4\": []
}
def route(self, question: str) -> str:
\"\"\"路由决策\"\"\"
complexity = classify_complexity(question)
# 检查GPT-3.5的历史表现
gpt35_avg = self._avg_quality(\"gpt-3.5-turbo\")
if complexity == \"simple\":
return \"gpt-3.5-turbo\"
elif complexity == \"complex\":
return \"gpt-4\"
else:
# 中等任务:根据历史表现决定
if gpt35_avg >= self.quality_threshold:
return \"gpt-3.5-turbo\"
else:
return \"gpt-4\"
def _avg_quality(self, model: str) -> float:
\"\"\"计算平均质量\"\"\"
if not self.performance[model]:
return 1.0
import statistics
return statistics.mean(self.performance[model])
def record_result(self, model: str, quality: float):
\"\"\"记录结果\"\"\"
self.performance[model].append(quality)
# 只保留最近100个
if len(self.performance[model]) > 100:
self.performance[model].pop(0)
router = AdaptiveRouter()
---
b.Prompt优化
a.功能说明
优化Prompt降低Token消耗。精简提示词,去除冗余。使用缩写和简洁表述。避免重复信息。合理使用系统提示词。Prompt优化在保持效果前提下降低成本。
b.代码示例
---
# 1. Token计数
def count_tokens(text: str, model: str = \"gpt-4\") -> int:
\"\"\"统计Token数\"\"\"
import tiktoken
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
return len(tokens)
# 2. Prompt对比
# 冗长版本
verbose_prompt = \"\"\"
你是一个非常专业和经验丰富的客户服务代表。
你的任务是以友好、专业和有帮助的方式回答用户的问题。
请确保你的回答准确、清晰、简洁。
如果你不确定答案,请诚实地告诉用户。
请使用礼貌的语言,保持专业的态度。
用户问题:{question}
请提供详细的回答:
\"\"\"
# 精简版本
concise_prompt = \"\"\"
你是专业客服,请准确、简洁回答。
问题:{question}
\"\"\"
verbose_tokens = count_tokens(verbose_prompt)
concise_tokens = count_tokens(concise_prompt)
print(f\"冗长版本:{verbose_tokens} tokens\")
print(f\"精简版本:{concise_tokens} tokens\")
print(f\"节省:{verbose_tokens - concise_tokens} tokens ({(1-concise_tokens/verbose_tokens)*100:.1f}%)\")
# 3. 模板优化
def optimize_template(template: str) -> str:
\"\"\"优化模板\"\"\"
# 去除多余空白
optimized = \" \".join(template.split())
# 缩写常用词
replacements = {
\"请你\": \"请\",
\"非常\": \"\",
\"您的\": \"你的\",
\"进行\": \"做\"
}
for old, new in replacements.items():
optimized = optimized.replace(old, new)
return optimized
# 4. 成本估算
def estimate_cost(prompt: str, expected_output_tokens: int, model: str = \"gpt-4\"):
\"\"\"估算单次调用成本\"\"\"
input_tokens = count_tokens(prompt, model)
# GPT-4价格(示例)
if model == \"gpt-4\":
input_cost_per_1k = 0.03
output_cost_per_1k = 0.06
else:
input_cost_per_1k = 0.0015
output_cost_per_1k = 0.002
input_cost = (input_tokens / 1000) * input_cost_per_1k
output_cost = (expected_output_tokens / 1000) * output_cost_per_1k
total_cost = input_cost + output_cost
print(f\"\\n成本估算:\")
print(f\" 输入:{input_tokens} tokens, ${input_cost:.4f}\")
print(f\" 输出(预估):{expected_output_tokens} tokens, ${output_cost:.4f}\")
print(f\" 总计:${total_cost:.4f}\")
return total_cost
estimate_cost(verbose_prompt, expected_output_tokens=100)
# 5. 批量优化
def batch_optimize_prompts(prompts: list):
\"\"\"批量优化Prompt\"\"\"
optimized = []
total_savings = 0
for prompt in prompts:
original_tokens = count_tokens(prompt)
optimized_prompt = optimize_template(prompt)
optimized_tokens = count_tokens(optimized_prompt)
savings = original_tokens - optimized_tokens
total_savings += savings
optimized.append(optimized_prompt)
print(f\"\\n批量优化结果:\")
print(f\" 总Prompt数:{len(prompts)}\")
print(f\" 总节省Token:{total_savings}\")
print(f\" 平均节省:{total_savings/len(prompts):.1f} tokens/prompt\")
return optimized
# 6. 成本预算控制
class CostController:
\"\"\"成本控制器\"\"\"
def __init__(self, daily_budget: float):
self.daily_budget = daily_budget
self.daily_spent = 0.0
def can_proceed(self, estimated_cost: float) -> bool:
\"\"\"检查是否可以继续\"\"\"
if self.daily_spent + estimated_cost > self.daily_budget:
print(f\"❌ 超出每日预算(${self.daily_budget})\")
return False
return True
def record_cost(self, cost: float):
\"\"\"记录成本\"\"\"
self.daily_spent += cost
remaining = self.daily_budget - self.daily_spent
print(f\"已使用${self.daily_spent:.2f} / ${self.daily_budget:.2f},剩余${remaining:.2f}\")
cost_controller = CostController(daily_budget=100.0)
# 使用前检查
estimated = estimate_cost(concise_prompt, 50)
if cost_controller.can_proceed(estimated):
# 执行调用
# result = llm.invoke(...)
cost_controller.record_cost(estimated)
---