1 深度学习基础

1.1 神经网络基础

01.感知机
    a.单层感知机
        a.模型结构
            输入层 → 加权求和 → 激活函数 → 输出
        b.数学表达
            y = f(w^T x + b),f为激活函数
        c.代码实现
            ---
            import numpy as np

            class Perceptron:
                def __init__(self, learning_rate=0.01, n_iterations=1000):
                    self.lr = learning_rate
                    self.n_iterations = n_iterations
                    self.weights = None
                    self.bias = None

                def fit(self, X, y):
                    n_samples, n_features = X.shape
                    self.weights = np.zeros(n_features)
                    self.bias = 0

                    for _ in range(self.n_iterations):
                        for idx, x_i in enumerate(X):
                            linear_output = np.dot(x_i, self.weights) + self.bias
                            y_predicted = self._activation(linear_output)

                            # 更新权重
                            update = self.lr * (y[idx] - y_predicted)
                            self.weights += update * x_i
                            self.bias += update

                def _activation(self, x):
                    return np.where(x >= 0, 1, 0)

                def predict(self, X):
                    linear_output = np.dot(X, self.weights) + self.bias
                    return self._activation(linear_output)

            # 测试
            from sklearn.datasets import make_classification
            from sklearn.model_selection import train_test_split

            X, y = make_classification(n_samples=100, n_features=2, n_redundant=0,
                                       n_informative=2, n_clusters_per_class=1,
                                       random_state=42)
            y = np.where(y == 0, 0, 1)

            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, random_state=42
            )

            perceptron = Perceptron(learning_rate=0.01, n_iterations=1000)
            perceptron.fit(X_train, y_train)
            predictions = perceptron.predict(X_test)

            accuracy = np.mean(predictions == y_test)
            print(f"感知机准确率: {accuracy:.4f}")
            ---

02.多层感知机
    a.网络结构
        a.输入层
            接收原始特征
        b.隐藏层
            多个隐藏层,每层多个神经元
        c.输出层
            产生最终预测
    b.前向传播
        ---
        import numpy as np

        class MLP:
            def __init__(self, layer_sizes):
                self.layer_sizes = layer_sizes
                self.weights = []
                self.biases = []

                # 初始化权重和偏置
                for i in range(len(layer_sizes) - 1):
                    w = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
                    b = np.zeros((1, layer_sizes[i+1]))
                    self.weights.append(w)
                    self.biases.append(b)

            def sigmoid(self, z):
                return 1 / (1 + np.exp(-np.clip(z, -500, 500)))

            def sigmoid_derivative(self, z):
                s = self.sigmoid(z)
                return s * (1 - s)

            def forward(self, X):
                self.activations = [X]
                self.z_values = []

                for i in range(len(self.weights)):
                    z = np.dot(self.activations[-1], self.weights[i]) + self.biases[i]
                    self.z_values.append(z)
                    a = self.sigmoid(z)
                    self.activations.append(a)

                return self.activations[-1]

            def backward(self, X, y, learning_rate=0.01):
                m = X.shape[0]
                deltas = [None] * len(self.weights)

                # 输出层误差
                deltas[-1] = (self.activations[-1] - y) * self.sigmoid_derivative(self.z_values[-1])

                # 反向传播误差
                for i in range(len(deltas) - 2, -1, -1):
                    deltas[i] = np.dot(deltas[i+1], self.weights[i+1].T) * \
                               self.sigmoid_derivative(self.z_values[i])

                # 更新权重和偏置
                for i in range(len(self.weights)):
                    self.weights[i] -= learning_rate * np.dot(self.activations[i].T, deltas[i]) / m
                    self.biases[i] -= learning_rate * np.sum(deltas[i], axis=0, keepdims=True) / m

            def train(self, X, y, epochs=1000, learning_rate=0.01):
                for epoch in range(epochs):
                    self.forward(X)
                    self.backward(X, y, learning_rate)

                    if epoch % 100 == 0:
                        loss = np.mean((self.activations[-1] - y) ** 2)
                        print(f"Epoch {epoch}, Loss: {loss:.4f}")

            def predict(self, X):
                return self.forward(X)

        # 测试
        X = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
        y = np.array([[0], [1], [1], [0]])  # XOR问题

        mlp = MLP([2, 4, 1])
        mlp.train(X, y, epochs=5000, learning_rate=0.5)

        predictions = mlp.predict(X)
        print(f"\nXOR预测结果:\n{predictions}")
        ---

03.激活函数
    a.Sigmoid
        a.公式
            σ(x) = 1 / (1 + e^(-x))
        b.特点
            输出范围(0, 1),适合二分类
        c.缺点
            梯度消失问题
    b.ReLU
        a.公式
            f(x) = max(0, x)
        b.优点
            计算简单,缓解梯度消失
        c.变体
            Leaky ReLU, PReLU, ELU
    c.代码对比
        ---
        import matplotlib.pyplot as plt

        def sigmoid(x):
            return 1 / (1 + np.exp(-x))

        def tanh(x):
            return np.tanh(x)

        def relu(x):
            return np.maximum(0, x)

        def leaky_relu(x, alpha=0.01):
            return np.where(x > 0, x, alpha * x)

        # 可视化
        x = np.linspace(-5, 5, 100)

        plt.figure(figsize=(15, 10))

        # Sigmoid
        plt.subplot(2, 3, 1)
        plt.plot(x, sigmoid(x), linewidth=2)
        plt.title('Sigmoid')
        plt.grid(True)

        # Tanh
        plt.subplot(2, 3, 2)
        plt.plot(x, tanh(x), linewidth=2)
        plt.title('Tanh')
        plt.grid(True)

        # ReLU
        plt.subplot(2, 3, 3)
        plt.plot(x, relu(x), linewidth=2)
        plt.title('ReLU')
        plt.grid(True)

        # Leaky ReLU
        plt.subplot(2, 3, 4)
        plt.plot(x, leaky_relu(x), linewidth=2)
        plt.title('Leaky ReLU')
        plt.grid(True)

        # 梯度对比
        plt.subplot(2, 3, 5)
        plt.plot(x, sigmoid(x) * (1 - sigmoid(x)), label='Sigmoid', linewidth=2)
        plt.plot(x, 1 - tanh(x)**2, label='Tanh', linewidth=2)
        plt.plot(x, (x > 0).astype(float), label='ReLU', linewidth=2)
        plt.title('激活函数梯度')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()
        ---

1.2 损失函数与优化器

01.损失函数
    a.均方误差
        a.公式
            MSE = (1/n) * Σ(y - ŷ)²
        b.适用场景
            回归任务
        c.代码实现
            ---
            import torch
            import torch.nn as nn

            # MSE损失
            mse_loss = nn.MSELoss()

            # 示例
            predictions = torch.tensor([2.5, 0.0, 2.1, 7.8])
            targets = torch.tensor([3.0, -0.5, 2.0, 8.0])

            loss = mse_loss(predictions, targets)
            print(f"MSE Loss: {loss.item():.4f}")

            # 手动计算验证
            manual_mse = torch.mean((predictions - targets) ** 2)
            print(f"Manual MSE: {manual_mse.item():.4f}")
            ---
    b.交叉熵损失
        a.二分类
            BCE = -[y*log(ŷ) + (1-y)*log(1-ŷ)]
        b.多分类
            CE = -Σ y_i * log(ŷ_i)
        c.代码实现
            ---
            # 二分类交叉熵
            bce_loss = nn.BCELoss()

            predictions = torch.tensor([0.9, 0.2, 0.8, 0.3])
            targets = torch.tensor([1.0, 0.0, 1.0, 0.0])

            loss = bce_loss(predictions, targets)
            print(f"BCE Loss: {loss.item():.4f}")

            # 多分类交叉熵
            ce_loss = nn.CrossEntropyLoss()

            # logits (未经softmax)
            logits = torch.randn(4, 3)  # 4个样本,3个类别
            targets = torch.tensor([0, 2, 1, 0])

            loss = ce_loss(logits, targets)
            print(f"CE Loss: {loss.item():.4f}")
            ---

02.优化器
    a.SGD
        a.原理
            w = w - lr * ∇w
        b.动量
            v = β*v + ∇w
            w = w - lr * v
        c.代码实现
            ---
            import torch.optim as optim

            # 创建简单模型
            model = nn.Linear(10, 1)

            # SGD优化器
            optimizer_sgd = optim.SGD(model.parameters(), lr=0.01)

            # SGD with momentum
            optimizer_momentum = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

            # 训练步骤
            for epoch in range(100):
                optimizer_sgd.zero_grad()

                # 前向传播
                X = torch.randn(32, 10)
                y = torch.randn(32, 1)
                predictions = model(X)

                # 计算损失
                loss = nn.MSELoss()(predictions, y)

                # 反向传播
                loss.backward()

                # 更新参数
                optimizer_sgd.step()

                if epoch % 20 == 0:
                    print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
            ---
    b.Adam
        a.原理
            结合动量和自适应学习率
            m = β₁*m + (1-β₁)*∇w
            v = β₂*v + (1-β₂)*(∇w)²
            w = w - lr * m / (√v + ε)
        b.优点
            自适应学习率,收敛快
        c.代码实现
            ---
            # Adam优化器
            optimizer_adam = optim.Adam(model.parameters(), lr=0.001)

            # 训练循环
            losses = []
            for epoch in range(100):
                optimizer_adam.zero_grad()

                X = torch.randn(32, 10)
                y = torch.randn(32, 1)
                predictions = model(X)

                loss = nn.MSELoss()(predictions, y)
                loss.backward()
                optimizer_adam.step()

                losses.append(loss.item())

            # 可视化损失
            import matplotlib.pyplot as plt

            plt.figure(figsize=(10, 6))
            plt.plot(losses, linewidth=2)
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.title('Training Loss with Adam')
            plt.grid(True)
            plt.show()
            ---
    c.学习率调度
        a.StepLR
            每隔step_size个epoch降低学习率
        b.ReduceLROnPlateau
            当指标不再改善时降低学习率
        c.代码实现
            ---
            from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau

            # StepLR
            optimizer = optim.Adam(model.parameters(), lr=0.1)
            scheduler_step = StepLR(optimizer, step_size=30, gamma=0.1)

            for epoch in range(100):
                # 训练代码
                optimizer.zero_grad()
                loss = torch.tensor(1.0, requires_grad=True)
                loss.backward()
                optimizer.step()

                # 更新学习率
                scheduler_step.step()

                if epoch % 20 == 0:
                    current_lr = optimizer.param_groups[0]['lr']
                    print(f"Epoch {epoch}, LR: {current_lr:.6f}")

            # ReduceLROnPlateau
            optimizer = optim.Adam(model.parameters(), lr=0.1)
            scheduler_plateau = ReduceLROnPlateau(optimizer, mode='min',
                                                  factor=0.1, patience=10)

            for epoch in range(100):
                # 训练并计算验证损失
                val_loss = torch.rand(1).item()

                # 根据验证损失调整学习率
                scheduler_plateau.step(val_loss)
            ---

03.反向传播算法
    a.链式法则
        ∂L/∂w = ∂L/∂y * ∂y/∂z * ∂z/∂w
    b.梯度计算
        ---
        # PyTorch自动微分
        x = torch.tensor([2.0], requires_grad=True)
        y = x ** 2 + 3 * x + 1

        # 反向传播
        y.backward()

        print(f"x = {x.item()}")
        print(f"y = {y.item()}")
        print(f"dy/dx = {x.grad.item()}")
        print(f"理论值: 2*x + 3 = {2*x.item() + 3}")

        # 多变量梯度
        x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
        y = torch.sum(x ** 2)

        y.backward()
        print(f"\nx = {x.data}")
        print(f"y = {y.item()}")
        print(f"dy/dx = {x.grad}")
        ---
    c.梯度消失与爆炸
        a.梯度消失
            深层网络中梯度趋近于0
        b.梯度爆炸
            梯度变得非常大
        c.解决方法
            ---
            # 梯度裁剪
            import torch.nn.utils as nn_utils

            model = nn.Sequential(
                nn.Linear(10, 50),
                nn.ReLU(),
                nn.Linear(50, 50),
                nn.ReLU(),
                nn.Linear(50, 1)
            )

            optimizer = optim.Adam(model.parameters(), lr=0.001)

            for epoch in range(10):
                optimizer.zero_grad()

                X = torch.randn(32, 10)
                y = torch.randn(32, 1)
                predictions = model(X)

                loss = nn.MSELoss()(predictions, y)
                loss.backward()

                # 梯度裁剪
                nn_utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

                optimizer.step()

                # 检查梯度
                total_norm = 0
                for p in model.parameters():
                    if p.grad is not None:
                        param_norm = p.grad.data.norm(2)
                        total_norm += param_norm.item() ** 2
                total_norm = total_norm ** 0.5

                print(f"Epoch {epoch}, Loss: {loss.item():.4f}, Grad Norm: {total_norm:.4f}")
            ---

2 卷积神经网络

2.1 卷积层原理

01.卷积操作
    a.基本概念
        a.卷积核
            小的权重矩阵,在输入上滑动
        b.步长
            卷积核移动的距离
        c.填充
            在输入边缘添加0
    b.数学表达
        (f * g)(i,j) = ΣΣ f(m,n) * g(i-m, j-n)
    c.代码实现
        ---
        import torch
        import torch.nn as nn
        import torch.nn.functional as F

        # 手动实现卷积
        def manual_conv2d(input, kernel):
            batch, in_channels, h, w = input.shape
            out_channels, _, kh, kw = kernel.shape

            out_h = h - kh + 1
            out_w = w - kw + 1

            output = torch.zeros(batch, out_channels, out_h, out_w)

            for b in range(batch):
                for oc in range(out_channels):
                    for i in range(out_h):
                        for j in range(out_w):
                            for ic in range(in_channels):
                                output[b, oc, i, j] += torch.sum(
                                    input[b, ic, i:i+kh, j:j+kw] * kernel[oc, ic]
                                )

            return output

        # 测试
        input = torch.randn(1, 1, 5, 5)
        kernel = torch.randn(1, 1, 3, 3)

        output_manual = manual_conv2d(input, kernel)
        output_pytorch = F.conv2d(input, kernel)

        print(f"手动卷积输出形状: {output_manual.shape}")
        print(f"PyTorch卷积输出形状: {output_pytorch.shape}")
        print(f"结果是否接近: {torch.allclose(output_manual, output_pytorch, atol=1e-6)}")
        ---

02.卷积层参数
    a.输入输出尺寸
        output_size = (input_size - kernel_size + 2*padding) / stride + 1
    b.参数数量
        params = (kernel_h * kernel_w * in_channels + 1) * out_channels
    c.代码示例
        ---
        # 创建卷积层
        conv1 = nn.Conv2d(in_channels=3, out_channels=64,
                         kernel_size=3, stride=1, padding=1)

        # 输入图像
        x = torch.randn(1, 3, 224, 224)  # batch=1, channels=3, H=224, W=224

        # 前向传播
        output = conv1(x)

        print(f"输入形状: {x.shape}")
        print(f"输出形状: {output.shape}")
        print(f"卷积核形状: {conv1.weight.shape}")
        print(f"偏置形状: {conv1.bias.shape}")

        # 计算参数数量
        params = (3 * 3 * 3 + 1) * 64
        print(f"参数数量: {params}")
        print(f"实际参数: {sum(p.numel() for p in conv1.parameters())}")
        ---

