1 框架介绍

1.1 核心理念

01.极简主义
    a.最小实现
        a.功能说明
            PocketFlow是100行代码的极简LLM框架。只包含核心Graph抽象和状态管理。没有复杂依赖和封装。专注于清晰和可理解性。适合学习和快速原型开发。
        b.代码示例
            ---
            # 1. PocketFlow核心理念

            """
            PocketFlow: 极简的LLM应用框架

            核心特点:
            - 100行左右的核心代码
            - 基于有向图(Graph)的工作流
            - 简单的状态管理
            - 易于理解和扩展
            - 无复杂依赖
            """

            # 2. 核心抽象:Graph

            class Graph:
                """工作流图"""
                def __init__(self):
                    self.nodes = {}      # 节点字典
                    self.edges = {}      # 边字典
                    self.state = {}      # 全局状态

                def add_node(self, name, func):
                    """添加节点"""
                    self.nodes[name] = func

                def add_edge(self, from_node, to_node, condition=None):
                    """添加边"""
                    if from_node not in self.edges:
                        self.edges[from_node] = []
                    self.edges[from_node].append((to_node, condition))

                def run(self, start_node, initial_state=None):
                    """执行工作流"""
                    if initial_state:
                        self.state.update(initial_state)

                    current = start_node

                    while current:
                        # 执行当前节点
                        print(f"执行节点: {current}")
                        result = self.nodes[current](self.state)

                        # 更新状态
                        if isinstance(result, dict):
                            self.state.update(result)

                        # 找到下一个节点
                        current = self._next_node(current)

                    return self.state

                def _next_node(self, current):
                    """确定下一个节点"""
                    if current not in self.edges:
                        return None

                    for next_node, condition in self.edges[current]:
                        if condition is None or condition(self.state):
                            return next_node

                    return None

            # 3. 简单示例

            def node_a(state):
                """节点A:问候"""
                print(f"你好,{state.get('name', '访客')}!")
                return {"greeted": True}

            def node_b(state):
                """节点B:处理"""
                print("正在处理...")
                return {"processed": True}

            def node_c(state):
                """节点C:完成"""
                print("完成!")
                return {"completed": True}

            # 构建工作流
            graph = Graph()
            graph.add_node("start", node_a)
            graph.add_node("process", node_b)
            graph.add_node("end", node_c)

            graph.add_edge("start", "process")
            graph.add_edge("process", "end")

            # 执行
            result = graph.run("start", {"name": "张三"})
            print(f"最终状态: {result}")

            # 输出:
            # 执行节点: start
            # 你好,张三!
            # 执行节点: process
            # 正在处理...
            # 执行节点: end
            # 完成!
            # 最终状态: {'name': '张三', 'greeted': True, 'processed': True, 'completed': True}

            print("✓ PocketFlow核心理念示例完成")

            # 4. 与复杂框架对比

            """
            LangChain/LangGraph:
            - 数千行代码
            - 丰富的内置组件
            - 复杂的抽象层
            - 适合生产环境

            PocketFlow:
            - 100行代码
            - 只有核心抽象
            - 简单直观
            - 适合学习和原型
            """

            # 5. 核心优势

            """
            优势:
            1. 代码少,易理解
            2. 无黑盒,完全可控
            3. 快速上手
            4. 灵活扩展
            5. 适合教学

            适用场景:
            - 学习LLM应用开发
            - 快速原型验证
            - 小型工具开发
            - 理解Graph工作流原理
            """

            print("✓ 核心理念完成")
            ---
    b.Graph思维
        a.功能说明
            将LLM应用建模为有向图。节点表示处理步骤。边表示控制流。状态在节点间传递。Graph思维清晰直观。
        b.代码示例
            ---
            # Graph思维模式

            # 1. 传统流程 vs Graph流程

            # 传统流程(线性)
            def traditional_flow(input_text):
                result1 = step1(input_text)
                result2 = step2(result1)
                result3 = step3(result2)
                return result3

            # Graph流程(图结构)
            graph = Graph()
            graph.add_node("step1", step1_func)
            graph.add_node("step2", step2_func)
            graph.add_node("step3", step3_func)

            graph.add_edge("step1", "step2")
            graph.add_edge("step2", "step3")

            result = graph.run("step1", {"input": input_text})

            # 2. 条件分支

            def check_condition(state):
                return state.get("score", 0) > 0.5

            graph.add_edge("analyze", "path_a", condition=check_condition)
            graph.add_edge("analyze", "path_b", condition=lambda s: not check_condition(s))

            # 3. 循环处理

            def need_retry(state):
                return state.get("retry_count", 0) < 3 and not state.get("success")

            graph.add_edge("process", "validate")
            graph.add_edge("validate", "process", condition=need_retry)
            graph.add_edge("validate", "complete", condition=lambda s: not need_retry(s))

            # 4. 并行处理(通过状态传递实现)

            def split_tasks(state):
                """分割任务"""
                return {"tasks": state["input"].split(",")}

            def process_task(state):
                """处理单个任务"""
                results = []
                for task in state["tasks"]:
                    results.append(f"处理: {task}")
                return {"results": results}

            graph.add_node("split", split_tasks)
            graph.add_node("process", process_task)
            graph.add_edge("split", "process")

            print("✓ Graph思维完成")
            ---

1.2 100行实现

