1 框架介绍
1.1 核心理念
01.极简主义
a.最小实现
a.功能说明
PocketFlow是100行代码的极简LLM框架。只包含核心Graph抽象和状态管理。没有复杂依赖和封装。专注于清晰和可理解性。适合学习和快速原型开发。
b.代码示例
---
# 1. PocketFlow核心理念
"""
PocketFlow: 极简的LLM应用框架
核心特点:
- 100行左右的核心代码
- 基于有向图(Graph)的工作流
- 简单的状态管理
- 易于理解和扩展
- 无复杂依赖
"""
# 2. 核心抽象:Graph
class Graph:
"""工作流图"""
def __init__(self):
self.nodes = {} # 节点字典
self.edges = {} # 边字典
self.state = {} # 全局状态
def add_node(self, name, func):
"""添加节点"""
self.nodes[name] = func
def add_edge(self, from_node, to_node, condition=None):
"""添加边"""
if from_node not in self.edges:
self.edges[from_node] = []
self.edges[from_node].append((to_node, condition))
def run(self, start_node, initial_state=None):
"""执行工作流"""
if initial_state:
self.state.update(initial_state)
current = start_node
while current:
# 执行当前节点
print(f"执行节点: {current}")
result = self.nodes[current](self.state)
# 更新状态
if isinstance(result, dict):
self.state.update(result)
# 找到下一个节点
current = self._next_node(current)
return self.state
def _next_node(self, current):
"""确定下一个节点"""
if current not in self.edges:
return None
for next_node, condition in self.edges[current]:
if condition is None or condition(self.state):
return next_node
return None
# 3. 简单示例
def node_a(state):
"""节点A:问候"""
print(f"你好,{state.get('name', '访客')}!")
return {"greeted": True}
def node_b(state):
"""节点B:处理"""
print("正在处理...")
return {"processed": True}
def node_c(state):
"""节点C:完成"""
print("完成!")
return {"completed": True}
# 构建工作流
graph = Graph()
graph.add_node("start", node_a)
graph.add_node("process", node_b)
graph.add_node("end", node_c)
graph.add_edge("start", "process")
graph.add_edge("process", "end")
# 执行
result = graph.run("start", {"name": "张三"})
print(f"最终状态: {result}")
# 输出:
# 执行节点: start
# 你好,张三!
# 执行节点: process
# 正在处理...
# 执行节点: end
# 完成!
# 最终状态: {'name': '张三', 'greeted': True, 'processed': True, 'completed': True}
print("✓ PocketFlow核心理念示例完成")
# 4. 与复杂框架对比
"""
LangChain/LangGraph:
- 数千行代码
- 丰富的内置组件
- 复杂的抽象层
- 适合生产环境
PocketFlow:
- 100行代码
- 只有核心抽象
- 简单直观
- 适合学习和原型
"""
# 5. 核心优势
"""
优势:
1. 代码少,易理解
2. 无黑盒,完全可控
3. 快速上手
4. 灵活扩展
5. 适合教学
适用场景:
- 学习LLM应用开发
- 快速原型验证
- 小型工具开发
- 理解Graph工作流原理
"""
print("✓ 核心理念完成")
---
b.Graph思维
a.功能说明
将LLM应用建模为有向图。节点表示处理步骤。边表示控制流。状态在节点间传递。Graph思维清晰直观。
b.代码示例
---
# Graph思维模式
# 1. 传统流程 vs Graph流程
# 传统流程(线性)
def traditional_flow(input_text):
result1 = step1(input_text)
result2 = step2(result1)
result3 = step3(result2)
return result3
# Graph流程(图结构)
graph = Graph()
graph.add_node("step1", step1_func)
graph.add_node("step2", step2_func)
graph.add_node("step3", step3_func)
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
result = graph.run("step1", {"input": input_text})
# 2. 条件分支
def check_condition(state):
return state.get("score", 0) > 0.5
graph.add_edge("analyze", "path_a", condition=check_condition)
graph.add_edge("analyze", "path_b", condition=lambda s: not check_condition(s))
# 3. 循环处理
def need_retry(state):
return state.get("retry_count", 0) < 3 and not state.get("success")
graph.add_edge("process", "validate")
graph.add_edge("validate", "process", condition=need_retry)
graph.add_edge("validate", "complete", condition=lambda s: not need_retry(s))
# 4. 并行处理(通过状态传递实现)
def split_tasks(state):
"""分割任务"""
return {"tasks": state["input"].split(",")}
def process_task(state):
"""处理单个任务"""
results = []
for task in state["tasks"]:
results.append(f"处理: {task}")
return {"results": results}
graph.add_node("split", split_tasks)
graph.add_node("process", process_task)
graph.add_edge("split", "process")
print("✓ Graph思维完成")
---
1.2 100行实现
01.核心代码
a.完整实现
a.功能说明
PocketFlow的核心实现仅需100行左右代码。包含Graph类、节点管理、边连接和状态执行。代码简洁清晰易懂。是学习Graph工作流的最佳示例。
b.代码示例
---
# PocketFlow完整实现(约100行)
from typing import Dict, List, Callable, Optional, Any, Tuple
class PocketFlow:
"""极简LLM工作流框架"""
def __init__(self):
"""初始化"""
self.nodes: Dict[str, Callable] = {}
self.edges: Dict[str, List[Tuple[str, Optional[Callable]]]] = {}
self.state: Dict[str, Any] = {}
def add_node(self, name: str, func: Callable) -> 'PocketFlow':
"""
添加节点
Args:
name: 节点名称
func: 节点处理函数,接收state,返回dict更新状态
"""
self.nodes[name] = func
return self # 支持链式调用
def add_edge(
self,
from_node: str,
to_node: str,
condition: Optional[Callable[[Dict], bool]] = None
) -> 'PocketFlow':
"""
添加边
Args:
from_node: 起始节点
to_node: 目标节点
condition: 条件函数,接收state,返回bool
"""
if from_node not in self.edges:
self.edges[from_node] = []
self.edges[from_node].append((to_node, condition))
return self
def run(
self,
start_node: str,
initial_state: Optional[Dict] = None,
verbose: bool = False
) -> Dict:
"""
执行工作流
Args:
start_node: 起始节点
initial_state: 初始状态
verbose: 是否打印执行日志
Returns:
最终状态
"""
# 初始化状态
if initial_state:
self.state = initial_state.copy()
else:
self.state = {}
current = start_node
step_count = 0
max_steps = 100 # 防止无限循环
while current and step_count < max_steps:
if verbose:
print(f"Step {step_count + 1}: 执行节点 '{current}'")
# 检查节点是否存在
if current not in self.nodes:
raise ValueError(f"节点 '{current}' 不存在")
# 执行节点函数
result = self.nodes[current](self.state)
# 更新状态
if isinstance(result, dict):
self.state.update(result)
# 确定下一个节点
current = self._next_node(current, verbose)
step_count += 1
if step_count >= max_steps:
raise RuntimeError(f"超过最大步数限制 {max_steps}")
if verbose:
print(f"工作流完成,共执行 {step_count} 步")
return self.state
def _next_node(self, current: str, verbose: bool = False) -> Optional[str]:
"""确定下一个节点"""
if current not in self.edges:
return None
for next_node, condition in self.edges[current]:
if condition is None:
# 无条件边
return next_node
elif condition(self.state):
# 条件满足
if verbose:
print(f" → 条件满足,前往 '{next_node}'")
return next_node
return None
def visualize(self) -> str:
"""可视化工作流(简单文本表示)"""
lines = ["工作流图:"]
lines.append("=" * 40)
for node in self.nodes:
lines.append(f"节点: {node}")
if node in self.edges:
for next_node, condition in self.edges[node]:
cond_str = " (有条件)" if condition else ""
lines.append(f" → {next_node}{cond_str}")
return "\n".join(lines)
# 使用示例
if __name__ == "__main__":
# 创建工作流
flow = PocketFlow()
# 定义节点函数
def greet(state):
print(f"欢迎,{state.get('name', '访客')}!")
return {"greeted": True}
def process(state):
print("处理中...")
return {"processed": True}
def finish(state):
print("完成!")
return {"completed": True}
# 构建图
flow.add_node("start", greet) \
.add_node("process", process) \
.add_node("end", finish) \
.add_edge("start", "process") \
.add_edge("process", "end")
# 可视化
print(flow.visualize())
print()
# 执行
result = flow.run("start", {"name": "Alice"}, verbose=True)
print(f"\n最终状态: {result}")
print("✓ 100行实现完成")
---
1.3 设计哲学
01.设计原则
a.简单优于复杂
a.功能说明
遵循Python之禅的设计理念。简单的实现更容易理解和维护。避免过度设计和不必要的抽象。专注于核心功能。
b.代码示例
---
# 设计哲学:简单优于复杂
# 1. 最小化依赖
"""
PocketFlow只依赖Python标准库:
- typing: 类型提示
- 没有外部依赖
对比其他框架:
- LangChain: 50+ 依赖包
- LangGraph: 20+ 依赖包
- PocketFlow: 0 外部依赖
"""
# 2. 显式优于隐式
# 不好的设计(隐式)
class ImplicitGraph:
def __init__(self):
self._magic_state = {} # 隐藏的状态
def process(self, data):
# 隐式修改状态
self._magic_state["data"] = data
return self._auto_route() # 自动路由
# 好的设计(显式)
class ExplicitGraph:
def __init__(self):
self.state = {} # 明确的状态
def process(self, state):
# 显式传递和返回状态
new_state = {"data": state["input"]}
return new_state
def run(self, start, state):
# 显式的执行流程
current = start
while current:
state = self.nodes[current](state)
current = self._next(current)
return state
# 3. 扁平优于嵌套
# 不好的设计(过度嵌套)
class NestedWorkflow:
class Node:
class Edge:
class Condition:
pass
# 好的设计(扁平)
class FlatWorkflow:
nodes = {}
edges = {}
# 简单的字典结构
# 4. 可读性至上
# 不好的代码
g=Graph();g.n("a",f1);g.e("a","b");g.r("a",{"x":1})
# 好的代码
graph = Graph()
graph.add_node("start", process_func)
graph.add_edge("start", "end")
result = graph.run("start", {"input": data})
# 5. 约定优于配置
"""
默认行为简单合理:
- 节点函数接收state,返回dict
- 边默认无条件执行
- 状态自动合并更新
- 无需复杂配置文件
"""
def simple_node(state):
# 约定:接收state
result = process(state["input"])
# 约定:返回dict更新状态
return {"output": result}
# 6. 渐进式复杂度
# 最简单:顺序执行
flow.add_edge("a", "b").add_edge("b", "c")
# 添加条件:分支
flow.add_edge("a", "b", condition=lambda s: s["x"] > 0)
flow.add_edge("a", "c", condition=lambda s: s["x"] <= 0)
# 添加循环:重复
flow.add_edge("process", "check")
flow.add_edge("check", "process", condition=need_retry)
flow.add_edge("check", "done", condition=lambda s: not need_retry(s))
print("✓ 设计原则示例完成")
---
b.教学友好
a.功能说明
代码结构清晰便于学习。每个概念都有对应的实现。适合理解Graph工作流原理。是优秀的教学材料。
b.代码示例
---
# 教学友好设计
# 1. 渐进式学习路径
# 第一步:理解节点
def my_first_node(state):
"""节点就是一个函数"""
print("这是一个节点")
return {"step": 1}
# 第二步:理解边
graph = PocketFlow()
graph.add_node("node1", my_first_node)
graph.add_node("node2", lambda s: {"step": 2})
graph.add_edge("node1", "node2") # 边连接节点
# 第三步:理解状态
result = graph.run("node1", {"input": "data"})
# 状态在节点间传递和累积
# 第四步:理解条件
graph.add_edge("check", "path_a", condition=lambda s: s["x"] > 0)
# 2. 清晰的命名
"""
好的命名:
- add_node: 添加节点(动词+名词)
- add_edge: 添加边(动词+名词)
- run: 运行(简单动词)
避免:
- n: 太简短
- register_processing_unit: 太复杂
- do_the_thing: 不清晰
"""
# 3. 完整的示例
"""
每个概念都有完整的示例代码:
- 节点定义
- 边连接
- 条件判断
- 状态管理
- 完整工作流
"""
# 4. 逐步增加复杂度
# 示例1:最简单
flow = PocketFlow()
flow.add_node("start", lambda s: {"done": True})
result = flow.run("start")
# 示例2:多个节点
flow.add_node("end", lambda s: {"completed": True})
flow.add_edge("start", "end")
# 示例3:添加条件
flow.add_edge("start", "path_a", condition=lambda s: s.get("x", 0) > 0)
flow.add_edge("start", "path_b", condition=lambda s: s.get("x", 0) <= 0)
# 5. 可视化帮助理解
print(flow.visualize())
# 工作流图:
# ========================================
# 节点: start
# → path_a (有条件)
# → path_b (有条件)
# 节点: path_a
# 节点: path_b
print("✓ 教学友好设计完成")
---
2 Graph抽象
2.1 节点定义
01.节点函数
a.函数签名
a.功能说明
节点是处理单元,接收state字典,返回更新字典。函数签名简单统一。支持同步和异步。节点是工作流的基本构建块。
b.代码示例
---
# 节点定义
# 1. 基础节点
def simple_node(state):
"""最简单的节点"""
print("执行节点")
return {"processed": True}
# 2. 有输入输出的节点
def process_data(state):
"""处理数据节点"""
input_data = state.get("input")
result = input_data.upper() # 处理逻辑
return {"output": result}
# 3. 有副作用的节点
def save_to_db(state):
"""保存到数据库"""
data = state.get("data")
# db.save(data) # 副作用
return {"saved": True}
# 4. LLM节点
def llm_call(state):
"""调用LLM"""
prompt = state.get("prompt")
# response = llm.generate(prompt)
response = "LLM响应"
return {"llm_response": response}
# 5. 条件节点
def decision_node(state):
"""决策节点"""
score = state.get("score", 0)
return {
"decision": "approve" if score > 0.5 else "reject",
"checked": True
}
print("✓ 节点定义完成")
---
2.2 边连接
01.控制流
a.边的类型
a.功能说明
边定义节点间的连接关系。支持无条件边和条件边。实现顺序、分支和循环。边控制工作流的执行路径。
b.代码示例
---
# 边连接
flow = PocketFlow()
# 1. 无条件边(顺序)
flow.add_edge("step1", "step2")
flow.add_edge("step2", "step3")
# 2. 条件边(分支)
flow.add_edge("check", "path_a", condition=lambda s: s["score"] > 0.5)
flow.add_edge("check", "path_b", condition=lambda s: s["score"] <= 0.5)
# 3. 循环边
def need_retry(state):
return state.get("retry", 0) < 3 and not state.get("success")
flow.add_edge("process", "validate")
flow.add_edge("validate", "process", condition=need_retry)
flow.add_edge("validate", "done", condition=lambda s: not need_retry(s))
# 4. 复杂条件
def complex_condition(state):
return (
state.get("score", 0) > 0.7 and
state.get("confidence", 0) > 0.8 and
state.get("validated", False)
)
flow.add_edge("analyze", "approve", condition=complex_condition)
print("✓ 边连接完成")
---
2.3 状态管理
01.状态传递
a.状态字典
a.功能说明
状态是全局字典,在节点间传递。节点返回dict更新状态。状态累积所有节点的输出。状态是数据流的载体。
b.代码示例
---
# 状态管理
# 1. 初始状态
initial_state = {
"input": "用户输入",
"user_id": "123"
}
# 2. 节点更新状态
def node1(state):
return {"step1_done": True, "data": "处理后数据"}
def node2(state):
# 可以访问之前的状态
previous_data = state.get("data")
return {"step2_done": True, "final": previous_data + "_final"}
# 3. 状态累积
flow = PocketFlow()
flow.add_node("n1", node1).add_node("n2", node2)
flow.add_edge("n1", "n2")
result = flow.run("n1", initial_state)
# result = {
# "input": "用户输入",
# "user_id": "123",
# "step1_done": True,
# "data": "处理后数据",
# "step2_done": True,
# "final": "处理后数据_final"
# }
print(f"最终状态: {result}")
print("✓ 状态管理完成")
---
3 基础示例
3.1 Chat对话
01.对话实现
a.简单聊天机器人
a.功能说明
使用PocketFlow实现聊天机器人。维护对话历史。调用LLM生成响应。演示基础应用。
b.代码示例
---
# 使用PocketFlow实现Chat
from litellm import completion
def add_user_message(state):
"""添加用户消息"""
message = state.get("user_input")
history = state.get("history", [])
history.append({"role": "user", "content": message})
return {"history": history}
def call_llm(state):
"""调用LLM"""
history = state.get("history", [])
response = completion(
model="gpt-3.5-turbo",
messages=history
)
ai_message = response.choices[0].message.content
history.append({"role": "assistant", "content": ai_message})
return {
"history": history,
"response": ai_message
}
def output_response(state):
"""输出响应"""
print(f"AI: {state['response']}")
return {}
# 构建聊天流程
chat_flow = PocketFlow()
chat_flow.add_node("add_message", add_user_message)
chat_flow.add_node("llm", call_llm)
chat_flow.add_node("output", output_response)
chat_flow.add_edge("add_message", "llm")
chat_flow.add_edge("llm", "output")
# 使用
result = chat_flow.run("add_message", {"user_input": "你好"})
print("✓ Chat对话完成")
---
3.2 Workflow工作流
01.业务流程
a.审批工作流
a.功能说明
实现多步骤业务流程。支持条件判断和人工审批。演示工作流自动化。
b.代码示例
---
# 审批工作流
def submit_request(state):
"""提交申请"""
print(f"提交申请: {state['request']}")
return {"status": "submitted"}
def auto_check(state):
"""自动检查"""
amount = state.get("amount", 0)
return {"auto_approved": amount < 1000}
def manual_review(state):
"""人工审核"""
print("需要人工审核")
return {"reviewed": True, "approved": True}
def approve(state):
"""批准"""
print("申请已批准")
return {"status": "approved"}
def reject(state):
"""拒绝"""
print("申请被拒绝")
return {"status": "rejected"}
# 构建工作流
workflow = PocketFlow()
workflow.add_node("submit", submit_request)
workflow.add_node("check", auto_check)
workflow.add_node("review", manual_review)
workflow.add_node("approve", approve)
workflow.add_node("reject", reject)
workflow.add_edge("submit", "check")
workflow.add_edge("check", "approve", condition=lambda s: s.get("auto_approved"))
workflow.add_edge("check", "review", condition=lambda s: not s.get("auto_approved"))
workflow.add_edge("review", "approve", condition=lambda s: s.get("approved"))
workflow.add_edge("review", "reject", condition=lambda s: not s.get("approved"))
# 执行
result = workflow.run("submit", {
"request": "采购申请",
"amount": 5000
}, verbose=True)
print(f"最终状态: {result['status']}")
print("✓ Workflow工作流完成")
---
3.3 Agent代理
01.Agent实现
a.ReAct模式
a.功能说明
实现ReAct Agent模式。支持工具调用和推理循环。演示Agent能力。
b.代码示例
---
# Agent实现
def think(state):
"""思考下一步"""
question = state.get("question")
# 使用LLM决定下一步
thought = f"需要查询知识库回答: {question}"
return {"thought": thought, "action": "search"}
def search_tool(state):
"""搜索工具"""
# 模拟搜索
results = ["结果1", "结果2"]
return {"search_results": results}
def answer(state):
"""生成答案"""
results = state.get("search_results", [])
answer = f"根据搜索结果: {results}"
return {"answer": answer}
# 构建Agent
agent = PocketFlow()
agent.add_node("think", think)
agent.add_node("search", search_tool)
agent.add_node("answer", answer)
agent.add_edge("think", "search")
agent.add_edge("search", "answer")
result = agent.run("think", {"question": "什么是AI?"})
print(f"答案: {result['answer']}")
print("✓ Agent代理完成")
---
3.4 RAG检索
01.RAG实现
a.检索增强
a.功能说明
实现简单的RAG流程。包括检索、上下文构建和生成。演示RAG核心逻辑。
b.代码示例
---
# RAG实现
def retrieve(state):
"""检索相关文档"""
query = state.get("query")
# 模拟向量检索
docs = ["文档1内容", "文档2内容"]
return {"retrieved_docs": docs}
def build_context(state):
"""构建上下文"""
docs = state.get("retrieved_docs", [])
context = "\n".join(docs)
return {"context": context}
def generate_answer(state):
"""生成答案"""
context = state.get("context")
query = state.get("query")
prompt = f"上下文:\n{context}\n\n问题: {query}\n\n回答:"
# 调用LLM
from litellm import completion
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return {"answer": response.choices[0].message.content}
# 构建RAG流程
rag = PocketFlow()
rag.add_node("retrieve", retrieve)
rag.add_node("build_context", build_context)
rag.add_node("generate", generate_answer)
rag.add_edge("retrieve", "build_context")
rag.add_edge("build_context", "generate")
result = rag.run("retrieve", {"query": "什么是Python?"})
print(f"答案: {result['answer']}")
print("✓ RAG检索完成")
---
4 高级模式
4.1 Multi-Agent
01.多Agent协作
a.Agent组合
a.功能说明
实现多个Agent协作完成复杂任务。每个Agent负责特定子任务。通过状态传递协调。
b.代码示例
---
# Multi-Agent实现
def research_agent(state):
"""研究Agent"""
topic = state.get("topic")
# 模拟研究
findings = f"{topic}的研究结果"
return {"research": findings}
def writer_agent(state):
"""写作Agent"""
research = state.get("research")
# 模拟写作
article = f"基于{research}的文章"
return {"article": article}
def reviewer_agent(state):
"""审核Agent"""
article = state.get("article")
# 模拟审核
return {"reviewed": True, "approved": True}
# 构建Multi-Agent流程
multi_agent = PocketFlow()
multi_agent.add_node("research", research_agent)
multi_agent.add_node("writer", writer_agent)
multi_agent.add_node("reviewer", reviewer_agent)
multi_agent.add_edge("research", "writer")
multi_agent.add_edge("writer", "reviewer")
result = multi_agent.run("research", {"topic": "AI技术"})
print(f"最终结果: {result}")
print("✓ Multi-Agent完成")
---
4.2 Supervisor监督
01.监督模式
a.中央协调
a.功能说明
Supervisor节点协调多个Worker。根据任务分配给不同Worker。汇总Worker结果。
b.代码示例
---
# Supervisor模式
def supervisor(state):
"""监督者决定任务分配"""
task_type = state.get("task_type")
if task_type == "translate":
return {"route": "translator"}
elif task_type == "summarize":
return {"route": "summarizer"}
else:
return {"route": "general"}
def translator(state):
"""翻译Worker"""
return {"result": "翻译结果"}
def summarizer(state):
"""摘要Worker"""
return {"result": "摘要结果"}
def general_worker(state):
"""通用Worker"""
return {"result": "通用结果"}
# 构建Supervisor流程
supervisor_flow = PocketFlow()
supervisor_flow.add_node("supervisor", supervisor)
supervisor_flow.add_node("translator", translator)
supervisor_flow.add_node("summarizer", summarizer)
supervisor_flow.add_node("general", general_worker)
supervisor_flow.add_edge("supervisor", "translator",
condition=lambda s: s.get("route") == "translator")
supervisor_flow.add_edge("supervisor", "summarizer",
condition=lambda s: s.get("route") == "summarizer")
supervisor_flow.add_edge("supervisor", "general",
condition=lambda s: s.get("route") == "general")
result = supervisor_flow.run("supervisor", {"task_type": "translate"})
print(f"结果: {result['result']}")
print("✓ Supervisor监督完成")
---
4.3 Map-Reduce
01.并行处理
a.Map阶段
a.功能说明
将任务拆分为多个子任务并行处理。汇总所有子任务结果。实现数据并行。
b.代码示例
---
# Map-Reduce模式
def split_tasks(state):
"""Map: 拆分任务"""
items = state.get("items", [])
return {"tasks": items, "results": []}
def process_batch(state):
"""处理批次"""
tasks = state.get("tasks", [])
results = []
# 模拟批量处理
for task in tasks:
result = f"处理: {task}"
results.append(result)
return {"results": results}
def reduce_results(state):
"""Reduce: 汇总结果"""
results = state.get("results", [])
summary = f"共处理{len(results)}个任务"
return {"summary": summary}
# 构建Map-Reduce流程
mapreduce = PocketFlow()
mapreduce.add_node("split", split_tasks)
mapreduce.add_node("process", process_batch)
mapreduce.add_node("reduce", reduce_results)
mapreduce.add_edge("split", "process")
mapreduce.add_edge("process", "reduce")
result = mapreduce.run("split", {
"items": ["任务1", "任务2", "任务3"]
})
print(f"汇总: {result['summary']}")
print("✓ Map-Reduce完成")
---
4.4 Streaming流式
01.流式处理
a.实时输出
a.功能说明
支持流式输出生成内容。实时更新状态。提升用户体验。
b.代码示例
---
# 流式处理
def stream_llm(state):
"""流式调用LLM"""
from litellm import completion
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": state["prompt"]}],
stream=True
)
full_text = ""
print("AI: ", end="", flush=True)
for chunk in response:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True)
full_text += token
print()
return {"response": full_text}
# 构建流式流程
stream_flow = PocketFlow()
stream_flow.add_node("stream", stream_llm)
result = stream_flow.run("stream", {"prompt": "讲个故事"})
print("✓ Streaming流式完成")
---
5 与其他框架对比
5.1 vs LangChain
01.对比分析
a.复杂度对比
a.功能说明
PocketFlow极简,LangChain功能丰富。PocketFlow适合学习和原型,LangChain适合生产。根据需求选择框架。
b.代码示例
---
# PocketFlow vs LangChain
# PocketFlow实现(简单)
flow = PocketFlow()
flow.add_node("llm", lambda s: {"response": "AI响应"})
result = flow.run("llm", {"input": "问题"})
# LangChain实现(完整)
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import ChatPromptTemplate
llm = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages([
("system", "你是助手"),
("user", "{input}")
])
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(input="问题")
"""
对比:
PocketFlow:
- 代码: 10行
- 依赖: 0
- 学习曲线: 平缓
- 适用: 原型/学习
LangChain:
- 代码: 10行(但底层数千行)
- 依赖: 50+
- 学习曲线: 陡峭
- 适用: 生产环境
"""
print("✓ vs LangChain完成")
---
5.2 vs LangGraph
01.Graph对比
a.设计差异
a.功能说明
PocketFlow和LangGraph都基于Graph抽象。LangGraph功能更强大,PocketFlow更简洁。选择取决于需求复杂度。
b.代码示例
---
# PocketFlow vs LangGraph
# PocketFlow(100行)
flow = PocketFlow()
flow.add_node("start", func1)
flow.add_edge("start", "end")
result = flow.run("start")
# LangGraph(功能丰富)
from langgraph.graph import StateGraph
graph = StateGraph()
graph.add_node("start", func1)
graph.add_edge("start", "end")
app = graph.compile()
result = app.invoke(state)
"""
相似点:
- 都基于Graph抽象
- 都有节点和边
- 都管理状态
差异点:
PocketFlow:
- 100行实现
- 简单直接
- 适合学习
LangGraph:
- 完整功能
- 类型检查
- Checkpointing
- Human-in-loop
- 适合生产
"""
print("✓ vs LangGraph完成")
---
5.3 适用场景
01.使用建议
a.选择指南
a.功能说明
根据项目特点选择合适框架。原型和学习选PocketFlow。生产环境选LangChain/LangGraph。
b.代码示例
---
# 选择框架指南
"""
选择PocketFlow的场景:
1. 学习LLM应用开发原理
2. 快速验证想法
3. 简单的内部工具
4. 理解Graph工作流
5. 不想引入复杂依赖
选择LangChain的场景:
1. 生产级应用
2. 需要丰富的内置组件
3. 多种数据源集成
4. 复杂的Chain组合
5. 完整的生态支持
选择LangGraph的场景:
1. 复杂的Agent系统
2. 需要状态持久化
3. Human-in-the-loop
4. 多Agent协作
5. 生产级Graph应用
"""
# 渐进式迁移路径
"""
阶段1: PocketFlow原型
- 快速验证想法
- 理解核心逻辑
阶段2: 功能扩展
- 添加错误处理
- 增加监控日志
阶段3: 迁移到LangGraph
- 保持Graph结构
- 使用生产级功能
- 添加持久化
"""
print("✓ 适用场景完成")
---
6 实战案例
6.1 快速原型
01.原型开发
a.快速验证
a.功能说明
使用PocketFlow快速验证AI应用想法。无需复杂配置。几十行代码实现完整流程。
b.代码示例
---
# 快速原型示例
from litellm import completion
def validate_input(state):
"""验证输入"""
user_input = state.get("input")
if not user_input:
return {"error": "输入为空"}
return {"validated": True}
def generate_response(state):
"""生成响应"""
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": state["input"]}]
)
return {"response": response.choices[0].message.content}
def format_output(state):
"""格式化输出"""
return {"final": f"答案: {state['response']}"}
# 构建原型
prototype = PocketFlow()
prototype.add_node("validate", validate_input)
prototype.add_node("generate", generate_response)
prototype.add_node("format", format_output)
prototype.add_edge("validate", "generate",
condition=lambda s: s.get("validated"))
prototype.add_edge("generate", "format")
# 测试
result = prototype.run("validate", {"input": "什么是AI?"})
print(result["final"])
print("✓ 快速原型完成")
---
6.2 客服助手
01.客服系统
a.智能客服
a.功能说明
实现简单的智能客服系统。支持意图识别和问题路由。演示完整应用。
b.代码示例
---
# 客服助手
def classify_intent(state):
"""意图分类"""
question = state.get("question")
# 简单规则分类
if "退货" in question or "退款" in question:
return {"intent": "refund"}
elif "物流" in question or "快递" in question:
return {"intent": "logistics"}
else:
return {"intent": "general"}
def handle_refund(state):
"""处理退货"""
return {"answer": "退货流程:1. 申请退货 2. 寄回商品 3. 审核退款"}
def handle_logistics(state):
"""处理物流"""
return {"answer": "请提供订单号,我帮您查询物流信息"}
def handle_general(state):
"""通用处理"""
from litellm import completion
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": state["question"]}]
)
return {"answer": response.choices[0].message.content}
# 构建客服流程
customer_service = PocketFlow()
customer_service.add_node("classify", classify_intent)
customer_service.add_node("refund", handle_refund)
customer_service.add_node("logistics", handle_logistics)
customer_service.add_node("general", handle_general)
customer_service.add_edge("classify", "refund",
condition=lambda s: s["intent"] == "refund")
customer_service.add_edge("classify", "logistics",
condition=lambda s: s["intent"] == "logistics")
customer_service.add_edge("classify", "general",
condition=lambda s: s["intent"] == "general")
# 测试
result = customer_service.run("classify", {"question": "如何退货?"})
print(f"答案: {result['answer']}")
print("✓ 客服助手完成")
---
6.3 文档处理
01.文档流水线
a.处理流程
a.功能说明
实现文档处理流水线。包括加载、分析、摘要和存储。演示完整数据处理。
b.代码示例
---
# 文档处理流水线
def load_document(state):
"""加载文档"""
filepath = state.get("filepath")
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
return {"content": content}
def analyze_document(state):
"""分析文档"""
content = state.get("content")
word_count = len(content.split())
return {
"word_count": word_count,
"length": len(content)
}
def summarize_document(state):
"""生成摘要"""
content = state.get("content")
from litellm import completion
response = completion(
model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": f"请总结以下内容:\n{content[:1000]}"
}]
)
return {"summary": response.choices[0].message.content}
def save_results(state):
"""保存结果"""
summary = state.get("summary")
# 保存到数据库或文件
print(f"保存摘要: {summary}")
return {"saved": True}
# 构建处理流程
doc_pipeline = PocketFlow()
doc_pipeline.add_node("load", load_document)
doc_pipeline.add_node("analyze", analyze_document)
doc_pipeline.add_node("summarize", summarize_document)
doc_pipeline.add_node("save", save_results)
doc_pipeline.add_edge("load", "analyze")
doc_pipeline.add_edge("analyze", "summarize")
doc_pipeline.add_edge("summarize", "save")
# 执行(假设文件存在)
# result = doc_pipeline.run("load", {"filepath": "./test.txt"})
print("✓ 文档处理完成")
---
7 扩展开发
7.1 自定义节点
01.扩展节点
a.高级节点
a.功能说明
实现带错误处理、重试、日志的高级节点。封装常用功能模式。提升代码复用性。
b.代码示例
---
# 自定义高级节点
# 1. 带重试的节点
def with_retry(func, max_retries=3):
"""添加重试功能的节点装饰器"""
def wrapped(state):
for attempt in range(max_retries):
try:
return func(state)
except Exception as e:
print(f"尝试 {attempt + 1} 失败: {e}")
if attempt == max_retries - 1:
return {"error": str(e)}
return {}
return wrapped
# 使用
@with_retry
def risky_operation(state):
"""可能失败的操作"""
import random
if random.random() < 0.5:
raise Exception("随机失败")
return {"success": True}
# 2. 带日志的节点
def with_logging(func):
"""添加日志的节点装饰器"""
def wrapped(state):
print(f"[LOG] 执行: {func.__name__}")
print(f"[LOG] 输入状态: {state}")
result = func(state)
print(f"[LOG] 输出: {result}")
return result
return wrapped
@with_logging
def process_data(state):
return {"processed": True}
# 3. 带计时的节点
import time
def with_timing(func):
"""添加计时的节点装饰器"""
def wrapped(state):
start = time.time()
result = func(state)
duration = time.time() - start
print(f"节点耗时: {duration:.2f}秒")
return {**result, "duration": duration}
return wrapped
@with_timing
def slow_operation(state):
time.sleep(1)
return {"done": True}
print("✓ 自定义节点完成")
---
7.2 工具集成
01.外部工具
a.工具调用
a.功能说明
集成搜索、数据库、API等外部工具。扩展PocketFlow能力。实现更复杂的应用。
b.代码示例
---
# 工具集成
# 1. 搜索工具
def search_tool(state):
"""搜索工具"""
query = state.get("query")
# 模拟搜索API
results = [f"搜索结果: {query}"]
return {"search_results": results}
# 2. 数据库工具
def db_query_tool(state):
"""数据库查询工具"""
sql = state.get("sql")
# 模拟数据库查询
# results = db.execute(sql)
results = ["数据1", "数据2"]
return {"db_results": results}
# 3. API调用工具
import requests
def api_call_tool(state):
"""API调用工具"""
endpoint = state.get("endpoint")
try:
# response = requests.get(endpoint)
# data = response.json()
data = {"api_response": "模拟数据"}
return {"api_data": data}
except Exception as e:
return {"api_error": str(e)}
# 4. 工具组合
flow = PocketFlow()
flow.add_node("search", search_tool)
flow.add_node("db_query", db_query_tool)
flow.add_node("api_call", api_call_tool)
flow.add_edge("search", "db_query")
flow.add_edge("db_query", "api_call")
print("✓ 工具集成完成")
---
7.3 源码解读
01.核心源码
a.实现细节
a.功能说明
深入解读PocketFlow的100行核心代码。理解每个方法的设计思路。掌握Graph工作流的本质。
b.代码示例
---
# PocketFlow源码解读
"""
核心数据结构:
1. nodes字典: {节点名: 处理函数}
- 存储所有节点
- O(1)查找节点
2. edges字典: {起始节点: [(目标节点, 条件函数), ...]}
- 存储节点连接关系
- 一个节点可以有多条出边
- 条件函数决定是否走这条边
3. state字典: {键: 值}
- 全局状态
- 节点间共享数据
- 累积所有节点的输出
"""
# 1. add_node方法
def add_node(self, name: str, func: Callable):
"""
添加节点到nodes字典
设计决策:
- 使用字典存储,快速查找
- 节点名作为key,必须唯一
- func可以是任意callable
- 返回self支持链式调用
"""
self.nodes[name] = func
return self
# 2. add_edge方法
def add_edge(self, from_node: str, to_node: str, condition=None):
"""
添加边到edges字典
设计决策:
- 使用列表存储多条出边
- condition为None表示无条件边
- 边是(目标节点, 条件)元组
- 返回self支持链式调用
"""
if from_node not in self.edges:
self.edges[from_node] = []
self.edges[from_node].append((to_node, condition))
return self
# 3. run方法(核心执行逻辑)
def run(self, start_node: str, initial_state=None, verbose=False):
"""
执行工作流
算法:
1. 初始化状态
2. 从起始节点开始
3. while循环执行:
a. 执行当前节点函数
b. 更新状态
c. 确定下一个节点
4. 返回最终状态
设计决策:
- while循环而不是递归(避免栈溢出)
- max_steps防止无限循环
- 状态累积而不是替换
"""
if initial_state:
self.state = initial_state.copy()
else:
self.state = {}
current = start_node
step_count = 0
max_steps = 100
while current and step_count < max_steps:
# 执行节点
result = self.nodes[current](self.state)
# 更新状态(合并dict)
if isinstance(result, dict):
self.state.update(result)
# 下一个节点
current = self._next_node(current)
step_count += 1
return self.state
# 4. _next_node方法(路由逻辑)
def _next_node(self, current: str):
"""
确定下一个节点
算法:
1. 获取当前节点的所有出边
2. 遍历每条边
3. 如果边无条件或条件满足,返回目标节点
4. 否则返回None(结束)
设计决策:
- 返回第一个满足条件的边
- 边的顺序很重要
- 无出边返回None终止
"""
if current not in self.edges:
return None
for next_node, condition in self.edges[current]:
if condition is None or condition(self.state):
return next_node
return None
"""
总结:
- 约100行实现完整Graph框架
- 核心是nodes和edges两个字典
- while循环驱动执行
- 状态在节点间传递
- 条件函数控制路由
扩展方向:
1. 添加异步支持
2. 添加并行执行
3. 添加状态持久化
4. 添加可视化
5. 添加调试工具
"""
print("✓ 源码解读完成")
---