03.多通道卷积
    a.RGB图像
        3个输入通道
    b.特征图
        多个输出通道
    c.代码实现
        ---
        import matplotlib.pyplot as plt

        # 多通道卷积
        conv_multi = nn.Conv2d(in_channels=3, out_channels=16,
                              kernel_size=3, padding=1)

        # RGB图像
        rgb_image = torch.randn(1, 3, 64, 64)

        # 卷积
        features = conv_multi(rgb_image)

        print(f"输入: {rgb_image.shape}")
        print(f"输出: {features.shape}")

        # 可视化特征图
        fig, axes = plt.subplots(4, 4, figsize=(12, 12))

        for i in range(16):
            ax = axes[i // 4, i % 4]
            feature_map = features[0, i].detach().numpy()
            ax.imshow(feature_map, cmap='viridis')
            ax.set_title(f'Channel {i}')
            ax.axis('off')

        plt.tight_layout()
        plt.show()
        ---

2.2 池化层

01.最大池化
    a.原理
        取窗口内最大值
    b.作用
        降维、增强特征不变性
    c.代码实现
        ---
        # 最大池化
        maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

        x = torch.randn(1, 64, 56, 56)
        output = maxpool(x)

        print(f"池化前: {x.shape}")
        print(f"池化后: {output.shape}")

        # 手动实现
        def manual_maxpool2d(input, kernel_size, stride):
            batch, channels, h, w = input.shape
            kh = kw = kernel_size

            out_h = (h - kh) // stride + 1
            out_w = (w - kw) // stride + 1

            output = torch.zeros(batch, channels, out_h, out_w)

            for b in range(batch):
                for c in range(channels):
                    for i in range(out_h):
                        for j in range(out_w):
                            h_start = i * stride
                            w_start = j * stride
                            window = input[b, c, h_start:h_start+kh, w_start:w_start+kw]
                            output[b, c, i, j] = torch.max(window)

            return output

        # 测试
        x_test = torch.randn(1, 1, 4, 4)
        output_manual = manual_maxpool2d(x_test, 2, 2)
        output_pytorch = nn.MaxPool2d(2, 2)(x_test)

        print(f"\n手动池化: {output_manual.shape}")
        print(f"PyTorch池化: {output_pytorch.shape}")
        ---

02.平均池化
    a.原理
        取窗口内平均值
    b.全局平均池化
        对整个特征图求平均
    c.代码实现
        ---
        # 平均池化
        avgpool = nn.AvgPool2d(kernel_size=2, stride=2)

        x = torch.randn(1, 64, 56, 56)
        output = avgpool(x)

        print(f"平均池化: {x.shape} -> {output.shape}")

        # 全局平均池化
        global_avgpool = nn.AdaptiveAvgPool2d((1, 1))

        x = torch.randn(1, 512, 7, 7)
        output = global_avgpool(x)

        print(f"全局平均池化: {x.shape} -> {output.shape}")

        # 展平用于分类
        output_flat = output.view(output.size(0), -1)
        print(f"展平后: {output_flat.shape}")
        ---

2.3 经典CNN架构

01.LeNet-5
    a.结构
        Conv -> Pool -> Conv -> Pool -> FC -> FC -> Output
    b.代码实现
        ---
        class LeNet5(nn.Module):
            def __init__(self, num_classes=10):
                super(LeNet5, self).__init__()
                self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
                self.pool = nn.MaxPool2d(2, 2)
                self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
                self.fc1 = nn.Linear(16 * 4 * 4, 120)
                self.fc2 = nn.Linear(120, 84)
                self.fc3 = nn.Linear(84, num_classes)

            def forward(self, x):
                x = self.pool(F.relu(self.conv1(x)))
                x = self.pool(F.relu(self.conv2(x)))
                x = x.view(-1, 16 * 4 * 4)
                x = F.relu(self.fc1(x))
                x = F.relu(self.fc2(x))
                x = self.fc3(x)
                return x

        # 测试
        model = LeNet5()
        x = torch.randn(1, 1, 28, 28)
        output = model(x)
        print(f"LeNet-5输出: {output.shape}")

        # 统计参数
        total_params = sum(p.numel() for p in model.parameters())
        print(f"总参数量: {total_params:,}")
        ---

02.AlexNet
    a.创新点
        ReLU激活、Dropout、数据增强
    b.代码实现
        ---
        class AlexNet(nn.Module):
            def __init__(self, num_classes=1000):
                super(AlexNet, self).__init__()
                self.features = nn.Sequential(
                    nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(kernel_size=3, stride=2),
                    nn.Conv2d(64, 192, kernel_size=5, padding=2),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(kernel_size=3, stride=2),
                    nn.Conv2d(192, 384, kernel_size=3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(384, 256, kernel_size=3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(256, 256, kernel_size=3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(kernel_size=3, stride=2),
                )
                self.classifier = nn.Sequential(
                    nn.Dropout(),
                    nn.Linear(256 * 6 * 6, 4096),
                    nn.ReLU(inplace=True),
                    nn.Dropout(),
                    nn.Linear(4096, 4096),
                    nn.ReLU(inplace=True),
                    nn.Linear(4096, num_classes),
                )

            def forward(self, x):
                x = self.features(x)
                x = x.view(x.size(0), 256 * 6 * 6)
                x = self.classifier(x)
                return x

        # 测试
        model = AlexNet(num_classes=10)
        x = torch.randn(1, 3, 224, 224)
        output = model(x)
        print(f"AlexNet输出: {output.shape}")
        ---

03.VGG
    a.特点
        小卷积核(3x3)、深层网络
    b.代码实现
        ---
        class VGG16(nn.Module):
            def __init__(self, num_classes=1000):
                super(VGG16, self).__init__()
                self.features = nn.Sequential(
                    # Block 1
                    nn.Conv2d(3, 64, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(64, 64, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    # Block 2
                    nn.Conv2d(64, 128, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 128, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    # Block 3
                    nn.Conv2d(128, 256, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(256, 256, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(256, 256, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    # Block 4
                    nn.Conv2d(256, 512, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(512, 512, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(512, 512, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    # Block 5
                    nn.Conv2d(512, 512, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(512, 512, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(512, 512, 3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),
                )

                self.classifier = nn.Sequential(
                    nn.Linear(512 * 7 * 7, 4096),
                    nn.ReLU(inplace=True),
                    nn.Dropout(),
                    nn.Linear(4096, 4096),
                    nn.ReLU(inplace=True),
                    nn.Dropout(),
                    nn.Linear(4096, num_classes),
                )

            def forward(self, x):
                x = self.features(x)
                x = x.view(x.size(0), -1)
                x = self.classifier(x)
                return x

        # 测试
        model = VGG16(num_classes=10)
        x = torch.randn(1, 3, 224, 224)
        output = model(x)
        print(f"VGG16输出: {output.shape}")

        total_params = sum(p.numel() for p in model.parameters())
        print(f"VGG16参数量: {total_params:,}")
        ---

04.ResNet
    a.残差块
        y = F(x) + x
    b.代码实现
        ---
        class ResidualBlock(nn.Module):
            def __init__(self, in_channels, out_channels, stride=1):
                super(ResidualBlock, self).__init__()
                self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1)
                self.bn1 = nn.BatchNorm2d(out_channels)
                self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1)
                self.bn2 = nn.BatchNorm2d(out_channels)

                self.shortcut = nn.Sequential()
                if stride != 1 or in_channels != out_channels:
                    self.shortcut = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, 1, stride),
                        nn.BatchNorm2d(out_channels)
                    )

            def forward(self, x):
                out = F.relu(self.bn1(self.conv1(x)))
                out = self.bn2(self.conv2(out))
                out += self.shortcut(x)
                out = F.relu(out)
                return out

        class ResNet18(nn.Module):
            def __init__(self, num_classes=10):
                super(ResNet18, self).__init__()
                self.conv1 = nn.Conv2d(3, 64, 7, 2, 3)
                self.bn1 = nn.BatchNorm2d(64)
                self.maxpool = nn.MaxPool2d(3, 2, 1)

                self.layer1 = self._make_layer(64, 64, 2, 1)
                self.layer2 = self._make_layer(64, 128, 2, 2)
                self.layer3 = self._make_layer(128, 256, 2, 2)
                self.layer4 = self._make_layer(256, 512, 2, 2)

                self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
                self.fc = nn.Linear(512, num_classes)

            def _make_layer(self, in_channels, out_channels, num_blocks, stride):
                layers = []
                layers.append(ResidualBlock(in_channels, out_channels, stride))
                for _ in range(1, num_blocks):
                    layers.append(ResidualBlock(out_channels, out_channels, 1))
                return nn.Sequential(*layers)

            def forward(self, x):
                x = F.relu(self.bn1(self.conv1(x)))
                x = self.maxpool(x)

                x = self.layer1(x)
                x = self.layer2(x)
                x = self.layer3(x)
                x = self.layer4(x)

                x = self.avgpool(x)
                x = x.view(x.size(0), -1)
                x = self.fc(x)
                return x

        # 测试
        model = ResNet18(num_classes=10)
        x = torch.randn(1, 3, 224, 224)
        output = model(x)
        print(f"ResNet18输出: {output.shape}")

        total_params = sum(p.numel() for p in model.parameters())
        print(f"ResNet18参数量: {total_params:,}")
        ---

2.4 图像分类实战

01.MNIST数据集
    a.数据加载
        ---
        import torchvision
        import torchvision.transforms as transforms
        from torch.utils.data import DataLoader

        # 数据预处理
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])

        # 加载数据
        train_dataset = torchvision.datasets.MNIST(
            root='./data', train=True, download=True, transform=transform
        )
        test_dataset = torchvision.datasets.MNIST(
            root='./data', train=False, download=True, transform=transform
        )

        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

        print(f"训练集大小: {len(train_dataset)}")
        print(f"测试集大小: {len(test_dataset)}")

        # 可视化样本
        import matplotlib.pyplot as plt

        examples = iter(train_loader)
        images, labels = next(examples)

        fig, axes = plt.subplots(2, 5, figsize=(12, 5))
        for i in range(10):
            ax = axes[i // 5, i % 5]
            ax.imshow(images[i].squeeze(), cmap='gray')
            ax.set_title(f'Label: {labels[i].item()}')
            ax.axis('off')
        plt.tight_layout()
        plt.show()
        ---
    b.模型训练
        ---
        class SimpleCNN(nn.Module):
            def __init__(self):
                super(SimpleCNN, self).__init__()
                self.conv1 = nn.Conv2d(1, 32, 3, 1)
                self.conv2 = nn.Conv2d(32, 64, 3, 1)
                self.dropout1 = nn.Dropout2d(0.25)
                self.dropout2 = nn.Dropout2d(0.5)
                self.fc1 = nn.Linear(9216, 128)
                self.fc2 = nn.Linear(128, 10)

            def forward(self, x):
                x = F.relu(self.conv1(x))
                x = F.relu(self.conv2(x))
                x = F.max_pool2d(x, 2)
                x = self.dropout1(x)
                x = torch.flatten(x, 1)
                x = F.relu(self.fc1(x))
                x = self.dropout2(x)
                x = self.fc2(x)
                return F.log_softmax(x, dim=1)

        # 训练函数
        def train(model, device, train_loader, optimizer, epoch):
            model.train()
            train_loss = 0
            correct = 0

            for batch_idx, (data, target) in enumerate(train_loader):
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = F.nll_loss(output, target)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

                if batch_idx % 100 == 0:
                    print(f'Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} '
                          f'({100. * batch_idx / len(train_loader):.0f}%)]'
                          f'\tLoss: {loss.item():.6f}')

            train_loss /= len(train_loader)
            accuracy = 100. * correct / len(train_loader.dataset)
            return train_loss, accuracy

        # 测试函数
        def test(model, device, test_loader):
            model.eval()
            test_loss = 0
            correct = 0

            with torch.no_grad():
                for data, target in test_loader:
                    data, target = data.to(device), target.to(device)
                    output = model(data)
                    test_loss += F.nll_loss(output, target, reduction='sum').item()
                    pred = output.argmax(dim=1, keepdim=True)
                    correct += pred.eq(target.view_as(pred)).sum().item()

            test_loss /= len(test_loader.dataset)
            accuracy = 100. * correct / len(test_loader.dataset)

            print(f'\nTest set: Average loss: {test_loss:.4f}, '
                  f'Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)\n')

            return test_loss, accuracy

        # 训练模型
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = SimpleCNN().to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

        train_losses, train_accs = [], []
        test_losses, test_accs = [], []

        for epoch in range(1, 6):
            train_loss, train_acc = train(model, device, train_loader, optimizer, epoch)
            test_loss, test_acc = test(model, device, test_loader)

            train_losses.append(train_loss)
            train_accs.append(train_acc)
            test_losses.append(test_loss)
            test_accs.append(test_acc)

        # 可视化训练过程
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

        ax1.plot(train_losses, label='Train Loss')
        ax1.plot(test_losses, label='Test Loss')
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Loss')
        ax1.set_title('Training and Test Loss')
        ax1.legend()
        ax1.grid(True)

        ax2.plot(train_accs, label='Train Accuracy')
        ax2.plot(test_accs, label='Test Accuracy')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Accuracy (%)')
        ax2.set_title('Training and Test Accuracy')
        ax2.legend()
        ax2.grid(True)

        plt.tight_layout()
        plt.show()
        ---

02.CIFAR-10数据集
    a.数据加载
        ---
        # CIFAR-10数据预处理
        transform_train = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])

        transform_test = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])

        # 加载CIFAR-10
        train_dataset = torchvision.datasets.CIFAR10(
            root='./data', train=True, download=True, transform=transform_train
        )
        test_dataset = torchvision.datasets.CIFAR10(
            root='./data', train=False, download=True, transform=transform_test
        )

        train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
        test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=2)

        classes = ('plane', 'car', 'bird', 'cat', 'deer',
                  'dog', 'frog', 'horse', 'ship', 'truck')

        # 可视化
        dataiter = iter(train_loader)
        images, labels = next(dataiter)

        fig, axes = plt.subplots(2, 5, figsize=(12, 5))
        for i in range(10):
            ax = axes[i // 5, i % 5]
            img = images[i].numpy().transpose((1, 2, 0))
            img = img * np.array([0.2023, 0.1994, 0.2010]) + np.array([0.4914, 0.4822, 0.4465])
            img = np.clip(img, 0, 1)
            ax.imshow(img)
            ax.set_title(classes[labels[i]])
            ax.axis('off')
        plt.tight_layout()
        plt.show()
        ---
    b.ResNet训练
        ---
        # 使用ResNet18
        model = ResNet18(num_classes=10).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.1,
                                   momentum=0.9, weight_decay=5e-4)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

        # 训练函数
        def train_cifar(epoch):
            model.train()
            train_loss = 0
            correct = 0
            total = 0

            for batch_idx, (inputs, targets) in enumerate(train_loader):
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()

                if batch_idx % 100 == 0:
                    print(f'Epoch: {epoch} [{batch_idx}/{len(train_loader)}] '
                          f'Loss: {train_loss/(batch_idx+1):.3f} | '
                          f'Acc: {100.*correct/total:.3f}%')

            return train_loss / len(train_loader), 100. * correct / total

        # 测试函数
        def test_cifar():
            model.eval()
            test_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for inputs, targets in test_loader:
                    inputs, targets = inputs.to(device), targets.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, targets)

                    test_loss += loss.item()
                    _, predicted = outputs.max(1)
                    total += targets.size(0)
                    correct += predicted.eq(targets).sum().item()

            print(f'Test Loss: {test_loss/len(test_loader):.3f} | '
                  f'Test Acc: {100.*correct/total:.3f}%')

            return test_loss / len(test_loader), 100. * correct / total

        # 训练10个epoch
        for epoch in range(10):
            train_loss, train_acc = train_cifar(epoch)
            test_loss, test_acc = test_cifar()
            scheduler.step()
        ---

3 循环神经网络

3.1 RNN基础

01.RNN原理
    a.循环结构
        h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h)
        y_t = W_hy * h_t + b_y
    b.时间展开
        将循环展开为序列
    c.代码实现
        ---
        import torch
        import torch.nn as nn

        class SimpleRNN(nn.Module):
            def __init__(self, input_size, hidden_size, output_size):
                super(SimpleRNN, self).__init__()
                self.hidden_size = hidden_size
                self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
                self.i2o = nn.Linear(input_size + hidden_size, output_size)

            def forward(self, input, hidden):
                combined = torch.cat((input, hidden), 1)
                hidden = torch.tanh(self.i2h(combined))
                output = self.i2o(combined)
                return output, hidden

            def init_hidden(self, batch_size):
                return torch.zeros(batch_size, self.hidden_size)

        # 测试
        rnn = SimpleRNN(input_size=10, hidden_size=20, output_size=5)
        input = torch.randn(3, 10)  # batch_size=3, input_size=10
        hidden = rnn.init_hidden(3)

        output, hidden = rnn(input, hidden)
        print(f"输出形状: {output.shape}")
        print(f"隐藏状态形状: {hidden.shape}")
        ---

02.PyTorch RNN
    a.nn.RNN
        ---
        # 使用PyTorch内置RNN
        rnn = nn.RNN(input_size=10, hidden_size=20, num_layers=2, batch_first=True)

        # 输入: (batch, seq_len, input_size)
        input = torch.randn(3, 5, 10)  # 3个样本,序列长度5,特征维度10
        h0 = torch.zeros(2, 3, 20)     # 2层,batch_size=3,hidden_size=20

        output, hn = rnn(input, h0)

        print(f"输出形状: {output.shape}")  # (3, 5, 20)
        print(f"最终隐藏状态: {hn.shape}")  # (2, 3, 20)
        ---
    b.序列分类
        ---
        class RNNClassifier(nn.Module):
            def __init__(self, input_size, hidden_size, num_layers, num_classes):
                super(RNNClassifier, self).__init__()
                self.hidden_size = hidden_size
                self.num_layers = num_layers
                self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
                self.fc = nn.Linear(hidden_size, num_classes)

            def forward(self, x):
                h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
                out, _ = self.rnn(x, h0)
                out = self.fc(out[:, -1, :])  # 取最后一个时间步
                return out

        # 测试
        model = RNNClassifier(input_size=28, hidden_size=128, num_layers=2, num_classes=10)
        x = torch.randn(32, 28, 28)  # MNIST展开为序列
        output = model(x)
        print(f"分类输出: {output.shape}")
        ---

03.梯度消失问题
    a.原因
        长序列反向传播时梯度��数衰减
    b.解决方案
        LSTM、GRU、梯度裁剪
    c.梯度可视化
        ---
        import matplotlib.pyplot as plt

        # 模拟梯度消失
        def gradient_flow(seq_length, weight=0.9):
            gradients = []
            grad = 1.0

            for t in range(seq_length):
                grad *= weight
                gradients.append(grad)

            return gradients

        # 不同权重的梯度流
        seq_len = 50
        weights = [0.5, 0.9, 0.99, 1.0, 1.1]

        plt.figure(figsize=(12, 6))
        for w in weights:
            grads = gradient_flow(seq_len, w)
            plt.plot(grads, label=f'weight={w}')

        plt.xlabel('Time Step')
        plt.ylabel('Gradient')
        plt.title('Gradient Flow in RNN')
        plt.legend()
        plt.grid(True)
        plt.yscale('log')
        plt.show()
        ---

3.2 LSTM

01.LSTM结构
    a.门控机制
        遗忘门: f_t = σ(W_f * [h_{t-1}, x_t] + b_f)
        输入门: i_t = σ(W_i * [h_{t-1}, x_t] + b_i)
        输出门: o_t = σ(W_o * [h_{t-1}, x_t] + b_o)
        细胞状态: C_t = f_t * C_{t-1} + i_t * tanh(W_C * [h_{t-1}, x_t] + b_C)
        隐藏状态: h_t = o_t * tanh(C_t)
    b.代码实现
        ---
        class LSTMCell(nn.Module):
            def __init__(self, input_size, hidden_size):
                super(LSTMCell, self).__init__()
                self.input_size = input_size
                self.hidden_size = hidden_size

                # 四个门的权重
                self.W_f = nn.Linear(input_size + hidden_size, hidden_size)
                self.W_i = nn.Linear(input_size + hidden_size, hidden_size)
                self.W_C = nn.Linear(input_size + hidden_size, hidden_size)
                self.W_o = nn.Linear(input_size + hidden_size, hidden_size)

            def forward(self, x, hidden):
                h_prev, C_prev = hidden

                # 拼接输入和隐藏状态
                combined = torch.cat((x, h_prev), dim=1)

                # 计算门
                f_t = torch.sigmoid(self.W_f(combined))
                i_t = torch.sigmoid(self.W_i(combined))
                C_tilde = torch.tanh(self.W_C(combined))
                o_t = torch.sigmoid(self.W_o(combined))

                # 更新细胞状态和隐藏状态
                C_t = f_t * C_prev + i_t * C_tilde
                h_t = o_t * torch.tanh(C_t)

                return h_t, (h_t, C_t)

        # 测试
        lstm_cell = LSTMCell(input_size=10, hidden_size=20)
        x = torch.randn(3, 10)
        h = torch.zeros(3, 20)
        C = torch.zeros(3, 20)

        h_new, (h_out, C_out) = lstm_cell(x, (h, C))
        print(f"新隐藏状态: {h_new.shape}")
        print(f"新细胞状态: {C_out.shape}")
        ---

02.PyTorch LSTM
    a.基本使用
        ---
        # 使用PyTorch LSTM
        lstm = nn.LSTM(input_size=10, hidden_size=20, num_layers=2, batch_first=True)

        input = torch.randn(3, 5, 10)  # (batch, seq_len, input_size)
        h0 = torch.zeros(2, 3, 20)     # (num_layers, batch, hidden_size)
        c0 = torch.zeros(2, 3, 20)

        output, (hn, cn) = lstm(input, (h0, c0))

        print(f"输出: {output.shape}")
        print(f"最终隐藏状态: {hn.shape}")
        print(f"最终细胞状态: {cn.shape}")
        ---
    b.双向LSTM
        ---
        # 双向LSTM
        bilstm = nn.LSTM(input_size=10, hidden_size=20, num_layers=2,
                        batch_first=True, bidirectional=True)

        input = torch.randn(3, 5, 10)
        h0 = torch.zeros(4, 3, 20)  # 2 layers * 2 directions
        c0 = torch.zeros(4, 3, 20)

        output, (hn, cn) = bilstm(input, (h0, c0))

        print(f"双向LSTM输出: {output.shape}")  # (3, 5, 40) - 40 = 20*2
        print(f"隐藏状态: {hn.shape}")  # (4, 3, 20)
        ---

03.LSTM应用
    a.序列分类
        ---
        class LSTMClassifier(nn.Module):
            def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
                super(LSTMClassifier, self).__init__()
                self.hidden_size = hidden_size
                self.num_layers = num_layers

                self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                                   batch_first=True, dropout=dropout)
                self.fc = nn.Linear(hidden_size, num_classes)

            def forward(self, x):
                h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
                c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

                out, _ = self.lstm(x, (h0, c0))
                out = self.fc(out[:, -1, :])
                return out

        # 训练示例
        model = LSTMClassifier(input_size=28, hidden_size=128,
                              num_layers=2, num_classes=10)
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

        # 模拟训练
        for epoch in range(5):
            x = torch.randn(32, 28, 28)  # MNIST序列
            y = torch.randint(0, 10, (32,))

            optimizer.zero_grad()
            outputs = model(x)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()

            print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")
        ---

3.3 GRU

01.GRU结构
    a.门控机制
        重置门: r_t = σ(W_r * [h_{t-1}, x_t])
        更新门: z_t = σ(W_z * [h_{t-1}, x_t])
        候选隐藏状态: h̃_t = tanh(W * [r_t ⊙ h_{t-1}, x_t])
        隐藏状态: h_t = (1 - z_t) ⊙ h_{t-1} + z_t ⊙ h̃_t
    b.与LSTM对比
        参数更少,训练更快
        性能相近
    c.代码实现
        ---
        import torch
        import torch.nn as nn

        class GRUCell(nn.Module):
            def __init__(self, input_size, hidden_size):
                super(GRUCell, self).__init__()
                self.input_size = input_size
                self.hidden_size = hidden_size

                self.W_r = nn.Linear(input_size + hidden_size, hidden_size)
                self.W_z = nn.Linear(input_size + hidden_size, hidden_size)
                self.W_h = nn.Linear(input_size + hidden_size, hidden_size)

            def forward(self, x, h_prev):
                combined = torch.cat((x, h_prev), dim=1)

                r_t = torch.sigmoid(self.W_r(combined))
                z_t = torch.sigmoid(self.W_z(combined))

                combined_r = torch.cat((x, r_t * h_prev), dim=1)
                h_tilde = torch.tanh(self.W_h(combined_r))

                h_t = (1 - z_t) * h_prev + z_t * h_tilde

                return h_t

        # 测试
        gru_cell = GRUCell(input_size=10, hidden_size=20)
        x = torch.randn(3, 10)
        h = torch.zeros(3, 20)

        h_new = gru_cell(x, h)
        print(f"GRU输出: {h_new.shape}")
        ---

02.PyTorch GRU
    a.基本使用
        ---
        # PyTorch GRU
        gru = nn.GRU(input_size=10, hidden_size=20, num_layers=2, batch_first=True)

        input = torch.randn(3, 5, 10)
        h0 = torch.zeros(2, 3, 20)

        output, hn = gru(input, h0)

        print(f"输出: {output.shape}")
        print(f"最终隐藏状态: {hn.shape}")
        ---
    b.GRU分类器
        ---
        class GRUClassifier(nn.Module):
            def __init__(self, input_size, hidden_size, num_layers, num_classes):
                super(GRUClassifier, self).__init__()
                self.hidden_size = hidden_size
                self.num_layers = num_layers

                self.gru = nn.GRU(input_size, hidden_size, num_layers,
                                 batch_first=True, dropout=0.5)
                self.fc = nn.Linear(hidden_size, num_classes)

            def forward(self, x):
                h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
                out, _ = self.gru(x, h0)
                out = self.fc(out[:, -1, :])
                return out

        # 测试
        model = GRUClassifier(input_size=28, hidden_size=128,
                             num_layers=2, num_classes=10)
        x = torch.randn(32, 28, 28)
        output = model(x)
        print(f"GRU分类输出: {output.shape}")
        ---

03.RNN变体对比
    a.性能对比
        ---
        import time

        # 对比RNN、LSTM、GRU
        models = {
            'RNN': nn.RNN(100, 256, 2, batch_first=True),
            'LSTM': nn.LSTM(100, 256, 2, batch_first=True),
            'GRU': nn.GRU(100, 256, 2, batch_first=True)
        }

        input = torch.randn(32, 50, 100)  # batch=32, seq_len=50, input_size=100

        for name, model in models.items():
            # 参数数量
            params = sum(p.numel() for p in model.parameters())

            # 速度测试
            start = time.time()
            for _ in range(100):
                if name == 'LSTM':
                    h0 = torch.zeros(2, 32, 256)
                    c0 = torch.zeros(2, 32, 256)
                    output, _ = model(input, (h0, c0))
                else:
                    h0 = torch.zeros(2, 32, 256)
                    output, _ = model(input, h0)
            elapsed = time.time() - start

            print(f"{name}:")
            print(f"  参数量: {params:,}")
            print(f"  100次前向传播时间: {elapsed:.4f}秒")
            print(f"  输出形状: {output.shape}\n")
        ---

3.4 序列到序列模型

01.Seq2Seq架构
    a.编码器-解码器
        编码器: 将输入序列编码为上下文向量
        解码器: 根据上下文向量生成输出序列
    b.代码实现
        ---
        class Encoder(nn.Module):
            def __init__(self, input_size, hidden_size, num_layers=1):
                super(Encoder, self).__init__()
                self.hidden_size = hidden_size
                self.num_layers = num_layers
                self.embedding = nn.Embedding(input_size, hidden_size)
                self.gru = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True)

            def forward(self, x):
                embedded = self.embedding(x)
                output, hidden = self.gru(embedded)
                return output, hidden

        class Decoder(nn.Module):
            def __init__(self, hidden_size, output_size, num_layers=1):
                super(Decoder, self).__init__()
                self.hidden_size = hidden_size
                self.num_layers = num_layers
                self.embedding = nn.Embedding(output_size, hidden_size)
                self.gru = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True)
                self.fc = nn.Linear(hidden_size, output_size)

            def forward(self, x, hidden):
                embedded = self.embedding(x)
                output, hidden = self.gru(embedded, hidden)
                output = self.fc(output)
                return output, hidden

        class Seq2Seq(nn.Module):
            def __init__(self, encoder, decoder):
                super(Seq2Seq, self).__init__()
                self.encoder = encoder
                self.decoder = decoder

            def forward(self, src, trg, teacher_forcing_ratio=0.5):
                batch_size = src.size(0)
                trg_len = trg.size(1)
                trg_vocab_size = self.decoder.fc.out_features

                outputs = torch.zeros(batch_size, trg_len, trg_vocab_size)

                # 编码
                _, hidden = self.encoder(src)

                # 解码
                input = trg[:, 0:1]  # <SOS> token

                for t in range(1, trg_len):
                    output, hidden = self.decoder(input, hidden)
                    outputs[:, t:t+1] = output

                    # Teacher forcing
                    teacher_force = torch.rand(1).item() < teacher_forcing_ratio
                    top1 = output.argmax(2)
                    input = trg[:, t:t+1] if teacher_force else top1

                return outputs

        # 测试
        INPUT_DIM = 1000
        OUTPUT_DIM = 1000
        HID_DIM = 256

        encoder = Encoder(INPUT_DIM, HID_DIM)
        decoder = Decoder(HID_DIM, OUTPUT_DIM)
        model = Seq2Seq(encoder, decoder)

        src = torch.randint(0, INPUT_DIM, (32, 10))  # batch=32, src_len=10
        trg = torch.randint(0, OUTPUT_DIM, (32, 15))  # batch=32, trg_len=15

        output = model(src, trg)
        print(f"Seq2Seq输出: {output.shape}")
        ---