01.核心代码
    a.完整实现
        a.功能说明
            PocketFlow的核心实现仅需100行左右代码。包含Graph类、节点管理、边连接和状态执行。代码简洁清晰易懂。是学习Graph工作流的最佳示例。
        b.代码示例
            ---
            # PocketFlow完整实现(约100行)

            from typing import Dict, List, Callable, Optional, Any, Tuple

            class PocketFlow:
                """极简LLM工作流框架"""

                def __init__(self):
                    """初始化"""
                    self.nodes: Dict[str, Callable] = {}
                    self.edges: Dict[str, List[Tuple[str, Optional[Callable]]]] = {}
                    self.state: Dict[str, Any] = {}

                def add_node(self, name: str, func: Callable) -> 'PocketFlow':
                    """
                    添加节点

                    Args:
                        name: 节点名称
                        func: 节点处理函数,接收state,返回dict更新状态
                    """
                    self.nodes[name] = func
                    return self  # 支持链式调用

                def add_edge(
                    self,
                    from_node: str,
                    to_node: str,
                    condition: Optional[Callable[[Dict], bool]] = None
                ) -> 'PocketFlow':
                    """
                    添加边

                    Args:
                        from_node: 起始节点
                        to_node: 目标节点
                        condition: 条件函数,接收state,返回bool
                    """
                    if from_node not in self.edges:
                        self.edges[from_node] = []
                    self.edges[from_node].append((to_node, condition))
                    return self

                def run(
                    self,
                    start_node: str,
                    initial_state: Optional[Dict] = None,
                    verbose: bool = False
                ) -> Dict:
                    """
                    执行工作流

                    Args:
                        start_node: 起始节点
                        initial_state: 初始状态
                        verbose: 是否打印执行日志

                    Returns:
                        最终状态
                    """
                    # 初始化状态
                    if initial_state:
                        self.state = initial_state.copy()
                    else:
                        self.state = {}

                    current = start_node
                    step_count = 0
                    max_steps = 100  # 防止无限循环

                    while current and step_count < max_steps:
                        if verbose:
                            print(f"Step {step_count + 1}: 执行节点 '{current}'")

                        # 检查节点是否存在
                        if current not in self.nodes:
                            raise ValueError(f"节点 '{current}' 不存在")

                        # 执行节点函数
                        result = self.nodes[current](self.state)

                        # 更新状态
                        if isinstance(result, dict):
                            self.state.update(result)

                        # 确定下一个节点
                        current = self._next_node(current, verbose)
                        step_count += 1

                    if step_count >= max_steps:
                        raise RuntimeError(f"超过最大步数限制 {max_steps}")

                    if verbose:
                        print(f"工作流完成,共执行 {step_count} 步")

                    return self.state

                def _next_node(self, current: str, verbose: bool = False) -> Optional[str]:
                    """确定下一个节点"""
                    if current not in self.edges:
                        return None

                    for next_node, condition in self.edges[current]:
                        if condition is None:
                            # 无条件边
                            return next_node
                        elif condition(self.state):
                            # 条件满足
                            if verbose:
                                print(f"  → 条件满足,前往 '{next_node}'")
                            return next_node

                    return None

                def visualize(self) -> str:
                    """可视化工作流(简单文本表示)"""
                    lines = ["工作流图:"]
                    lines.append("=" * 40)

                    for node in self.nodes:
                        lines.append(f"节点: {node}")
                        if node in self.edges:
                            for next_node, condition in self.edges[node]:
                                cond_str = " (有条件)" if condition else ""
                                lines.append(f"  → {next_node}{cond_str}")

                    return "\n".join(lines)

            # 使用示例

            if __name__ == "__main__":
                # 创建工作流
                flow = PocketFlow()

                # 定义节点函数
                def greet(state):
                    print(f"欢迎,{state.get('name', '访客')}!")
                    return {"greeted": True}

                def process(state):
                    print("处理中...")
                    return {"processed": True}

                def finish(state):
                    print("完成!")
                    return {"completed": True}

                # 构建图
                flow.add_node("start", greet) \
                    .add_node("process", process) \
                    .add_node("end", finish) \
                    .add_edge("start", "process") \
                    .add_edge("process", "end")

                # 可视化
                print(flow.visualize())
                print()

                # 执行
                result = flow.run("start", {"name": "Alice"}, verbose=True)
                print(f"\n最终状态: {result}")

            print("✓ 100行实现完成")
            ---

1.3 设计哲学

01.设计原则
    a.简单优于复杂
        a.功能说明
            遵循Python之禅的设计理念。简单的实现更容易理解和维护。避免过度设计和不必要的抽象。专注于核心功能。
        b.代码示例
            ---
            # 设计哲学:简单优于复杂

            # 1. 最小化依赖

            """
            PocketFlow只依赖Python标准库:
            - typing: 类型提示
            - 没有外部依赖

            对比其他框架:
            - LangChain: 50+ 依赖包
            - LangGraph: 20+ 依赖包
            - PocketFlow: 0 外部依赖
            """

            # 2. 显式优于隐式

            # 不好的设计(隐式)
            class ImplicitGraph:
                def __init__(self):
                    self._magic_state = {}  # 隐藏的状态

                def process(self, data):
                    # 隐式修改状态
                    self._magic_state["data"] = data
                    return self._auto_route()  # 自动路由

            # 好的设计(显式)
            class ExplicitGraph:
                def __init__(self):
                    self.state = {}  # 明确的状态

                def process(self, state):
                    # 显式传递和返回状态
                    new_state = {"data": state["input"]}
                    return new_state

                def run(self, start, state):
                    # 显式的执行流程
                    current = start
                    while current:
                        state = self.nodes[current](state)
                        current = self._next(current)
                    return state

            # 3. 扁平优于嵌套

            # 不好的设计(过度嵌套)
            class NestedWorkflow:
                class Node:
                    class Edge:
                        class Condition:
                            pass

            # 好的设计(扁平)
            class FlatWorkflow:
                nodes = {}
                edges = {}
                # 简单的字典结构

            # 4. 可读性至上

            # 不好的代码
            g=Graph();g.n("a",f1);g.e("a","b");g.r("a",{"x":1})

            # 好的代码
            graph = Graph()
            graph.add_node("start", process_func)
            graph.add_edge("start", "end")
            result = graph.run("start", {"input": data})

            # 5. 约定优于配置

            """
            默认行为简单合理:
            - 节点函数接收state,返回dict
            - 边默认无条件执行
            - 状态自动合并更新
            - 无需复杂配置文件
            """

            def simple_node(state):
                # 约定:接收state
                result = process(state["input"])
                # 约定:返回dict更新状态
                return {"output": result}

            # 6. 渐进式复杂度

            # 最简单:顺序执行
            flow.add_edge("a", "b").add_edge("b", "c")

            # 添加条件:分支
            flow.add_edge("a", "b", condition=lambda s: s["x"] > 0)
            flow.add_edge("a", "c", condition=lambda s: s["x"] <= 0)

            # 添加循环:重复
            flow.add_edge("process", "check")
            flow.add_edge("check", "process", condition=need_retry)
            flow.add_edge("check", "done", condition=lambda s: not need_retry(s))

            print("✓ 设计原则示例完成")
            ---
    b.教学友好
        a.功能说明
            代码结构清晰便于学习。每个概念都有对应的实现。适合理解Graph工作流原理。是优秀的教学材料。
        b.代码示例
            ---
            # 教学友好设计

            # 1. 渐进式学习路径

            # 第一步:理解节点
            def my_first_node(state):
                """节点就是一个函数"""
                print("这是一个节点")
                return {"step": 1}

            # 第二步:理解边
            graph = PocketFlow()
            graph.add_node("node1", my_first_node)
            graph.add_node("node2", lambda s: {"step": 2})
            graph.add_edge("node1", "node2")  # 边连接节点

            # 第三步:理解状态
            result = graph.run("node1", {"input": "data"})
            # 状态在节点间传递和累积

            # 第四步:理解条件
            graph.add_edge("check", "path_a", condition=lambda s: s["x"] > 0)

            # 2. 清晰的命名

            """
            好的命名:
            - add_node: 添加节点(动词+名词)
            - add_edge: 添加边(动词+名词)
            - run: 运行(简单动词)

            避免:
            - n: 太简短
            - register_processing_unit: 太复杂
            - do_the_thing: 不清晰
            """

            # 3. 完整的示例

            """
            每个概念都有完整的示例代码:
            - 节点定义
            - 边连接
            - 条件判断
            - 状态管理
            - 完整工作流
            """

            # 4. 逐步增加复杂度

            # 示例1:最简单
            flow = PocketFlow()
            flow.add_node("start", lambda s: {"done": True})
            result = flow.run("start")

            # 示例2:多个节点
            flow.add_node("end", lambda s: {"completed": True})
            flow.add_edge("start", "end")

            # 示例3:添加条件
            flow.add_edge("start", "path_a", condition=lambda s: s.get("x", 0) > 0)
            flow.add_edge("start", "path_b", condition=lambda s: s.get("x", 0) <= 0)

            # 5. 可视化帮助理解

            print(flow.visualize())
            # 工作流图:
            # ========================================
            # 节点: start
            #   → path_a (有条件)
            #   → path_b (有条件)
            # 节点: path_a
            # 节点: path_b

            print("✓ 教学友好设计完成")
            ---

2 Graph抽象

2.1 节点定义

01.节点函数
    a.函数签名
        a.功能说明
            节点是处理单元,接收state字典,返回更新字典。函数签名简单统一。支持同步和异步。节点是工作流的基本构建块。
        b.代码示例
            ---
            # 节点定义

            # 1. 基础节点

            def simple_node(state):
                """最简单的节点"""
                print("执行节点")
                return {"processed": True}

            # 2. 有输入输出的节点

            def process_data(state):
                """处理数据节点"""
                input_data = state.get("input")
                result = input_data.upper()  # 处理逻辑
                return {"output": result}

            # 3. 有副作用的节点

            def save_to_db(state):
                """保存到数据库"""
                data = state.get("data")
                # db.save(data)  # 副作用
                return {"saved": True}

            # 4. LLM节点

            def llm_call(state):
                """调用LLM"""
                prompt = state.get("prompt")
                # response = llm.generate(prompt)
                response = "LLM响应"
                return {"llm_response": response}

            # 5. 条件节点

            def decision_node(state):
                """决策节点"""
                score = state.get("score", 0)
                return {
                    "decision": "approve" if score > 0.5 else "reject",
                    "checked": True
                }

            print("✓ 节点定义完成")
            ---

2.2 边连接