02.注意力机制
    a.原理
        计算查询与键的相似度,加权求和值
        Attention(Q, K, V) = softmax(QK^T / √d_k)V
    b.代码实现
        ---
        class Attention(nn.Module):
            def __init__(self, hidden_size):
                super(Attention, self).__init__()
                self.attn = nn.Linear(hidden_size * 2, hidden_size)
                self.v = nn.Linear(hidden_size, 1, bias=False)

            def forward(self, hidden, encoder_outputs):
                # hidden: (batch, hidden_size)
                # encoder_outputs: (batch, seq_len, hidden_size)

                batch_size = encoder_outputs.size(0)
                seq_len = encoder_outputs.size(1)

                # 重复隐藏状态
                hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)

                # 计算注意力分数
                energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
                attention = self.v(energy).squeeze(2)

                # Softmax
                return torch.softmax(attention, dim=1)

        class AttentionDecoder(nn.Module):
            def __init__(self, hidden_size, output_size):
                super(AttentionDecoder, self).__init__()
                self.hidden_size = hidden_size
                self.embedding = nn.Embedding(output_size, hidden_size)
                self.attention = Attention(hidden_size)
                self.gru = nn.GRU(hidden_size * 2, hidden_size, batch_first=True)
                self.fc = nn.Linear(hidden_size, output_size)

            def forward(self, input, hidden, encoder_outputs):
                embedded = self.embedding(input)

                # 计算注意力权重
                attn_weights = self.attention(hidden[-1], encoder_outputs)

                # 应用注意力
                attn_weights = attn_weights.unsqueeze(1)
                context = torch.bmm(attn_weights, encoder_outputs)

                # 拼接嵌入和上下文
                rnn_input = torch.cat((embedded, context), dim=2)

                # GRU
                output, hidden = self.gru(rnn_input, hidden)

                # 输出
                output = self.fc(output.squeeze(1))

                return output, hidden, attn_weights.squeeze(1)

        # 测试
        encoder = Encoder(1000, 256)
        decoder = AttentionDecoder(256, 1000)

        src = torch.randint(0, 1000, (32, 10))
        encoder_outputs, hidden = encoder(src)

        input = torch.randint(0, 1000, (32, 1))
        output, hidden, attn_weights = decoder(input, hidden, encoder_outputs)

        print(f"解码器输出: {output.shape}")
        print(f"注意力权重: {attn_weights.shape}")
        ---