01.控制流
    a.边的类型
        a.功能说明
            边定义节点间的连接关系。支持无条件边和条件边。实现顺序、分支和循环。边控制工作流的执行路径。
        b.代码示例
            ---
            # 边连接

            flow = PocketFlow()

            # 1. 无条件边(顺序)
            flow.add_edge("step1", "step2")
            flow.add_edge("step2", "step3")

            # 2. 条件边(分支)
            flow.add_edge("check", "path_a", condition=lambda s: s["score"] > 0.5)
            flow.add_edge("check", "path_b", condition=lambda s: s["score"] <= 0.5)

            # 3. 循环边
            def need_retry(state):
                return state.get("retry", 0) < 3 and not state.get("success")

            flow.add_edge("process", "validate")
            flow.add_edge("validate", "process", condition=need_retry)
            flow.add_edge("validate", "done", condition=lambda s: not need_retry(s))

            # 4. 复杂条件
            def complex_condition(state):
                return (
                    state.get("score", 0) > 0.7 and
                    state.get("confidence", 0) > 0.8 and
                    state.get("validated", False)
                )

            flow.add_edge("analyze", "approve", condition=complex_condition)

            print("✓ 边连接完成")
            ---

2.3 状态管理

01.状态传递
    a.状态字典
        a.功能说明
            状态是全局字典,在节点间传递。节点返回dict更新状态。状态累积所有节点的输出。状态是数据流的载体。
        b.代码示例
            ---
            # 状态管理

            # 1. 初始状态
            initial_state = {
                "input": "用户输入",
                "user_id": "123"
            }

            # 2. 节点更新状态
            def node1(state):
                return {"step1_done": True, "data": "处理后数据"}

            def node2(state):
                # 可以访问之前的状态
                previous_data = state.get("data")
                return {"step2_done": True, "final": previous_data + "_final"}

            # 3. 状态累积
            flow = PocketFlow()
            flow.add_node("n1", node1).add_node("n2", node2)
            flow.add_edge("n1", "n2")

            result = flow.run("n1", initial_state)
            # result = {
            #     "input": "用户输入",
            #     "user_id": "123",
            #     "step1_done": True,
            #     "data": "处理后数据",
            #     "step2_done": True,
            #     "final": "处理后数据_final"
            # }

            print(f"最终状态: {result}")
            print("✓ 状态管理完成")
            ---

3 基础示例

3.1 Chat对话

01.对话实现
    a.简单聊天机器人
        a.功能说明
            使用PocketFlow实现聊天机器人。维护对话历史。调用LLM生成响应。演示基础应用。
        b.代码示例
            ---
            # 使用PocketFlow实现Chat

            from litellm import completion

            def add_user_message(state):
                """添加用户消息"""
                message = state.get("user_input")
                history = state.get("history", [])
                history.append({"role": "user", "content": message})
                return {"history": history}

            def call_llm(state):
                """调用LLM"""
                history = state.get("history", [])

                response = completion(
                    model="gpt-3.5-turbo",
                    messages=history
                )

                ai_message = response.choices[0].message.content
                history.append({"role": "assistant", "content": ai_message})

                return {
                    "history": history,
                    "response": ai_message
                }

            def output_response(state):
                """输出响应"""
                print(f"AI: {state['response']}")
                return {}

            # 构建聊天流程
            chat_flow = PocketFlow()
            chat_flow.add_node("add_message", add_user_message)
            chat_flow.add_node("llm", call_llm)
            chat_flow.add_node("output", output_response)

            chat_flow.add_edge("add_message", "llm")
            chat_flow.add_edge("llm", "output")

            # 使用
            result = chat_flow.run("add_message", {"user_input": "你好"})
            print("✓ Chat对话完成")
            ---

3.2 Workflow工作流

01.业务流程
    a.审批工作流
        a.功能说明
            实现多步骤业务流程。支持条件判断和人工审批。演示工作流自动化。
        b.代码示例
            ---
            # 审批工作流

            def submit_request(state):
                """提交申请"""
                print(f"提交申请: {state['request']}")
                return {"status": "submitted"}

            def auto_check(state):
                """自动检查"""
                amount = state.get("amount", 0)
                return {"auto_approved": amount < 1000}

            def manual_review(state):
                """人工审核"""
                print("需要人工审核")
                return {"reviewed": True, "approved": True}

            def approve(state):
                """批准"""
                print("申请已批准")
                return {"status": "approved"}

            def reject(state):
                """拒绝"""
                print("申请被拒绝")
                return {"status": "rejected"}

            # 构建工作流
            workflow = PocketFlow()
            workflow.add_node("submit", submit_request)
            workflow.add_node("check", auto_check)
            workflow.add_node("review", manual_review)
            workflow.add_node("approve", approve)
            workflow.add_node("reject", reject)

            workflow.add_edge("submit", "check")
            workflow.add_edge("check", "approve", condition=lambda s: s.get("auto_approved"))
            workflow.add_edge("check", "review", condition=lambda s: not s.get("auto_approved"))
            workflow.add_edge("review", "approve", condition=lambda s: s.get("approved"))
            workflow.add_edge("review", "reject", condition=lambda s: not s.get("approved"))

            # 执行
            result = workflow.run("submit", {
                "request": "采购申请",
                "amount": 5000
            }, verbose=True)

            print(f"最终状态: {result['status']}")
            print("✓ Workflow工作流完成")
            ---

3.3 Agent代理

01.Agent实现
    a.ReAct模式
        a.功能说明
            实现ReAct Agent模式。支持工具调用和推理循环。演示Agent能力。
        b.代码示例
            ---
            # Agent实现

            def think(state):
                """思考下一步"""
                question = state.get("question")
                # 使用LLM决定下一步
                thought = f"需要查询知识库回答: {question}"
                return {"thought": thought, "action": "search"}

            def search_tool(state):
                """搜索工具"""
                # 模拟搜索
                results = ["结果1", "结果2"]
                return {"search_results": results}

            def answer(state):
                """生成答案"""
                results = state.get("search_results", [])
                answer = f"根据搜索结果: {results}"
                return {"answer": answer}

            # 构建Agent
            agent = PocketFlow()
            agent.add_node("think", think)
            agent.add_node("search", search_tool)
            agent.add_node("answer", answer)

            agent.add_edge("think", "search")
            agent.add_edge("search", "answer")

            result = agent.run("think", {"question": "什么是AI?"})
            print(f"答案: {result['answer']}")
            print("✓ Agent代理完成")
            ---

3.4 RAG检索

01.RAG实现
    a.检索增强
        a.功能说明
            实现简单的RAG流程。包括检索、上下文构建和生成。演示RAG核心逻辑。
        b.代码示例
            ---
            # RAG实现

            def retrieve(state):
                """检索相关文档"""
                query = state.get("query")
                # 模拟向量检索
                docs = ["文档1内容", "文档2内容"]
                return {"retrieved_docs": docs}

            def build_context(state):
                """构建上下文"""
                docs = state.get("retrieved_docs", [])
                context = "\n".join(docs)
                return {"context": context}

            def generate_answer(state):
                """生成答案"""
                context = state.get("context")
                query = state.get("query")

                prompt = f"上下文:\n{context}\n\n问题: {query}\n\n回答:"

                # 调用LLM
                from litellm import completion
                response = completion(
                    model="gpt-3.5-turbo",
                    messages=[{"role": "user", "content": prompt}]
                )

                return {"answer": response.choices[0].message.content}

            # 构建RAG流程
            rag = PocketFlow()
            rag.add_node("retrieve", retrieve)
            rag.add_node("build_context", build_context)
            rag.add_node("generate", generate_answer)

            rag.add_edge("retrieve", "build_context")
            rag.add_edge("build_context", "generate")

            result = rag.run("retrieve", {"query": "什么是Python?"})
            print(f"答案: {result['answer']}")
            print("✓ RAG检索完成")
            ---

4 高级模式

4.1 Multi-Agent

01.多Agent协作
    a.Agent组合
        a.功能说明
            实现多个Agent协作完成复杂任务。每个Agent负责特定子任务。通过状态传递协调。
        b.代码示例
            ---
            # Multi-Agent实现

            def research_agent(state):
                """研究Agent"""
                topic = state.get("topic")
                # 模拟研究
                findings = f"{topic}的研究结果"
                return {"research": findings}

            def writer_agent(state):
                """写作Agent"""
                research = state.get("research")
                # 模拟写作
                article = f"基于{research}的文章"
                return {"article": article}

            def reviewer_agent(state):
                """审核Agent"""
                article = state.get("article")
                # 模拟审核
                return {"reviewed": True, "approved": True}

            # 构建Multi-Agent流程
            multi_agent = PocketFlow()
            multi_agent.add_node("research", research_agent)
            multi_agent.add_node("writer", writer_agent)
            multi_agent.add_node("reviewer", reviewer_agent)

            multi_agent.add_edge("research", "writer")
            multi_agent.add_edge("writer", "reviewer")

            result = multi_agent.run("research", {"topic": "AI技术"})
            print(f"最终结果: {result}")
            print("✓ Multi-Agent完成")
            ---

4.2 Supervisor监督

01.监督模式
    a.中央协调
        a.功能说明
            Supervisor节点协调多个Worker。根据任务分配给不同Worker。汇总Worker结果。
        b.代码示例
            ---
            # Supervisor模式

            def supervisor(state):
                """监督者决定任务分配"""
                task_type = state.get("task_type")

                if task_type == "translate":
                    return {"route": "translator"}
                elif task_type == "summarize":
                    return {"route": "summarizer"}
                else:
                    return {"route": "general"}

            def translator(state):
                """翻译Worker"""
                return {"result": "翻译结果"}

            def summarizer(state):
                """摘要Worker"""
                return {"result": "摘要结果"}

            def general_worker(state):
                """通用Worker"""
                return {"result": "通用结果"}

            # 构建Supervisor流程
            supervisor_flow = PocketFlow()
            supervisor_flow.add_node("supervisor", supervisor)
            supervisor_flow.add_node("translator", translator)
            supervisor_flow.add_node("summarizer", summarizer)
            supervisor_flow.add_node("general", general_worker)

            supervisor_flow.add_edge("supervisor", "translator",
                                    condition=lambda s: s.get("route") == "translator")
            supervisor_flow.add_edge("supervisor", "summarizer",
                                    condition=lambda s: s.get("route") == "summarizer")
            supervisor_flow.add_edge("supervisor", "general",
                                    condition=lambda s: s.get("route") == "general")

            result = supervisor_flow.run("supervisor", {"task_type": "translate"})
            print(f"结果: {result['result']}")
            print("✓ Supervisor监督完成")
            ---

4.3 Map-Reduce