4 PyTorch深度学习框架

4.1 张量操作

01.张量创建
    a.基本创建
        ---
        import torch
        import numpy as np

        # 从列表创建
        tensor1 = torch.tensor([1, 2, 3, 4])
        print(f"从列表: {tensor1}")

        # 从NumPy数组
        arr = np.array([1, 2, 3, 4])
        tensor2 = torch.from_numpy(arr)
        print(f"从NumPy: {tensor2}")

        # 全零张量
        zeros = torch.zeros(2, 3)
        print(f"全零:\n{zeros}")

        # 全一张量
        ones = torch.ones(2, 3)
        print(f"全一:\n{ones}")

        # 随机张量
        rand = torch.rand(2, 3)  # [0, 1)均匀分布
        randn = torch.randn(2, 3)  # 标准正态分布
        print(f"随机:\n{rand}")
        print(f"正态:\n{randn}")

        # 指定范围
        arange = torch.arange(0, 10, 2)
        print(f"范围: {arange}")

        # 线性空间
        linspace = torch.linspace(0, 1, 5)
        print(f"线性: {linspace}")
        ---

02.张量操作
    a.形状操作
        ---
        # 创建张量
        x = torch.randn(2, 3, 4)
        print(f"原始形状: {x.shape}")

        # 重塑
        y = x.view(2, 12)
        print(f"view后: {y.shape}")

        # reshape
        z = x.reshape(6, 4)
        print(f"reshape后: {z.shape}")

        # 转置
        x2d = torch.randn(3, 4)
        x_t = x2d.t()
        print(f"转置: {x2d.shape} -> {x_t.shape}")

        # 维度交换
        x3d = torch.randn(2, 3, 4)
        x_perm = x3d.permute(2, 0, 1)
        print(f"permute: {x3d.shape} -> {x_perm.shape}")

        # 压缩维度
        x_squeeze = torch.randn(1, 3, 1, 4)
        x_sq = x_squeeze.squeeze()
        print(f"squeeze: {x_squeeze.shape} -> {x_sq.shape}")

        # 增加维度
        x_unsq = x_sq.unsqueeze(0)
        print(f"unsqueeze: {x_sq.shape} -> {x_unsq.shape}")
        ---
    b.数学运算
        ---
        # 基本运算
        a = torch.tensor([1.0, 2.0, 3.0])
        b = torch.tensor([4.0, 5.0, 6.0])

        # 加减乘除
        print(f"加法: {a + b}")
        print(f"减法: {a - b}")
        print(f"乘法: {a * b}")
        print(f"除法: {a / b}")

        # 矩阵乘法
        A = torch.randn(2, 3)
        B = torch.randn(3, 4)
        C = torch.mm(A, B)  # 或 A @ B
        print(f"矩阵乘法: {A.shape} @ {B.shape} = {C.shape}")

        # 批量矩阵乘法
        batch_A = torch.randn(10, 2, 3)
        batch_B = torch.randn(10, 3, 4)
        batch_C = torch.bmm(batch_A, batch_B)
        print(f"批量矩阵乘法: {batch_C.shape}")

        # 统计运算
        x = torch.randn(3, 4)
        print(f"求和: {x.sum()}")
        print(f"均值: {x.mean()}")
        print(f"最大值: {x.max()}")
        print(f"最小值: {x.min()}")
        print(f"标准差: {x.std()}")

        # 按维度统计
        print(f"按行求和: {x.sum(dim=1)}")
        print(f"按列求和: {x.sum(dim=0)}")
        ---

03.索引与切片
    a.基本索引
        ---
        x = torch.randn(4, 5)

        # 单个元素
        print(f"元素[0,0]: {x[0, 0]}")

        # 行切片
        print(f"第一行: {x[0, :]}")

        # 列切片
        print(f"第一列: {x[:, 0]}")

        # 范围切片
        print(f"子矩阵:\n{x[1:3, 2:4]}")

        # 布尔索引
        mask = x > 0
        positive = x[mask]
        print(f"正数: {positive}")

        # 高级索引
        indices = torch.tensor([0, 2])
        selected = x[indices]
        print(f"选择行:\n{selected}")
        ---
    b.拼接与分割
        ---
        # 拼接
        a = torch.randn(2, 3)
        b = torch.randn(2, 3)

        # 按行拼接
        c = torch.cat([a, b], dim=0)
        print(f"按行拼接: {c.shape}")

        # 按列拼接
        d = torch.cat([a, b], dim=1)
        print(f"按列拼接: {d.shape}")

        # 堆叠
        e = torch.stack([a, b], dim=0)
        print(f"堆叠: {e.shape}")

        # 分割
        x = torch.randn(6, 4)
        chunks = torch.chunk(x, 3, dim=0)
        print(f"分割成3块: {[c.shape for c in chunks]}")

        # 按大小分割
        splits = torch.split(x, [2, 2, 2], dim=0)
        print(f"按大小分割: {[s.shape for s in splits]}")
        ---

4.2 自动微分

01.梯度计算
    a.基本用法
        ---
        # 创建需要梯度的张量
        x = torch.tensor([2.0], requires_grad=True)
        y = x ** 2 + 3 * x + 1

        # 反向传播
        y.backward()

        print(f"x = {x.item()}")
        print(f"y = {y.item()}")
        print(f"dy/dx = {x.grad.item()}")
        print(f"理论值: 2*x + 3 = {2*x.item() + 3}")
        ---
    b.多变量梯度
        ---
        # 多变量函数
        x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
        y = torch.sum(x ** 2)

        y.backward()

        print(f"x = {x.data}")
        print(f"y = {y.item()}")
        print(f"dy/dx = {x.grad}")
        print(f"理论值: 2*x = {2*x.data}")
        ---
    c.梯度累积
        ---
        x = torch.tensor([1.0], requires_grad=True)

        # 第一次计算
        y1 = x ** 2
        y1.backward()
        print(f"第一次梯度: {x.grad}")

        # 第二次计算(梯度累积)
        y2 = x ** 3
        y2.backward()
        print(f"累积后梯度: {x.grad}")

        # 清零梯度
        x.grad.zero_()
        y3 = x ** 2
        y3.backward()
        print(f"清零后梯度: {x.grad}")
        ---

02.计算图
    a.动态图
        ---
        # PyTorch使用动态计算图
        def forward(x, n):
            y = x
            for i in range(n):
                y = y * x
            return y

        x = torch.tensor([2.0], requires_grad=True)

        # 不同的n产生不同的计算图
        for n in [2, 3, 4]:
            x.grad = None  # 清零
            y = forward(x, n)
            y.backward()
            print(f"n={n}, dy/dx = {x.grad.item()}")
        ---
    b.梯度控制
        ---
        # 禁用梯度计算
        x = torch.randn(3, 4, requires_grad=True)

        # 方法1: with torch.no_grad()
        with torch.no_grad():
            y = x * 2
            print(f"y.requires_grad = {y.requires_grad}")

        # 方法2: @torch.no_grad()装饰器
        @torch.no_grad()
        def inference(x):
            return x * 2

        z = inference(x)
        print(f"z.requires_grad = {z.requires_grad}")

        # 方法3: detach()
        w = x.detach()
        print(f"w.requires_grad = {w.requires_grad}")
        ---

03.自定义函数
    a.Function类
        ---
        class MyReLU(torch.autograd.Function):
            @staticmethod
            def forward(ctx, input):
                ctx.save_for_backward(input)
                return input.clamp(min=0)

            @staticmethod
            def backward(ctx, grad_output):
                input, = ctx.saved_tensors
                grad_input = grad_output.clone()
                grad_input[input < 0] = 0
                return grad_input

        # 使用自定义函数
        x = torch.randn(5, requires_grad=True)
        y = MyReLU.apply(x)
        loss = y.sum()
        loss.backward()

        print(f"输入: {x.data}")
        print(f"输出: {y.data}")
        print(f"梯度: {x.grad}")
        ---

4.3 模型构建

01.nn.Module
    a.基本结构
        ---
        import torch.nn as nn
        import torch.nn.functional as F

        class SimpleNet(nn.Module):
            def __init__(self, input_size, hidden_size, output_size):
                super(SimpleNet, self).__init__()
                self.fc1 = nn.Linear(input_size, hidden_size)
                self.fc2 = nn.Linear(hidden_size, output_size)

            def forward(self, x):
                x = F.relu(self.fc1(x))
                x = self.fc2(x)
                return x

        # 创建模型
        model = SimpleNet(10, 20, 5)
        print(model)

        # 前向传播
        x = torch.randn(3, 10)
        output = model(x)
        print(f"输出: {output.shape}")

        # 查看参数
        for name, param in model.named_parameters():
            print(f"{name}: {param.shape}")
        ---
    b.Sequential容器
        ---
        # 使用Sequential
        model = nn.Sequential(
            nn.Linear(10, 20),
            nn.ReLU(),
            nn.Linear(20, 30),
            nn.ReLU(),
            nn.Linear(30, 5)
        )

        print(model)

        # 访问层
        print(f"第一层: {model[0]}")

        # 添加层
        model.add_module('output', nn.Softmax(dim=1))
        print(model)
        ---
    c.ModuleList和ModuleDict
        ---
        # ModuleList
        class DynamicNet(nn.Module):
            def __init__(self, layers):
                super(DynamicNet, self).__init__()
                self.layers = nn.ModuleList([
                    nn.Linear(layers[i], layers[i+1])
                    for i in range(len(layers)-1)
                ])

            def forward(self, x):
                for layer in self.layers[:-1]:
                    x = F.relu(layer(x))
                x = self.layers[-1](x)
                return x

        model = DynamicNet([10, 20, 30, 5])
        print(model)

        # ModuleDict
        class FlexibleNet(nn.Module):
            def __init__(self):
                super(FlexibleNet, self).__init__()
                self.layers = nn.ModuleDict({
                    'linear1': nn.Linear(10, 20),
                    'linear2': nn.Linear(20, 5)
                })

            def forward(self, x, layer_name):
                return self.layers[layer_name](x)

        model = FlexibleNet()
        x = torch.randn(3, 10)
        output = model(x, 'linear1')
        print(f"输出: {output.shape}")
        ---

02.常用层
    a.全连接层
        ---
        # Linear层
        fc = nn.Linear(in_features=10, out_features=5, bias=True)

        x = torch.randn(3, 10)
        output = fc(x)

        print(f"权重: {fc.weight.shape}")
        print(f"偏置: {fc.bias.shape}")
        print(f"输出: {output.shape}")
        ---
    b.卷积层
        ---
        # Conv2d
        conv = nn.Conv2d(in_channels=3, out_channels=64,
                        kernel_size=3, stride=1, padding=1)

        x = torch.randn(1, 3, 224, 224)
        output = conv(x)

        print(f"卷积核: {conv.weight.shape}")
        print(f"输出: {output.shape}")

        # Conv1d用于序列
        conv1d = nn.Conv1d(in_channels=10, out_channels=20, kernel_size=3)
        x1d = torch.randn(1, 10, 100)
        output1d = conv1d(x1d)
        print(f"Conv1d输出: {output1d.shape}")
        ---
    c.归一化层
        ---
        # BatchNorm
        bn = nn.BatchNorm2d(64)
        x = torch.randn(32, 64, 28, 28)
        output = bn(x)
        print(f"BatchNorm输出: {output.shape}")

        # LayerNorm
        ln = nn.LayerNorm(64)
        x = torch.randn(32, 10, 64)
        output = ln(x)
        print(f"LayerNorm输出: {output.shape}")

        # InstanceNorm
        in_norm = nn.InstanceNorm2d(64)
        x = torch.randn(32, 64, 28, 28)
        output = in_norm(x)
        print(f"InstanceNorm输出: {output.shape}")
        ---
    d.Dropout
        ---
        # Dropout
        dropout = nn.Dropout(p=0.5)

        x = torch.randn(10, 20)
        print(f"原始:\n{x[0, :5]}")

        # 训练模式
        dropout.train()
        output_train = dropout(x)
        print(f"训练模式:\n{output_train[0, :5]}")

        # 评估模式
        dropout.eval()
        output_eval = dropout(x)
        print(f"评估模式:\n{output_eval[0, :5]}")
        ---

03.完整模型示例
    a.图像分类网络
        ---
        class ImageClassifier(nn.Module):
            def __init__(self, num_classes=10):
                super(ImageClassifier, self).__init__()

                # 特征提取
                self.features = nn.Sequential(
                    nn.Conv2d(3, 64, 3, padding=1),
                    nn.BatchNorm2d(64),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    nn.Conv2d(64, 128, 3, padding=1),
                    nn.BatchNorm2d(128),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    nn.Conv2d(128, 256, 3, padding=1),
                    nn.BatchNorm2d(256),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),
                )

                # 分类器
                self.classifier = nn.Sequential(
                    nn.Dropout(0.5),
                    nn.Linear(256 * 4 * 4, 512),
                    nn.ReLU(inplace=True),
                    nn.Dropout(0.5),
                    nn.Linear(512, num_classes)
                )

            def forward(self, x):
                x = self.features(x)
                x = x.view(x.size(0), -1)
                x = self.classifier(x)
                return x

        # 测试
        model = ImageClassifier(num_classes=10)
        x = torch.randn(4, 3, 32, 32)
        output = model(x)
        print(f"输出: {output.shape}")

        # 统计参数
        total_params = sum(p.numel() for p in model.parameters())
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        print(f"总参数: {total_params:,}")
        print(f"可训练参数: {trainable_params:,}")
        ---

4.4 训练流程