01.并行处理
    a.Map阶段
        a.功能说明
            将任务拆分为多个子任务并行处理。汇总所有子任务结果。实现数据并行。
        b.代码示例
            ---
            # Map-Reduce模式

            def split_tasks(state):
                """Map: 拆分任务"""
                items = state.get("items", [])
                return {"tasks": items, "results": []}

            def process_batch(state):
                """处理批次"""
                tasks = state.get("tasks", [])
                results = []

                # 模拟批量处理
                for task in tasks:
                    result = f"处理: {task}"
                    results.append(result)

                return {"results": results}

            def reduce_results(state):
                """Reduce: 汇总结果"""
                results = state.get("results", [])
                summary = f"共处理{len(results)}个任务"
                return {"summary": summary}

            # 构建Map-Reduce流程
            mapreduce = PocketFlow()
            mapreduce.add_node("split", split_tasks)
            mapreduce.add_node("process", process_batch)
            mapreduce.add_node("reduce", reduce_results)

            mapreduce.add_edge("split", "process")
            mapreduce.add_edge("process", "reduce")

            result = mapreduce.run("split", {
                "items": ["任务1", "任务2", "任务3"]
            })

            print(f"汇总: {result['summary']}")
            print("✓ Map-Reduce完成")
            ---

4.4 Streaming流式

01.流式处理
    a.实时输出
        a.功能说明
            支持流式输出生成内容。实时更新状态。提升用户体验。
        b.代码示例
            ---
            # 流式处理

            def stream_llm(state):
                """流式调用LLM"""
                from litellm import completion

                response = completion(
                    model="gpt-3.5-turbo",
                    messages=[{"role": "user", "content": state["prompt"]}],
                    stream=True
                )

                full_text = ""
                print("AI: ", end="", flush=True)

                for chunk in response:
                    if chunk.choices[0].delta.content:
                        token = chunk.choices[0].delta.content
                        print(token, end="", flush=True)
                        full_text += token

                print()
                return {"response": full_text}

            # 构建流式流程
            stream_flow = PocketFlow()
            stream_flow.add_node("stream", stream_llm)

            result = stream_flow.run("stream", {"prompt": "讲个故事"})
            print("✓ Streaming流式完成")
            ---

5 与其他框架对比

5.1 vs LangChain

01.对比分析
    a.复杂度对比
        a.功能说明
            PocketFlow极简,LangChain功能丰富。PocketFlow适合学习和原型,LangChain适合生产。根据需求选择框架。
        b.代码示例
            ---
            # PocketFlow vs LangChain

            # PocketFlow实现(简单)

            flow = PocketFlow()
            flow.add_node("llm", lambda s: {"response": "AI响应"})
            result = flow.run("llm", {"input": "问题"})

            # LangChain实现(完整)

            from langchain.chat_models import ChatOpenAI
            from langchain.chains import LLMChain
            from langchain.prompts import ChatPromptTemplate

            llm = ChatOpenAI()
            prompt = ChatPromptTemplate.from_messages([
                ("system", "你是助手"),
                ("user", "{input}")
            ])
            chain = LLMChain(llm=llm, prompt=prompt)
            result = chain.run(input="问题")

            """
            对比:
            PocketFlow:
            - 代码: 10行
            - 依赖: 0
            - 学习曲线: 平缓
            - 适用: 原型/学习

            LangChain:
            - 代码: 10行(但底层数千行)
            - 依赖: 50+
            - 学习曲线: 陡峭
            - 适用: 生产环境
            """

            print("✓ vs LangChain完成")
            ---

5.2 vs LangGraph

01.Graph对比
    a.设计差异
        a.功能说明
            PocketFlow和LangGraph都基于Graph抽象。LangGraph功能更强大,PocketFlow更简洁。选择取决于需求复杂度。
        b.代码示例
            ---
            # PocketFlow vs LangGraph

            # PocketFlow(100行)
            flow = PocketFlow()
            flow.add_node("start", func1)
            flow.add_edge("start", "end")
            result = flow.run("start")

            # LangGraph(功能丰富)
            from langgraph.graph import StateGraph

            graph = StateGraph()
            graph.add_node("start", func1)
            graph.add_edge("start", "end")
            app = graph.compile()
            result = app.invoke(state)

            """
            相似点:
            - 都基于Graph抽象
            - 都有节点和边
            - 都管理状态

            差异点:
            PocketFlow:
            - 100行实现
            - 简单直接
            - 适合学习

            LangGraph:
            - 完整功能
            - 类型检查
            - Checkpointing
            - Human-in-loop
            - 适合生产
            """

            print("✓ vs LangGraph完成")
            ---

5.3 适用场景

01.使用建议
    a.选择指南
        a.功能说明
            根据项目特点选择合适框架。原型和学习选PocketFlow。生产环境选LangChain/LangGraph。
        b.代码示例
            ---
            # 选择框架指南

            """
            选择PocketFlow的场景:
            1. 学习LLM应用开发原理
            2. 快速验证想法
            3. 简单的内部工具
            4. 理解Graph工作流
            5. 不想引入复杂依赖

            选择LangChain的场景:
            1. 生产级应用
            2. 需要丰富的内置组件
            3. 多种数据源集成
            4. 复杂的Chain组合
            5. 完整的生态支持

            选择LangGraph的场景:
            1. 复杂的Agent系统
            2. 需要状态持久化
            3. Human-in-the-loop
            4. 多Agent协作
            5. 生产级Graph应用
            """

            # 渐进式迁移路径

            """
            阶段1: PocketFlow原型
            - 快速验证想法
            - 理解核心逻辑

            阶段2: 功能扩展
            - 添加错误处理
            - 增加监控日志

            阶段3: 迁移到LangGraph
            - 保持Graph结构
            - 使用生产级功能
            - 添加持久化
            """

            print("✓ 适用场景完成")
            ---

6 实战案例

6.1 快速原型

01.原型开发
    a.快速验证
        a.功能说明
            使用PocketFlow快速验证AI应用想法。无需复杂配置。几十行代码实现完整流程。
        b.代码示例
            ---
            # 快速原型示例

            from litellm import completion

            def validate_input(state):
                """验证输入"""
                user_input = state.get("input")
                if not user_input:
                    return {"error": "输入为空"}
                return {"validated": True}

            def generate_response(state):
                """生成响应"""
                response = completion(
                    model="gpt-3.5-turbo",
                    messages=[{"role": "user", "content": state["input"]}]
                )
                return {"response": response.choices[0].message.content}

            def format_output(state):
                """格式化输出"""
                return {"final": f"答案: {state['response']}"}

            # 构建原型
            prototype = PocketFlow()
            prototype.add_node("validate", validate_input)
            prototype.add_node("generate", generate_response)
            prototype.add_node("format", format_output)

            prototype.add_edge("validate", "generate",
                             condition=lambda s: s.get("validated"))
            prototype.add_edge("generate", "format")

            # 测试
            result = prototype.run("validate", {"input": "什么是AI?"})
            print(result["final"])
            print("✓ 快速原型完成")
            ---

6.2 客服助手

01.客服系统
    a.智能客服
        a.功能说明
            实现简单的智能客服系统。支持意图识别和问题路由。演示完整应用。
        b.代码示例
            ---
            # 客服助手

            def classify_intent(state):
                """意图分类"""
                question = state.get("question")

                # 简单规则分类
                if "退货" in question or "退款" in question:
                    return {"intent": "refund"}
                elif "物流" in question or "快递" in question:
                    return {"intent": "logistics"}
                else:
                    return {"intent": "general"}

            def handle_refund(state):
                """处理退货"""
                return {"answer": "退货流程:1. 申请退货 2. 寄回商品 3. 审核退款"}

            def handle_logistics(state):
                """处理物流"""
                return {"answer": "请提供订单号,我帮您查询物流信息"}

            def handle_general(state):
                """通用处理"""
                from litellm import completion
                response = completion(
                    model="gpt-3.5-turbo",
                    messages=[{"role": "user", "content": state["question"]}]
                )
                return {"answer": response.choices[0].message.content}

            # 构建客服流程
            customer_service = PocketFlow()
            customer_service.add_node("classify", classify_intent)
            customer_service.add_node("refund", handle_refund)
            customer_service.add_node("logistics", handle_logistics)
            customer_service.add_node("general", handle_general)

            customer_service.add_edge("classify", "refund",
                                     condition=lambda s: s["intent"] == "refund")
            customer_service.add_edge("classify", "logistics",
                                     condition=lambda s: s["intent"] == "logistics")
            customer_service.add_edge("classify", "general",
                                     condition=lambda s: s["intent"] == "general")

            # 测试
            result = customer_service.run("classify", {"question": "如何退货?"})
            print(f"答案: {result['answer']}")
            print("✓ 客服助手完成")
            ---

6.3 文档处理

01.文档流水线
    a.处理流程
        a.功能说明
            实现文档处理流水线。包括加载、分析、摘要和存储。演示完整数据处理。
        b.代码示例
            ---
            # 文档处理流水线

            def load_document(state):
                """加载文档"""
                filepath = state.get("filepath")
                with open(filepath, 'r', encoding='utf-8') as f:
                    content = f.read()
                return {"content": content}

            def analyze_document(state):
                """分析文档"""
                content = state.get("content")
                word_count = len(content.split())
                return {
                    "word_count": word_count,
                    "length": len(content)
                }

            def summarize_document(state):
                """生成摘要"""
                content = state.get("content")

                from litellm import completion
                response = completion(
                    model="gpt-3.5-turbo",
                    messages=[{
                        "role": "user",
                        "content": f"请总结以下内容:\n{content[:1000]}"
                    }]
                )

                return {"summary": response.choices[0].message.content}

            def save_results(state):
                """保存结果"""
                summary = state.get("summary")
                # 保存到数据库或文件
                print(f"保存摘要: {summary}")
                return {"saved": True}

            # 构建处理流程
            doc_pipeline = PocketFlow()
            doc_pipeline.add_node("load", load_document)
            doc_pipeline.add_node("analyze", analyze_document)
            doc_pipeline.add_node("summarize", summarize_document)
            doc_pipeline.add_node("save", save_results)

            doc_pipeline.add_edge("load", "analyze")
            doc_pipeline.add_edge("analyze", "summarize")
            doc_pipeline.add_edge("summarize", "save")

            # 执行(假设文件存在)
            # result = doc_pipeline.run("load", {"filepath": "./test.txt"})
            print("✓ 文档处理完成")
            ---

7 扩展开发

7.1 自定义节点

01.扩展节点
    a.高级节点
        a.功能说明
            实现带错误处理、重试、日志的高级节点。封装常用功能模式。提升代码复用性。
        b.代码示例
            ---
            # 自定义高级节点

            # 1. 带重试的节点

            def with_retry(func, max_retries=3):
                """添加重试功能的节点装饰器"""
                def wrapped(state):
                    for attempt in range(max_retries):
                        try:
                            return func(state)
                        except Exception as e:
                            print(f"尝试 {attempt + 1} 失败: {e}")
                            if attempt == max_retries - 1:
                                return {"error": str(e)}
                    return {}
                return wrapped

            # 使用
            @with_retry
            def risky_operation(state):
                """可能失败的操作"""
                import random
                if random.random() < 0.5:
                    raise Exception("随机失败")
                return {"success": True}

            # 2. 带日志的节点

            def with_logging(func):
                """添加日志的节点装饰器"""
                def wrapped(state):
                    print(f"[LOG] 执行: {func.__name__}")
                    print(f"[LOG] 输入状态: {state}")

                    result = func(state)

                    print(f"[LOG] 输出: {result}")
                    return result
                return wrapped

            @with_logging
            def process_data(state):
                return {"processed": True}

            # 3. 带计时的节点

            import time

            def with_timing(func):
                """添加计时的节点装饰器"""
                def wrapped(state):
                    start = time.time()
                    result = func(state)
                    duration = time.time() - start

                    print(f"节点耗时: {duration:.2f}秒")
                    return {**result, "duration": duration}
                return wrapped

            @with_timing
            def slow_operation(state):
                time.sleep(1)
                return {"done": True}

            print("✓ 自定义节点完成")
            ---