01.完整训练循环
    a.基本流程
        ---
        import torch.optim as optim
        from torch.utils.data import DataLoader, TensorDataset

        # 准备数据
        X_train = torch.randn(1000, 10)
        y_train = torch.randint(0, 2, (1000,))
        train_dataset = TensorDataset(X_train, y_train)
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

        # 创建模型
        model = nn.Sequential(
            nn.Linear(10, 20),
            nn.ReLU(),
            nn.Linear(20, 2)
        )

        # 损失函数和优化器
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # 训练循环
        num_epochs = 10
        for epoch in range(num_epochs):
            model.train()
            total_loss = 0
            correct = 0
            total = 0

            for batch_idx, (data, target) in enumerate(train_loader):
                # 前向传播
                output = model(data)
                loss = criterion(output, target)

                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                # 统计
                total_loss += loss.item()
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()

            # 打印信息
            avg_loss = total_loss / len(train_loader)
            accuracy = 100. * correct / total
            print(f'Epoch {epoch+1}/{num_epochs}, '
                  f'Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
        ---
    b.验证循环
        ---
        def validate(model, val_loader, criterion):
            model.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for data, target in val_loader:
                    output = model(data)
                    loss = criterion(output, target)

                    val_loss += loss.item()
                    _, predicted = output.max(1)
                    total += target.size(0)
                    correct += predicted.eq(target).sum().item()

            avg_loss = val_loss / len(val_loader)
            accuracy = 100. * correct / total

            return avg_loss, accuracy

        # 使用验证
        X_val = torch.randn(200, 10)
        y_val = torch.randint(0, 2, (200,))
        val_dataset = TensorDataset(X_val, y_val)
        val_loader = DataLoader(val_dataset, batch_size=32)

        val_loss, val_acc = validate(model, val_loader, criterion)
        print(f'Validation Loss: {val_loss:.4f}, Accuracy: {val_acc:.2f}%')
        ---

02.模型保存与加载
    a.保存模型
        ---
        # 保存整个模型
        torch.save(model, 'model.pth')

        # 保存模型参数(推荐)
        torch.save(model.state_dict(), 'model_params.pth')

        # 保存训练状态
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
        }
        torch.save(checkpoint, 'checkpoint.pth')
        ---
    b.加载模型
        ---
        # 加载整个模型
        model = torch.load('model.pth')

        # 加载模型参数
        model = nn.Sequential(
            nn.Linear(10, 20),
            nn.ReLU(),
            nn.Linear(20, 2)
        )
        model.load_state_dict(torch.load('model_params.pth'))

        # 加载训练状态
        checkpoint = torch.load('checkpoint.pth')
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        loss = checkpoint['loss']

        # 继续训练
        model.train()
        ---

03.GPU加速
    a.设备管理
        ---
        # 检查GPU
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"使用设备: {device}")

        # 将模型移到GPU
        model = model.to(device)

        # 将数据移到GPU
        x = torch.randn(10, 10).to(device)
        y = model(x)

        # 多GPU
        if torch.cuda.device_count() > 1:
            print(f"使用 {torch.cuda.device_count()} 个GPU")
            model = nn.DataParallel(model)

        model = model.to(device)
        ---
    b.完整GPU训练
        ---
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = model.to(device)

        for epoch in range(num_epochs):
            for data, target in train_loader:
                # 数据移到GPU
                data, target = data.to(device), target.to(device)

                # 训练步骤
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
        ---

5 高级技术

5.1 残差网络

01.残差连接原理
    a.梯度消失问题
        深层网络训练困难
        梯度在反向传播中衰减
    b.残差块设计
        ---
        y = F(x) + x

        其中F(x)是残差映射
        x是恒等映射(shortcut connection)
        ---
    c.基本残差块
        ---
        import torch
        import torch.nn as nn
        import torch.nn.functional as F

        class BasicBlock(nn.Module):
            def __init__(self, in_channels, out_channels, stride=1):
                super(BasicBlock, self).__init__()

                self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
                self.bn1 = nn.BatchNorm2d(out_channels)
                self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
                self.bn2 = nn.BatchNorm2d(out_channels)

                # Shortcut连接
                self.shortcut = nn.Sequential()
                if stride != 1 or in_channels != out_channels:
                    self.shortcut = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
                        nn.BatchNorm2d(out_channels)
                    )

            def forward(self, x):
                out = F.relu(self.bn1(self.conv1(x)))
                out = self.bn2(self.conv2(out))
                out += self.shortcut(x)
                out = F.relu(out)
                return out

        # 测试
        block = BasicBlock(64, 128, stride=2)
        x = torch.randn(4, 64, 32, 32)
        output = block(x)
        print(f"输入: {x.shape}")
        print(f"输出: {output.shape}")
        ---

02.Bottleneck残差块
    a.设计思想
        使用1x1卷积降维和升维
        减少计算量
    b.代码实现
        ---
        class Bottleneck(nn.Module):
            expansion = 4

            def __init__(self, in_channels, out_channels, stride=1):
                super(Bottleneck, self).__init__()

                # 1x1降维
                self.conv1 = nn.Conv2d(in_channels, out_channels, 1, bias=False)
                self.bn1 = nn.BatchNorm2d(out_channels)

                # 3x3卷积
                self.conv2 = nn.Conv2d(out_channels, out_channels, 3, stride, 1, bias=False)
                self.bn2 = nn.BatchNorm2d(out_channels)

                # 1x1升维
                self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, 1, bias=False)
                self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)

                # Shortcut
                self.shortcut = nn.Sequential()
                if stride != 1 or in_channels != out_channels * self.expansion:
                    self.shortcut = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels * self.expansion, 1, stride, bias=False),
                        nn.BatchNorm2d(out_channels * self.expansion)
                    )

            def forward(self, x):
                out = F.relu(self.bn1(self.conv1(x)))
                out = F.relu(self.bn2(self.conv2(out)))
                out = self.bn3(self.conv3(out))
                out += self.shortcut(x)
                out = F.relu(out)
                return out

        # 测试
        bottleneck = Bottleneck(256, 64, stride=1)
        x = torch.randn(4, 256, 28, 28)
        output = bottleneck(x)
        print(f"Bottleneck输入: {x.shape}")
        print(f"Bottleneck输出: {output.shape}")
        ---

03.完整ResNet实现
    a.ResNet架构
        ---
        class ResNet(nn.Module):
            def __init__(self, block, num_blocks, num_classes=10):
                super(ResNet, self).__init__()
                self.in_channels = 64

                # 初始卷积
                self.conv1 = nn.Conv2d(3, 64, 3, 1, 1, bias=False)
                self.bn1 = nn.BatchNorm2d(64)

                # 残差层
                self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
                self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
                self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
                self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

                # 全局平均池化和分类器
                self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
                self.fc = nn.Linear(512 * block.expansion, num_classes)

            def _make_layer(self, block, out_channels, num_blocks, stride):
                strides = [stride] + [1] * (num_blocks - 1)
                layers = []
                for stride in strides:
                    layers.append(block(self.in_channels, out_channels, stride))
                    self.in_channels = out_channels * block.expansion
                return nn.Sequential(*layers)

            def forward(self, x):
                out = F.relu(self.bn1(self.conv1(x)))
                out = self.layer1(out)
                out = self.layer2(out)
                out = self.layer3(out)
                out = self.layer4(out)
                out = self.avgpool(out)
                out = out.view(out.size(0), -1)
                out = self.fc(out)
                return out

        # 创建不同版本的ResNet
        def ResNet18():
            return ResNet(BasicBlock, [2, 2, 2, 2])

        def ResNet34():
            return ResNet(BasicBlock, [3, 4, 6, 3])

        def ResNet50():
            return ResNet(Bottleneck, [3, 4, 6, 3])

        # 测试
        model = ResNet18()
        x = torch.randn(4, 3, 32, 32)
        output = model(x)
        print(f"ResNet18输出: {output.shape}")

        # 统计参数
        total_params = sum(p.numel() for p in model.parameters())
        print(f"ResNet18参数量: {total_params:,}")
        ---

04.残差网络变体
    a.Pre-activation ResNet
        ---
        class PreActBlock(nn.Module):
            def __init__(self, in_channels, out_channels, stride=1):
                super(PreActBlock, self).__init__()

                self.bn1 = nn.BatchNorm2d(in_channels)
                self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
                self.bn2 = nn.BatchNorm2d(out_channels)
                self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)

                self.shortcut = nn.Sequential()
                if stride != 1 or in_channels != out_channels:
                    self.shortcut = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, 1, stride, bias=False)
                    )

            def forward(self, x):
                out = F.relu(self.bn1(x))
                shortcut = self.shortcut(out)
                out = self.conv1(out)
                out = self.conv2(F.relu(self.bn2(out)))
                out += shortcut
                return out

        # 测试
        preact_block = PreActBlock(64, 128, stride=2)
        x = torch.randn(4, 64, 32, 32)
        output = preact_block(x)
        print(f"Pre-activation输出: {output.shape}")
        ---
    b.Wide ResNet
        ---
        class WideBasicBlock(nn.Module):
            def __init__(self, in_channels, out_channels, stride=1, dropout=0.3):
                super(WideBasicBlock, self).__init__()

                self.bn1 = nn.BatchNorm2d(in_channels)
                self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
                self.dropout = nn.Dropout(dropout)
                self.bn2 = nn.BatchNorm2d(out_channels)
                self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)

                self.shortcut = nn.Sequential()
                if stride != 1 or in_channels != out_channels:
                    self.shortcut = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, 1, stride, bias=False)
                    )

            def forward(self, x):
                out = self.conv1(F.relu(self.bn1(x)))
                out = self.dropout(out)
                out = self.conv2(F.relu(self.bn2(out)))
                out += self.shortcut(x)
                return out

        # 测试
        wide_block = WideBasicBlock(64, 128, stride=2)
        x = torch.randn(4, 64, 32, 32)
        output = wide_block(x)
        print(f"Wide ResNet输出: {output.shape}")
        ---

5.2 迁移学习

01.迁移学习原理
    a.基本概念
        利用预训练模型的知识
        在新任务上微调
    b.适用场景
        数据量较小
        任务相似度高
        计算资源有限
    c.迁移策略
        ---
        # 策略1: 特征提取(冻结预训练层)
        # 策略2: 微调(fine-tuning)
        # 策略3: 从头训练最后几层
        ---

02.使用预训练模型
    a.加载预训练ResNet
        ---
        import torchvision.models as models

        # 加载预训练的ResNet18
        model = models.resnet18(pretrained=True)
        print(model)

        # 查看模型结构
        print(f"\n最后一层: {model.fc}")

        # 测试
        x = torch.randn(1, 3, 224, 224)
        output = model(x)
        print(f"输出形状: {output.shape}")  # (1, 1000) - ImageNet类别数
        ---
    b.修改分类器
        ---
        # 替换最后的全连接层
        num_classes = 10  # 新任务的类别数
        num_features = model.fc.in_features
        model.fc = nn.Linear(num_features, num_classes)

        print(f"新的分类器: {model.fc}")

        # 测试
        x = torch.randn(1, 3, 224, 224)
        output = model(x)
        print(f"新输出形状: {output.shape}")  # (1, 10)
        ---

03.特征提取
    a.冻结预训练层
        ---
        # 冻结所有层
        for param in model.parameters():
            param.requires_grad = False

        # 只训练新的分类器
        model.fc = nn.Linear(num_features, num_classes)

        # 验证
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        total_params = sum(p.numel() for p in model.parameters())
        print(f"可训练参数: {trainable_params:,}")
        print(f"总参数: {total_params:,}")
        print(f"冻结比例: {(1 - trainable_params/total_params)*100:.2f}%")
        ---
    b.训练分类器
        ---
        import torch.optim as optim
        from torch.utils.data import DataLoader, TensorDataset

        # 准备数据
        X_train = torch.randn(1000, 3, 224, 224)
        y_train = torch.randint(0, num_classes, (1000,))
        train_loader = DataLoader(TensorDataset(X_train, y_train),
                                 batch_size=32, shuffle=True)

        # 只优化分类器参数
        optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        # 训练
        model.train()
        for epoch in range(5):
            total_loss = 0
            correct = 0
            total = 0

            for data, target in train_loader:
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()

            avg_loss = total_loss / len(train_loader)
            accuracy = 100. * correct / total
            print(f'Epoch {epoch+1}, Loss: {avg_loss:.4f}, Acc: {accuracy:.2f}%')
        ---

04.微调(Fine-tuning)
    a.部分解冻
        ---
        # 加载预训练模型
        model = models.resnet18(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)

        # 冻结前面的层
        for name, param in model.named_parameters():
            if 'layer4' not in name and 'fc' not in name:
                param.requires_grad = False

        # 查看可训练层
        print("可训练层:")
        for name, param in model.named_parameters():
            if param.requires_grad:
                print(f"  {name}: {param.shape}")
        ---
    b.差异化学习率
        ---
        # 为不同层设置不同学习率
        params = [
            {'params': model.layer4.parameters(), 'lr': 1e-4},
            {'params': model.fc.parameters(), 'lr': 1e-3}
        ]
        optimizer = optim.Adam(params)

        # 训练
        for epoch in range(10):
            model.train()
            for data, target in train_loader:
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
        ---
    c.逐层解冻
        ---
        def unfreeze_layers(model, layer_names):
            for name, param in model.named_parameters():
                if any(layer in name for layer in layer_names):
                    param.requires_grad = True

        # 第一阶段:只训练分类器
        model = models.resnet18(pretrained=True)
        for param in model.parameters():
            param.requires_grad = False
        model.fc = nn.Linear(model.fc.in_features, num_classes)

        optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
        # 训练几个epoch...

        # 第二阶段:解冻layer4
        unfreeze_layers(model, ['layer4', 'fc'])
        optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()),
                              lr=0.0001)
        # 继续训练...

        # 第三阶段:解冻更多层
        unfreeze_layers(model, ['layer3', 'layer4', 'fc'])
        optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()),
                              lr=0.00001)
        # 继续训练...
        ---

05.完整迁移学习示例
    a.CIFAR-10迁移学习
        ---
        import torchvision
        import torchvision.transforms as transforms

        # 数据预处理
        transform = transforms.Compose([
            transforms.Resize(224),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        ])

        # 加载CIFAR-10
        train_dataset = torchvision.datasets.CIFAR10(
            root='./data', train=True, download=True, transform=transform
        )
        test_dataset = torchvision.datasets.CIFAR10(
            root='./data', train=False, download=True, transform=transform
        )

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        # 加载预训练模型
        model = models.resnet18(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, 10)

        # 冻结除最后一层外的所有层
        for name, param in model.named_parameters():
            if 'fc' not in name:
                param.requires_grad = False

        # 训练
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = model.to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

        def train_epoch(model, loader, criterion, optimizer):
            model.train()
            total_loss = 0
            correct = 0
            total = 0

            for data, target in loader:
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()

            return total_loss / len(loader), 100. * correct / total

        def test(model, loader, criterion):
            model.eval()
            total_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for data, target in loader:
                    data, target = data.to(device), target.to(device)
                    output = model(data)
                    loss = criterion(output, target)

                    total_loss += loss.item()
                    _, predicted = output.max(1)
                    total += target.size(0)
                    correct += predicted.eq(target).sum().item()

            return total_loss / len(loader), 100. * correct / total

        # 训练循环
        for epoch in range(10):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
            test_loss, test_acc = test(model, test_loader, criterion)

            print(f'Epoch {epoch+1}:')
            print(f'  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
            print(f'  Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')
        ---

06.数据增强
    a.常用增强方法
        ---
        # 训练时的数据增强
        train_transform = transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        ])

        # 测试时只做基本预处理
        test_transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        ])
        ---
    b.自定义增强
        ---
        class CustomAugmentation:
            def __init__(self, p=0.5):
                self.p = p

            def __call__(self, img):
                if torch.rand(1) < self.p:
                    # 添加高斯噪声
                    noise = torch.randn_like(img) * 0.1
                    img = img + noise
                    img = torch.clamp(img, 0, 1)
                return img

        # 使用
        transform = transforms.Compose([
            transforms.ToTensor(),
            CustomAugmentation(p=0.5),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        ])
        ---
    c.Mixup增强
        ---
        def mixup_data(x, y, alpha=1.0):
            if alpha > 0:
                lam = torch.distributions.Beta(alpha, alpha).sample()
            else:
                lam = 1

            batch_size = x.size(0)
            index = torch.randperm(batch_size)

            mixed_x = lam * x + (1 - lam) * x[index]
            y_a, y_b = y, y[index]

            return mixed_x, y_a, y_b, lam

        def mixup_criterion(criterion, pred, y_a, y_b, lam):
            return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

        # 训练时使用
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)

            # Mixup
            data, target_a, target_b, lam = mixup_data(data, target, alpha=1.0)

            optimizer.zero_grad()
            output = model(data)
            loss = mixup_criterion(criterion, output, target_a, target_b, lam)
            loss.backward()
            optimizer.step()
        ---