7.2 工具集成

01.外部工具
    a.工具调用
        a.功能说明
            集成搜索、数据库、API等外部工具。扩展PocketFlow能力。实现更复杂的应用。
        b.代码示例
            ---
            # 工具集成

            # 1. 搜索工具

            def search_tool(state):
                """搜索工具"""
                query = state.get("query")
                # 模拟搜索API
                results = [f"搜索结果: {query}"]
                return {"search_results": results}

            # 2. 数据库工具

            def db_query_tool(state):
                """数据库查询工具"""
                sql = state.get("sql")
                # 模拟数据库查询
                # results = db.execute(sql)
                results = ["数据1", "数据2"]
                return {"db_results": results}

            # 3. API调用工具

            import requests

            def api_call_tool(state):
                """API调用工具"""
                endpoint = state.get("endpoint")
                try:
                    # response = requests.get(endpoint)
                    # data = response.json()
                    data = {"api_response": "模拟数据"}
                    return {"api_data": data}
                except Exception as e:
                    return {"api_error": str(e)}

            # 4. 工具组合

            flow = PocketFlow()
            flow.add_node("search", search_tool)
            flow.add_node("db_query", db_query_tool)
            flow.add_node("api_call", api_call_tool)

            flow.add_edge("search", "db_query")
            flow.add_edge("db_query", "api_call")

            print("✓ 工具集成完成")
            ---

7.3 源码解读

01.核心源码
    a.实现细节
        a.功能说明
            深入解读PocketFlow的100行核心代码。理解每个方法的设计思路。掌握Graph工作流的本质。
        b.代码示例
            ---
            # PocketFlow源码解读

            """
            核心数据结构:

            1. nodes字典: {节点名: 处理函数}
               - 存储所有节点
               - O(1)查找节点

            2. edges字典: {起始节点: [(目标节点, 条件函数), ...]}
               - 存储节点连接关系
               - 一个节点可以有多条出边
               - 条件函数决定是否走这条边

            3. state字典: {键: 值}
               - 全局状态
               - 节点间共享数据
               - 累积所有节点的输出
            """

            # 1. add_node方法

            def add_node(self, name: str, func: Callable):
                """
                添加节点到nodes字典

                设计决策:
                - 使用字典存储,快速查找
                - 节点名作为key,必须唯一
                - func可以是任意callable
                - 返回self支持链式调用
                """
                self.nodes[name] = func
                return self

            # 2. add_edge方法

            def add_edge(self, from_node: str, to_node: str, condition=None):
                """
                添加边到edges字典

                设计决策:
                - 使用列表存储多条出边
                - condition为None表示无条件边
                - 边是(目标节点, 条件)元组
                - 返回self支持链式调用
                """
                if from_node not in self.edges:
                    self.edges[from_node] = []
                self.edges[from_node].append((to_node, condition))
                return self

            # 3. run方法(核心执行逻辑)

            def run(self, start_node: str, initial_state=None, verbose=False):
                """
                执行工作流

                算法:
                1. 初始化状态
                2. 从起始节点开始
                3. while循环执行:
                   a. 执行当前节点函数
                   b. 更新状态
                   c. 确定下一个节点
                4. 返回最终状态

                设计决策:
                - while循环而不是递归(避免栈溢出)
                - max_steps防止无限循环
                - 状态累积而不是替换
                """
                if initial_state:
                    self.state = initial_state.copy()
                else:
                    self.state = {}

                current = start_node
                step_count = 0
                max_steps = 100

                while current and step_count < max_steps:
                    # 执行节点
                    result = self.nodes[current](self.state)

                    # 更新状态(合并dict)
                    if isinstance(result, dict):
                        self.state.update(result)

                    # 下一个节点
                    current = self._next_node(current)
                    step_count += 1

                return self.state

            # 4. _next_node方法(路由逻辑)

            def _next_node(self, current: str):
                """
                确定下一个节点

                算法:
                1. 获取当前节点的所有出边
                2. 遍历每条边
                3. 如果边无条件或条件满足,返回目标节点
                4. 否则返回None(结束)

                设计决策:
                - 返回第一个满足条件的边
                - 边的顺序很重要
                - 无出边返回None终止
                """
                if current not in self.edges:
                    return None

                for next_node, condition in self.edges[current]:
                    if condition is None or condition(self.state):
                        return next_node

                return None

            """
            总结:
            - 约100行实现完整Graph框架
            - 核心是nodes和edges两个字典
            - while循环驱动执行
            - 状态在节点间传递
            - 条件函数控制路由

            扩展方向:
            1. 添加异步支持
            2. 添加并行执行
            3. 添加状态持久化
            4. 添加可视化
            5. 添加调试工具
            """

            print("✓ 源码解读完成")
            ---