5.3 批归一化

01.BatchNorm原理
    a.数学公式
        ---
        # 对于每个特征维度:
        μ_B = 1/m * Σx_i                    # 批次均值
        σ²_B = 1/m * Σ(x_i - μ_B)²         # 批次方差
        x̂_i = (x_i - μ_B) / √(σ²_B + ε)   # 归一化
        y_i = γ * x̂_i + β                  # 缩放和平移

        其中γ和β是可学习参数
        ---
    b.BatchNorm1d(用于全连接层)
        ---
        # 输入形状: (N, C) 或 (N, C, L)
        bn1d = nn.BatchNorm1d(num_features=20)

        # 全连接层后使用
        x = torch.randn(32, 20)
        output = bn1d(x)
        print(f"输入: {x.shape}, 输出: {output.shape}")

        # 查看可学��参数
        print(f"gamma (weight): {bn1d.weight.shape}")
        print(f"beta (bias): {bn1d.bias.shape}")

        # 查看运行统计量
        print(f"running_mean: {bn1d.running_mean.shape}")
        print(f"running_var: {bn1d.running_var.shape}")
        ---
    c.BatchNorm2d(用于卷积层)
        ---
        # 输入形状: (N, C, H, W)
        bn2d = nn.BatchNorm2d(num_features=64)

        # 卷积层后使用
        x = torch.randn(32, 64, 28, 28)
        output = bn2d(x)
        print(f"输入: {x.shape}, 输出: {output.shape}")

        # 完整卷积块
        conv_block = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        x = torch.randn(16, 3, 32, 32)
        output = conv_block(x)
        print(f"卷积块输出: {output.shape}")
        ---

02.BatchNorm的使用
    a.训练和评估模式
        ---
        bn = nn.BatchNorm2d(64)

        # 训练模式:使用批次统计量
        bn.train()
        x_train = torch.randn(32, 64, 28, 28)
        output_train = bn(x_train)
        print(f"训练模式 - 均值: {output_train.mean():.4f}, 标准差: {output_train.std():.4f}")

        # 评估模式:使用运行统计量
        bn.eval()
        x_test = torch.randn(1, 64, 28, 28)
        output_test = bn(x_test)
        print(f"评估模式 - 均值: {output_test.mean():.4f}, 标准差: {output_test.std():.4f}")
        ---
    b.完整网络示例
        ---
        class BNNet(nn.Module):
            def __init__(self, num_classes=10):
                super(BNNet, self).__init__()

                self.features = nn.Sequential(
                    # Block 1
                    nn.Conv2d(3, 64, 3, padding=1),
                    nn.BatchNorm2d(64),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    # Block 2
                    nn.Conv2d(64, 128, 3, padding=1),
                    nn.BatchNorm2d(128),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),

                    # Block 3
                    nn.Conv2d(128, 256, 3, padding=1),
                    nn.BatchNorm2d(256),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2, 2),
                )

                self.classifier = nn.Sequential(
                    nn.Linear(256 * 4 * 4, 512),
                    nn.BatchNorm1d(512),
                    nn.ReLU(inplace=True),
                    nn.Dropout(0.5),
                    nn.Linear(512, num_classes)
                )

            def forward(self, x):
                x = self.features(x)
                x = x.view(x.size(0), -1)
                x = self.classifier(x)
                return x

        # 测试
        model = BNNet(num_classes=10)
        x = torch.randn(16, 3, 32, 32)

        model.train()
        output = model(x)
        print(f"输出: {output.shape}")
        ---

03.其他归一化方法
    a.LayerNorm
        理论:对每个样本的所有特征进行归一化
        ---
        # 输入形状: (N, *)
        ln = nn.LayerNorm(normalized_shape=64)

        # 用于序列数据
        x = torch.randn(32, 10, 64)  # (batch, seq_len, features)
        output = ln(x)
        print(f"LayerNorm输出: {output.shape}")

        # 验证归一化效果
        print(f"每个样本的均值: {output[0].mean():.4f}")
        print(f"每个样本的标准差: {output[0].std():.4f}")
        ---
    b.InstanceNorm
        理论:对每个样本的每个通道分别归一化
        ---
        # 用于风格迁移等任务
        in_norm = nn.InstanceNorm2d(64)

        x = torch.randn(32, 64, 28, 28)
        output = in_norm(x)
        print(f"InstanceNorm输出: {output.shape}")

        # 验证:每个通道独立归一化
        print(f"第一个样本第一个通道均值: {output[0, 0].mean():.4f}")
        print(f"第一个样本第一个通道标准差: {output[0, 0].std():.4f}")
        ---
    c.GroupNorm
        理论:将通道分组后进行归一化
        ---
        # num_groups: 分组数量
        # num_channels: 通道数(必须能被num_groups整除)
        gn = nn.GroupNorm(num_groups=8, num_channels=64)

        x = torch.randn(32, 64, 28, 28)
        output = gn(x)
        print(f"GroupNorm输出: {output.shape}")

        # 对比不同归一化方法
        bn = nn.BatchNorm2d(64)
        ln = nn.LayerNorm([64, 28, 28])
        in_norm = nn.InstanceNorm2d(64)

        x = torch.randn(4, 64, 28, 28)
        print(f"BatchNorm: {bn(x).shape}")
        print(f"LayerNorm: {ln(x).shape}")
        print(f"InstanceNorm: {in_norm(x).shape}")
        print(f"GroupNorm: {gn(x).shape}")
        ---

04.归一化对比实验
    a.性能对比
        ---
        import time

        def train_with_norm(model, train_loader, epochs=5):
            criterion = nn.CrossEntropyLoss()
            optimizer = optim.Adam(model.parameters(), lr=0.001)

            start_time = time.time()
            for epoch in range(epochs):
                model.train()
                total_loss = 0
                for data, target in train_loader:
                    optimizer.zero_grad()
                    output = model(data)
                    loss = criterion(output, target)
                    loss.backward()
                    optimizer.step()
                    total_loss += loss.item()

                avg_loss = total_loss / len(train_loader)
                print(f'Epoch {epoch+1}, Loss: {avg_loss:.4f}')

            elapsed = time.time() - start_time
            return elapsed

        # 准备数据
        X = torch.randn(1000, 3, 32, 32)
        y = torch.randint(0, 10, (1000,))
        train_loader = DataLoader(TensorDataset(X, y), batch_size=32)

        # 无归一化
        model_no_norm = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64 * 16 * 16, 10)
        )

        # BatchNorm
        model_bn = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64 * 16 * 16, 10)
        )

        print("无归一化:")
        time_no_norm = train_with_norm(model_no_norm, train_loader)
        print(f"训练时间: {time_no_norm:.2f}秒\n")

        print("BatchNorm:")
        time_bn = train_with_norm(model_bn, train_loader)
        print(f"训练时间: {time_bn:.2f}秒")
        ---

6 实战项目

6.1 文本情感分析

01.项目概述
    a.任务目标
        分析文本的情感倾向(正面/负面/中性)
        NLP基础应用
        序列模型实践
    b.数据集
        IMDB电影评论数据集
        25,000训练样本
        25,000测试样本
        二分类问题
    c.技术路线
        文本预处理
        词嵌入
        RNN/LSTM模型
        注意力机制

02.数据预处理
    a.文本清洗和分词
        ---
        import re
        import nltk
        from nltk.corpus import stopwords
        from nltk.tokenize import word_tokenize

        # 下载必要资源
        nltk.download('punkt')
        nltk.download('stopwords')
        nltk.download('wordnet')

        # 文本清洗函数
        def clean_text(text):
            # 转为小写
            text = text.lower()
            # 移除HTML标签
            text = re.sub(r'<.*?>', '', text)
            # 移除URL
            text = re.sub(r'http\S+|www\S+', '', text)
            # 移除非字母字符
            text = re.sub(r'[^a-zA-Z\s]', '', text)
            # 分词
            words = word_tokenize(text)
            # 移除停用词
            stop_words = set(stopwords.words('english'))
            words = [word for word in words if word not in stop_words]
            return ' '.join(words)

        # 测试清洗函数
        sample_text = "This movie was <b>amazing</b>! I loved it. Check out https://example.com"
        cleaned = clean_text(sample_text)
        print(f"原始文本: {sample_text}")
        print(f"清洗后: {cleaned}")
        ---
    b.词汇表构建
        ---
        import torch
        from torch.utils.data import Dataset, DataLoader
        import numpy as np

        class Vocabulary:
            def __init__(self):
                self.word2idx = {"<PAD>": 0, "<UNK>": 1}
                self.idx2word = {0: "<PAD>", 1: "<UNK>"}
                self.word_counts = {}

            def add_word(self, word):
                if word not in self.word_counts:
                    self.word_counts[word] = 0
                self.word_counts[word] += 1

            def build_vocab(self, min_freq=2):
                # 按频率排序
                sorted_words = sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True)

                # 添加高频词到词汇表
                for word, count in sorted_words:
                    if count >= min_freq and word not in self.word2idx:
                        idx = len(self.word2idx)
                        self.word2idx[word] = idx
                        self.idx2word[idx] = word

            def encode(self, text):
                words = word_tokenize(text.lower())
                return [self.word2idx.get(word, 1) for word in words]

            def decode(self, indices):
                return ' '.join([self.idx2word[idx] for idx in indices if idx != 0])

            def __len__(self):
                return len(self.word2idx)

        # 创建词汇表
        vocab = Vocabulary()

        # 模拟添加词汇(实际使用时从数据集构建)
        sample_texts = [
            "this movie was great",
            "i really liked this film",
            "terrible acting and plot",
            "amazing cinematography"
        ]

        for text in sample_texts:
            cleaned = clean_text(text)
            words = word_tokenize(cleaned)
            for word in words:
                vocab.add_word(word)

        vocab.build_vocab(min_freq=1)
        print(f"词汇表大小: {len(vocab)}")
        print(f"前10个词: {list(vocab.word2idx.items())[:10]}")
        ---
    c.数据集类
        ---
        class SentimentDataset(Dataset):
            def __init__(self, texts, labels, vocab, max_length=100):
                self.texts = texts
                self.labels = labels
                self.vocab = vocab
                self.max_length = max_length

            def __len__(self):
                return len(self.texts)

            def __getitem__(self, idx):
                text = self.texts[idx]
                label = self.labels[idx]

                # 清洗和编码文本
                cleaned_text = clean_text(text)
                encoded = self.vocab.encode(cleaned_text)

                # 填充或截断
                if len(encoded) < self.max_length:
                    encoded.extend([0] * (self.max_length - len(encoded)))
                else:
                    encoded = encoded[:self.max_length]

                return {
                    'input_ids': torch.tensor(encoded, dtype=torch.long),
                    'label': torch.tensor(label, dtype=torch.long),
                    'length': torch.tensor(min(len(cleaned_text.split()), self.max_length))
                }

        # 创建示例数据集
        texts = [
            "This movie was absolutely fantastic! Great acting and story.",
            "I hated this film. Boring and predictable.",
            "Not bad, but could be better. Average movie overall.",
            "One of the best movies I've ever seen! Highly recommend."
        ]
        labels = [1, 0, 0, 1]  # 1: 正面, 0: 负面

        dataset = SentimentDataset(texts, labels, vocab, max_length=50)
        sample = dataset[0]
        print(f"输入ID: {sample['input_ids'][:10]}...")
        print(f"标签: {sample['label']}")
        print(f"长度: {sample['length']}")
        ---

03.词嵌入
    a.词嵌入层
        ---
        import torch.nn as nn

        class EmbeddingLayer(nn.Module):
            def __init__(self, vocab_size, embedding_dim, pretrained_embeddings=None):
                super(EmbeddingLayer, self).__init__()
                self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

                # 如果有预训练嵌入,加载它们
                if pretrained_embeddings is not None:
                    self.embedding.weight.data.copy_(pretrained_embeddings)
                    # 设置UNK词向量为随机初始化
                    self.embedding.weight.data[1].normal_(0, 0.1)

            def forward(self, input_ids):
                return self.embedding(input_ids)

        # 创建嵌入层
        embedding_dim = 100
        embedding_layer = EmbeddingLayer(len(vocab), embedding_dim)

        # 测试
        sample_input = torch.randint(0, len(vocab), (2, 10))  # batch_size=2, seq_len=10
        embedded = embedding_layer(sample_input)
        print(f"输入形状: {sample_input.shape}")
        print(f"嵌入后形状: {embedded.shape}")
        ---
    b.使用预训练词向量
        ---
        # 模拟加载预训练词向量(实际使用时从文件加载)
        def load_pretrained_embeddings(vocab, embedding_file=None, embedding_dim=100):
            vocab_size = len(vocab)
            embeddings = torch.randn(vocab_size, embedding_dim) * 0.1

            # 模拟一些预训练向量
            pretrained_words = {
                'good': torch.tensor([0.5, 0.3, 0.8, -0.2] + [0.1] * (embedding_dim - 4)),
                'bad': torch.tensor([-0.5, -0.3, -0.8, 0.2] + [-0.1] * (embedding_dim - 4)),
                'movie': torch.tensor([0.1, 0.2, 0.1, -0.1] + [0.0] * (embedding_dim - 4)),
                'film': torch.tensor([0.1, 0.2, 0.1, -0.1] + [0.0] * (embedding_dim - 4)),
            }

            for word, vector in pretrained_words.items():
                if word in vocab.word2idx:
                    idx = vocab.word2idx[word]
                    embeddings[idx] = vector

            return embeddings

        # 加载预训练嵌入
        pretrained_embeddings = load_pretrained_embeddings(vocab, embedding_dim=embedding_dim)
        embedding_layer_pretrained = EmbeddingLayer(len(vocab), embedding_dim, pretrained_embeddings)
        ---

04.LSTM情感分类模型
    a.基本LSTM模型
        ---
        class LSTMSentimentClassifier(nn.Module):
            def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=2, dropout=0.5):
                super(LSTMSentimentClassifier, self).__init__()

                # 词嵌入
                self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

                # LSTM层
                self.lstm = nn.LSTM(
                    embedding_dim,
                    hidden_dim,
                    num_layers=num_layers,
                    batch_first=True,
                    dropout=dropout if num_layers > 1 else 0,
                    bidirectional=True
                )

                # Dropout
                self.dropout = nn.Dropout(dropout)

                # 分类器
                self.classifier = nn.Linear(hidden_dim * 2, output_dim)  # *2因为双向LSTM

            def forward(self, input_ids, lengths=None):
                # 词嵌入
                embedded = self.embedding(input_ids)  # (batch_size, seq_len, embedding_dim)

                # 打包序列(变长序列处理)
                if lengths is not None:
                    embedded = nn.utils.rnn.pack_padded_sequence(
                        embedded, lengths.cpu(), batch_first=True, enforce_sorted=False
                    )

                # LSTM
                lstm_out, (hidden, cell) = self.lstm(embedded)

                # 如果使用了打包,需要解包
                if lengths is not None:
                    lstm_out, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)

                # 使用最后时刻的隐藏状态
                # 双向LSTM:前向和后向拼接
                hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)

                # Dropout和分类
                hidden = self.dropout(hidden)
                output = self.classifier(hidden)

                return output

        # 创建模型
        hidden_dim = 128
        output_dim = 2  # 二分类
        model = LSTMSentimentClassifier(
            vocab_size=len(vocab),
            embedding_dim=embedding_dim,
            hidden_dim=hidden_dim,
            output_dim=output_dim,
            num_layers=2,
            dropout=0.5
        )

        print(f"模型参数数量: {sum(p.numel() for p in model.parameters()):,}")
        ---
    b.带注意力机制的模型
        ---
        class Attention(nn.Module):
            def __init__(self, hidden_dim):
                super(Attention, self).__init__()
                self.attention = nn.Sequential(
                    nn.Linear(hidden_dim * 2, hidden_dim * 2),
                    nn.Tanh(),
                    nn.Linear(hidden_dim * 2, 1)
                )

            def forward(self, lstm_output):
                # lstm_output: (batch_size, seq_len, hidden_dim * 2)
                attention_weights = self.attention(lstm_output)  # (batch_size, seq_len, 1)
                attention_weights = torch.softmax(attention_weights, dim=1)

                # 加权求和
                context_vector = torch.sum(attention_weights * lstm_output, dim=1)
                return context_vector, attention_weights

        class AttentionLSTMClassifier(nn.Module):
            def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=2, dropout=0.5):
                super(AttentionLSTMClassifier, self).__init__()

                self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
                self.lstm = nn.LSTM(
                    embedding_dim,
                    hidden_dim,
                    num_layers=num_layers,
                    batch_first=True,
                    dropout=dropout if num_layers > 1 else 0,
                    bidirectional=True
                )
                self.attention = Attention(hidden_dim)
                self.dropout = nn.Dropout(dropout)
                self.classifier = nn.Linear(hidden_dim * 2, output_dim)

            def forward(self, input_ids, lengths=None):
                embedded = self.embedding(input_ids)

                if lengths is not None:
                    embedded = nn.utils.rnn.pack_padded_sequence(
                        embedded, lengths.cpu(), batch_first=True, enforce_sorted=False
                    )

                lstm_out, _ = self.lstm(embedded)

                if lengths is not None:
                    lstm_out, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)

                # 注意力机制
                context_vector, attention_weights = self.attention(lstm_out)

                # 分类
                output = self.classifier(self.dropout(context_vector))

                return output, attention_weights

        # 创建注意力模型
        attention_model = AttentionLSTMClassifier(
            vocab_size=len(vocab),
            embedding_dim=embedding_dim,
            hidden_dim=hidden_dim,
            output_dim=output_dim
        )

        print(f"注意力模型参数数量: {sum(p.numel() for p in attention_model.parameters()):,}")
        ---

05.训练和评估
    a.训练函数
        ---
        import torch.optim as optim
        from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

        def train_sentiment_model(model, train_loader, val_loader, num_epochs=10, learning_rate=0.001):
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model = model.to(device)

            # 损失函数和优化器
            criterion = nn.CrossEntropyLoss()
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)
            scheduler = optim.lr_scheduler.ReduceLROnPlateau(
                optimizer, mode='min', factor=0.5, patience=2, verbose=True
            )

            # 记录训练过程
            train_losses = []
            val_losses = []
            train_accs = []
            val_accs = []
            best_val_acc = 0
            best_model = None

            for epoch in range(num_epochs):
                # 训练阶段
                model.train()
                train_loss = 0
                train_predictions = []
                train_targets = []

                for batch in train_loader:
                    input_ids = batch['input_ids'].to(device)
                    labels = batch['label'].to(device)
                    lengths = batch['length'].to(device)

                    optimizer.zero_grad()

                    if isinstance(model, AttentionLSTMClassifier):
                        outputs, _ = model(input_ids, lengths)
                    else:
                        outputs = model(input_ids, lengths)

                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()

                    train_loss += loss.item()
                    _, predicted = torch.max(outputs.data, 1)
                    train_predictions.extend(predicted.cpu().numpy())
                    train_targets.extend(labels.cpu().numpy())

                train_loss /= len(train_loader)
                train_acc = accuracy_score(train_targets, train_predictions) * 100

                # 验证阶段
                val_loss, val_acc, val_predictions, val_targets = evaluate_sentiment_model(
                    model, val_loader, criterion, device
                )

                # 学习率调度
                scheduler.step(val_loss)

                # 记录结果
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                train_accs.append(train_acc)
                val_accs.append(val_acc)

                # 保存最佳模型
                if val_acc > best_val_acc:
                    best_val_acc = val_acc
                    best_model = model.state_dict().copy()

                # 打印进度
                print(f'Epoch {epoch+1}/{num_epochs}:')
                print(f'  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
                print(f'  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
                print(f'  Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')
                print('-' * 50)

            # 加载最佳模型
            model.load_state_dict(best_model)
            return model, train_losses, val_losses, train_accs, val_accs

        def evaluate_sentiment_model(model, data_loader, criterion, device):
            model.eval()
            total_loss = 0
            predictions = []
            targets = []

            with torch.no_grad():
                for batch in data_loader:
                    input_ids = batch['input_ids'].to(device)
                    labels = batch['label'].to(device)
                    lengths = batch['length'].to(device)

                    if isinstance(model, AttentionLSTMClassifier):
                        outputs, _ = model(input_ids, lengths)
                    else:
                        outputs = model(input_ids, lengths)

                    loss = criterion(outputs, labels)
                    total_loss += loss.item()

                    _, predicted = torch.max(outputs.data, 1)
                    predictions.extend(predicted.cpu().numpy())
                    targets.extend(labels.cpu().numpy())

            avg_loss = total_loss / len(data_loader)
            accuracy = accuracy_score(targets, predictions) * 100

            return avg_loss, accuracy, predictions, targets
        ---
    b.模型训练
        ---
        # 创建模拟数据加载器
        from torch.utils.data import DataLoader

        # 生成更多示例数据
        np.random.seed(42)
        num_samples = 1000
        texts = [f"This is sample text number {i}" for i in range(num_samples)]
        labels = np.random.randint(0, 2, num_samples).tolist()

        # 划分数据集
        from sklearn.model_selection import train_test_split
        train_texts, val_texts, train_labels, val_labels = train_test_split(
            texts, labels, test_size=0.2, random_state=42
        )

        # 创建数据集
        train_dataset = SentimentDataset(train_texts, train_labels, vocab, max_length=50)
        val_dataset = SentimentDataset(val_texts, val_labels, vocab, max_length=50)

        # 创建数据加载器
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

        # 训练基本LSTM模型
        print("训练基本LSTM模型...")
        lstm_model, lstm_train_losses, lstm_val_losses,
        lstm_train_accs, lstm_val_accs = train_sentiment_model(
            model, train_loader, val_loader, num_epochs=5, learning_rate=0.001
        )

        # 训练注意力模型
        print("\n训练注意力LSTM模型...")
        attention_model, att_train_losses, att_val_losses,
        att_train_accs, att_val_accs = train_sentiment_model(
            attention_model, train_loader, val_loader, num_epochs=5, learning_rate=0.001
        )
        ---

06.模型解释和可视化
    a.注意力权重可视化
        ---
        import matplotlib.pyplot as plt
        import seaborn as sns

        def visualize_attention(model, text, vocab):
            """可视化注意力权重"""
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model.eval()

            # 预处理文本
            cleaned_text = clean_text(text)
            words = word_tokenize(cleaned_text)
            encoded = vocab.encode(words)

            # 创建输入
            input_ids = torch.tensor([encoded + [0] * (50 - len(encoded))], dtype=torch.long).to(device)
            length = torch.tensor([len(encoded)], dtype=torch.long).to(device)

            # 获取注意力权重
            with torch.no_grad():
                output, attention_weights = model(input_ids, length)

            # 提取注意力权重
            attention = attention_weights[0, :len(encoded)].cpu().numpy()

            # 可视化
            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6))

            # 文本和注意力权重
            ax1.bar(range(len(words)), attention)
            ax1.set_xticks(range(len(words)))
            ax1.set_xticklabels(words, rotation=45, ha='right')
            ax1.set_ylabel('Attention Weight')
            ax1.set_title('Attention Weights for Each Word')
            ax1.grid(True, alpha=0.3)

            # 热图
            attention_matrix = attention.reshape(1, -1)
            im = ax2.imshow(attention_matrix, cmap='Reds', aspect='auto')
            ax2.set_xticks(range(len(words)))
            ax2.set_xticklabels(words, rotation=45, ha='right')
            ax2.set_yticks([0])
            ax2.set_yticklabels(['Attention'])
            ax2.set_title('Attention Heatmap')

            # 添加颜色条
            cbar = plt.colorbar(im, ax=ax2, orientation='horizontal', pad=0.1)
            cbar.set_label('Attention Weight')

            plt.tight_layout()
            plt.show()

            # 预测结果
            _, predicted = torch.max(output.data, 1)
            sentiment = "Positive" if predicted.item() == 1 else "Negative"
            confidence = torch.softmax(output, dim=1)[0, predicted.item()].item()

            print(f"文本: {text}")
            print(f"预测情感: {sentiment}")
            print(f"置信度: {confidence:.3f}")

            return words, attention

        # 测试注意力可视化
        test_text = "This movie was absolutely amazing and I loved every minute of it"
        words, attention = visualize_attention(attention_model, test_text, vocab)
        ---
    b.特征重要性分析
        ---
        def analyze_feature_importance(model, data_loader, vocab, top_k=20):
            """分析词汇重要性"""
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model.eval()

            # 统计每个词的平均注意力权重
            word_importance = {}
            word_count = {}

            with torch.no_grad():
                for batch in data_loader:
                    input_ids = batch['input_ids'].to(device)
                    labels = batch['label'].to(device)
                    lengths = batch['length'].to(device)

                    if isinstance(model, AttentionLSTMClassifier):
                        outputs, attention_weights = model(input_ids, lengths)

                        for i in range(input_ids.size(0)):
                            seq_len = lengths[i].item()
                            attention = attention_weights[i, :seq_len].cpu().numpy()
                            ids = input_ids[i, :seq_len].cpu().numpy()

                            for word_id, attn in zip(ids, attention):
                                word = vocab.idx2word.get(word_id, "<UNK>")
                                if word != "<PAD>":
                                    if word not in word_importance:
                                        word_importance[word] = 0
                                        word_count[word] = 0
                                    word_importance[word] += attn
                                    word_count[word] += 1

            # 计算平均注意力
            for word in word_importance:
                word_importance[word] /= word_count[word]

            # 排序并可视化
            sorted_words = sorted(word_importance.items(), key=lambda x: x[1], reverse=True)
            top_words = sorted_words[:top_k]

            plt.figure(figsize=(12, 8))
            words, importances = zip(*top_words)
            plt.barh(range(len(words)), importances)
            plt.yticks(range(len(words)), words)
            plt.xlabel('Average Attention Weight')
            plt.title(f'Top {top_k} Most Important Words')
            plt.gca().invert_yaxis()
            plt.tight_layout()
            plt.show()

            return top_words

        # 分析词汇重要性
        important_words = analyze_feature_importance(attention_model, val_loader, vocab, top_k=20)
        ---

6.2 时间序列预测

01.项目概述
    a.任务目标
        预测股票价格或股票走势
        多变量时间序列分析
        使用历史数据预测未来
    b.数据来源
        股票价格数据(开盘价、最高价、最低价、收盘价、成交量)
        技术指标(移动平均线、RSI等)
        可能的外部因素(新闻情绪、市场指数等)
    c.技术挑战
        时间序列的非平稳性
        噪声影响
        长短期依赖关系

02.数据准备
    a.数据加载和预处理
        ---
        import pandas as pd
        import numpy as np
        from sklearn.preprocessing import MinMaxScaler

        # 加载股票数据(示例使用模拟数据)
        def generate_stock_data(start_date='2020-01-01', end_date='2023-12-31'):
            """生成模拟股票数据"""
            dates = pd.date_range(start=start_date, end=end_date, freq='D')

            # 生成基础价格序列(带趋势)
            np.random.seed(42)
            base_price = 100
            returns = np.random.normal(0.0005, 0.02, len(dates))
            price_series = base_price * np.exp(np.cumsum(returns))

            # 添加季节性
            seasonal = 5 * np.sin(2 * np.pi * np.arange(len(dates)) / 252)

            # 生成OHLC数据
            high_noise = np.random.uniform(0, 0.03, len(dates))
            low_noise = np.random.uniform(-0.03, 0, len(dates))

            data = pd.DataFrame({
                'Date': dates,
                'Close': price_series + seasonal,
                'Volume': np.random.randint(1000000, 10000000, len(dates)),
                'High': price_series * (1 + high_noise),
                'Low': price_series * (1 + low_noise),
                'Open': price_series * (1 + np.random.uniform(-0.01, 0.01, len(dates)))
            })

            data.set_index('Date', inplace=True)
            return data

        # 生成数据
        stock_data = generate_stock_data()
        print(f"数据形状: {stock_data.shape}")
        print("\n数据概览:")
        print(stock_data.head())

        # 数据可视化
        import matplotlib.pyplot as plt

        plt.figure(figsize=(15, 10))

        plt.subplot(2, 2, 1)
        plt.plot(stock_data.index, stock_data['Close'])
        plt.title('Closing Price')
        plt.xlabel('Date')
        plt.ylabel('Price')

        plt.subplot(2, 2, 2)
        plt.plot(stock_data.index, stock_data['Volume'])
        plt.title('Volume')
        plt.xlabel('Date')
        plt.ylabel('Volume')

        plt.subplot(2, 2, 3)
        plt.plot(stock_data.index, stock_data['High'], label='High', alpha=0.7)
        plt.plot(stock_data.index, stock_data['Low'], label='Low', alpha=0.7)
        plt.plot(stock_data.index, stock_data['Open'], label='Open', alpha=0.7)
        plt.title('OHLC Prices')
        plt.xlabel('Date')
        plt.ylabel('Price')
        plt.legend()

        plt.subplot(2, 2, 4)
        daily_returns = stock_data['Close'].pct_change()
        plt.hist(daily_returns.dropna(), bins=50, alpha=0.7)
        plt.title('Daily Returns Distribution')
        plt.xlabel('Daily Return')
        plt.ylabel('Frequency')

        plt.tight_layout()
        plt.show()
        ---
    b.特征工程
        ---
        def create_features(df):
            """创建技术指标特征"""
            df = df.copy()

            # 价格变化
            df['Returns'] = df['Close'].pct_change()
            df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))

            # 移动平均线
            for window in [5, 10, 20, 50]:
                df[f'MA_{window}'] = df['Close'].rolling(window=window).mean()
                df[f'Close_MA_{window}_Ratio'] = df['Close'] / df[f'MA_{window}']

            # 指数移动平均线
            for span in [12, 26]:
                df[f'EMA_{span}'] = df['Close'].ewm(span=span).mean()

            # MACD
            df['MACD'] = df['EMA_12'] - df['EMA_26']
            df['MACD_Signal'] = df['MACD'].ewm(span=9).mean()
            df['MACD_Histogram'] = df['MACD'] - df['MACD_Signal']

            # RSI (相对强弱指数)
            def calculate_rsi(prices, window=14):
                delta = prices.diff()
                gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
                loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
                rs = gain / loss
                rsi = 100 - (100 / (1 + rs))
                return rsi

            df['RSI'] = calculate_rsi(df['Close'])

            # 布林带
            df['BB_Upper'] = df['Close'].rolling(window=20).mean() + 2 * df['Close'].rolling(window=20).std()
            df['BB_Lower'] = df['Close'].rolling(window=20).mean() - 2 * df['Close'].rolling(window=20).std()
            df['BB_Position'] = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower'])

            # 成交量指标
            df['Volume_MA'] = df['Volume'].rolling(window=20).mean()
            df['Volume_Ratio'] = df['Volume'] / df['Volume_MA']

            # 价格波动性
            df['Volatility'] = df['Returns'].rolling(window=20).std() * np.sqrt(252)

            # 星期几、月份等时间特征
            df['Day_of_Week'] = df.index.dayofweek
            df['Month'] = df.index.month
            df['Quarter'] = df.index.quarter

            return df

        # 创建特征
        stock_features = create_features(stock_data)

        # 移除NaN值
        stock_features = stock_features.dropna()

        print(f"特征工程后数据形状: {stock_features.shape}")
        print("\n特征列:")
        print(stock_features.columns.tolist())

        # 相关性分析
        correlation_matrix = stock_features.corr()
        plt.figure(figsize=(15, 12))
        mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
        sns.heatmap(correlation_matrix, mask=mask, cmap='coolwarm', center=0,
                   square=True, linewidths=.5, cbar_kws={"shrink": .8})
        plt.title('Feature Correlation Matrix')
        plt.tight_layout()
        plt.show()
        ---
    c.数据标准化和序列化
        ---
        import torch
        from torch.utils.data import Dataset, DataLoader

        class StockDataset(Dataset):
            def __init__(self, data, sequence_length=30, target_column='Close'):
                self.data = data
                self.sequence_length = sequence_length
                self.target_column = target_column

                # 选择特征列(排除目标列和原始OHLC列)
                self.feature_columns = [col for col in data.columns
                                       if col not in ['Open', 'High', 'Low', 'Close', target_column]]

                # 数据标准化
                self.scaler_features = MinMaxScaler()
                self.scaler_target = MinMaxScaler()

                self.features_scaled = self.scaler_features.fit_transform(
                    data[self.feature_columns].values
                )
                self.target_scaled = self.scaler_target.fit_transform(
                    data[[target_column]].values
                )

                # 创建序列
                self.sequences = []
                self.targets = []

                for i in range(sequence_length, len(data)):
                    self.sequences.append(self.features_scaled[i-sequence_length:i])
                    self.targets.append(self.target_scaled[i])

            def __len__(self):
                return len(self.sequences)

            def __getitem__(self, idx):
                return {
                    'features': torch.FloatTensor(self.sequences[idx]),
                    'target': torch.FloatTensor(self.targets[idx])
                }

        # 创建数据集
        sequence_length = 30
        dataset = StockDataset(stock_features, sequence_length=sequence_length)

        # 划分训练集和测试集
        train_size = int(0.8 * len(dataset))
        val_size = int(0.1 * len(dataset))
        test_size = len(dataset) - train_size - val_size

        train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
            dataset, [train_size, val_size, test_size]
        )

        # 创建数据加载器
        batch_size = 32
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        print(f"训练集: {len(train_dataset)} 序列")
        print(f"验证集: {len(val_dataset)} 序列")
        print(f"测试集: {len(test_dataset)} 序列")
        print(f"特征数量: {len(dataset.feature_columns)}")
        ---

7 学习路径与资源

7.1 学习路线图

01.学习阶段规划
    a.基础阶段(1-2个月)
        ---
        目标:建立坚实的数学和编程基础

        学习内容:
        1. Python编程基础
           - 基础语法、数据类型、控制流
           - 函数、类、模块
           - NumPy数组操作
           - Pandas数据处理
           - Matplotlib数据可视化

        2. 数学基础
           - 线性代数(向量、矩阵运算)
           - 微积分(导数、梯度)
           - 概率统计(概率分布、期望、方差)

        3. 机器学习基础
           - 监督学习与无监督学习
           - 模型评估指标
           - 过拟合与欠拟合
           - 交叉验证

        实践项目:
        - 实现简单线性回归
        - 构建决策树分类器
        - K-means聚类算法
        ---
    b.入门阶段(2-3个月)
        ---
        目标:理解深度学习核心概念

        学习内容:
        1. 神经网络基础
           - 感知机模型
           - 多层感知机
           - 激活函数
           - 反向传播算法

        2. 深度学习框架
           - PyTorch基础
           - 张量操作
           - 自动求导
           - 模型构建

        3. 优化方法
           - 梯度下降变体
           - Adam优化器
           - 学习率调度

        实践项目:
        - 手写数字识别
        - 图像分类
        - 时间序列预测
        ---
    c.进阶阶段(3-4个月)
        ---
        目标:掌握主流深度学习技术

        学习内容:
        1. 卷积神经网络
           - 卷积和池化原理
           - 经典CNN架构
           - 目标检测入门

        2. 循环神经网络
           - RNN、LSTM、GRU
           - 序列建模
           - 注意力机制

        3. 实战技巧
           - 数据增强
           - 迁移学习
           - 模型调优

        实践项目:
        - CIFAR-10图像分类
        - 文本情感分析
        - 股票价格预测
        ---
    d.专精阶段(4-6个月)
        ---
        目标:深入特定领域

        方向选择:
        1. 计算机视觉
           - 目标检测
           - 图像分割
           - 生成对抗网络

        2. 自然语言处理
           - Transformer
           - BERT/GPT
           - 机器翻译

        3. 其他领域
           - 强化学习
           - 推荐系统
           - 语音识别

        实践项目:
        - 完整的端到端应用
        - 参加Kaggle竞赛
        - 发表论文或开源项目
        ---

7.2 推荐学习资源

01.在线课程
    a.入门课程
        ---
        1. 吴恩达深度学习专项课程
           - Coursera平台
           - 中英文字幕
           - 理论与实践结合
           链接:https://www.coursera.org/specializations/deep-learning
        2. 李沐动手学深度学习
           - 免费开源
           - PyTorch实现
           - 中文资源丰富
           链接:https://zh.d2l.ai/
        3. fast.ai课程
           - 实用导向
           - top-down教学方法
           - 实战项目丰富
           链接:https://www.fast.ai/
        4. CS231n斯坦福课程
           - 计算机视觉方向
           - 理论深度足够
           - 有中文字幕
           链接:http://cs231n.stanford.edu/
        ---
    b.进阶课程
        ---
        1. CS224n:自然语言处理
           - 斯坦福大学
           - 从基础到前沿
           - 包含最新研究
        2. DeepMind x UCL课程
           - 强化学习专精
           - 前沿研究内容
           - 理论深度大
        3. MIT 6.S191:深度学习导论
           - 数学基础扎实
           - 理论推导详细
        ---

02.书籍推荐
    a.理论类
        ---
        1. 《深度学习》- Ian Goodfellow
           - 理论最权威
           - 数学推导详细
           - 称为"花书"
        2. 《模式识别与机器学习》- Christopher Bishop
           - 统计学角度
           - 贝叶斯方法
           - 适合理论基础
        3. 《机器学习》- 周志华
           - 中文经典教材
           - 内容全面
           - 适合初学者
        ---
    b.实践类
        ---
        1. 《动手学深度学习》
           - PyTorch实现
           - 代码丰富
           - 中文友好
        2. 《Python深度学习》
           - Keras框架
           - 实例丰富
           - 快速上手
        3. 《深度学习实战》
           - 实战项目导向
           - 端到端案例
        ---

03.在线资源
    a.技术博客
        ---
        1. Distill.pub
           - 可视化解释
           - 理论深入浅出
        2. Towards Data Science
           - Medium平台
           - 实战案例丰富
        3. 机器之心、量子位
           - 中文资讯
           - 前沿动态
        ---
    b.视频资源
        ---
        1. 3Blue1Brown
           - 数学可视化
           - 直观理解
        2. StatQuest
           - 统计学概念
           - 轻松易懂
        3. 两分钟论文
           - 论文解读
           - 保持前沿
        ---

7.3 实践平台

01.竞赛平台
    a.Kaggle
        ---
        特点:
        - 全球最大数据科学竞赛平台
        - 提供真实数据和挑战
        - 有大量学习资源
        - 可以查看优秀解决方案

        推荐竞赛:
        1. 数字识别(Digit Recognizer)
           - MNIST数据集
           - 入门级竞赛

        2. 泰坦尼克号生还预测
           - 经典入门项目
           - 数据处理练习

        3. 房价预测
           - 回归任务
           - 特征工程实践

        策略:
        - 从入门级开始
        - 学习高赞解决方案
        - 参与讨论区交流
        ---
    b.天池
        ---
        特点:
        - 阿里云平台
        - 中文环境友好
        - 有真实业务场景

        适合:中文用户、想了解工业界应用
        ---

02.实验环境
    a.云平台
        ---
        1. Google Colab
           - 免费GPU资源
           - Jupyter环境
           - 适合学习和实验

        2. Kaggle Kernels
           - 竞赛数据集成
           - 社区分享代码

        3. 百度AI Studio
           - 国内访问快
           - 中文文档
        ---
    b.本地环境
        ---
        硬件建议:
        - GPU:RTX 3060/3070起步
        - 内存:16GB以上
        - 存储:SSD 512GB以上

        软件配置:
        - Anaconda环境管理
        - PyTorch安装
        - Jupyter/VS Code
        ---

7.4 开源项目

01.框架和工具
    a.深度学习框架
        ---
        1. PyTorch
           - Facebook开发
           - 动态计算图
           - 研究友好

        2. TensorFlow
           - Google开发
           - 生产部署优势
           - 生态系统完善

        3. Keras
           - 高层API
           - 快速原型开发
        ---
    b.工具库
        ---
        1. scikit-learn
           - 传统机器学习
           - 数据预处理工具

        2. OpenCV
           - 计算机视觉
           - 图像处理

        3. NLTK/spaCy
           - 自然语言处理
        ---

02.优秀开源项目
    a.计算机视觉
        ---
        1. Detectron2
           - Facebook目标检测框架
           - 包含最新模型

        2. mmcv
           - 商汤开源工具箱
           - 模型库丰富

        3. Albumentations
           - 图像增强库
           - 速度快效果好
        ---
    b.自然语言处理
        ---
        1. Hugging Face Transformers
           - 预训练模型库
           - BERT、GPT等

        2. spaCy
           - 工业级NLP库
           - 多语言支持

        3. Gensim
           - 主题建模
           - Word2Vec实现
        ---

7.5 常见问题解答

01.学习相关
    a.数学基础不够怎么办?
        ---
        答案:
        1. 不用过度担心,很多概念可以在实践中理解
        2. 重点掌握:线性代数基础、微积分、概率统计
        3. 推荐资源:
           - 3Blue1Brown的线性代数视频
           - Khan Academy的数学课程
           - 《程序员的数学》

        4. 实践建议:
           - 先写代码,再补理论
           - 遇到不懂的概念立即查证
           - 保持数学思维的锻炼
        ---
    b.需要先学机器学习再学深度学习吗?
        ---
        答案:
        1. 强烈建议先学机器学习基础
        2. 理由:
           - 理解基本概念(过拟合、交叉验证等)
           - 掌握模型评估方法
           - 建立正确的思维模式

        3. 学习路径:
           - 线性回归 → 逻辑回归 → 决策树
           - 随机森林 → 支持向量机
           - 然后进入神经网络
        ---
    c.选择PyTorch还是TensorFlow?
        ---
        答案:
        1. 初学者建议选PyTorch
           - 更直观的API设计
           - 动态图易调试
           - 社区活跃

        2. TensorFlow优势:
           - 生产部署更成熟
           - TensorBoard工具
           - 移动端支持好

        3. 建议:
           - 学习阶段用PyTorch
           - 工作需要再学TensorFlow
           - 两者核心概念相通
        ---

02.实践相关
    a.没有GPU怎么学习?
        ---
        答案:
        1. 使用免费云平台:
           - Google Colab(15GB免费GPU)
           - Kaggle Kernels(每周30小时)

        2. 本地优化:
           - 使用小数据集
           - 简化模型结构
           - 降低batch size

        3. 解决方案:
           - 在本地写代码调试
           - 上云平台训练
           - 只在关键实验用GPU
        ---
    b.如何避免过拟合?
        ---
        答案:
        1. 数据层面:
           - 增加训练数据
           - 数据增强
           - 合理划分数据集

        2. 模型层面:
           - 减少模型复杂度
           - 添加正则化(L1/L2)
           - 使用Dropout
           - 早停策略

        3. 训练技巧:
           - 合适的学习率
           - 批归一化
           - 交叉验证
        ---
    c.如何调试深度学习模型?
        ---
        答案:
        1. 逐步调试:
           - 先在小数据集上测试
           - 检查loss下降情况
           - 验证输入输出形状

        2. 可视化工具:
           - TensorBoard监控训练
           - 打印中间结果
           - 检查梯度分布

        3. 常见检查点:
           - 数据预处理正确性
           - loss函数设置
           - 标签范围是否匹配
        ---

7.6 职业发展

01.技能树
    a.技术技能
        ---
        核心技能:
        1. 编程语言:Python(必需)
        2. 深度学习框架:PyTorch/TensorFlow
        3. 数据处理:Pandas、NumPy
        4. 可视化:Matplotlib、Seaborn

        进阶技能:
        1. 云平台:AWS、Azure、GCP
        2. 容器化:Docker、Kubernetes
        3. 版本控制:Git、GitHub
        4. 数据库:SQL、NoSQL
        ---
    b.软技能
        ---
        1. 问题分解能力
           - 将复杂问题拆解
           - 制定实验计划

        2. 沟通能力
           - 解释技术概念
           - 汇报实验结果

        3. 学习能力
           - 快速学习新技术
           - 阅读论文能力

        4. 商业理解
           - 理解业务需求
           - 技术落地能力
        ---

02.职业方向
    a.算法工程师
        ---
        职责:
        - 算法设计与实现
        - 模型调优与部署
        - 性能优化

        要求:
        - 扎实的算法基础
        - 编程能力强
        - 数学基础好

        发展路径:
        初级 → 高级 → 资深 → 专家/架构师
        ---
    b.数据科学家
        ---
        职责:
        - 数据分析挖掘
        - 业务洞察提取
        - 预测模型构建

        要求:
        - 统计学基础
        - 业务理解能力
        - 可视化技能

        发展路径:
        分析师 → 科学家 → 高级科学家 → 首席科学家
        ---
    c.研究员
        ---
        职责:
        - 前沿技术研究
        - 论文发表
        - 算法创新

        要求:
        - 理论基础深厚
        - 创新能力强
        - 学术背景好

        发展路径:
        助理研究员 → 研究员 → 高级研究员 → 首席科学家
        ---

03.持续学习
    a.保持前沿
        ---
        1. 关注顶级会议:
           - NeurIPS、ICML、ICLR
           - CVPR、ICCV(视觉方向)
           - ACL、EMNLP(NLP方向)

        2. 阅读最新论文:
           - arXiv预印本
           - Papers with Code

        3. 参与社区:
           - GitHub开源贡献
           - 技术博客写作
           - Meetup参与
        ---
    b.技能提升
        ---
        1. 参与竞赛:
           - Kaggle竞赛
           - 天池比赛

        2. 项目实践:
           - 个人项目
           - 开源贡献

        3. 证书认证:
           - 云厂商认证
           - 在线课程证书
        ---