1 课程概述
1.1 课程定位
01.在学习路径中的位置
a.承上启下
a.前置课程
课程一:AI的数学与编程基础,已掌握线性代数、微积分、概率论和Python编程。
b.后续课程
为深度学习、NLP、大模型课程打下算法基础。
c.核心作用
机器学习是AI的核心,理解机器学习算法是理解深度学习的前提。
b.与深度学习的关系
a.理论基础
深度学习是机器学习的子集,神经网络是机器学习算法的一种。
b.算法思想
梯度下降、损失函数、正则化等核心思想在两者中通用。
c.学习顺序
先学机器学习的经典算法,再学深度学习,循序渐进。
02.课程特点
a.理论与实践结合
每个算法都包含数学原理推导和代码实现。
b.从简单到复杂
从线性回归开始,逐步深入到集成学习、支持向量机等复杂算法。
c.注重应用
每章都有实际案例,帮助理解算法的应用场景。
d.面向岗位需求
覆盖算法工程师、机器学习工程师面试的核心知识点。
03.与岗位的关系
a.算法工程师
a.核心技能
必须深入理解机器学习算法原理,能手推公式,能从零实现。
b.面试重点
线性回归、逻辑回归、决策树、SVM的数学推导和代码实现。
c.工作应用
根据业务场景选择合适算法,调优模型性能。
b.机器学习工程师
a.核心技能
熟练使用Scikit-learn等库,能快速搭建和调优模型。
b.面试重点
模型评估、特征工程、过拟合处理、超参数调优。
c.工作应用
端到端的机器学习项目,从数据处理到模型部署。
c.NLP工程师
a.基础要求
理解分类、回归等基本任务,为文本分类、情感分析打基础。
b.特征工程
文本特征提取(TF-IDF、词袋模型)是传统NLP的核心。
1.2 学习目标
01.理论目标
a.理解机器学习核心概念
a.监督学习与无监督学习
理解两者的区别、应用场景、典型算法。
b.损失函数与优化
理解不同任务的损失函数设计,掌握梯度下降等优化方法。
c.泛化能力
理解偏差-方差权衡,掌握防止过拟合的方法。
b.掌握经典算法原理
a.线性模型
线性回归、逻辑回归的数学推导、梯度计算、正则化。
b.树模型
决策树的分裂准则、剪枝策略、随机森林的集成原理。
c.支持向量机
最大间隔原理、核技巧、软间隔SVM。
d.聚类算法
K-Means、层次聚类、DBSCAN的原理和适用场景。
e.降维算法
PCA的数学原理、SVD分解、t-SNE可视化。
02.实践目标
a.从零实现算法
a.线性回归
用NumPy实现梯度下降、正规方程两种方法。
b.逻辑回归
实现二分类和多分类,包含正则化。
c.决策树
实现ID3或CART算法,理解递归分裂过程。
d.K-Means
实现完整的聚类算法,包含初始化和收敛判断。
b.使用Scikit-learn
a.模型训练
熟练使用fit、predict、score等API。
b.模型评估
使用cross_val_score、GridSearchCV等工具。
c.Pipeline构建
构建完整的数据处理和建模流程。
c.完成实战项目
a.回归任务
房价预测,包含特征工程、模型对比、结果可视化。
b.分类任务
信用评分,处理不平衡数据、选择合适评估指标。
c.聚类任务
客户分群,确定最优聚类数、解释聚类结果。
03.能力目标
a.算法选择能力
a.根据任务类型
回归、分类、聚类、降维,选择合适的算法类别。
b.根据数据特点
数据量、特征维度、是否线性可分,选择具体算法。
c.根据业务需求
可解释性、训练速度、预测精度的权衡。
b.模型调优能力
a.特征工程
特征提取、特征选择、特征变换、特征编码。
b.超参数调优
网格搜索、随机搜索、贝叶斯优化。
c.模型融合
Bagging、Boosting、Stacking等集成方法。
c.问题诊断能力
a.识别过拟合
通过学习曲线、验证集性能判断过拟合。
b.识别欠拟合
模型复杂度不足,需要增加特征或使用更复杂模型。
c.数据问题
数据泄露、样本不平衡、特征共线性等问题的识别和处理。
04.岗位技能对应
a.P0级别
a.线性回归和逻辑回归
所有岗位必考,必须能手推公式、从零实现。
b.决策树和随机森林
应用最广泛的算法,必须深入理解。
c.模型评估
准确率、精确率、召回率、F1、AUC等指标的计算和应用。
b.P1级别
a.支持向量机
算法工程师岗位重点,需要理解核技巧。
b.集成学习
XGBoost、LightGBM等工业界常用算法。
c.特征工程
特征选择、特征变换的系统方法。
c.P2级别
a.算法优化
自定义损失函数、改进算法性能。
b.大规模机器学习
分布式训练、在线学习、增量学习。
1.3 前置要求
01.数学基础
a.必须掌握
a.线性代数
矩阵运算、特征值分解,用于理解PCA、SVM等算法。
b.微积分
梯度、链式法则,用于理解梯度下降、反向传播。
c.概率论
概率分布、贝叶斯定理,用于理解朴素贝叶斯、最大似然估计。
b.检验标准
能手推线性回归的梯度,理解正则化的数学原理。
02.编程基础
a.必须掌握
a.Python核心语法
函数、类、列表推导式、异常处理。
b.NumPy
数组操作、矩阵运算、广播机制。
c.Pandas
数据读取、清洗、转换、分组聚合。
d.Matplotlib
折线图、散点图、直方图等基本可视化。
b.检验标准
能独立完成数据加载、预处理、可视化的完整流程。
03.推荐学习路径
a.已完成课程一
如果已完成课程一(AI的数学与编程基础),可以直接学习本课程。
b.未完成课程一
a.补充数学
重点学习线性代数的矩阵运算、微积分的梯度计算。
b.补充编程
重点学习NumPy的矩阵操作、Pandas的数据处理。
c.时间安排
建议先花2-3周补充基础,再开始本课程。
04.学习环境
a.硬件要求
a.个人电脑
8GB以上内存,无需GPU。
b.存储空间
至少10GB用于数据集和模型。
b.软件环境
a.Python环境
Python 3.7+,推荐使用Anaconda。
b.核心库
NumPy、Pandas、Matplotlib、Scikit-learn。
c.安装命令
---
# 使用conda安装
conda install numpy pandas matplotlib scikit-learn
# 或使用pip安装
pip install numpy pandas matplotlib scikit-learn
# 验证安装
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import datasets
print(f"NumPy版本: {np.__version__}")
print(f"Pandas版本: {pd.__version__}")
print(f"Scikit-learn版本: {sklearn.__version__}")
---
c.开发工具
a.Jupyter Notebook
交互式编程,适合学习和实验。
b.VS Code
功能强大的代码编辑器,支持Python调试。
c.PyCharm
专业的Python IDE,适合大型项目。
1.4 学习时长
01.总体时长规划
a.全日制学习
a.学习时长
每天6-8小时,持续6-8周完成。
b.适用人群
全职学习者、准备求职的应届生、转行人员。
c.学习重点
深入理解算法原理,从零实现核心算法,完成多个实战项目。
b.业余学习
a.学习时长
每天2-3小时,持续4-6个月完成。
b.适用人群
在职学习者、利用业余时间提升技能。
c.学习重点
理解算法思想,熟练使用Scikit-learn,完成1-2个项目。
02.各章节时长分配
a.第1章
a.学习时长
3-5天,理解机器学习的基本概念和流程。
b.学习方法
阅读理论、观看视频、理解核心术语。
b.第2章
a.学习时长
2-3周,深入学习监督学习算法。
b.学习方法
每个算法都要手推公式、从零实现、使用Scikit-learn对比。
c.重点算法
线性回归、逻辑回归、决策树、随机森林。
c.第3章
a.学习时长
1-2周,学习无监督学习算法。
b.学习方法
理解算法原理,实现K-Means和PCA,应用到实际数据。
d.第4章
a.学习时长
1-2周,学习模型评估和优化方法。
b.学习方法
理解评估指标,实践交叉验证和超参数调优。
e.第5章
a.学习时长
1周,系统学习特征工程方法。
b.学习方法
在实际项目中应用特征工程技巧。
f.第6章
a.学习时长
2-3周,完成实战项目。
b.学习方法
独立完成端到端的机器学习项目。
03.学习节奏建议
a.理论与实践交替
a.上午学理论
观看视频、阅读教材、理解算法原理。
b.下午写代码
从零实现算法、使用Scikit-learn、完成练习。
c.晚上做项目
将当天学习的内容应用到实际项目中。
b.每周复习
a.周末复习
回顾本周学习内容,整理笔记,做总结。
b.查漏补缺
找出薄弱环节,重点复习。
c.阶段性测试
a.每章结束
完成章节练习题,检验学习效果。
b.每月测试
做综合测试题,评估整体掌握程度。
04.时间管理技巧
a.制定学习计划
a.周计划
每周日制定下周学习计划,明确每天学习内容。
b.日计划
每天早上制定当天学习任务,晚上检查完成情况。
b.番茄工作法
a.25分钟专注
设置定时器,25分钟内专注学习,不被打扰。
b.5分钟休息
休息时离开电脑,活动身体,放松大脑。
c.4个番茄后
休息15-30分钟,回顾学习内容。
c.避免拖延
a.立即开始
不要等"准备好"才开始,先做起来再说。
b.分解任务
大任务分解成小任务,逐个完成。
c.奖励机制
完成任务后给自己小奖励,保持动力。
1.5 岗位关联
01.核心岗位需求
a.机器学习工程师
a.技能要求
a.算法理解
深入理解监督学习、无监督学习的核心算法。
b.工程实现
熟练使用Scikit-learn、XGBoost等库,能快速搭建模型。
c.项目经验
有完整的机器学习项目经验,从数据处理到模型部署。
b.薪资范围
a.初级
15-25K,要求掌握基础算法,有1-2个项目经验。
b.中级
25-40K,要求独立完成项目,有模型优化经验。
c.高级
40K+,要求架构设计能力,能解决复杂业务问题。
c.面试重点
a.算法原理
手推线性回归、逻辑回归、决策树的公式。
b.代码实现
现场实现梯度下降、K-Means等算法。
c.项目经验
详细讲解项目中的特征工程、模型选择、调优过程。
b.算法工程师
a.技能要求
a.数学功底
能推导算法公式,理解优化理论。
b.算法创新
能改进现有算法,提出新的解决方案。
c.论文阅读
能读懂顶会论文,复现算法。
b.薪资范围
a.初级
20-30K,要求硕士学历,扎实的数学基础。
b.中级
30-50K,要求有算法优化经验,发表过论文。
c.高级
50K+,要求技术深度,能带团队。
c.面试重点
a.数学推导
SVM的对偶问题、核技巧的数学原理。
b.算法优化
如何改进算法性能,降低时间复杂度。
c.论文讨论
讨论最新的机器学习论文和技术趋势。
c.数据科学家
a.技能要求
a.数据分析
熟练使用Pandas、SQL进行数据分析。
b.统计建模
理解统计学原理,能进行假设检验、A/B测试。
c.业务理解
能将业务问题转化为机器学习问题。
b.薪资范围
20-40K,根据经验和行业不同有较大差异。
c.面试重点
a.数据分析
给定数据集,进行探索性数据分析。
b.建模思路
针对业务问题,设计建模方案。
c.结果解释
向非技术人员解释模型结果。
02.技能优先级
a.P0级别
a.线性回归和逻辑回归
所有岗位必考,必须能手推公式、从零实现、使用Scikit-learn。
b.决策树和随机森林
应用最广泛,必须深入理解分裂准则、剪枝、集成原理。
c.模型评估
准确率、精确率、召回率、F1、AUC、混淆矩阵的计算和应用。
d.特征工程
特征提取、特征选择、特征编码的基本方法。
b.P1级别
a.支持向量机
算法工程师重点,需要理解最大间隔、核技巧、软间隔。
b.集成学习
XGBoost、LightGBM、CatBoost等工业界常用算法。
c.超参数调优
网格搜索、随机搜索、贝叶斯优化的原理和应用。
d.不平衡数据处理
过采样、欠采样、SMOTE、调整类别权重等方法。
c.P2级别
a.算法优化
自定义损失函数、改进算法性能、降低计算复杂度。
b.大规模机器学习
分布式训练、在线学习、增量学习。
c.AutoML
自动特征工程、自动模型选择、神经架构搜索。
03.行业应用
a.互联网行业
a.推荐系统
协同过滤、内容推荐、排序模型。
b.广告投放
点击率预估、转化率预估、出价策略。
c.风控系统
信用评分、欺诈检测、异常交易识别。
b.金融行业
a.信贷风控
信用评分模型、违约预测、反欺诈。
b.量化交易
股票预测、因子挖掘、组合优化。
c.客户分析
客户分群、流失预测、精准营销。
c.制造业
a.质量控制
缺陷检测、异常识别、预测性维护。
b.供应链优化
需求预测、库存优化、物流规划。
c.设备监控
故障预测、健康评估、寿命预测。
d.医疗健康
a.疾病诊断
影像识别、疾病预测、风险评估。
b.药物研发
分子性质预测、药物筛选、临床试验优化。
c.健康管理
健康评分、个性化建议、疾病预防。
04.职业发展路径
a.技术路线
a.初级工程师
1-3年,掌握基础算法,完成项目开发。
b.中级工程师
3-5年,独立负责项目,有算法优化经验。
c.高级工程师
5-8年,技术专家,能解决复杂问题。
d.技术专家
8年以上,行业影响力,技术深度和广度。
b.管理路线
a.技术Leader
带领小团队,负责技术方案设计。
b.技术经理
管理多个团队,负责技术规划。
c.技术总监
负责整个技术部门,制定技术战略。
c.创业路线
利用机器学习技术创业,提供AI解决方案。
2 机器学习基础
2.1 机器学习定义
01.什么是机器学习
a.定义
a.Tom Mitchell定义
如果一个程序在任务T上的性能P随着经验E而提高,则称该程序从经验E中学习。
b.通俗理解
让计算机从数据中自动学习规律,而不是人工编写规则。
c.核心思想
用数据驱动,而非规则驱动。
b.与传统编程的区别
a.传统编程
人工设计规则,输入数据,输出结果。
b.机器学习
输入数据和结果,自动学习规则。
c.代码示例
---
# 传统编程:判断邮件是否为垃圾邮件
def is_spam_traditional(email):
spam_words = ['中奖', '免费', '点击', '优惠']
for word in spam_words:
if word in email:
return True
return False
# 机器学习:从数据中学习规则
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import CountVectorizer
# 训练数据
emails = [
'恭喜中奖,点击领取',
'明天开会通知',
'免费获取优惠券',
'项目进度报告'
]
labels = [1, 0, 1, 0] # 1表示垃圾邮件,0表示正常邮件
# 特征提取
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(emails)
# 训练模型
model = MultinomialNB()
model.fit(X, labels)
# 预测新邮件
new_email = ['限时优惠活动']
X_new = vectorizer.transform(new_email)
prediction = model.predict(X_new)
print(f"预测结果: {'垃圾邮件' if prediction[0] == 1 else '正常邮件'}")
---
02.机器学习的要素
a.数据
a.训练数据
用于训练模型的数据,包含特征和标签。
b.测试数据
用于评估模型性能的数据,模型未见过。
c.数据质量
数据质量决定模型上限,垃圾进垃圾出。
b.模型
a.定义
从输入到输出的映射函数,如y = f(x)。
b.模型复杂度
简单模型(线性模型)vs 复杂模型(深度神经网络)。
c.模型选择
根据任务和数据选择合适的模型。
c.算法
a.定义
学习模型参数的方法,如梯度下降。
b.优化目标
最小化损失函数,找到最优参数。
c.代码示例
---
# 机器学习的三要素示例
import numpy as np
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
# 1. 数据
np.random.seed(42)
X = 2 * np.random.rand(100, 1)
y = 4 + 3 * X + np.random.randn(100, 1)
# 2. 模型
model = LinearRegression()
# 3. 算法(训练)
model.fit(X, y)
# 预测
X_new = np.array([[0], [2]])
y_pred = model.predict(X_new)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X, y, alpha=0.5, label='训练数据')
plt.plot(X_new, y_pred, 'r-', linewidth=2, label='模型预测')
plt.xlabel('X')
plt.ylabel('y')
plt.title('线性回归示例')
plt.legend()
plt.grid(True)
plt.show()
print(f"学习到的参数: w={model.coef_[0][0]:.2f}, b={model.intercept_[0]:.2f}")
---
03.机器学习的应用
a.计算机视觉
图像分类、目标检测、人脸识别、图像生成。
b.自然语言处理
文本分类、情感分析、机器翻译、问答系统。
c.推荐系统
商品推荐、内容推荐、好友推荐。
d.语音识别
语音转文字、语音助手、声纹识别。
e.金融风控
信用评分、欺诈检测、风险预测。
f.医疗健康
疾病诊断、药物研发、健康管理。
2.2 学习范式
01.监督学习
a.定义
a.基本概念
训练数据包含输入特征和对应的标签,模型学习从输入到输出的映射。
b.数学表示
给定训练集D = {(x₁,y₁), (x₂,y₂), ..., (xₙ,yₙ)},学习函数f: X → Y。
c.目标
在新数据上也能准确预测标签。
b.任务类型
a.分类
a.定义
预测离散的类别标签,如垃圾邮件识别、图像分类。
b.二分类
只有两个类别,如是/否、正/负。
c.多分类
多个类别,如手写数字识别(0-9)。
d.代码示例
---
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, classification_report
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练分类模型
clf = DecisionTreeClassifier(random_state=42)
clf.fit(X_train, y_train)
# 预测
y_pred = clf.predict(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
print(f"\n分类报告:\n{classification_report(y_test, y_pred)}")
---
b.回归
a.定义
预测连续的数值,如房价预测、股票价格预测。
b.线性回归
假设输入和输出之间是线性关系。
c.非线性回归
使用非线性模型拟合数据。
d.代码示例
---
from sklearn.datasets import load_boston
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
# 加载数据
boston = load_boston()
X, y = boston.data, boston.target
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练回归模型
reg = LinearRegression()
reg.fit(X_train, y_train)
# 预测
y_pred = reg.predict(X_test)
# 评估
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"均方误差: {mse:.4f}")
print(f"R²分数: {r2:.4f}")
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(y_test, y_pred, alpha=0.5)
plt.plot([y_test.min(), y_test.max()],
[y_test.min(), y_test.max()],
'r--', linewidth=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('回归预测结果')
plt.grid(True)
plt.show()
---
c.常见算法
线性回归、逻辑回归、决策树、随机森林、SVM、神经网络。
02.无监督学习
a.定义
a.基本概念
训练数据只有输入特征,没有标签,模型自动发现数据中的模式。
b.目标
发现数据的内在结构、降维、异常检测。
b.任务类型
a.聚类
a.定义
将相似的数据点分到同一组。
b.应用
客户分群、图像分割、文档聚类。
c.代码示例
---
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
# 生成数据
X, y_true = make_blobs(n_samples=300, centers=3,
n_features=2, random_state=42)
# K-Means聚类
kmeans = KMeans(n_clusters=3, random_state=42)
y_pred = kmeans.fit_predict(X)
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis')
plt.title('真实标签')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0],
kmeans.cluster_centers_[:, 1],
c='red', marker='X', s=200, label='中心点')
plt.title('K-Means聚类结果')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.legend()
plt.tight_layout()
plt.show()
---
b.降维
a.定义
减少特征数量,保留主要信息。
b.应用
数据可视化、特征提取、去噪。
c.代码示例
---
from sklearn.decomposition import PCA
# 加载高维数据
iris = load_iris()
X = iris.data
y = iris.target
# PCA降维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)
# 可视化
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1],
c=y, cmap='viridis', alpha=0.6)
plt.xlabel('第一主成分')
plt.ylabel('第二主成分')
plt.title('PCA降维可视化')
plt.colorbar(scatter, label='类别')
plt.grid(True)
plt.show()
print(f"解释方差比例: {pca.explained_variance_ratio_}")
print(f"累计解释方差: {pca.explained_variance_ratio_.sum():.2%}")
---
c.常见算法
K-Means、层次聚类、DBSCAN、PCA、t-SNE、自编码器。
03.半监督学习
a.定义
结合少量标注数据和大量未标注数据进行学习。
b.应用场景
标注成本高的任务,如医疗影像、语音识别。
c.方法
自训练、协同训练、生成模型。
04.强化学习
a.定义
智能体通过与环境交互,学习最优策略以最大化累积奖励。
b.核心概念
状态、动作、奖励、策略、价值函数。
c.应用
游戏AI、机器人控制、自动驾驶、推荐系统。
2.3 核心概念
01.特征与标签
a.特征
a.定义
描述样本的属性,也称为自变量、输入变量。
b.特征类型
a.数值特征
连续值(身高、体重)或离散值(年龄、数量)。
b.类别特征
有限的离散类别(性别、城市、颜色)。
c.文本特征
需要转换为数值表示(TF-IDF、词向量)。
c.代码示例
---
import pandas as pd
# 创建示例数据
data = {
'age': [25, 30, 35, 28, 32],
'salary': [5000, 8000, 12000, 6000, 9000],
'city': ['北京', '上海', '深圳', '北京', '上海'],
'gender': ['男', '女', '男', '女', '男'],
'purchased': [0, 1, 1, 0, 1]
}
df = pd.DataFrame(data)
print("原始数据:")
print(df)
# 特征类型分析
print(f"\n数值特征: {df.select_dtypes(include=['int64', 'float64']).columns.tolist()}")
print(f"类别特征: {df.select_dtypes(include=['object']).columns.tolist()}")
# 类别特征编码
from sklearn.preprocessing import LabelEncoder
le_city = LabelEncoder()
le_gender = LabelEncoder()
df['city_encoded'] = le_city.fit_transform(df['city'])
df['gender_encoded'] = le_gender.fit_transform(df['gender'])
print("\n编码后的数据:")
print(df)
---
b.标签
a.定义
样本的输出值,也称为因变量、目标变量。
b.标签类型
a.分类标签
离散的类别值(0/1、猫/狗/鸟)。
b.回归标签
连续的数值(价格、温度、评分)。
02.训练集、验证集、测试集
a.训练集
a.作用
用于训练模型,学习参数。
b.比例
通常占总数据的60-80%。
b.验证集
a.作用
用于调整超参数,选择模型。
b.比例
通常占总数据的10-20%。
c.重要性
防止在测试集上过拟合。
c.测试集
a.作用
用于最终评估模型性能,模拟真实场景。
b.比例
通常占总数据的10-20%。
c.原则
测试集只能使用一次,不能用于调参。
d.代码示例
---
from sklearn.model_selection import train_test_split
# 生成数据
X = np.random.rand(1000, 10)
y = np.random.randint(0, 2, 1000)
# 第一次划分:训练集 + 临时集(验证集+测试集)
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 第二次划分:验证集 + 测试集
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=0.5, random_state=42
)
print(f"训练集大小: {X_train.shape[0]} ({X_train.shape[0]/len(X):.1%})")
print(f"验证集大小: {X_val.shape[0]} ({X_val.shape[0]/len(X):.1%})")
print(f"测试集大小: {X_test.shape[0]} ({X_test.shape[0]/len(X):.1%})")
# 使用验证集选择模型
from sklearn.tree import DecisionTreeClassifier
best_score = 0
best_depth = 0
for depth in range(1, 11):
model = DecisionTreeClassifier(max_depth=depth, random_state=42)
model.fit(X_train, y_train)
score = model.score(X_val, y_val)
print(f"深度={depth}, 验证集准确率={score:.4f}")
if score > best_score:
best_score = score
best_depth = depth
print(f"\n最佳深度: {best_depth}")
# 使用最佳参数在测试集上评估
final_model = DecisionTreeClassifier(max_depth=best_depth, random_state=42)
final_model.fit(X_train, y_train)
test_score = final_model.score(X_test, y_test)
print(f"测试集准确率: {test_score:.4f}")
---
03.损失函数
a.定义
衡量模型预测值与真实值之间差异的函数。
b.回归任务
a.均方误差
a.公式
MSE = (1/n) * Σ(yᵢ - ŷᵢ)²
b.特点
对异常值敏感,常用于线性回归。
c.代码示例
---
# 均方误差
y_true = np.array([3, -0.5, 2, 7])
y_pred = np.array([2.5, 0.0, 2, 8])
mse = np.mean((y_true - y_pred) ** 2)
print(f"MSE: {mse:.4f}")
# 使用sklearn
from sklearn.metrics import mean_squared_error
mse_sklearn = mean_squared_error(y_true, y_pred)
print(f"MSE (sklearn): {mse_sklearn:.4f}")
---
b.平均绝对误差
a.公式
MAE = (1/n) * Σ|yᵢ - ŷᵢ|
b.特点
对异常值不敏感,更稳健。
c.分类任务
a.交叉熵损失
a.二分类
Loss = -[y*log(ŷ) + (1-y)*log(1-ŷ)]
b.多分类
Loss = -Σ yᵢ * log(ŷᵢ)
c.代码示例
---
# 二分类交叉熵
def binary_cross_entropy(y_true, y_pred):
epsilon = 1e-15
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
return -np.mean(y_true * np.log(y_pred) +
(1 - y_true) * np.log(1 - y_pred))
y_true = np.array([1, 0, 1, 1, 0])
y_pred = np.array([0.9, 0.1, 0.8, 0.7, 0.2])
loss = binary_cross_entropy(y_true, y_pred)
print(f"交叉熵损失: {loss:.4f}")
# 使用sklearn
from sklearn.metrics import log_loss
loss_sklearn = log_loss(y_true, y_pred)
print(f"交叉熵损失 (sklearn): {loss_sklearn:.4f}")
---
04.模型参数与超参数
a.模型参数
a.定义
模型内部的变量,通过训练数据学习得到。
b.示例
线性回归的权重和偏置、神经网络的权重。
c.特点
由算法自动学习,不需要人工设置。
b.超参数
a.定义
模型外部的配置,需要人工设置或调优。
b.示例
学习率、正则化系数、决策树深度、K-Means的K值。
c.调优方法
网格搜索、随机搜索、贝叶斯优化。
d.代码示例
---
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
# 定义超参数网格
param_grid = {
'n_estimators': [10, 50, 100],
'max_depth': [3, 5, 7, None],
'min_samples_split': [2, 5, 10]
}
# 网格搜索
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(rf, param_grid, cv=5,
scoring='accuracy', n_jobs=-1)
grid_search.fit(X, y)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")
# 查看所有结果
results = pd.DataFrame(grid_search.cv_results_)
print("\n前5个结果:")
print(results[['params', 'mean_test_score']].head())
---
2.4 工作流程
01.数据收集
a.数据来源
a.公开数据集
Kaggle、UCI、ImageNet、COCO等。
b.业务数据
公司内部数据库、日志、用户行为数据。
c.爬虫数据
从网站爬取数据(需遵守法律法规)。
b.数据格式
CSV、JSON、数据库、图片、文本等。
c.代码示例
---
# 从CSV加载数据
df = pd.read_csv('data.csv')
print(f"数据形状: {df.shape}")
print(f"\n前5行:\n{df.head()}")
# 从数据库加载
import sqlite3
conn = sqlite3.connect('database.db')
df_db = pd.read_sql_query("SELECT * FROM users", conn)
conn.close()
# 从API获取
import requests
response = requests.get('https://api.example.com/data')
data = response.json()
df_api = pd.DataFrame(data)
---
02.数据探索
a.基本统计
a.描述性统计
均值、中位数、标准差、最大值、最小值。
b.数据分布
直方图、箱线图、密度图。
c.代码示例
---
# 加载数据
df = pd.read_csv('data.csv')
# 基本信息
print("数据信息:")
print(df.info())
print("\n描述性统计:")
print(df.describe())
# 缺失值统计
print("\n缺失值:")
print(df.isnull().sum())
# 数值特征分布
df.hist(figsize=(15, 10), bins=30)
plt.tight_layout()
plt.show()
# 类别特征分布
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
print(f"\n{col}的分布:")
print(df[col].value_counts())
---
b.相关性分析
a.数值特征相关性
计算相关系数矩阵,识别高度相关的特征。
b.可视化
热力图、散点图矩阵。
c.代码示例
---
import seaborn as sns
# 相关系数矩阵
corr = df.corr()
# 热力图
plt.figure(figsize=(12, 10))
sns.heatmap(corr, annot=True, fmt='.2f',
cmap='coolwarm', center=0,
square=True, linewidths=1)
plt.title('特征相关性热力图')
plt.show()
# 散点图矩阵
from pandas.plotting import scatter_matrix
scatter_matrix(df.iloc[:, :5], figsize=(15, 15), alpha=0.5)
plt.show()
---
03.数据预处理
a.缺失值处理
a.删除
删除含缺失值的行或列。
b.填充
用均值、中位数、众数或特定值填充。
c.预测
用其他特征预测缺失值。
d.代码示例
---
# 创建含缺失值的数据
df_missing = pd.DataFrame({
'A': [1, 2, np.nan, 4, 5],
'B': [np.nan, 2, 3, 4, 5],
'C': [1, 2, 3, 4, 5]
})
print("原始数据:")
print(df_missing)
# 删除含缺失值的行
df_drop = df_missing.dropna()
print("\n删除缺失值后:")
print(df_drop)
# 用均值填充
df_mean = df_missing.fillna(df_missing.mean())
print("\n用均值填充:")
print(df_mean)
# 用前一个值填充
df_ffill = df_missing.fillna(method='ffill')
print("\n前向填充:")
print(df_ffill)
# 使用sklearn的Imputer
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(strategy='mean')
df_imputed = pd.DataFrame(
imputer.fit_transform(df_missing),
columns=df_missing.columns
)
print("\nSimpleImputer填充:")
print(df_imputed)
---
b.异常值处理
a.识别
箱线图、3σ原则、IQR方法。
b.处理
删除、替换、保留(根据业务判断)。
c.代码示例
---
# 生成含异常值的数据
np.random.seed(42)
data = np.random.randn(1000)
data = np.append(data, [10, -10, 15]) # 添加异常值
# 箱线图识别
plt.figure(figsize=(10, 6))
plt.boxplot(data)
plt.title('箱线图识别异常值')
plt.ylabel('值')
plt.show()
# IQR方法
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = (data < lower_bound) | (data > upper_bound)
print(f"异常值数量: {outliers.sum()}")
print(f"异常值: {data[outliers]}")
# 删除异常值
data_clean = data[~outliers]
print(f"清洗后数据量: {len(data_clean)}")
---
c.特征缩放
a.标准化
转换为均值0、标准差1的分布。
b.归一化
缩放到[0,1]或[-1,1]区间。
c.代码示例
---
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# 生成数据
X = np.random.rand(100, 3) * 100
# 标准化
scaler_std = StandardScaler()
X_std = scaler_std.fit_transform(X)
print("标准化后:")
print(f"均值: {X_std.mean(axis=0)}")
print(f"标准差: {X_std.std(axis=0)}")
# 归一化
scaler_minmax = MinMaxScaler()
X_norm = scaler_minmax.fit_transform(X)
print("\n归一化后:")
print(f"最小值: {X_norm.min(axis=0)}")
print(f"最大值: {X_norm.max(axis=0)}")
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].hist(X[:, 0], bins=30)
axes[0].set_title('原始数据')
axes[1].hist(X_std[:, 0], bins=30)
axes[1].set_title('标准化')
axes[2].hist(X_norm[:, 0], bins=30)
axes[2].set_title('归一化')
plt.tight_layout()
plt.show()
---
04.模型训练与评估
a.模型选择
根据任务类型和数据特点选择合适的算法。
b.训练模型
使用训练集拟合模型参数。
c.模型评估
使用验证集或交叉验证评估性能。
d.完整流程示例
---
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
# 构建Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])
# 交叉验证
scores = cross_val_score(pipeline, X, y, cv=5)
print(f"交叉验证分数: {scores}")
print(f"平均分数: {scores.mean():.4f} (+/- {scores.std():.4f})")
# 训练最终模型
pipeline.fit(X, y)
# 预测新数据
X_new = [[5.0, 3.0, 1.5, 0.2]]
prediction = pipeline.predict(X_new)
proba = pipeline.predict_proba(X_new)
print(f"\n预测类别: {prediction[0]}")
print(f"预测概率: {proba[0]}")
---
05.模型部署
a.模型保存
使用pickle或joblib保存训练好的模型。
b.模型加载
在生产环境加载模型进行预测。
c.代码示例
---
import joblib
# 保存模型
joblib.dump(pipeline, 'model.pkl')
print("模型已保存")
# 加载模型
loaded_model = joblib.load('model.pkl')
print("模型已加载")
# 使��加载的模型预测
prediction = loaded_model.predict(X_new)
print(f"预测结果: {prediction[0]}")
---
2.5 常用术语
01.模型相关
a.拟合
a.欠拟合
模型过于简单,无法捕捉数据的模式,训练集和测试集性能都差。
b.过拟合
模型过于复杂,记住了训练数据的噪声,训练集性能好但测试集性能差。
c.良好拟合
模型复杂度适中,在训练集和测试集上都有良好表现。
d.代码示例
---
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
# 生成数据
np.random.seed(42)
X = np.sort(np.random.rand(50, 1) * 10, axis=0)
y = np.sin(X).ravel() + np.random.randn(50) * 0.1
# 划分数据
X_train, X_test = X[:40], X[40:]
y_train, y_test = y[:40], y[40:]
# 不同复杂度的模型
degrees = [1, 4, 15]
plt.figure(figsize=(15, 5))
for i, degree in enumerate(degrees):
# 多项式特征
poly = PolynomialFeatures(degree=degree)
X_train_poly = poly.fit_transform(X_train)
X_test_poly = poly.transform(X_test)
# 训练模型
model = LinearRegression()
model.fit(X_train_poly, y_train)
# 预测
X_plot = np.linspace(0, 10, 100).reshape(-1, 1)
X_plot_poly = poly.transform(X_plot)
y_plot = model.predict(X_plot_poly)
# 评估
train_mse = mean_squared_error(y_train, model.predict(X_train_poly))
test_mse = mean_squared_error(y_test, model.predict(X_test_poly))
# 可视化
plt.subplot(1, 3, i+1)
plt.scatter(X_train, y_train, label='训练数据')
plt.scatter(X_test, y_test, label='测试数据')
plt.plot(X_plot, y_plot, 'r-', linewidth=2, label='模型')
plt.xlabel('X')
plt.ylabel('y')
plt.title(f'度={degree}\n训练MSE={train_mse:.4f}, 测试MSE={test_mse:.4f}')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
b.泛化能力
模型在未见过的数据上的表现能力。
c.偏差与方差
a.偏差
模型预测值与真实值的平均差异,高偏差导致欠拟合。
b.方差
���型在不同训练集上预测值的波动,高方差导致过拟合。
c.权衡
需要在偏差和方差之间找到平衡点。
02.评估相关
a.准确率
a.定义
正确预测的样本数占总样本数的比例。
b.公式
Accuracy = (TP + TN) / (TP + TN + FP + FN)
c.局限性
在类别不平衡时不适用。
b.精确率
a.定义
预测为正类的样本中,真正为正类的比例。
b.公式
Precision = TP / (TP + FP)
c.应用
关注误报���,如垃圾邮件过滤。
c.召回率
a.定义
真正为正类的样本中,被正确预测的比例。
b.公式
Recall = TP / (TP + FN)
c.应用
关注漏报率,如疾病诊断。
d.F1分数
a.定义
精确率和召回率的调和平均。
b.公式
F1 = 2 * (Precision * Recall) / (Precision + Recall)
c.应用
综合考虑精确率和召回率。
e.代码示例
---
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, classification_report
# 模拟预测结果
y_true = np.array([1, 0, 1, 1, 0, 1, 0, 0, 1, 0])
y_pred = np.array([1, 0, 1, 0, 0, 1, 1, 0, 1, 0])
# 计算指标
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
print(f"准确率: {accuracy:.4f}")
print(f"精确率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1:.4f}")
# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
print(f"\n混淆矩阵:\n{cm}")
# 分类报告
print(f"\n分类报告:\n{classification_report(y_true, y_pred)}")
# 可视化混淆矩阵
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.title('混淆矩阵')
plt.show()
---
03.优化相关
a.学习率
a.定义
梯度下降中参数更新的步长。
b.影响
过大导致震荡,过小导致收敛慢。
c.调整策略
学习率衰减、自适应学习率(Adam)。
b.正则化
a.定义
在损失函数中添加惩罚项,防止过拟合。
b.L1正则化
惩罚权重的绝对值之和,产生稀疏解。
c.L2正则化
惩罚权重的平方和,权重趋向于小值。
d.代码示例
---
from sklearn.linear_model import Ridge, Lasso
# 生成数据
X = np.random.rand(100, 20)
y = np.random.rand(100)
# 无正则化
lr = LinearRegression()
lr.fit(X, y)
# L2正则化(Ridge)
ridge = Ridge(alpha=1.0)
ridge.fit(X, y)
# L1正则化(Lasso)
lasso = Lasso(alpha=0.1)
lasso.fit(X, y)
# 比较权重
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.bar(range(20), lr.coef_)
plt.title('无正则化')
plt.xlabel('特征')
plt.ylabel('权重')
plt.subplot(1, 3, 2)
plt.bar(range(20), ridge.coef_)
plt.title('L2正则化(Ridge)')
plt.xlabel('特征')
plt.ylabel('权重')
plt.subplot(1, 3, 3)
plt.bar(range(20), lasso.coef_)
plt.title('L1正则化(Lasso)')
plt.xlabel('特征')
plt.ylabel('权重')
plt.tight_layout()
plt.show()
print(f"L1正则化后非零权重数: {np.sum(lasso.coef_ != 0)}")
---
c.批量大小
a.定义
每次更新参数使用的样本数量。
b.类型
批量梯度下降(全部样本)、随机梯度下降(单个样本)、小批量梯度下降(部分样本)。
c.权衡
大批量稳定但慢,小批量快但不稳定。
04.数据相关
a.样本不平衡
a.定义
不同类别的样本数量差异很大。
b.影响
模型倾向于预测多数类。
c.解决方法
过采样、欠采样、SMOTE、调整类别权重。
b.数据泄露
a.定义
测试集信息泄露到训练过程中。
b.常见情况
在划分数据前进行特征缩放、使用未来信息。
c.避免方法
先划分数据,再进行预处理;使用Pipeline。
c.特征共线性
a.定义
特征之间高度相关。
b.影响
导致模型不稳定,权重解释困难。
c.解决方法
删除相关特征、PCA降维、使用正则化。
3 监督学习
3.1 线性回归
01.数学原理
a.模型定义
a.单变量线性回归
y = wx + b,其中w是权重,b是偏置。
b.多变量线性回归
y = w₁x₁ + w₂x₂ + ... + wₙxₙ + b,向量形式:y = w^T x + b。
c.矩阵形式
Y = XW,其中X是特征矩阵,W是权重向量。
b.损失函数
a.均方误差
J(w,b) = (1/2m) * Σ(ŷᵢ - yᵢ)²
b.目标
找到使损失函数最小的w和b。
c.梯度计算
a.权重梯度
∂J/∂w = (1/m) * X^T(Xw - y)
b.偏置梯度
∂J/∂b = (1/m) * Σ(ŷᵢ - yᵢ)
02.求解方法
a.正规方程
a.公式
w = (X^T X)^(-1) X^T y
b.优点
一步求解,无需迭代。
c.缺点
当特征数量很大时,计算逆矩阵很慢。
d.代码实现
---
# 正规方程实现线性回归
class LinearRegressionNormal:
def __init__(self):
self.weights = None
self.bias = None
def fit(self, X, y):
# 添加偏置列
X_b = np.c_[np.ones((X.shape[0], 1)), X]
# 正规方程
theta = np.linalg.inv(X_b.T @ X_b) @ X_b.T @ y
self.bias = theta[0]
self.weights = theta[1:]
def predict(self, X):
return X @ self.weights + self.bias
# 测试
np.random.seed(42)
X = 2 * np.random.rand(100, 1)
y = 4 + 3 * X.flatten() + np.random.randn(100)
model = LinearRegressionNormal()
model.fit(X, y)
print(f"权重: {model.weights[0]:.4f}")
print(f"偏置: {model.bias:.4f}")
# 预测
X_new = np.array([[0], [2]])
y_pred = model.predict(X_new)
print(f"预测: {y_pred}")
---
b.梯度下降
a.算法流程
初始化参数 → 计算梯度 → 更新参数 → 重复直到收敛。
b.更新公式
w = w - α * ∂J/∂w,b = b - α * ∂J/∂b
c.代码实现
---
# 梯度下降实现线性回归
class LinearRegressionGD:
def __init__(self, learning_rate=0.01, iterations=1000):
self.lr = learning_rate
self.iterations = iterations
self.weights = None
self.bias = None
self.loss_history = []
def fit(self, X, y):
n_samples, n_features = X.shape
self.weights = np.zeros(n_features)
self.bias = 0
for i in range(self.iterations):
# 预测
y_pred = X @ self.weights + self.bias
# 计算损失
loss = np.mean((y_pred - y) ** 2)
self.loss_history.append(loss)
# 计算梯度
dw = (2/n_samples) * X.T @ (y_pred - y)
db = (2/n_samples) * np.sum(y_pred - y)
# 更新参数
self.weights -= self.lr * dw
self.bias -= self.lr * db
if i % 100 == 0:
print(f"Iteration {i}: Loss = {loss:.6f}")
def predict(self, X):
return X @ self.weights + self.bias
# 测试
model_gd = LinearRegressionGD(learning_rate=0.1, iterations=1000)
model_gd.fit(X, y)
print(f"\n权重: {model_gd.weights[0]:.4f}")
print(f"偏置: {model_gd.bias:.4f}")
# 可视化损失
plt.figure(figsize=(10, 6))
plt.plot(model_gd.loss_history)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('训练损失曲线')
plt.grid(True)
plt.show()
---
03.使用Scikit-learn
a.基本使用
---
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
# 训练模型
model_sklearn = LinearRegression()
model_sklearn.fit(X, y)
# 预测
y_pred = model_sklearn.predict(X)
# 评估
mse = mean_squared_error(y, y_pred)
r2 = r2_score(y, y_pred)
print(f"MSE: {mse:.4f}")
print(f"R²: {r2:.4f}")
print(f"权重: {model_sklearn.coef_}")
print(f"偏置: {model_sklearn.intercept_:.4f}")
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X, y, alpha=0.5, label='数据点')
plt.plot(X, y_pred, 'r-', linewidth=2, label='拟合线')
plt.xlabel('X')
plt.ylabel('y')
plt.title('线性回归')
plt.legend()
plt.grid(True)
plt.show()
---
b.多元线性回归
---
# 生成多特征数��
from sklearn.datasets import make_regression
X_multi, y_multi = make_regression(n_samples=100, n_features=5,
noise=10, random_state=42)
# 划分数据
X_train, X_test, y_train, y_test = train_test_split(
X_multi, y_multi, test_size=0.2, random_state=42
)
# 训练
model_multi = LinearRegression()
model_multi.fit(X_train, y_train)
# 评估
train_score = model_multi.score(X_train, y_train)
test_score = model_multi.score(X_test, y_test)
print(f"训练集R²: {train_score:.4f}")
print(f"测试集R²: {test_score:.4f}")
# 特征重要性
feature_importance = np.abs(model_multi.coef_)
plt.figure(figsize=(10, 6))
plt.bar(range(len(feature_importance)), feature_importance)
plt.xlabel('特征索引')
plt.ylabel('权重绝对值')
plt.title('特征重要性')
plt.grid(True)
plt.show()
---
04.正则化线性回归
a.Ridge回归
a.原理
L2正则化,损失函数:J = MSE + α * Σw²
b.效果
权重趋向于小值,防止过拟合。
c.代码示例
---
from sklearn.linear_model import Ridge
# 不同正则化强度
alphas = [0.001, 0.1, 1, 10, 100]
plt.figure(figsize=(15, 5))
for i, alpha in enumerate(alphas):
ridge = Ridge(alpha=alpha)
ridge.fit(X_train, y_train)
train_score = ridge.score(X_train, y_train)
test_score = ridge.score(X_test, y_test)
plt.subplot(1, len(alphas), i+1)
plt.bar(range(len(ridge.coef_)), ridge.coef_)
plt.title(f'α={alpha}\n训练R²={train_score:.3f}\n测试R²={test_score:.3f}')
plt.xlabel('特征')
plt.ylabel('权重')
plt.tight_layout()
plt.show()
---
b.Lasso回归
a.原理
L1正则化,损失函数:J = MSE + α * Σ|w|
b.效果
产生稀疏解,自动进行特征选择。
c.代码示例
---
from sklearn.linear_model import Lasso
# Lasso回归
lasso = Lasso(alpha=0.1)
lasso.fit(X_train, y_train)
print(f"非零权重数: {np.sum(lasso.coef_ != 0)}")
print(f"权重: {lasso.coef_}")
# 可视化
plt.figure(figsize=(10, 6))
plt.bar(range(len(lasso.coef_)), lasso.coef_)
plt.xlabel('特征索引')
plt.ylabel('权重')
plt.title('Lasso回归权重(稀疏)')
plt.grid(True)
plt.show()
---
3.2 逻辑回归
01.数学原理
a.模型定义
a.线性组合
z = w^T x + b
b.Sigmoid函数
σ(z) = 1 / (1 + e^(-z)),将线性输出映射到[0,1]。
c.概率解释
P(y=1|x) = σ(w^T x + b)
b.损失函数
a.交叉熵损失
J = -(1/m) * Σ[y*log(ŷ) + (1-y)*log(1-ŷ)]
b.梯度
∂J/∂w = (1/m) * X^T(ŷ - y)
c.代码实现
---
# 从零实现逻辑回归
class LogisticRegression:
def __init__(self, learning_rate=0.01, iterations=1000):
self.lr = learning_rate
self.iterations = iterations
self.weights = None
self.bias = None
self.loss_history = []
def sigmoid(self, z):
return 1 / (1 + np.exp(-z))
def fit(self, X, y):
n_samples, n_features = X.shape
self.weights = np.zeros(n_features)
self.bias = 0
for i in range(self.iterations):
# 前向传播
z = X @ self.weights + self.bias
y_pred = self.sigmoid(z)
# 计算损失
epsilon = 1e-15
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
loss = -np.mean(y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred))
self.loss_history.append(loss)
# 计算梯度
dw = (1/n_samples) * X.T @ (y_pred - y)
db = (1/n_samples) * np.sum(y_pred - y)
# 更新参数
self.weights -= self.lr * dw
self.bias -= self.lr * db
if i % 100 == 0:
print(f"Iteration {i}: Loss = {loss:.6f}")
def predict_proba(self, X):
z = X @ self.weights + self.bias
return self.sigmoid(z)
def predict(self, X, threshold=0.5):
return (self.predict_proba(X) >= threshold).astype(int)
# 测试
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=2, n_redundant=0,
n_informative=2, random_state=42, n_clusters_per_class=1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练
model = LogisticRegression(learning_rate=0.1, iterations=1000)
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
accuracy = np.mean(y_pred == y_test)
print(f"\n准确率: {accuracy:.4f}")
# 可视化决策边界
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(model.loss_history)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('训练损失')
plt.grid(True)
plt.subplot(1, 2, 2)
plt.scatter(X_test[y_test==0, 0], X_test[y_test==0, 1], label='类别0', alpha=0.6)
plt.scatter(X_test[y_test==1, 0], X_test[y_test==1, 1], label='类别1', alpha=0.6)
# 绘制决策边界
x_min, x_max = X_test[:, 0].min() - 1, X_test[:, 0].max() + 1
y_min, y_max = X_test[:, 1].min() - 1, X_test[:, 1].max() + 1
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
np.linspace(y_min, y_max, 100))
Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contour(xx, yy, Z, levels=[0.5], colors='red', linewidths=2)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('决策边界')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
02.使用Scikit-learn
a.二分类
---
from sklearn.linear_model import LogisticRegression as LR
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score, roc_curve
# 训练
model_sklearn = LR(random_state=42)
model_sklearn.fit(X_train, y_train)
# 预测
y_pred = model_sklearn.predict(X_test)
y_proba = model_sklearn.predict_proba(X_test)[:, 1]
# 评估
accuracy = accuracy_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_proba)
print(f"准确率: {accuracy:.4f}")
print(f"AUC: {auc:.4f}")
print(f"\n分类报告:\n{classification_report(y_test, y_pred)}")
# ROC曲线
fpr, tpr, thresholds = roc_curve(y_test, y_proba)
plt.figure(figsize=(10, 6))
plt.plot(fpr, tpr, linewidth=2, label=f'ROC (AUC = {auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', label='随机猜测')
plt.xlabel('假正率 (FPR)')
plt.ylabel('真正率 (TPR)')
plt.title('ROC曲线')
plt.legend()
plt.grid(True)
plt.show()
---
b.多分类
---
# 多分类数据
iris = load_iris()
X_multi, y_multi = iris.data, iris.target
X_train_m, X_test_m, y_train_m, y_test_m = train_test_split(
X_multi, y_multi, test_size=0.2, random_state=42
)
# 训练多分类模型
model_multi = LR(multi_class='multinomial', max_iter=1000, random_state=42)
model_multi.fit(X_train_m, y_train_m)
# 预测
y_pred_m = model_multi.predict(X_test_m)
y_proba_m = model_multi.predict_proba(X_test_m)
# 评估
accuracy_m = accuracy_score(y_test_m, y_pred_m)
print(f"多分类准确率: {accuracy_m:.4f}")
print(f"\n分类报告:\n{classification_report(y_test_m, y_pred_m)}")
# 混淆矩阵
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test_m, y_pred_m)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.title('混淆矩阵')
plt.show()
---
03.正则化
a.L1正则化
---
# L1正则化(Lasso)
model_l1 = LR(penalty='l1', C=1.0, solver='liblinear', random_state=42)
model_l1.fit(X_train, y_train)
print(f"L1权重: {model_l1.coef_}")
print(f"非零权重数: {np.sum(model_l1.coef_ != 0)}")
---
b.L2正则化
---
# L2正则化(Ridge)
model_l2 = LR(penalty='l2', C=1.0, random_state=42)
model_l2.fit(X_train, y_train)
print(f"L2权重: {model_l2.coef_}")
---
c.正则化强度对比
---
# 不同C值(C越小,正则化越强)
C_values = [0.001, 0.01, 0.1, 1, 10, 100]
train_scores = []
test_scores = []
for C in C_values:
model = LR(C=C, random_state=42)
model.fit(X_train, y_train)
train_scores.append(model.score(X_train, y_train))
test_scores.append(model.score(X_test, y_test))
plt.figure(figsize=(10, 6))
plt.plot(C_values, train_scores, 'o-', label='训练集', linewidth=2)
plt.plot(C_values, test_scores, 's-', label='测试集', linewidth=2)
plt.xscale('log')
plt.xlabel('C (正则化强度的倒数)')
plt.ylabel('准确率')
plt.title('正则化强度对模型性能的影响')
plt.legend()
plt.grid(True)
plt.show()
---
3.3 决策树
01.基本原理
a.树结构
a.根节点
包含所有训练样本。
b.内部节点
表示一个特征上的测试。
c.分支
表示测试的结果。
d.叶节点
表示类别标签或数值。
b.决策过程
a.从根节点开始
根据特征值选择分支。
b.递归向下
直到到达叶节点。
c.输出结果
叶节点的标签或数值。
c.优点
a.易于理解
可视化决策过程,可解释性强。
b.无需特征缩放
对特征的尺度不敏感。
c.处理非线性
能捕捉特征之间的非线性关系。
d.缺点
a.容易过拟合
树太深会记住训练数据的噪声。
b.不稳定
数据的小变化可能导致完全不同的树。
02.分裂准则
a.分类树
a.信息增益
a.熵
H(D) = -Σ pᵢ * log₂(pᵢ),衡量数据的不确定性。
b.信息增益
IG = H(D) - Σ |Dᵥ|/|D| * H(Dᵥ),选择信息增益最大的特征。
c.代码示例
---
# 计算熵
def entropy(y):
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return -np.sum(probabilities * np.log2(probabilities + 1e-10))
# 计算信息增益
def information_gain(X, y, feature_idx, threshold):
# 父节点熵
parent_entropy = entropy(y)
# 分裂
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
return 0
# 子节点熵
n = len(y)
n_left, n_right = np.sum(left_mask), np.sum(right_mask)
e_left, e_right = entropy(y[left_mask]), entropy(y[right_mask])
# 信息增益
child_entropy = (n_left / n) * e_left + (n_right / n) * e_right
return parent_entropy - child_entropy
# 测试
y = np.array([0, 0, 1, 1, 1])
print(f"熵: {entropy(y):.4f}")
X = np.array([[1], [2], [3], [4], [5]])
ig = information_gain(X, y, 0, 2.5)
print(f"信息增益: {ig:.4f}")
---
b.基尼不纯度
a.公式
Gini(D) = 1 - Σ pᵢ²,值越小越纯。
b.基尼增益
选择基尼增益最大的特征。
c.代码示例
---
# 计算基尼不纯度
def gini(y):
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return 1 - np.sum(probabilities ** 2)
# 计算基尼增益
def gini_gain(X, y, feature_idx, threshold):
# 父节点基尼
parent_gini = gini(y)
# 分裂
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
return 0
# 子节点基尼
n = len(y)
n_left, n_right = np.sum(left_mask), np.sum(right_mask)
g_left, g_right = gini(y[left_mask]), gini(y[right_mask])
# 基尼增益
child_gini = (n_left / n) * g_left + (n_right / n) * g_right
return parent_gini - child_gini
# 测试
print(f"基尼不纯度: {gini(y):.4f}")
gg = gini_gain(X, y, 0, 2.5)
print(f"基尼增益: {gg:.4f}")
---
b.回归树
a.均方误差
选择使MSE减少最多的分裂点。
b.分裂后预测
叶节点预测值为该节点样本的均值。
03.剪枝策
a.预剪枝
a.最大深度
限制树的最大深度。
b.最小样本数
节点样本数少于阈值时停止分裂。
c.最小信息增益
信息增益小于阈值时停止分裂。
b.后剪枝
a.代价复杂度剪枝
在完整树的基础上,删除对性能提升不大的子树。
b.优点
通常比预剪枝效果更好。
04.使用Scikit-learn
a.分类树
---
from sklearn.tree import DecisionTreeClassifier, plot_tree
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练决策树
clf = DecisionTreeClassifier(max_depth=3, random_state=42)
clf.fit(X_train, y_train)
# 预测
y_pred = clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
# 可视化决策树
plt.figure(figsize=(20, 10))
plot_tree(clf, feature_names=iris.feature_names,
class_names=iris.target_names, filled=True, rounded=True)
plt.title('决策树可视化')
plt.show()
# 特征重要性
importances = clf.feature_importances_
indices = np.argsort(importances)[::-1]
plt.figure(figsize=(10, 6))
plt.bar(range(len(importances)), importances[indices])
plt.xticks(range(len(importances)),
[iris.feature_names[i] for i in indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('特征重要性')
plt.tight_layout()
plt.show()
---
b.回归树
---
from sklearn.tree import DecisionTreeRegressor
# 生成回归数据
np.random.seed(42)
X_reg = np.sort(5 * np.random.rand(80, 1), axis=0)
y_reg = np.sin(X_reg).ravel() + np.random.randn(80) * 0.1
# 训练不同深度的回归树
depths = [2, 5, 10]
plt.figure(figsize=(15, 5))
for i, depth in enumerate(depths):
# 训练
reg = DecisionTreeRegressor(max_depth=depth, random_state=42)
reg.fit(X_reg, y_reg)
# 预测
X_test_reg = np.linspace(0, 5, 100).reshape(-1, 1)
y_pred_reg = reg.predict(X_test_reg)
# 可视化
plt.subplot(1, 3, i+1)
plt.scatter(X_reg, y_reg, s=20, label='数据')
plt.plot(X_test_reg, y_pred_reg, 'r-', linewidth=2, label='预测')
plt.xlabel('X')
plt.ylabel('y')
plt.title(f'深度={depth}')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
c.超参数调优
---
from sklearn.model_selection import GridSearchCV
# 定义参数网格
param_grid = {
'max_depth': [3, 5, 7, 10, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'criterion': ['gini', 'entropy']
}
# 网格搜索
clf_grid = DecisionTreeClassifier(random_state=42)
grid_search = GridSearchCV(clf_grid, param_grid, cv=5,
scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")
# 使用最佳参数评估
best_clf = grid_search.best_estimator_
test_score = best_clf.score(X_test, y_test)
print(f"测试集准确率: {test_score:.4f}")
---
05.从零实现决策树
a.简化版实现
---
# 简化的决策树实现(仅支持二分类)
class SimpleDecisionTree:
def __init__(self, max_depth=5):
self.max_depth = max_depth
self.tree = None
def gini(self, y):
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return 1 - np.sum(probabilities ** 2)
def split(self, X, y, feature_idx, threshold):
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
return X[left_mask], y[left_mask], X[right_mask], y[right_mask]
def find_best_split(self, X, y):
best_gain = 0
best_feature = None
best_threshold = None
n_features = X.shape[1]
parent_gini = self.gini(y)
for feature_idx in range(n_features):
thresholds = np.unique(X[:, feature_idx])
for threshold in thresholds:
X_left, y_left, X_right, y_right = self.split(
X, y, feature_idx, threshold
)
if len(y_left) == 0 or len(y_right) == 0:
continue
# 计算基尼增益
n = len(y)
gini_left = self.gini(y_left)
gini_right = self.gini(y_right)
weighted_gini = (len(y_left) / n) * gini_left + \
(len(y_right) / n) * gini_right
gain = parent_gini - weighted_gini
if gain > best_gain:
best_gain = gain
best_feature = feature_idx
best_threshold = threshold
return best_feature, best_threshold
def build_tree(self, X, y, depth=0):
n_samples, n_features = X.shape
n_classes = len(np.unique(y))
# 停止条件
if depth >= self.max_depth or n_classes == 1 or n_samples < 2:
leaf_value = np.bincount(y).argmax()
return {'leaf': True, 'value': leaf_value}
# 找最佳分裂
feature_idx, threshold = self.find_best_split(X, y)
if feature_idx is None:
leaf_value = np.bincount(y).argmax()
return {'leaf': True, 'value': leaf_value}
# 分裂
X_left, y_left, X_right, y_right = self.split(
X, y, feature_idx, threshold
)
# 递归构建子树
left_subtree = self.build_tree(X_left, y_left, depth + 1)
right_subtree = self.build_tree(X_right, y_right, depth + 1)
return {
'leaf': False,
'feature': feature_idx,
'threshold': threshold,
'left': left_subtree,
'right': right_subtree
}
def fit(self, X, y):
self.tree = self.build_tree(X, y)
def predict_sample(self, x, tree):
if tree['leaf']:
return tree['value']
if x[tree['feature']] <= tree['threshold']:
return self.predict_sample(x, tree['left'])
else:
return self.predict_sample(x, tree['right'])
def predict(self, X):
return np.array([self.predict_sample(x, self.tree) for x in X])
# 测试
X_simple, y_simple = make_classification(n_samples=200, n_features=2,
n_redundant=0, n_informative=2,
random_state=42, n_clusters_per_class=1)
X_train_s, X_test_s, y_train_s, y_test_s = train_test_split(
X_simple, y_simple, test_size=0.2, random_state=42
)
# 训练
tree = SimpleDecisionTree(max_depth=5)
tree.fit(X_train_s, y_train_s)
# 预测
y_pred_s = tree.predict(X_test_s)
accuracy_s = np.mean(y_pred_s == y_test_s)
print(f"自实现决策树准确率: {accuracy_s:.4f}")
---
3.4 随机森林
01.集成学习原理
a.集成学习思想
a.三个臭皮匠
多个弱学习器组合成强学习器。
b.降低方差
通过平均多个模型的预测,降低过拟合风险。
c.提高稳定性
单个模型的错误可以被其他模型纠正。
b.集成方法
a.Bagging
并行训练多个模型,投票或平均预测结果。
b.Boosting
串行训练模型,后续模型关注前面模型的错误。
c.Stacking
用元学习器组合多个基学习器的预测。
02.随机森林算法
a.核心思想
a.Bootstrap采样
从训练集中有放回地随机采样,生成多个子集。
b.随机特征选择
每次分裂时,从随机选择的特征子集中选择最佳特征。
c.多棵决策树
训练多棵决策树,每棵树使用不同的数据和特征子集。
d.投票或平均
分类任务投票,回归任务平均。
b.算法流程
a.步骤1
从训练集中Bootstrap采样,生成n个子集。
b.步骤2
对每个子集训练一棵决策树,分裂时随机选择特征子集。
c.步骤3
预测时,所有树投票(分类)或平均(回归)。
c.优点
a.准确率高
通常比单棵决策树准确率更高。
b.抗过拟合
通过集成降低方差,不容易过拟合。
c.特征重要性
可以评估特征的重要性。
d.并行训练
每棵树独立训练,可以并行加速。
d.缺点
a.模型复杂
包含多棵树,模型较大。
b.训练时间长
需要训练多棵树。
c.可解释性差
难以解释单个预测的原因。
03.使用Scikit-learn
a.分类任务
---
from sklearn.ensemble import RandomForestClassifier
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练随机森林
rf_clf = RandomForestClassifier(n_estimators=100, max_depth=5,
random_state=42, n_jobs=-1)
rf_clf.fit(X_train, y_train)
# 预测
y_pred = rf_clf.predict(X_test)
y_proba = rf_clf.predict_proba(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
print(f"\n分类报告:\n{classification_report(y_test, y_pred)}")
# 特征重要性
importances = rf_clf.feature_importances_
indices = np.argsort(importances)[::-1]
plt.figure(figsize=(10, 6))
plt.bar(range(len(importances)), importances[indices])
plt.xticks(range(len(importances)),
[iris.feature_names[i] for i in indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('随机森林特征重要性')
plt.tight_layout()
plt.show()
# 单棵树 vs 随机森林对比
single_tree = DecisionTreeClassifier(max_depth=5, random_state=42)
single_tree.fit(X_train, y_train)
single_accuracy = single_tree.score(X_test, y_test)
print(f"\n单棵决策树准确率: {single_accuracy:.4f}")
print(f"随机森林准确率: {accuracy:.4f}")
print(f"提升: {(accuracy - single_accuracy) * 100:.2f}%")
---
b.回归任务
---
from sklearn.ensemble import RandomForestRegressor
# 生成回归数据
X_reg, y_reg = make_regression(n_samples=200, n_features=10,
noise=10, random_state=42)
X_train_r, X_test_r, y_train_r, y_test_r = train_test_split(
X_reg, y_reg, test_size=0.2, random_state=42
)
# 训练随机森林回归
rf_reg = RandomForestRegressor(n_estimators=100, max_depth=10,
random_state=42, n_jobs=-1)
rf_reg.fit(X_train_r, y_train_r)
# 预测
y_pred_r = rf_reg.predict(X_test_r)
# 评估
mse = mean_squared_error(y_test_r, y_pred_r)
r2 = r2_score(y_test_r, y_pred_r)
print(f"MSE: {mse:.4f}")
print(f"R²: {r2:.4f}")
# 可视化预测结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test_r, y_pred_r, alpha=0.5)
plt.plot([y_test_r.min(), y_test_r.max()],
[y_test_r.min(), y_test_r.max()],
'r--', linewidth=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('随机森林回归预测')
plt.grid(True)
plt.show()
---
c.超参数调优
---
# 定义参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['sqrt', 'log2', None]
}
# 随机搜索(比网格搜索更快)
from sklearn.model_selection import RandomizedSearchCV
rf_random = RandomForestClassifier(random_state=42, n_jobs=-1)
random_search = RandomizedSearchCV(rf_random, param_grid, n_iter=20,
cv=5, scoring='accuracy',
random_state=42, n_jobs=-1)
random_search.fit(X_train, y_train)
print(f"最佳参数: {random_search.best_params_}")
print(f"最佳分数: {random_search.best_score_:.4f}")
# 测试集评估
best_rf = random_search.best_estimator_
test_score = best_rf.score(X_test, y_test)
print(f"测试集准确率: {test_score:.4f}")
---
04.树的数量对性能的影响
a.实验
---
# 不同树数量的性能
n_estimators_range = [1, 5, 10, 20, 50, 100, 200, 500]
train_scores = []
test_scores = []
for n in n_estimators_range:
rf = RandomForestClassifier(n_estimators=n, random_state=42, n_jobs=-1)
rf.fit(X_train, y_train)
train_scores.append(rf.score(X_train, y_train))
test_scores.append(rf.score(X_test, y_test))
# 可视化
plt.figure(figsize=(10, 6))
plt.plot(n_estimators_range, train_scores, 'o-', label='训练集', linewidth=2)
plt.plot(n_estimators_range, test_scores, 's-', label='测试集', linewidth=2)
plt.xlabel('树的数量')
plt.ylabel('准确率')
plt.title('树的数量对性能的影响')
plt.legend()
plt.grid(True)
plt.show()
---
05.Out-of-Bag评估
a.原理
a.Bootstrap采样
每棵树使用约63.2%的样本训练。
b.OOB样本
剩余约36.8%的样本未被使用,称为Out-of-Bag样本。
c.OOB评估
用OOB样本评估模型,无需单独的验证集。
b.代码示例
---
# 启用OOB评估
rf_oob = RandomForestClassifier(n_estimators=100, oob_score=True,
random_state=42, n_jobs=-1)
rf_oob.fit(X_train, y_train)
print(f"OOB分数: {rf_oob.oob_score_:.4f}")
print(f"测试集分数: {rf_oob.score(X_test, y_test):.4f}")
---
06.从零实现随机森林
a.简化版实现
---
# 简化的随机森林实现
class SimpleRandomForest:
def __init__(self, n_estimators=10, max_depth=5, max_features='sqrt'):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.max_features = max_features
self.trees = []
def bootstrap_sample(self, X, y):
n_samples = X.shape[0]
indices = np.random.choice(n_samples, n_samples, replace=True)
return X[indices], y[indices]
def fit(self, X, y):
self.trees = []
n_features = X.shape[1]
# 确定每棵树使用的特征数
if self.max_features == 'sqrt':
max_features = int(np.sqrt(n_features))
elif self.max_features == 'log2':
max_features = int(np.log2(n_features))
else:
max_features = n_features
for _ in range(self.n_estimators):
# Bootstrap采样
X_sample, y_sample = self.bootstrap_sample(X, y)
# 随机选择特征
feature_indices = np.random.choice(n_features, max_features,
replace=False)
# 训练决策树
tree = DecisionTreeClassifier(max_depth=self.max_depth,
random_state=None)
tree.fit(X_sample[:, feature_indices], y_sample)
self.trees.append((tree, feature_indices))
def predict(self, X):
# 收集所有树的预测
predictions = np.zeros((X.shape[0], self.n_estimators))
for i, (tree, feature_indices) in enumerate(self.trees):
predictions[:, i] = tree.predict(X[:, feature_indices])
# 投票
return np.apply_along_axis(
lambda x: np.bincount(x.astype(int)).argmax(),
axis=1, arr=predictions
)
# 测试
srf = SimpleRandomForest(n_estimators=50, max_depth=5)
srf.fit(X_train, y_train)
y_pred_srf = srf.predict(X_test)
accuracy_srf = np.mean(y_pred_srf == y_test)
print(f"自实现随机森林准确率: {accuracy_srf:.4f}")
---
3.5 支持向量机
01.基本原理
a.最大间隔
a.核心思想
找到一个超平面,使得两类样本之间的间隔最大。
b.支持向量
距离超平面最近的样本点,决定了超平面的位置。
c.几何间隔
样本点到超平面的距离,间隔越大,分类越可靠。
b.线性可分SVM
a.决策函数
f(x) = w^T x + b,超平面方程:w^T x + b = 0
b.分类规则
y = sign(w^T x + b),正类为+1,负类为-1。
c.优化目标
a.最大化间隔
max 2/||w||,等价于 min (1/2)||w||²
b.约束条件
yᵢ(w^T xᵢ + b) ≥ 1,所有样本正确分类且间隔≥1。
c.对偶问题
a.拉格朗日乘子法
引入拉格朗日乘子α,将约束优化转为无约束优化。
b.对偶形式
max Σαᵢ - (1/2)ΣΣαᵢαⱼyᵢyⱼxᵢ^T xⱼ,约束:Σαᵢyᵢ = 0, αᵢ ≥ 0
c.KKT条件
αᵢ[yᵢ(w^T xᵢ + b) - 1] = 0,支持向量对应的αᵢ > 0。
02.软间隔SVM
a.线性不可分
a.问题
实际数据往往线性不可分,存在噪声和异常点。
b.解决方法
引入松弛变量ξᵢ,允许部分样本违反间隔约束。
b.优化目标
a.目标函数
min (1/2)||w||² + C * Σξᵢ
b.约束条件
yᵢ(w^T xᵢ + b) ≥ 1 - ξᵢ, ξᵢ ≥ 0
c.参数C
惩罚系数,C越大,对误分类的惩罚越大。
c.代码示例
---
from sklearn.svm import SVC
# 生成线性不可分数据
X, y = make_classification(n_samples=200, n_features=2, n_redundant=0,
n_informative=2, random_state=42,
n_clusters_per_class=1, flip_y=0.1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 不同C值的SVM
C_values = [0.01, 0.1, 1, 10, 100]
plt.figure(figsize=(20, 4))
for i, C in enumerate(C_values):
# 训练SVM
svm = SVC(kernel='linear', C=C, random_state=42)
svm.fit(X_train, y_train)
# 评估
train_score = svm.score(X_train, y_train)
test_score = svm.score(X_test, y_test)
# 可视化决策边界
plt.subplot(1, len(C_values), i+1)
# 绘制数据点
plt.scatter(X_train[y_train==0, 0], X_train[y_train==0, 1],
c='blue', label='类别0', alpha=0.6)
plt.scatter(X_train[y_train==1, 0], X_train[y_train==1, 1],
c='red', label='类别1', alpha=0.6)
# 绘制决策边界和间隔
ax = plt.gca()
xlim = ax.get_xlim()
ylim = ax.get_ylim()
xx = np.linspace(xlim[0], xlim[1], 30)
yy = np.linspace(ylim[0], ylim[1], 30)
YY, XX = np.meshgrid(yy, xx)
xy = np.vstack([XX.ravel(), YY.ravel()]).T
Z = svm.decision_function(xy).reshape(XX.shape)
# 决策边界和间隔
ax.contour(XX, YY, Z, colors='k', levels=[-1, 0, 1],
alpha=0.5, linestyles=['--', '-', '--'])
# 支持向量
ax.scatter(svm.support_vectors_[:, 0], svm.support_vectors_[:, 1],
s=100, linewidth=1, facecolors='none', edgecolors='k',
label='支持向量')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title(f'C={C}\n训练={train_score:.3f}, 测试={test_score:.3f}')
plt.legend()
plt.tight_layout()
plt.show()
---
03.核技巧
a.非线性问题
a.问题
很多实际问题在原始空间线性不可分。
b.解决思路
将数据映射到高维空间,使其线性可分。
b.核函数
a.定义
K(x, z) = φ(x)^T φ(z),直接计算内积,无需显式映射。
b.常用核函数
a.线性核
K(x, z) = x^T z
b.多项式核
K(x, z) = (γx^T z + r)^d
c.RBF核
K(x, z) = exp(-γ||x - z||²),最常用。
d.Sigmoid核
K(x, z) = tanh(γx^T z + r)
c.RBF核SVM
---
# RBF核SVM
# 生成非线性数据
from sklearn.datasets import make_moons
X_moon, y_moon = make_moons(n_samples=200, noise=0.15, random_state=42)
X_train_m, X_test_m, y_train_m, y_test_m = train_test_split(
X_moon, y_moon, test_size=0.2, random_state=42
)
# 线性核 vs RBF核
kernels = ['linear', 'rbf']
plt.figure(figsize=(15, 5))
for i, kernel in enumerate(kernels):
# 训练
svm = SVC(kernel=kernel, C=1.0, gamma='scale', random_state=42)
svm.fit(X_train_m, y_train_m)
# 评估
train_score = svm.score(X_train_m, y_train_m)
test_score = svm.score(X_test_m, y_test_m)
# 可视化
plt.subplot(1, 2, i+1)
# 数据点
plt.scatter(X_train_m[y_train_m==0, 0], X_train_m[y_train_m==0, 1],
c='blue', label='类别0', alpha=0.6)
plt.scatter(X_train_m[y_train_m==1, 0], X_train_m[y_train_m==1, 1],
c='red', label='类别1', alpha=0.6)
# 决策边界
ax = plt.gca()
xlim = ax.get_xlim()
ylim = ax.get_ylim()
xx = np.linspace(xlim[0], xlim[1], 200)
yy = np.linspace(ylim[0], ylim[1], 200)
YY, XX = np.meshgrid(yy, xx)
xy = np.vstack([XX.ravel(), YY.ravel()]).T
Z = svm.decision_function(xy).reshape(XX.shape)
ax.contourf(XX, YY, Z, alpha=0.3, levels=20, cmap='RdBu')
ax.contour(XX, YY, Z, colors='k', levels=[0], linewidths=2)
# 支持向量
ax.scatter(svm.support_vectors_[:, 0], svm.support_vectors_[:, 1],
s=100, linewidth=1, facecolors='none', edgecolors='k')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title(f'{kernel.upper()}核\n训练={train_score:.3f}, 测试={test_score:.3f}')
plt.legend()
plt.tight_layout()
plt.show()
---
d.超参数调优
---
# RBF核的C和gamma调优
param_grid = {
'C': [0.1, 1, 10, 100],
'gamma': [0.001, 0.01, 0.1, 1, 'scale', 'auto']
}
svm_grid = SVC(kernel='rbf', random_state=42)
grid_search = GridSearchCV(svm_grid, param_grid, cv=5,
scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train_m, y_train_m)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")
# 测试集评估
best_svm = grid_search.best_estimator_
test_score = best_svm.score(X_test_m, y_test_m)
print(f"测试集准确率: {test_score:.4f}")
# 可视化参数影响
scores = grid_search.cv_results_['mean_test_score'].reshape(
len(param_grid['C']), len(param_grid['gamma'])
)
plt.figure(figsize=(10, 8))
plt.imshow(scores, interpolation='nearest', cmap='viridis')
plt.xlabel('gamma')
plt.ylabel('C')
plt.colorbar(label='准确率')
plt.xticks(np.arange(len(param_grid['gamma'])), param_grid['gamma'])
plt.yticks(np.arange(len(param_grid['C'])), param_grid['C'])
plt.title('C和gamma对准确率的影响')
plt.show()
---
04.多分类SVM
a.一对一
训练k(k-1)/2个二分类器,投票决定最终类别。
b.一对多
训练k个二分类器,选择决策函数值最大的类别。
c.代码示例
---
# 多分类SVM
iris = load_iris()
X_iris, y_iris = iris.data, iris.target
X_train_i, X_test_i, y_train_i, y_test_i = train_test_split(
X_iris, y_iris, test_size=0.2, random_state=42
)
# 训练多分类SVM
svm_multi = SVC(kernel='rbf', C=1.0, gamma='scale',
decision_function_shape='ovr', random_state=42)
svm_multi.fit(X_train_i, y_train_i)
# 预测
y_pred_i = svm_multi.predict(X_test_i)
# 评估
accuracy_i = accuracy_score(y_test_i, y_pred_i)
print(f"多分类准确率: {accuracy_i:.4f}")
print(f"\n分类报告:\n{classification_report(y_test_i, y_pred_i)}")
# 混淆矩阵
cm = confusion_matrix(y_test_i, y_pred_i)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.title('多分类SVM混淆矩阵')
plt.show()
---
05.SVM回归
a.SVR原理
a.ε-不敏感损失
|y - f(x)| ≤ ε时损失为0,超过ε才计算损失。
b.支持向量
落在ε管道边界上或外部的样本。
b.代码示例
---
from sklearn.svm import SVR
# 生成回归数据
X_svr = np.sort(5 * np.random.rand(100, 1), axis=0)
y_svr = np.sin(X_svr).ravel() + np.random.randn(100) * 0.1
# 训练SVR
svr = SVR(kernel='rbf', C=100, gamma=0.1, epsilon=0.1)
svr.fit(X_svr, y_svr)
# 预测
X_test_svr = np.linspace(0, 5, 100).reshape(-1, 1)
y_pred_svr = svr.predict(X_test_svr)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_svr, y_svr, c='blue', label='训练数据', alpha=0.6)
plt.plot(X_test_svr, y_pred_svr, 'r-', linewidth=2, label='SVR预测')
# ε管道
plt.fill_between(X_test_svr.ravel(),
y_pred_svr - svr.epsilon,
y_pred_svr + svr.epsilon,
alpha=0.2, color='red', label='ε管道')
plt.xlabel('X')
plt.ylabel('y')
plt.title('支持向量回归')
plt.legend()
plt.grid(True)
plt.show()
---
3.6 集成学习
01.Bagging方法
a.Bootstrap Aggregating
a.原理
通过Bootstrap采样生成多个训练集,训练多个模型,聚合预测结果。
b.降低方差
通过平均多个模型的预测,降低模型的方差,减少过拟合。
c.并行训练
每个模型独立训练,可以并行加速。
b.代码示例
---
from sklearn.ensemble import BaggingClassifier
# 基学习器:决策树
base_clf = DecisionTreeClassifier(random_state=42)
# Bagging集成
bagging = BaggingClassifier(base_estimator=base_clf,
n_estimators=50,
max_samples=0.8,
max_features=0.8,
random_state=42,
n_jobs=-1)
# 训练
X, y = make_classification(n_samples=1000, n_features=20,
n_informative=15, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
bagging.fit(X_train, y_train)
# 对比单个模型
base_clf.fit(X_train, y_train)
print(f"单个决策树准确率: {base_clf.score(X_test, y_test):.4f}")
print(f"Bagging准确率: {bagging.score(X_test, y_test):.4f}")
---
02.Boosting方法
a.AdaBoost
a.原理
串行训练多个弱学习器,每个学习器关注前面学习器的错误。
b.样本权重
错误分类的样本权重增加,正确分类的样本权重减少。
c.模型权重
准确率高的模型权重大,准确率低的模型权重小。
d.代码示例
---
from sklearn.ensemble import AdaBoostClassifier
# AdaBoost
ada = AdaBoostClassifier(base_estimator=DecisionTreeClassifier(max_depth=1),
n_estimators=50,
learning_rate=1.0,
random_state=42)
ada.fit(X_train, y_train)
# 评估
print(f"AdaBoost准确率: {ada.score(X_test, y_test):.4f}")
# 可视化学习过程
train_scores = []
test_scores = []
for i, y_pred in enumerate(ada.staged_predict(X_train)):
train_scores.append(accuracy_score(y_train, y_pred))
for i, y_pred in enumerate(ada.staged_predict(X_test)):
test_scores.append(accuracy_score(y_test, y_pred))
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(train_scores) + 1), train_scores,
label='训练集', linewidth=2)
plt.plot(range(1, len(test_scores) + 1), test_scores,
label='测试集', linewidth=2)
plt.xlabel('迭代次数')
plt.ylabel('准确率')
plt.title('AdaBoost学习曲线')
plt.legend()
plt.grid(True)
plt.show()
---
b.Gradient Boosting
a.原理
每个新模型拟合前面模型的残差(负梯度)。
b.损失函数
可以使用不同的损失函数(MSE、交叉熵等)。
c.代码示例
---
from sklearn.ensemble import GradientBoostingClassifier
# Gradient Boosting
gb = GradientBoostingClassifier(n_estimators=100,
learning_rate=0.1,
max_depth=3,
random_state=42)
gb.fit(X_train, y_train)
# 评估
print(f"Gradient Boosting准确率: {gb.score(X_test, y_test):.4f}")
# 特征重要性
importances = gb.feature_importances_
indices = np.argsort(importances)[::-1][:10]
plt.figure(figsize=(10, 6))
plt.bar(range(len(indices)), importances[indices])
plt.xlabel('特征索引')
plt.ylabel('重要性')
plt.title('Gradient Boosting特征重要性(Top 10)')
plt.xticks(range(len(indices)), indices)
plt.grid(True)
plt.show()
---
03.XGBoost
a.核心特性
a.正则化
在目标函数中加入L1和L2正则化,防止过拟合。
b.并行计算
特征预排序,并行寻找最佳分裂点。
c.缺失值处理
自动学习缺失值的分裂方向。
d.剪枝
从叶到根的后剪枝,更高效。
b.安装与使用
---
# 安装:pip install xgboost
import xgboost as xgb
from xgboost import XGBClassifier
# XGBoost分类
xgb_clf = XGBClassifier(n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42,
use_label_encoder=False,
eval_metric='logloss')
xgb_clf.fit(X_train, y_train)
# 评估
print(f"XGBoost准确率: {xgb_clf.score(X_test, y_test):.4f}")
# 特征重要性
xgb.plot_importance(xgb_clf, max_num_features=10)
plt.title('XGBoost特征重要性')
plt.show()
---
c.超参数调优
---
# XGBoost参数网格
param_grid_xgb = {
'n_estimators': [50, 100, 200],
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'subsample': [0.8, 1.0],
'colsample_bytree': [0.8, 1.0]
}
xgb_grid = XGBClassifier(random_state=42, use_label_encoder=False,
eval_metric='logloss')
grid_xgb = GridSearchCV(xgb_grid, param_grid_xgb, cv=3,
scoring='accuracy', n_jobs=-1)
grid_xgb.fit(X_train, y_train)
print(f"XGBoost最佳参数: {grid_xgb.best_params_}")
print(f"XGBoost最佳分数: {grid_xgb.best_score_:.4f}")
---
04.LightGBM
a.核心特性
a.基于直方图
将连续特征离散化为直方图,加速训练。
b.叶子生长策略
Leaf-wise生长,而非Level-wise,更高效。
c.类别特征支持
直接支持类别特征,无需编码。
b.使用示例
---
# 安装:pip install lightgbm
import lightgbm as lgb
from lightgbm import LGBMClassifier
# LightGBM分类
lgb_clf = LGBMClassifier(n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42)
lgb_clf.fit(X_train, y_train)
# 评估
print(f"LightGBM准确率: {lgb_clf.score(X_test, y_test):.4f}")
# 特征重要性
lgb.plot_importance(lgb_clf, max_num_features=10)
plt.title('LightGBM特征重要性')
plt.show()
---
05.CatBoost
a.核心特性
a.类别特征处理
自动处理类别特征,无需手动编码。
b.有序Boosting
减少预测偏移,提高泛化能力。
c.GPU加速
支持GPU训练,速度更快。
b.使用示例
---
# 安装:pip install catboost
from catboost import CatBoostClassifier
# CatBoost分类
cat_clf = CatBoostClassifier(iterations=100,
learning_rate=0.1,
depth=5,
random_state=42,
verbose=False)
cat_clf.fit(X_train, y_train)
# 评估
print(f"CatBoost准确率: {cat_clf.score(X_test, y_test):.4f}")
# 特征重要性
importances = cat_clf.get_feature_importance()
indices = np.argsort(importances)[::-1][:10]
plt.figure(figsize=(10, 6))
plt.bar(range(len(indices)), importances[indices])
plt.xlabel('特征索引')
plt.ylabel('重要性')
plt.title('CatBoost特征重要性(Top 10)')
plt.xticks(range(len(indices)), indices)
plt.grid(True)
plt.show()
---
06.模型对比
a.性能对比
---
# 对比多个集成模型
models = {
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'AdaBoost': AdaBoostClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'XGBoost': XGBClassifier(n_estimators=100, random_state=42,
use_label_encoder=False, eval_metric='logloss'),
'LightGBM': LGBMClassifier(n_estimators=100, random_state=42),
'CatBoost': CatBoostClassifier(iterations=100, random_state=42, verbose=False)
}
results = {}
for name, model in models.items():
model.fit(X_train, y_train)
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {'train': train_score, 'test': test_score}
print(f"{name}: 训练={train_score:.4f}, 测试={test_score:.4f}")
# 可视化对比
names = list(results.keys())
train_scores = [results[name]['train'] for name in names]
test_scores = [results[name]['test'] for name in names]
x = np.arange(len(names))
width = 0.35
plt.figure(figsize=(12, 6))
plt.bar(x - width/2, train_scores, width, label='训练集')
plt.bar(x + width/2, test_scores, width, label='测试集')
plt.xlabel('模型')
plt.ylabel('准确率')
plt.title('集成学习模型性能对比')
plt.xticks(x, names, rotation=45, ha='right')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
---
4 无监督学习
4.1 聚类算法
01.K-Means聚类
a.算法原理
a.基本思想
将数据分成K个簇,使得簇内样本相似度高,簇间样本相似度低。
b.目标函数
最小化簇内平方和:J = ΣΣ||xᵢ - μₖ||²
c.算法流程
a.初始化
随机选择K个样本作为初始中心点。
b.分配
将每个样本分配到最近的中心点。
c.更新
重新计算每个簇的中心点(均值)。
d.迭代
重复分配和更新,直到中心点不再变化或达到最大迭代次数。
b.从零实现
---
# K-Means从零实现
class KMeans:
def __init__(self, n_clusters=3, max_iters=100, random_state=None):
self.n_clusters = n_clusters
self.max_iters = max_iters
self.random_state = random_state
self.centroids = None
self.labels = None
def fit(self, X):
np.random.seed(self.random_state)
# 随机初始化中心点
n_samples = X.shape[0]
random_indices = np.random.choice(n_samples, self.n_clusters, replace=False)
self.centroids = X[random_indices].copy()
for iteration in range(self.max_iters):
# 分配样本到最近的中心
distances = np.sqrt(((X - self.centroids[:, np.newaxis])**2).sum(axis=2))
self.labels = np.argmin(distances, axis=0)
# 更新中心点
new_centroids = np.array([
X[self.labels == k].mean(axis=0) if np.sum(self.labels == k) > 0
else self.centroids[k]
for k in range(self.n_clusters)
])
# 检查收敛
if np.allclose(self.centroids, new_centroids):
print(f"收敛于第{iteration}次迭代")
break
self.centroids = new_centroids
return self
def predict(self, X):
distances = np.sqrt(((X - self.centroids[:, np.newaxis])**2).sum(axis=2))
return np.argmin(distances, axis=0)
def fit_predict(self, X):
self.fit(X)
return self.labels
# 测试
from sklearn.datasets import make_blobs
X, y_true = make_blobs(n_samples=300, centers=4, n_features=2,
cluster_std=0.6, random_state=42)
# 训练
kmeans = KMeans(n_clusters=4, random_state=42)
y_pred = kmeans.fit_predict(X)
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.6)
plt.title('真实标签')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.grid(True)
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.6)
plt.scatter(kmeans.centroids[:, 0], kmeans.centroids[:, 1],
c='red', marker='X', s=200, edgecolors='black', label='中心点')
plt.title('K-Means聚类结果')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
c.使用Scikit-learn
---
from sklearn.cluster import KMeans as SKLearnKMeans
# 训练
kmeans_sklearn = SKLearnKMeans(n_clusters=4, random_state=42, n_init=10)
y_pred_sklearn = kmeans_sklearn.fit_predict(X)
# 评估
from sklearn.metrics import silhouette_score, davies_bouldin_score
silhouette = silhouette_score(X, y_pred_sklearn)
davies_bouldin = davies_bouldin_score(X, y_pred_sklearn)
print(f"轮廓系数: {silhouette:.4f} (越接近1越好)")
print(f"Davies-Bouldin指数: {davies_bouldin:.4f} (越小越好)")
print(f"簇内平方和: {kmeans_sklearn.inertia_:.4f}")
---
d.确定最优K值
a.肘部法则
---
# 肘部法则
inertias = []
K_range = range(2, 11)
for k in K_range:
kmeans_k = SKLearnKMeans(n_clusters=k, random_state=42, n_init=10)
kmeans_k.fit(X)
inertias.append(kmeans_k.inertia_)
plt.figure(figsize=(10, 6))
plt.plot(K_range, inertias, 'bo-', linewidth=2, markersize=8)
plt.xlabel('K值')
plt.ylabel('簇内平方和')
plt.title('肘部法则确定最优K值')
plt.grid(True)
plt.show()
---
b.轮廓系数法
---
# 轮廓系数法
silhouette_scores = []
for k in K_range:
kmeans_k = SKLearnKMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans_k.fit_predict(X)
score = silhouette_score(X, labels)
silhouette_scores.append(score)
plt.figure(figsize=(10, 6))
plt.plot(K_range, silhouette_scores, 'go-', linewidth=2, markersize=8)
plt.xlabel('K值')
plt.ylabel('轮廓系数')
plt.title('轮廓系数法确定最优K值')
plt.grid(True)
plt.show()
best_k = K_range[np.argmax(silhouette_scores)]
print(f"最优K值: {best_k}")
---
02.层次聚类
a.算法原理
a.凝聚层次聚类
自底向上,每个样本初始为一个簇,逐步合并最相似的簇。
b.分裂层次聚类
自顶向下,所有样本初始为一个簇,逐步分裂。
c.距离度量
a.单链接
两簇最近样本的距离。
b.全链接
两簇最远样本的距离。
c.平均链接
两簇所有样本对距离的平均。
d.Ward链接
合并后簇内平方和的增量最小。
b.使用Scikit-learn
---
from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram, linkage
# 层次聚类
hierarchical = AgglomerativeClustering(n_clusters=4, linkage='ward')
y_hierarchical = hierarchical.fit_predict(X)
# 可视化聚类结果
plt.figure(figsize=(10, 6))
plt.scatter(X[:, 0], X[:, 1], c=y_hierarchical, cmap='viridis', alpha=0.6)
plt.title('层次聚类结果')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.grid(True)
plt.show()
# 树状图
plt.figure(figsize=(12, 6))
Z = linkage(X, method='ward')
dendrogram(Z)
plt.title('层次聚类树状图')
plt.xlabel('样本索引')
plt.ylabel('距离')
plt.show()
---
03.DBSCAN
a.算法原理
a.基于密度
将高密度区域的样本聚为一簇,能发现任意形状的簇。
b.核心概念
a.ε邻域
距离样本ε以内的所有样本。
b.核心点
ε邻域内至少有MinPts个样本的点。
c.边界点
不是核心点,但在某个核心点的ε邻域内。
d.噪声点
既不是核心点也不是边界点。
c.优点
a.无需指定K
不需要预先指定簇的数量。
b.任意形状
能发现任意形状的簇。
c.噪声处理
能识别噪声点。
b.使用Scikit-learn
---
from sklearn.cluster import DBSCAN
# 生成复杂形状数据
from sklearn.datasets import make_moons
X_moons, _ = make_moons(n_samples=300, noise=0.05, random_state=42)
# DBSCAN聚类
dbscan = DBSCAN(eps=0.2, min_samples=5)
y_dbscan = dbscan.fit_predict(X_moons)
# 可视化
plt.figure(figsize=(15, 5))
# K-Means(对比)
plt.subplot(1, 3, 1)
kmeans_moons = SKLearnKMeans(n_clusters=2, random_state=42)
y_kmeans_moons = kmeans_moons.fit_predict(X_moons)
plt.scatter(X_moons[:, 0], X_moons[:, 1], c=y_kmeans_moons, cmap='viridis', alpha=0.6)
plt.title('K-Means(不适合)')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.grid(True)
# DBSCAN
plt.subplot(1, 3, 2)
plt.scatter(X_moons[:, 0], X_moons[:, 1], c=y_dbscan, cmap='viridis', alpha=0.6)
plt.title(f'DBSCAN(簇数={len(set(y_dbscan)) - (1 if -1 in y_dbscan else 0)})')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.grid(True)
# 噪声点
plt.subplot(1, 3, 3)
noise_mask = y_dbscan == -1
plt.scatter(X_moons[~noise_mask, 0], X_moons[~noise_mask, 1],
c=y_dbscan[~noise_mask], cmap='viridis', alpha=0.6, label='正常点')
plt.scatter(X_moons[noise_mask, 0], X_moons[noise_mask, 1],
c='red', marker='x', s=100, label='噪声点')
plt.title(f'DBSCAN噪声检测(噪声点={noise_mask.sum()})')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
c.参数调优
---
# 不同eps和min_samples的影响
eps_values = [0.1, 0.2, 0.3, 0.4]
min_samples_values = [3, 5, 10]
fig, axes = plt.subplots(len(min_samples_values), len(eps_values),
figsize=(16, 12))
for i, min_samples in enumerate(min_samples_values):
for j, eps in enumerate(eps_values):
dbscan_param = DBSCAN(eps=eps, min_samples=min_samples)
y_param = dbscan_param.fit_predict(X_moons)
n_clusters = len(set(y_param)) - (1 if -1 in y_param else 0)
n_noise = list(y_param).count(-1)
axes[i, j].scatter(X_moons[:, 0], X_moons[:, 1],
c=y_param, cmap='viridis', alpha=0.6)
axes[i, j].set_title(f'eps={eps}, min_samples={min_samples}\n'
f'簇数={n_clusters}, 噪声={n_noise}')
axes[i, j].grid(True)
plt.tight_layout()
plt.show()
---
04.聚类评估
a.内部评估
a.轮廓系数
---
# 轮廓系数详解
from sklearn.metrics import silhouette_samples
# 计算每个样本的轮廓系数
silhouette_vals = silhouette_samples(X, y_pred_sklearn)
# 可视化轮廓图
plt.figure(figsize=(10, 6))
y_lower = 10
for i in range(4):
cluster_silhouette_vals = silhouette_vals[y_pred_sklearn == i]
cluster_silhouette_vals.sort()
size_cluster_i = cluster_silhouette_vals.shape[0]
y_upper = y_lower + size_cluster_i
plt.fill_betweenx(np.arange(y_lower, y_upper),
0, cluster_silhouette_vals,
alpha=0.7, label=f'簇{i}')
y_lower = y_upper + 10
plt.axvline(x=silhouette, color='red', linestyle='--',
label=f'平均轮廓系数={silhouette:.3f}')
plt.xlabel('轮廓系数')
plt.ylabel('簇')
plt.title('轮廓图')
plt.legend()
plt.grid(True)
plt.show()
---
b.Calinski-Harabasz指数
---
from sklearn.metrics import calinski_harabasz_score
ch_score = calinski_harabasz_score(X, y_pred_sklearn)
print(f"Calinski-Harabasz指数: {ch_score:.4f} (越大越好)")
---
b.外部评估
a.调整兰德指数
---
from sklearn.metrics import adjusted_rand_score
# 需要真实标签
ari = adjusted_rand_score(y_true, y_pred_sklearn)
print(f"调整兰德指数: {ari:.4f} (1表示完全一致)")
---
b.归一化互信息
---
from sklearn.metrics import normalized_mutual_info_score
nmi = normalized_mutual_info_score(y_true, y_pred_sklearn)
print(f"归一化互信息: {nmi:.4f} (1表示完全一致)")
---
4.2 降维算法
01.主成分分析
a.PCA原理
a.核心思想
找到数据方差最大的方向,将数据投影到这些方向上,实现降维。
b.数学推导
a.中心化
X_centered = X - mean(X)
b.协方差矩阵
C = (1/n) * X_centered^T * X_centered
c.特征分解
求协方差矩阵的特征值和特征向量。
d.选择主成分
选择特征值最大的k个特征向量作为主成分。
c.降维过程
X_reduced = X_centered * W,其中W是主成分矩阵。
b.从零实现
---
# PCA从零实现
class PCA:
def __init__(self, n_components):
self.n_components = n_components
self.components = None
self.mean = None
self.explained_variance = None
self.explained_variance_ratio = None
def fit(self, X):
# 中心化
self.mean = np.mean(X, axis=0)
X_centered = X - self.mean
# 协方差矩阵
cov_matrix = np.cov(X_centered.T)
# 特征分解
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
# 排序
idx = eigenvalues.argsort()[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# 选择主成分
self.components = eigenvectors[:, :self.n_components]
self.explained_variance = eigenvalues[:self.n_components]
self.explained_variance_ratio = self.explained_variance / eigenvalues.sum()
return self
def transform(self, X):
X_centered = X - self.mean
return np.dot(X_centered, self.components)
def fit_transform(self, X):
self.fit(X)
return self.transform(X)
def inverse_transform(self, X_reduced):
return np.dot(X_reduced, self.components.T) + self.mean
# 测试
iris = load_iris()
X_iris = iris.data
y_iris = iris.target
# 降维到2维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_iris)
print(f"原始数据形状: {X_iris.shape}")
print(f"降维后形状: {X_pca.shape}")
print(f"解释方差比例: {pca.explained_variance_ratio}")
print(f"累计解释方差: {pca.explained_variance_ratio.sum():.2%}")
# 可视化
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y_iris,
cmap='viridis', alpha=0.6, edgecolors='black')
plt.xlabel(f'第一主成分 ({pca.explained_variance_ratio[0]:.2%})')
plt.ylabel(f'第二主成分 ({pca.explained_variance_ratio[1]:.2%})')
plt.title('PCA降维可视化')
plt.colorbar(scatter, label='类别')
plt.grid(True)
plt.show()
---
c.使用Scikit-learn
---
from sklearn.decomposition import PCA as SKLearnPCA
# PCA降维
pca_sklearn = SKLearnPCA(n_components=2)
X_pca_sklearn = pca_sklearn.fit_transform(X_iris)
print(f"解释方差比例: {pca_sklearn.explained_variance_ratio_}")
print(f"累计解释方差: {pca_sklearn.explained_variance_ratio_.sum():.2%}")
# 主成分
print(f"\n主成分矩阵:\n{pca_sklearn.components_}")
# 重构误差
X_reconstructed = pca_sklearn.inverse_transform(X_pca_sklearn)
reconstruction_error = np.mean((X_iris - X_reconstructed) ** 2)
print(f"\n重构误差: {reconstruction_error:.6f}")
---
d.确定主成分数量
a.累计解释方差
---
# 不同主成分数量的解释方差
pca_full = SKLearnPCA()
pca_full.fit(X_iris)
cumsum = np.cumsum(pca_full.explained_variance_ratio_)
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(cumsum) + 1), cumsum, 'bo-', linewidth=2)
plt.axhline(y=0.95, color='r', linestyle='--', label='95%阈值')
plt.xlabel('主成分数量')
plt.ylabel('累计解释方差比例')
plt.title('主成分数量选择')
plt.legend()
plt.grid(True)
plt.show()
# 找到达到95%的主成分数量
n_components_95 = np.argmax(cumsum >= 0.95) + 1
print(f"达到95%解释方差需要{n_components_95}个主成分")
---
b.碎石图
---
plt.figure(figsize=(10, 6))
plt.bar(range(1, len(pca_full.explained_variance_) + 1),
pca_full.explained_variance_)
plt.xlabel('主成分')
plt.ylabel('解释方差')
plt.title('碎石图')
plt.grid(True)
plt.show()
---
02.t-SNE
a.算法原理
a.核心思想
保持数据在高维和低维空间中的相似度分布,适合可视化。
b.与PCA的区别
a.PCA
线性降维,保持全局结构。
b.t-SNE
非线性降维,保持局部结构,更适合可视化。
c.注意事项
a.计算复杂度高
不适合大规模数据。
b.随机性
每次运行结果可能不同。
c.超参数敏感
perplexity参数对结果影响大。
b.使用Scikit-learn
---
from sklearn.manifold import TSNE
# t-SNE降维
tsne = TSNE(n_components=2, perplexity=30, random_state=42)
X_tsne = tsne.fit_transform(X_iris)
# 可视化
plt.figure(figsize=(15, 5))
# PCA
plt.subplot(1, 2, 1)
plt.scatter(X_pca_sklearn[:, 0], X_pca_sklearn[:, 1],
c=y_iris, cmap='viridis', alpha=0.6, edgecolors='black')
plt.xlabel('第一主成分')
plt.ylabel('第二主成分')
plt.title('PCA降维')
plt.colorbar(label='类别')
plt.grid(True)
# t-SNE
plt.subplot(1, 2, 2)
plt.scatter(X_tsne[:, 0], X_tsne[:, 1],
c=y_iris, cmap='viridis', alpha=0.6, edgecolors='black')
plt.xlabel('t-SNE维度1')
plt.ylabel('t-SNE维度2')
plt.title('t-SNE降维')
plt.colorbar(label='类别')
plt.grid(True)
plt.tight_layout()
plt.show()
---
c.perplexity参数影响
---
# 不同perplexity的影响
perplexity_values = [5, 30, 50, 100]
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
for i, perplexity in enumerate(perplexity_values):
tsne_p = TSNE(n_components=2, perplexity=perplexity, random_state=42)
X_tsne_p = tsne_p.fit_transform(X_iris)
ax = axes[i // 2, i % 2]
scatter = ax.scatter(X_tsne_p[:, 0], X_tsne_p[:, 1],
c=y_iris, cmap='viridis', alpha=0.6, edgecolors='black')
ax.set_xlabel('t-SNE维度1')
ax.set_ylabel('t-SNE维度2')
ax.set_title(f'perplexity={perplexity}')
ax.grid(True)
plt.colorbar(scatter, ax=ax, label='类别')
plt.tight_layout()
plt.show()
---
03.其他降维方法
a.线性判别分析
a.LDA原理
有监督降维,最大化类间距离,最小化类内距离。
b.代码示例
---
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
# LDA降维
lda = LinearDiscriminantAnalysis(n_components=2)
X_lda = lda.fit_transform(X_iris, y_iris)
# 可视化
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X_lda[:, 0], X_lda[:, 1],
c=y_iris, cmap='viridis', alpha=0.6, edgecolors='black')
plt.xlabel('LD1')
plt.ylabel('LD2')
plt.title('LDA降维(有监督)')
plt.colorbar(scatter, label='类别')
plt.grid(True)
plt.show()
print(f"解释方差比例: {lda.explained_variance_ratio_}")
---
b.截断SVD
a.原理
奇异值分解的截断版本,适合稀疏矩阵。
b.代码示例
---
from sklearn.decomposition import TruncatedSVD
# 生成稀疏数据
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
# 加载文本数据
categories = ['alt.atheism', 'talk.religion.misc']
newsgroups = fetch_20newsgroups(subset='train', categories=categories)
# TF-IDF特征
vectorizer = TfidfVectorizer(max_features=1000)
X_text = vectorizer.fit_transform(newsgroups.data)
print(f"原始特征数: {X_text.shape[1]}")
print(f"稀疏度: {1 - X_text.nnz / (X_text.shape[0] * X_text.shape[1]):.2%}")
# SVD降维
svd = TruncatedSVD(n_components=2, random_state=42)
X_svd = svd.fit_transform(X_text)
# 可视化
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X_svd[:, 0], X_svd[:, 1],
c=newsgroups.target, cmap='viridis', alpha=0.6)
plt.xlabel('SVD维度1')
plt.ylabel('SVD维度2')
plt.title('TruncatedSVD降维(文本数据)')
plt.colorbar(scatter, label='类别')
plt.grid(True)
plt.show()
print(f"解释方差比例: {svd.explained_variance_ratio_}")
---
04.降维应用
a.数据可视化
将高维数据降到2维或3维,便于可视化和理解。
b.特征提取
---
# 降维后的分类性能
from sklearn.linear_model import LogisticRegression
# 原始数据
lr_original = LogisticRegression(max_iter=1000, random_state=42)
scores_original = cross_val_score(lr_original, X_iris, y_iris, cv=5)
# PCA降维后
X_pca_2 = SKLearnPCA(n_components=2).fit_transform(X_iris)
lr_pca = LogisticRegression(max_iter=1000, random_state=42)
scores_pca = cross_val_score(lr_pca, X_pca_2, y_iris, cv=5)
print(f"原始数据准确率: {scores_original.mean():.4f} (+/- {scores_original.std():.4f})")
print(f"PCA降维后准确率: {scores_pca.mean():.4f} (+/- {scores_pca.std():.4f})")
---
c.去噪
---
# PCA去噪
# 添加噪声
np.random.seed(42)
X_noisy = X_iris + np.random.normal(0, 0.5, X_iris.shape)
# PCA去噪
pca_denoise = SKLearnPCA(n_components=3)
X_pca_denoise = pca_denoise.fit_transform(X_noisy)
X_denoised = pca_denoise.inverse_transform(X_pca_denoise)
# 计算误差
noise_error = np.mean((X_iris - X_noisy) ** 2)
denoise_error = np.mean((X_iris - X_denoised) ** 2)
print(f"噪声误差: {noise_error:.6f}")
print(f"去噪后误差: {denoise_error:.6f}")
print(f"误差降低: {(1 - denoise_error/noise_error) * 100:.2f}%")
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, (data, title) in enumerate([
(X_iris, '原始数据'),
(X_noisy, '添加噪声'),
(X_denoised, 'PCA去噪')
]):
axes[i].scatter(data[:, 0], data[:, 1], c=y_iris,
cmap='viridis', alpha=0.6)
axes[i].set_xlabel('特征1')
axes[i].set_ylabel('特征2')
axes[i].set_title(title)
axes[i].grid(True)
plt.tight_layout()
plt.show()
---
4.3 关联规则
01.基本概念
a.关联规则
a.定义
发现数据中项之间的关联关系,如"购买面包的顾客也购买牛奶"。
b.形式
X → Y,表示如果X发生,则Y也可能发生。
c.应用场景
购物篮分析、推荐系统、网页点击流分析。
b.核心指标
a.支持度
a.定义
Support(X) = 包含X的交易数 / 总交易数
b.意义
衡量项集的流行程度。
b.置信度
a.定义
Confidence(X→Y) = Support(X∪Y) / Support(X)
b.意义
衡量规则的可靠性。
c.提升度
a.定义
Lift(X→Y) = Confidence(X→Y) / Support(Y)
b.意义
Lift > 1表示正相关,Lift < 1表示负相关,Lift = 1表示独立。
02.Apriori算法
a.算法原理
a.先验性质
如果一个项集是频繁的,则它的所有子集也是频繁的。
b.算法流程
a.步骤1
找出所有频繁1-项集。
b.步骤2
由频繁k-项集生成候选(k+1)-项集。
c.步骤3
扫描数据库,计算候选项集的支持度。
d.步骤4
保留支持度≥最小支持度的项集,重复步骤2-4。
b.代码实现
---
# 简化的Apriori实现
from itertools import combinations
from collections import defaultdict
class Apriori:
def __init__(self, min_support=0.5, min_confidence=0.7):
self.min_support = min_support
self.min_confidence = min_confidence
self.frequent_itemsets = []
self.rules = []
def get_support(self, itemset, transactions):
count = sum(1 for t in transactions if itemset.issubset(t))
return count / len(transactions)
def find_frequent_1_itemsets(self, transactions):
items = set()
for t in transactions:
items.update(t)
frequent = {}
for item in items:
itemset = frozenset([item])
support = self.get_support(itemset, transactions)
if support >= self.min_support:
frequent[itemset] = support
return frequent
def generate_candidates(self, frequent_k):
candidates = set()
items = list(frequent_k.keys())
for i in range(len(items)):
for j in range(i + 1, len(items)):
union = items[i] | items[j]
if len(union) == len(items[i]) + 1:
candidates.add(union)
return candidates
def fit(self, transactions):
# 找频繁1-项集
frequent_k = self.find_frequent_1_itemsets(transactions)
self.frequent_itemsets.append(frequent_k)
k = 1
while frequent_k:
# 生成候选项集
candidates = self.generate_candidates(frequent_k)
# 计算支持度
frequent_k = {}
for candidate in candidates:
support = self.get_support(candidate, transactions)
if support >= self.min_support:
frequent_k[candidate] = support
if frequent_k:
self.frequent_itemsets.append(frequent_k)
k += 1
# 生成关联规则
self.generate_rules(transactions)
return self
def generate_rules(self, transactions):
for frequent in self.frequent_itemsets[1:]: # 跳过1-项集
for itemset in frequent.keys():
if len(itemset) < 2:
continue
# 生成所有非空子集
for i in range(1, len(itemset)):
for antecedent in combinations(itemset, i):
antecedent = frozenset(antecedent)
consequent = itemset - antecedent
# 计算置信度
support_union = frequent[itemset]
support_antecedent = self.get_support(antecedent, transactions)
if support_antecedent > 0:
confidence = support_union / support_antecedent
if confidence >= self.min_confidence:
# 计算提升度
support_consequent = self.get_support(consequent, transactions)
lift = confidence / support_consequent if support_consequent > 0 else 0
self.rules.append({
'antecedent': antecedent,
'consequent': consequent,
'support': support_union,
'confidence': confidence,
'lift': lift
})
# 测试数据
transactions = [
{'面包', '牛奶'},
{'面包', '尿布', '啤酒', '鸡蛋'},
{'牛奶', '尿布', '啤酒', '可乐'},
{'面包', '牛奶', '尿布', '啤酒'},
{'面包', '牛奶', '尿布', '可乐'}
]
# 训练
apriori = Apriori(min_support=0.4, min_confidence=0.6)
apriori.fit(transactions)
# 输出频繁项集
print("频繁项集:")
for i, frequent in enumerate(apriori.frequent_itemsets):
print(f"\n{i+1}-项集:")
for itemset, support in sorted(frequent.items(), key=lambda x: x[1], reverse=True):
print(f" {set(itemset)}: {support:.2f}")
# 输出关联规则
print("\n关联规则:")
for rule in sorted(apriori.rules, key=lambda x: x['lift'], reverse=True):
print(f"{set(rule['antecedent'])} → {set(rule['consequent'])}")
print(f" 支持度: {rule['support']:.2f}")
print(f" 置信度: {rule['confidence']:.2f}")
print(f" 提升度: {rule['lift']:.2f}\n")
---
c.使用mlxtend库
---
# 安装:pip install mlxtend
from mlxtend.frequent_patterns import apriori, association_rules
from mlxtend.preprocessing import TransactionEncoder
# 转换数据格式
te = TransactionEncoder()
te_ary = te.fit(transactions).transform(transactions)
df = pd.DataFrame(te_ary, columns=te.columns_)
print("交易数据:")
print(df)
# Apriori算法
frequent_itemsets = apriori(df, min_support=0.4, use_colnames=True)
print("\n频繁项集:")
print(frequent_itemsets)
# 生成关联规则
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.6)
print("\n关联规则:")
print(rules[['antecedents', 'consequents', 'support', 'confidence', 'lift']])
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(rules['support'], rules['confidence'],
s=rules['lift']*100, alpha=0.6, c=rules['lift'], cmap='viridis')
plt.xlabel('支持度')
plt.ylabel('置信度')
plt.title('关联规则可视化(气泡大小表示提升度)')
plt.colorbar(label='提升度')
plt.grid(True)
plt.show()
---
03.FP-Growth算法
a.算法原理
a.优势
��需扫描数据库两次,比Apriori更高效。
b.FP树
压缩数据库的树结构,保留频繁项信息。
b.使用mlxtend
---
from mlxtend.frequent_patterns import fpgrowth
# FP-Growth算法
frequent_itemsets_fp = fpgrowth(df, min_support=0.4, use_colnames=True)
print("FP-Growth频繁项集:")
print(frequent_itemsets_fp)
# 生成规则
rules_fp = association_rules(frequent_itemsets_fp, metric="confidence", min_threshold=0.6)
print("\nFP-Growth关联规则:")
print(rules_fp[['antecedents', 'consequents', 'support', 'confidence', 'lift']])
---
04.实际应用
a.购物篮分析
---
# 模拟超市交易数据
np.random.seed(42)
products = ['面包', '牛奶', '鸡蛋', '啤酒', '尿布', '可乐', '薯片', '巧克力']
# 生成100笔交易
transactions_large = []
for _ in range(100):
n_items = np.random.randint(2, 6)
transaction = set(np.random.choice(products, n_items, replace=False))
transactions_large.append(transaction)
# 转换格式
te_large = TransactionEncoder()
te_ary_large = te_large.fit(transactions_large).transform(transactions_large)
df_large = pd.DataFrame(te_ary_large, columns=te_large.columns_)
# 挖掘频繁项集
frequent_large = apriori(df_large, min_support=0.1, use_colnames=True)
# 生成规则
rules_large = association_rules(frequent_large, metric="lift", min_threshold=1.0)
rules_large = rules_large.sort_values('lift', ascending=False)
print("Top 10 关联规则:")
print(rules_large.head(10)[['antecedents', 'consequents', 'support', 'confidence', 'lift']])
# 可视化支持度-置信度-提升度
fig = plt.figure(figsize=(12, 5))
# 支持度 vs 置信度
ax1 = fig.add_subplot(121)
scatter1 = ax1.scatter(rules_large['support'], rules_large['confidence'],
c=rules_large['lift'], s=50, alpha=0.6, cmap='viridis')
ax1.set_xlabel('支持度')
ax1.set_ylabel('置信度')
ax1.set_title('支持度 vs 置信度')
ax1.grid(True)
plt.colorbar(scatter1, ax=ax1, label='提升度')
# 提升度分布
ax2 = fig.add_subplot(122)
ax2.hist(rules_large['lift'], bins=20, edgecolor='black', alpha=0.7)
ax2.axvline(x=1, color='r', linestyle='--', label='Lift=1(独立)')
ax2.set_xlabel('提升度')
ax2.set_ylabel('频数')
ax2.set_title('提升度分布')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.show()
---
4.4 异常检测
01.基本概念
a.异常检测定义
a.什么是异常
与大多数数据显著不同的样本,也称为离群点、异常值。
b.应用场景
欺诈检测、网络入侵检测、设备故障检测、医疗诊断。
c.挑战
a.异常样本稀少
正常样本远多于异常样本。
b.异常类型多样
异常的表现形式多种多样。
c.标注困难
很难获得大量标注的异常样本。
b.异常类型
a.点异常
单个数据点异常。
b.上下文异常
在特定上下文中异常,如夏天的低温。
c.集体异常
一组数据点共同表现异常。
02.统计方法
a.3σ原则
a.原理
假设数据服从正态分布,超过μ±3σ的样本为异常。
b.代码实现
---
# 3σ原则异常检测
def detect_outliers_3sigma(data):
mean = np.mean(data)
std = np.std(data)
threshold = 3 * std
outliers = np.abs(data - mean) > threshold
return outliers
# 生成数据
np.random.seed(42)
normal_data = np.random.randn(1000)
outliers_data = np.array([5, -5, 6, -6])
data = np.concatenate([normal_data, outliers_data])
# 检测异常
is_outlier = detect_outliers_3sigma(data)
print(f"异常点数量: {is_outlier.sum()}")
print(f"异常点: {data[is_outlier]}")
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(data, bins=50, alpha=0.7, edgecolor='black')
plt.axvline(x=np.mean(data) + 3*np.std(data), color='r',
linestyle='--', label='μ+3σ')
plt.axvline(x=np.mean(data) - 3*np.std(data), color='r',
linestyle='--', label='μ-3σ')
plt.xlabel('值')
plt.ylabel('频数')
plt.title('3σ原则异常检测')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.scatter(range(len(data)), data, c=is_outlier,
cmap='coolwarm', alpha=0.6)
plt.xlabel('索引')
plt.ylabel('值')
plt.title('异常点标记')
plt.grid(True)
plt.tight_layout()
plt.show()
---
b.箱线图方法
a.IQR方法
Q1 - 1.5*IQR 和 Q3 + 1.5*IQR 之外的点为异常。
b.代码实现
---
# IQR方法异常检测
def detect_outliers_iqr(data):
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = (data < lower_bound) | (data > upper_bound)
return outliers
# 检测异常
is_outlier_iqr = detect_outliers_iqr(data)
print(f"IQR方法检测到{is_outlier_iqr.sum()}个异常点")
# 箱线图
plt.figure(figsize=(10, 6))
plt.boxplot(data, vert=False)
plt.scatter(data[is_outlier_iqr],
np.ones(is_outlier_iqr.sum()),
c='red', s=100, label='异常点', zorder=3)
plt.xlabel('值')
plt.title('箱线图异常检测')
plt.legend()
plt.grid(True)
plt.show()
---
03.基于距离的方法
a.KNN异常检测
a.原理
计算每个样本到其K近邻的平均距离,距离大的为异常。
b.代码实现
---
from sklearn.neighbors import NearestNeighbors
# 生成2维数据
np.random.seed(42)
X_normal = np.random.randn(200, 2)
X_outliers = np.random.uniform(-4, 4, (10, 2))
X = np.vstack([X_normal, X_outliers])
# KNN异常检测
k = 5
nbrs = NearestNeighbors(n_neighbors=k+1) # +1因为包含自己
nbrs.fit(X)
distances, indices = nbrs.kneighbors(X)
# 排除自己,计算到K近邻的平均距离
avg_distances = distances[:, 1:].mean(axis=1)
# 设置阈值(例如95分位数)
threshold = np.percentile(avg_distances, 95)
is_outlier_knn = avg_distances > threshold
print(f"KNN检测到{is_outlier_knn.sum()}个异常点")
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[~is_outlier_knn, 0], X[~is_outlier_knn, 1],
c='blue', alpha=0.6, label='正常点')
plt.scatter(X[is_outlier_knn, 0], X[is_outlier_knn, 1],
c='red', s=100, marker='x', label='异常点')
plt.xlabel('特���1')
plt.ylabel('特征2')
plt.title('KNN异常检测结果')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.hist(avg_distances, bins=30, alpha=0.7, edgecolor='black')
plt.axvline(x=threshold, color='r', linestyle='--',
label=f'阈值={threshold:.2f}')
plt.xlabel('平均K近邻距离')
plt.ylabel('频数')
plt.title('距离分布')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
04.基于密度的方法
a.LOF算法
a.原理
局部异常因子,比较样本的局部密度与其邻居的局部密度。
b.使用Scikit-learn
---
from sklearn.neighbors import LocalOutlierFactor
# LOF异常检测
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred_lof = lof.fit_predict(X)
# -1表示异常,1表示正常
is_outlier_lof = y_pred_lof == -1
# LOF分数(越负越异常)
lof_scores = lof.negative_outlier_factor_
print(f"LOF检测到{is_outlier_lof.sum()}个异常点")
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[~is_outlier_lof, 0], X[~is_outlier_lof, 1],
c='blue', alpha=0.6, label='正常点')
plt.scatter(X[is_outlier_lof, 0], X[is_outlier_lof, 1],
c='red', s=100, marker='x', label='异常点')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('LOF异常检测结果')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=lof_scores,
cmap='coolwarm', alpha=0.6)
plt.colorbar(label='LOF分数')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('LOF分数分布')
plt.grid(True)
plt.tight_layout()
plt.show()
---
05.基于模型的方法
a.Isolation Forest
a.原理
异常点更容易被隔离,需要更少的分裂次数。
b.使用Scikit-learn
---
from sklearn.ensemble import IsolationForest
# Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
y_pred_iso = iso_forest.fit_predict(X)
is_outlier_iso = y_pred_iso == -1
# 异常分数
scores_iso = iso_forest.score_samples(X)
print(f"Isolation Forest检测到{is_outlier_iso.sum()}个异常点")
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[~is_outlier_iso, 0], X[~is_outlier_iso, 1],
c='blue', alpha=0.6, label='正常点')
plt.scatter(X[is_outlier_iso, 0], X[is_outlier_iso, 1],
c='red', s=100, marker='x', label='异常点')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('Isolation Forest异常检测')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=scores_iso,
cmap='coolwarm', alpha=0.6)
plt.colorbar(label='异常分数')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('异常分数分布')
plt.grid(True)
plt.tight_layout()
plt.show()
---
b.One-Class SVM
a.原理
学习正常数据的边界,边界外的点为异常。
b.使用Scikit-learn
---
from sklearn.svm import OneClassSVM
# One-Class SVM
oc_svm = OneClassSVM(nu=0.1, kernel='rbf', gamma='auto')
y_pred_svm = oc_svm.fit_predict(X)
is_outlier_svm = y_pred_svm == -1
print(f"One-Class SVM检测到{is_outlier_svm.sum()}个异常点")
# 可视化决策边界
xx, yy = np.meshgrid(np.linspace(X[:, 0].min()-1, X[:, 0].max()+1, 100),
np.linspace(X[:, 1].min()-1, X[:, 1].max()+1, 100))
Z = oc_svm.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.figure(figsize=(10, 6))
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7),
cmap='Blues', alpha=0.3)
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X[~is_outlier_svm, 0], X[~is_outlier_svm, 1],
c='blue', alpha=0.6, label='正常点')
plt.scatter(X[is_outlier_svm, 0], X[is_outlier_svm, 1],
c='red', s=100, marker='x', label='异常点')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('One-Class SVM异常检测')
plt.legend()
plt.grid(True)
plt.show()
---
06.方法对比
a.性能对比
---
# 对比多种异常检测方法
methods = {
'LOF': LocalOutlierFactor(n_neighbors=20, contamination=0.1),
'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma='auto')
}
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for i, (name, model) in enumerate(methods.items()):
y_pred = model.fit_predict(X)
is_outlier = y_pred == -1
axes[i].scatter(X[~is_outlier, 0], X[~is_outlier, 1],
c='blue', alpha=0.6, label='正常点')
axes[i].scatter(X[is_outlier, 0], X[is_outlier, 1],
c='red', s=100, marker='x', label='异常点')
axes[i].set_xlabel('特征1')
axes[i].set_ylabel('特征2')
axes[i].set_title(f'{name}\n检测到{is_outlier.sum()}个异常点')
axes[i].legend()
axes[i].grid(True)
plt.tight_layout()
plt.show()
---
5 模型评估与优化
5.1 评估指标
01.分类指标
a.准确率
a.定义
正确预测的样本数 / 总样本数
b.适用场景
类别平衡的数据集
c.局限性
类别不平衡时会误导
b.精确率与召回率
a.精确率
TP / (TP + FP),预测为正的样本中真正为正的比例
b.召回率
TP / (TP + FN),真正为正的样本中被预测为正的比例
c.F1分数
2 * (精确率 * 召回率) / (精确率 + 召回率),调和平均
d.代码示例
---
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.metrics import classification_report, confusion_matrix
# 生成不平衡数据
X_imb, y_imb = make_classification(n_samples=1000, n_features=20,
n_informative=15, n_redundant=5,
weights=[0.9, 0.1], random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X_imb, y_imb, test_size=0.2, random_state=42
)
# 训练模型
clf = LogisticRegression(random_state=42)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
print(f"精确率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1:.4f}")
# 详细报告
print(f"\n分类报告:\n{classification_report(y_test, y_pred)}")
# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.title('混淆矩阵')
plt.show()
---
c.ROC曲线与AUC
a.ROC曲线
横轴FPR,纵轴TPR,展示不同阈值下的性能
b.AUC
ROC曲线下面积,越接近1越好
c.代码示例
---
from sklearn.metrics import roc_curve, roc_auc_score, auc
# 预测概率
y_proba = clf.predict_proba(X_test)[:, 1]
# 计算ROC曲线
fpr, tpr, thresholds = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)
# 可视化
plt.figure(figsize=(10, 6))
plt.plot(fpr, tpr, linewidth=2, label=f'ROC (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', linewidth=1, label='随机猜测')
plt.xlabel('假正率 (FPR)')
plt.ylabel('真正率 (TPR)')
plt.title('ROC曲线')
plt.legend()
plt.grid(True)
plt.show()
# 不同模型对比
models = {
'Logistic Regression': LogisticRegression(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(probability=True, random_state=42)
}
plt.figure(figsize=(10, 6))
for name, model in models.items():
model.fit(X_train, y_train)
y_proba = model.predict_proba(X_test)[:, 1]
fpr, tpr, _ = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, linewidth=2, label=f'{name} (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', linewidth=1, label='随机猜测')
plt.xlabel('假正率 (FPR)')
plt.ylabel('真正率 (TPR)')
plt.title('多模型ROC曲线对比')
plt.legend()
plt.grid(True)
plt.show()
---
02.回归指标
a.均方误差
MSE = (1/n) * Σ(yᵢ - ŷᵢ)²
b.均方根误差
RMSE = √MSE,与目标变量同单位
c.平均绝对误差
MAE = (1/n) * Σ|yᵢ - ŷᵢ|,对异常值不敏感
d.R²分数
R² = 1 - SS_res / SS_tot,解释方差比例
e.代码示例
---
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
# 生成回归数据
X_reg, y_reg = make_regression(n_samples=200, n_features=10,
noise=10, random_state=42)
X_train_r, X_test_r, y_train_r, y_test_r = train_test_split(
X_reg, y_reg, test_size=0.2, random_state=42
)
# 训练模型
reg = LinearRegression()
reg.fit(X_train_r, y_train_r)
y_pred_r = reg.predict(X_test_r)
# 评估
mse = mean_squared_error(y_test_r, y_pred_r)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test_r, y_pred_r)
r2 = r2_score(y_test_r, y_pred_r)
print(f"MSE: {mse:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")
# 可视化预测结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test_r, y_pred_r, alpha=0.5)
plt.plot([y_test_r.min(), y_test_r.max()],
[y_test_r.min(), y_test_r.max()],
'r--', linewidth=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title(f'回归预测结果 (R² = {r2:.4f})')
plt.grid(True)
plt.show()
# 残差分析
residuals = y_test_r - y_pred_r
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# 残差分布
axes[0].hist(residuals, bins=30, edgecolor='black')
axes[0].set_xlabel('残差')
axes[0].set_ylabel('频数')
axes[0].set_title('残差分布')
axes[0].grid(True)
# 残差vs预测值
axes[1].scatter(y_pred_r, residuals, alpha=0.5)
axes[1].axhline(y=0, color='r', linestyle='--', linewidth=2)
axes[1].set_xlabel('预测值')
axes[1].set_ylabel('残差')
axes[1].set_title('残差vs预测值')
axes[1].grid(True)
plt.tight_layout()
plt.show()
---
03.多分类指标
a.宏平均
对每个类别分别计算指标,然后平均
b.微平均
将所有类别的TP、FP、FN汇总后计算
c.加权平均
按类别样本数加权平均
d.代码示例
---
# 多分类数据
iris = load_iris()
X_iris, y_iris = iris.data, iris.target
X_train_i, X_test_i, y_train_i, y_test_i = train_test_split(
X_iris, y_iris, test_size=0.2, random_state=42
)
# 训练模型
clf_multi = RandomForestClassifier(n_estimators=100, random_state=42)
clf_multi.fit(X_train_i, y_train_i)
y_pred_i = clf_multi.predict(X_test_i)
# 不同平均方式
print("精确率:")
print(f" 宏平均: {precision_score(y_test_i, y_pred_i, average='macro'):.4f}")
print(f" 微平均: {precision_score(y_test_i, y_pred_i, average='micro'):.4f}")
print(f" 加权平均: {precision_score(y_test_i, y_pred_i, average='weighted'):.4f}")
print("\n召回率:")
print(f" 宏平均: {recall_score(y_test_i, y_pred_i, average='macro'):.4f}")
print(f" 微平均: {recall_score(y_test_i, y_pred_i, average='micro'):.4f}")
print(f" 加权平均: {recall_score(y_test_i, y_pred_i, average='weighted'):.4f}")
print("\nF1分数:")
print(f" 宏平均: {f1_score(y_test_i, y_pred_i, average='macro'):.4f}")
print(f" 微平均: {f1_score(y_test_i, y_pred_i, average='micro'):.4f}")
print(f" 加权平均: {f1_score(y_test_i, y_pred_i, average='weighted'):.4f}")
# 详细报告
print(f"\n分类报告:\n{classification_report(y_test_i, y_pred_i, target_names=iris.target_names)}")
---
5.2 交叉验证
01.基本原理
a.为什么需要交叉验证
a.单次划分的局限性
测试集的选择会影响评估结果
b.充分利用数据
所有数据都用于训练和测试
c.更可靠的评估
多次评估的平均结果更稳定
b.K折交叉验证
a.步骤
将数据分成K份,轮流用1份做测试,其余做训练
b.常用K值
K=5或K=10
c.优点
每个样本都被用于测试一次
02.Scikit-learn实现
a.K折交叉验证
---
from sklearn.model_selection import cross_val_score, cross_validate
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
# 模型
clf = RandomForestClassifier(n_estimators=100, random_state=42)
# 5折交叉验证
scores = cross_val_score(clf, X, y, cv=5, scoring='accuracy')
print(f"各折准确率: {scores}")
print(f"平均准确率: {scores.mean():.4f}")
print(f"标准差: {scores.std():.4f}")
# 多个指标
scoring = ['accuracy', 'precision_macro', 'recall_macro', 'f1_macro']
scores_multi = cross_validate(clf, X, y, cv=5, scoring=scoring)
for metric in scoring:
print(f"{metric}: {scores_multi[f'test_{metric}'].mean():.4f} "
f"(±{scores_multi[f'test_{metric}'].std():.4f})")
# 可视化
plt.figure(figsize=(10, 6))
plt.boxplot([scores_multi[f'test_{m}'] for m in scoring],
labels=scoring)
plt.ylabel('分数')
plt.title('交叉验证结果')
plt.grid(True)
plt.show()
---
b.分层K折
---
from sklearn.model_selection import StratifiedKFold
# 分层K折(保持类别比例)
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores_stratified = cross_val_score(clf, X, y, cv=skf, scoring='accuracy')
print(f"分层K折准确率: {scores_stratified.mean():.4f} "
f"(±{scores_stratified.std():.4f})")
# 对比普通K折
from sklearn.model_selection import KFold
kf = KFold(n_splits=5, shuffle=True, random_state=42)
scores_normal = cross_val_score(clf, X, y, cv=kf, scoring='accuracy')
print(f"普通K折准确率: {scores_normal.mean():.4f} "
f"(±{scores_normal.std():.4f})")
---
c.留一交叉验证
---
from sklearn.model_selection import LeaveOneOut
# 留一交叉验证(适用于小数据集)
# 注意:计算量大,仅用于演示
X_small, y_small = X[:50], y[:50]
loo = LeaveOneOut()
scores_loo = cross_val_score(clf, X_small, y_small, cv=loo, scoring='accuracy')
print(f"留一交叉验证准确率: {scores_loo.mean():.4f}")
print(f"总共{len(scores_loo)}次迭代")
---
03.自定义交叉验证
a.手动实现
---
from sklearn.model_selection import KFold
# 手动K折交叉验证
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fold_scores = []
fold_num = 1
for train_idx, test_idx in kf.split(X):
X_train_fold, X_test_fold = X[train_idx], X[test_idx]
y_train_fold, y_test_fold = y[train_idx], y[test_idx]
# 训练
clf_fold = RandomForestClassifier(n_estimators=100, random_state=42)
clf_fold.fit(X_train_fold, y_train_fold)
# 评估
score = clf_fold.score(X_test_fold, y_test_fold)
fold_scores.append(score)
print(f"第{fold_num}折准确率: {score:.4f}")
fold_num += 1
print(f"\n平均准确率: {np.mean(fold_scores):.4f}")
print(f"标准差: {np.std(fold_scores):.4f}")
---
b.时间序列交叉验证
---
from sklearn.model_selection import TimeSeriesSplit
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
# 生成时间序列数据
X_ts = np.arange(100).reshape(-1, 1)
y_ts = np.sin(X_ts.ravel() / 10) + np.random.randn(100) * 0.1
plt.figure(figsize=(15, 10))
for i, (train_idx, test_idx) in enumerate(tscv.split(X_ts)):
plt.subplot(5, 1, i+1)
plt.plot(train_idx, y_ts[train_idx], 'b.', label='训练集')
plt.plot(test_idx, y_ts[test_idx], 'r.', label='测试集')
plt.ylabel(f'第{i+1}折')
plt.legend()
plt.grid(True)
plt.xlabel('时间索引')
plt.tight_layout()
plt.show()
# 评估
reg_ts = LinearRegression()
scores_ts = cross_val_score(reg_ts, X_ts, y_ts, cv=tscv, scoring='r2')
print(f"时间序列交叉验证R²: {scores_ts.mean():.4f} "
f"(±{scores_ts.std():.4f})")
---
04.交叉验证与超参数调优
a.嵌套交叉验证
---
from sklearn.model_selection import GridSearchCV
# 外层交叉验证
outer_cv = KFold(n_splits=5, shuffle=True, random_state=42)
# 内层交叉验证(用于超参数调优)
inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)
# 参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, None]
}
# 嵌套交叉验证
nested_scores = []
for train_idx, test_idx in outer_cv.split(X):
X_train_outer, X_test_outer = X[train_idx], X[test_idx]
y_train_outer, y_test_outer = y[train_idx], y[test_idx]
# 内层网格搜索
grid = GridSearchCV(RandomForestClassifier(random_state=42),
param_grid, cv=inner_cv, scoring='accuracy')
grid.fit(X_train_outer, y_train_outer)
# 外层评估
score = grid.score(X_test_outer, y_test_outer)
nested_scores.append(score)
print(f"嵌套交叉验证准确率: {np.mean(nested_scores):.4f} "
f"(±{np.std(nested_scores):.4f})")
---
5.3 超参数调优
01.网格搜索
a.原理
穷举所有参数组合,找到最佳参数
b.优点
简单直接,保证找到最优组合
c.缺点
计算量大,参数多时很慢
d.代码示例
---
from sklearn.model_selection import GridSearchCV
# 参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
# 网格搜索
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(rf, param_grid, cv=5,
scoring='accuracy', n_jobs=-1,
verbose=1)
# 训练
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
grid_search.fit(X_train, y_train)
# 结果
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
print(f"测试集分数: {grid_search.score(X_test, y_test):.4f}")
# 所有结果
results = pd.DataFrame(grid_search.cv_results_)
print(f"\n前5个结果:")
print(results[['params', 'mean_test_score', 'std_test_score']].head())
# 可视化参数影响
scores = results['mean_test_score'].values
params_n_estimators = [p['n_estimators'] for p in results['params']]
params_max_depth = [p['max_depth'] if p['max_depth'] is not None else 20
for p in results['params']]
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(params_n_estimators, scores, alpha=0.5)
plt.xlabel('n_estimators')
plt.ylabel('准确率')
plt.title('n_estimators对准确率的影响')
plt.grid(True)
plt.subplot(1, 2, 2)
plt.scatter(params_max_depth, scores, alpha=0.5)
plt.xlabel('max_depth')
plt.ylabel('准确率')
plt.title('max_depth对准确率的影响')
plt.grid(True)
plt.tight_layout()
plt.show()
---
02.随机搜索
a.原理
随机采样参数组合,而非穷举
b.优点
更快,适合参数空间大的情况
c.适用场景
参数很多或参数范围很大
d.代码示例
---
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
# 参数分布
param_distributions = {
'n_estimators': randint(50, 500),
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10),
'max_features': uniform(0.1, 0.9)
}
# 随机搜索
rf_random = RandomForestClassifier(random_state=42)
random_search = RandomizedSearchCV(rf_random, param_distributions,
n_iter=50, cv=5, scoring='accuracy',
random_state=42, n_jobs=-1, verbose=1)
random_search.fit(X_train, y_train)
# 结果
print(f"最佳参数: {random_search.best_params_}")
print(f"最佳交叉验证分数: {random_search.best_score_:.4f}")
print(f"测试集分数: {random_search.score(X_test, y_test):.4f}")
# 对比网格搜索
print(f"\n网格搜索最佳分数: {grid_search.best_score_:.4f}")
print(f"随机搜索最佳分数: {random_search.best_score_:.4f}")
---
03.贝叶斯优化
a.原理
使用贝叶斯方法建模参数与性能的关系,智能选择下一个参数
b.优点
比随机搜索更高效,适合昂贵的评估
c.代码示例
---
# 安装:pip install scikit-optimize
from skopt import BayesSearchCV
from skopt.space import Real, Integer
# 参数空间
param_space = {
'n_estimators': Integer(50, 500),
'max_depth': Integer(5, 20),
'min_samples_split': Integer(2, 20),
'min_samples_leaf': Integer(1, 10),
'max_features': Real(0.1, 1.0)
}
# 贝叶斯搜索
rf_bayes = RandomForestClassifier(random_state=42)
bayes_search = BayesSearchCV(rf_bayes, param_space, n_iter=50,
cv=5, scoring='accuracy',
random_state=42, n_jobs=-1, verbose=1)
bayes_search.fit(X_train, y_train)
# 结果
print(f"最佳参数: {bayes_search.best_params_}")
print(f"最佳交叉验证分数: {bayes_search.best_score_:.4f}")
print(f"测试集分数: {bayes_search.score(X_test, y_test):.4f}")
# 优化过程可视化
from skopt.plots import plot_convergence
plot_convergence(bayes_search.optimizer_results_[0])
plt.title('贝叶斯优化收敛过程')
plt.show()
---
04.学习曲线
a.原理
观察训练集大小对性能的影响
b.用途
判断模型是否过拟合或欠拟合
c.代码示例
---
from sklearn.model_selection import learning_curve
# 计算学习曲线
train_sizes, train_scores, test_scores = learning_curve(
RandomForestClassifier(n_estimators=100, random_state=42),
X, y, cv=5, n_jobs=-1,
train_sizes=np.linspace(0.1, 1.0, 10),
scoring='accuracy'
)
# 计算均值和标准差
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
test_std = np.std(test_scores, axis=1)
# 可视化
plt.figure(figsize=(10, 6))
plt.plot(train_sizes, train_mean, 'o-', linewidth=2, label='训练集')
plt.fill_between(train_sizes, train_mean - train_std,
train_mean + train_std, alpha=0.2)
plt.plot(train_sizes, test_mean, 's-', linewidth=2, label='验证集')
plt.fill_between(train_sizes, test_mean - test_std,
test_mean + test_std, alpha=0.2)
plt.xlabel('训练样本数')
plt.ylabel('准确率')
plt.title('学习曲线')
plt.legend()
plt.grid(True)
plt.show()
# 对比不同模型
models = {
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(random_state=42)
}
plt.figure(figsize=(15, 5))
for i, (name, model) in enumerate(models.items()):
train_sizes, train_scores, test_scores = learning_curve(
model, X, y, cv=5, n_jobs=-1,
train_sizes=np.linspace(0.1, 1.0, 10)
)
train_mean = np.mean(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
plt.subplot(1, 3, i+1)
plt.plot(train_sizes, train_mean, 'o-', linewidth=2, label='训练集')
plt.plot(train_sizes, test_mean, 's-', linewidth=2, label='验证集')
plt.xlabel('训练样本数')
plt.ylabel('准确率')
plt.title(f'{name}学习曲线')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
---
05.验证曲线
a.原理
观察单个超参数对性能的影响
b.用途
选择合适的超参数范围
c.代码示例
---
from sklearn.model_selection import validation_curve
# 验证曲线:max_depth
param_range = np.arange(1, 21)
train_scores, test_scores = validation_curve(
RandomForestClassifier(n_estimators=100, random_state=42),
X, y, param_name='max_depth', param_range=param_range,
cv=5, scoring='accuracy', n_jobs=-1
)
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
test_std = np.std(test_scores, axis=1)
plt.figure(figsize=(10, 6))
plt.plot(param_range, train_mean, 'o-', linewidth=2, label='训练集')
plt.fill_between(param_range, train_mean - train_std,
train_mean + train_std, alpha=0.2)
plt.plot(param_range, test_mean, 's-', linewidth=2, label='验证集')
plt.fill_between(param_range, test_mean - test_std,
test_mean + test_std, alpha=0.2)
plt.xlabel('max_depth')
plt.ylabel('准确率')
plt.title('验证曲线:max_depth')
plt.legend()
plt.grid(True)
plt.show()
# 多个参数的验证曲线
params_to_test = {
'n_estimators': np.arange(10, 201, 20),
'max_depth': np.arange(1, 21, 2),
'min_samples_split': np.arange(2, 21, 2)
}
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for idx, (param_name, param_range) in enumerate(params_to_test.items()):
train_scores, test_scores = validation_curve(
RandomForestClassifier(random_state=42),
X, y, param_name=param_name, param_range=param_range,
cv=5, scoring='accuracy', n_jobs=-1
)
train_mean = np.mean(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
axes[idx].plot(param_range, train_mean, 'o-', linewidth=2, label='训练集')
axes[idx].plot(param_range, test_mean, 's-', linewidth=2, label='验证集')
axes[idx].set_xlabel(param_name)
axes[idx].set_ylabel('准确率')
axes[idx].set_title(f'验证曲线:{param_name}')
axes[idx].legend()
axes[idx].grid(True)
plt.tight_layout()
plt.show()
---
6 降维与特征工程
6.1 主成分分析
01.基本原理
a.核心思想
找到数据方差最大的方向,将高维数据投影到低维空间
b.目标
用较少的主成分保留大部分信息
c.步骤
a.数据标准化
b.计算协方差矩阵
c.计算特征值和特征向量
d.选择前k个主成分
e.投影数据
02.使用Scikit-learn
a.基本使用
---
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# PCA降维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# 可视化
plt.figure(figsize=(10, 6))
colors = ['red', 'green', 'blue']
for i, color in enumerate(colors):
plt.scatter(X_pca[y == i, 0], X_pca[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
plt.title('PCA降维可视化')
plt.legend()
plt.grid(True)
plt.show()
# 解释方差比例
print(f"各主成分解释方差比例: {pca.explained_variance_ratio_}")
print(f"累计解释方差比例: {pca.explained_variance_ratio_.sum():.4f}")
# 主成分载荷
components = pd.DataFrame(
pca.components_,
columns=iris.feature_names,
index=['PC1', 'PC2']
)
print(f"\n主成分载荷:\n{components}")
---
b.选择主成分数量
---
# 计算所有主成分
pca_full = PCA()
pca_full.fit(X_scaled)
# 累计解释方差
cumsum = np.cumsum(pca_full.explained_variance_ratio_)
# 可视化
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(cumsum) + 1), cumsum, 'bo-', linewidth=2)
plt.axhline(y=0.95, color='r', linestyle='--', label='95%阈值')
plt.xlabel('主成分数量')
plt.ylabel('累计解释方差比例')
plt.title('主成分数量选择')
plt.legend()
plt.grid(True)
plt.show()
# 自动选择保留95%方差的主成分
pca_auto = PCA(n_components=0.95)
X_pca_auto = pca_auto.fit_transform(X_scaled)
print(f"保留95%方差需要{pca_auto.n_components_}个主成分")
---
c.PCA用于降噪
---
# 添加噪声
X_noisy = X + np.random.randn(*X.shape) * 0.5
# PCA降噪
pca_denoise = PCA(n_components=2)
X_pca_noisy = pca_denoise.fit_transform(X_noisy)
X_denoised = pca_denoise.inverse_transform(X_pca_noisy)
# 对比
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for i, (data, title) in enumerate([
(X[:, 0], '原始数据'),
(X_noisy[:, 0], '噪声数据'),
(X_denoised[:, 0], '降噪数据')
]):
axes[i].hist(data, bins=30, edgecolor='black')
axes[i].set_xlabel('特征值')
axes[i].set_ylabel('频数')
axes[i].set_title(title)
axes[i].grid(True)
plt.tight_layout()
plt.show()
---
03.核PCA
a.原理
使用核技巧处理非线性数据
b.代码示例
---
from sklearn.decomposition import KernelPCA
# 生成非线性数据
from sklearn.datasets import make_circles
X_circles, y_circles = make_circles(n_samples=400, factor=0.3,
noise=0.05, random_state=42)
# 线性PCA vs 核PCA
pca_linear = PCA(n_components=2)
X_pca_linear = pca_linear.fit_transform(X_circles)
kpca = KernelPCA(n_components=2, kernel='rbf', gamma=10)
X_kpca = kpca.fit_transform(X_circles)
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 原始数据
axes[0].scatter(X_circles[y_circles == 0, 0], X_circles[y_circles == 0, 1],
c='red', alpha=0.6, label='类别0')
axes[0].scatter(X_circles[y_circles == 1, 0], X_circles[y_circles == 1, 1],
c='blue', alpha=0.6, label='类别1')
axes[0].set_title('原始数据')
axes[0].legend()
axes[0].grid(True)
# 线性PCA
axes[1].scatter(X_pca_linear[y_circles == 0, 0], X_pca_linear[y_circles == 0, 1],
c='red', alpha=0.6, label='类别0')
axes[1].scatter(X_pca_linear[y_circles == 1, 0], X_pca_linear[y_circles == 1, 1],
c='blue', alpha=0.6, label='类别1')
axes[1].set_title('线性PCA')
axes[1].legend()
axes[1].grid(True)
# 核PCA
axes[2].scatter(X_kpca[y_circles == 0, 0], X_kpca[y_circles == 0, 1],
c='red', alpha=0.6, label='类别0')
axes[2].scatter(X_kpca[y_circles == 1, 0], X_kpca[y_circles == 1, 1],
c='blue', alpha=0.6, label='类别1')
axes[2].set_title('核PCA (RBF)')
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
plt.show()
---
6.2 其他降维方法
01.t-SNE
a.原理
保持数据点之间的相似性,适合可视化
b.特点
非线性降维,主要用于2D/3D可视化
c.代码示例
---
from sklearn.manifold import TSNE
# t-SNE降维
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
X_tsne = tsne.fit_transform(X_scaled)
# 可视化
plt.figure(figsize=(10, 6))
for i, color in enumerate(['red', 'green', 'blue']):
plt.scatter(X_tsne[y == i, 0], X_tsne[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
plt.xlabel('t-SNE 1')
plt.ylabel('t-SNE 2')
plt.title('t-SNE降维可视化')
plt.legend()
plt.grid(True)
plt.show()
# 对比PCA和t-SNE
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# PCA
for i, color in enumerate(['red', 'green', 'blue']):
axes[0].scatter(X_pca[y == i, 0], X_pca[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
axes[0].set_xlabel('PC1')
axes[0].set_ylabel('PC2')
axes[0].set_title('PCA')
axes[0].legend()
axes[0].grid(True)
# t-SNE
for i, color in enumerate(['red', 'green', 'blue']):
axes[1].scatter(X_tsne[y == i, 0], X_tsne[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
axes[1].set_xlabel('t-SNE 1')
axes[1].set_ylabel('t-SNE 2')
axes[1].set_title('t-SNE')
axes[1].legend()
axes[1].grid(True)
plt.tight_layout()
plt.show()
---
02.LDA
a.原理
线性判别分析,最大化类间距离,最小化类内距离
b.特点
监督降维,考虑类别信息
c.代码示例
---
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
# LDA降维
lda = LinearDiscriminantAnalysis(n_components=2)
X_lda = lda.fit_transform(X_scaled, y)
# 可视化
plt.figure(figsize=(10, 6))
for i, color in enumerate(['red', 'green', 'blue']):
plt.scatter(X_lda[y == i, 0], X_lda[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
plt.xlabel('LD1')
plt.ylabel('LD2')
plt.title('LDA降维可视化')
plt.legend()
plt.grid(True)
plt.show()
# 对比PCA和LDA
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# PCA
for i, color in enumerate(['red', 'green', 'blue']):
axes[0].scatter(X_pca[y == i, 0], X_pca[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
axes[0].set_xlabel('PC1')
axes[0].set_ylabel('PC2')
axes[0].set_title('PCA (无监督)')
axes[0].legend()
axes[0].grid(True)
# LDA
for i, color in enumerate(['red', 'green', 'blue']):
axes[1].scatter(X_lda[y == i, 0], X_lda[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
axes[1].set_xlabel('LD1')
axes[1].set_ylabel('LD2')
axes[1].set_title('LDA (监督)')
axes[1].legend()
axes[1].grid(True)
plt.tight_layout()
plt.show()
---
03.UMAP
a.原理
统一流形逼近与投影,保持全局和局部结构
b.特点
比t-SNE更快,保持更多全局结构
c.代码示例
---
# 安装:pip install umap-learn
import umap
# UMAP降维
reducer = umap.UMAP(n_components=2, random_state=42)
X_umap = reducer.fit_transform(X_scaled)
# 可视化
plt.figure(figsize=(10, 6))
for i, color in enumerate(['red', 'green', 'blue']):
plt.scatter(X_umap[y == i, 0], X_umap[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
plt.xlabel('UMAP 1')
plt.ylabel('UMAP 2')
plt.title('UMAP降维可视化')
plt.legend()
plt.grid(True)
plt.show()
# 对比PCA、t-SNE和UMAP
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
methods = [
(X_pca, 'PCA'),
(X_tsne, 't-SNE'),
(X_umap, 'UMAP')
]
for idx, (X_reduced, method_name) in enumerate(methods):
for i, color in enumerate(['red', 'green', 'blue']):
axes[idx].scatter(X_reduced[y == i, 0], X_reduced[y == i, 1],
c=color, label=iris.target_names[i], alpha=0.6)
axes[idx].set_title(method_name)
axes[idx].legend()
axes[idx].grid(True)
plt.tight_layout()
plt.show()
---
04.特征选择
a.过滤法
---
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
# 方差过滤
from sklearn.feature_selection import VarianceThreshold
selector_var = VarianceThreshold(threshold=0.1)
X_var = selector_var.fit_transform(X_scaled)
print(f"方差过滤后特征数: {X_var.shape[1]}")
# 单变量特征选择
selector_k = SelectKBest(f_classif, k=2)
X_k = selector_k.fit_transform(X, y)
# 特征得分
scores = selector_k.scores_
indices = np.argsort(scores)[::-1]
plt.figure(figsize=(10, 6))
plt.bar(range(len(scores)), scores[indices])
plt.xticks(range(len(scores)),
[iris.feature_names[i] for i in indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('F分数')
plt.title('单变量特征选择')
plt.tight_layout()
plt.show()
# 互信息
selector_mi = SelectKBest(mutual_info_classif, k=2)
X_mi = selector_mi.fit_transform(X, y)
mi_scores = selector_mi.scores_
mi_indices = np.argsort(mi_scores)[::-1]
plt.figure(figsize=(10, 6))
plt.bar(range(len(mi_scores)), mi_scores[mi_indices])
plt.xticks(range(len(mi_scores)),
[iris.feature_names[i] for i in mi_indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('互信息')
plt.title('互信息特征选择')
plt.tight_layout()
plt.show()
---
b.包装法
---
from sklearn.feature_selection import RFE
# 递归特征消除
clf_rfe = RandomForestClassifier(n_estimators=100, random_state=42)
rfe = RFE(clf_rfe, n_features_to_select=2)
X_rfe = rfe.fit_transform(X, y)
# 选中的特征
selected_features = [iris.feature_names[i] for i, selected in enumerate(rfe.support_) if selected]
print(f"RFE选中的特征: {selected_features}")
# 特征排名
ranking = rfe.ranking_
plt.figure(figsize=(10, 6))
plt.bar(range(len(ranking)), ranking)
plt.xticks(range(len(ranking)), iris.feature_names, rotation=45)
plt.xlabel('特征')
plt.ylabel('排名')
plt.title('RFE特征排名')
plt.tight_layout()
plt.show()
---
c.嵌入法
---
from sklearn.feature_selection import SelectFromModel
# 基于模型的特征选择
clf_tree = RandomForestClassifier(n_estimators=100, random_state=42)
clf_tree.fit(X, y)
# 特征重要性
importances = clf_tree.feature_importances_
indices = np.argsort(importances)[::-1]
plt.figure(figsize=(10, 6))
plt.bar(range(len(importances)), importances[indices])
plt.xticks(range(len(importances)),
[iris.feature_names[i] for i in indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('随机森林特征重要性')
plt.tight_layout()
plt.show()
# 选择重要特征
selector_model = SelectFromModel(clf_tree, threshold='median')
X_model = selector_model.fit_transform(X, y)
print(f"选中的特征数: {X_model.shape[1]}")
# L1正则化特征选择
from sklearn.linear_model import LogisticRegression
clf_l1 = LogisticRegression(penalty='l1', solver='liblinear', C=0.1, random_state=42)
clf_l1.fit(X_scaled, y)
# 系数
coef = np.abs(clf_l1.coef_).mean(axis=0)
indices = np.argsort(coef)[::-1]
plt.figure(figsize=(10, 6))
plt.bar(range(len(coef)), coef[indices])
plt.xticks(range(len(coef)),
[iris.feature_names[i] for i in indices], rotation=45)
plt.xlabel('特征')
plt.ylabel('L1系数绝对值')
plt.title('L1正则化特征选择')
plt.tight_layout()
plt.show()
---
7 无监督学习
7.1 聚类算法
01.K-Means
a.原理
将数据分成K个簇,最小化簇内距离
b.算法步骤
a.随机初始化K个中心点
b.分配样本到最近的中心
c.更新中心为簇内样本均值
d.重复b-c直到收敛
c.代码示例
---
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
# 生成聚类数据
X_blobs, y_blobs = make_blobs(n_samples=300, centers=4,
cluster_std=0.6, random_state=42)
# K-Means聚类
kmeans = KMeans(n_clusters=4, random_state=42)
y_pred = kmeans.fit_predict(X_blobs)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred,
cmap='viridis', alpha=0.6)
plt.scatter(kmeans.cluster_centers_[:, 0],
kmeans.cluster_centers_[:, 1],
c='red', marker='X', s=200, label='中心点')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('K-Means聚类')
plt.legend()
plt.grid(True)
plt.show()
# 惯性(簇内距离平方和)
print(f"惯性: {kmeans.inertia_:.2f}")
---
d.选择K值
---
# 肘部法则
inertias = []
K_range = range(1, 11)
for k in K_range:
kmeans_k = KMeans(n_clusters=k, random_state=42)
kmeans_k.fit(X_blobs)
inertias.append(kmeans_k.inertia_)
plt.figure(figsize=(10, 6))
plt.plot(K_range, inertias, 'bo-', linewidth=2)
plt.xlabel('K值')
plt.ylabel('惯性')
plt.title('肘部法则选择K值')
plt.grid(True)
plt.show()
# 轮廓系数
from sklearn.metrics import silhouette_score
silhouette_scores = []
for k in range(2, 11):
kmeans_k = KMeans(n_clusters=k, random_state=42)
y_pred_k = kmeans_k.fit_predict(X_blobs)
score = silhouette_score(X_blobs, y_pred_k)
silhouette_scores.append(score)
plt.figure(figsize=(10, 6))
plt.plot(range(2, 11), silhouette_scores, 'bo-', linewidth=2)
plt.xlabel('K值')
plt.ylabel('轮廓系数')
plt.title('轮廓系数选择K值')
plt.grid(True)
plt.show()
---
e.K-Means++
---
# K-Means++初始化
kmeans_pp = KMeans(n_clusters=4, init='k-means++', random_state=42)
y_pred_pp = kmeans_pp.fit_predict(X_blobs)
# 对比随机初始化
kmeans_random = KMeans(n_clusters=4, init='random', random_state=42)
y_pred_random = kmeans_random.fit_predict(X_blobs)
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# K-Means++
axes[0].scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred_pp,
cmap='viridis', alpha=0.6)
axes[0].scatter(kmeans_pp.cluster_centers_[:, 0],
kmeans_pp.cluster_centers_[:, 1],
c='red', marker='X', s=200)
axes[0].set_title(f'K-Means++ (惯性={kmeans_pp.inertia_:.2f})')
axes[0].grid(True)
# 随机初始化
axes[1].scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred_random,
cmap='viridis', alpha=0.6)
axes[1].scatter(kmeans_random.cluster_centers_[:, 0],
kmeans_random.cluster_centers_[:, 1],
c='red', marker='X', s=200)
axes[1].set_title(f'随机初始化 (惯性={kmeans_random.inertia_:.2f})')
axes[1].grid(True)
plt.tight_layout()
plt.show()
---
02.层次聚类
a.原理
自底向上或自顶向下构建聚类树
b.凝聚层次聚类
---
from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram, linkage
# 层次聚类
agg = AgglomerativeClustering(n_clusters=4, linkage='ward')
y_pred_agg = agg.fit_predict(X_blobs)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred_agg,
cmap='viridis', alpha=0.6)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('层次聚类')
plt.grid(True)
plt.show()
# 树状图
linkage_matrix = linkage(X_blobs, method='ward')
plt.figure(figsize=(12, 6))
dendrogram(linkage_matrix)
plt.xlabel('样本索引')
plt.ylabel('距离')
plt.title('层次聚类树状图')
plt.show()
---
c.不同连接方式
---
# 对比不同连接方式
linkage_methods = ['ward', 'complete', 'average', 'single']
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.ravel()
for idx, method in enumerate(linkage_methods):
agg_method = AgglomerativeClustering(n_clusters=4, linkage=method)
y_pred_method = agg_method.fit_predict(X_blobs)
axes[idx].scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred_method,
cmap='viridis', alpha=0.6)
axes[idx].set_title(f'连接方式: {method}')
axes[idx].grid(True)
plt.tight_layout()
plt.show()
---
03.DBSCAN
a.原理
基于密度的聚类,可以发现任意形状的簇
b.核心概念
a.核心点
邻域内样本数≥min_samples
b.边界点
不是核心点但在核心点邻域内
c.噪声点
既不是核心点也不是边界点
c.代码示例
---
from sklearn.cluster import DBSCAN
# 生成复杂形状数据
from sklearn.datasets import make_moons
X_moons, y_moons = make_moons(n_samples=200, noise=0.05, random_state=42)
# DBSCAN聚类
dbscan = DBSCAN(eps=0.3, min_samples=5)
y_pred_dbscan = dbscan.fit_predict(X_moons)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_moons[:, 0], X_moons[:, 1], c=y_pred_dbscan,
cmap='viridis', alpha=0.6)
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title(f'DBSCAN聚类 (簇数={len(set(y_pred_dbscan)) - (1 if -1 in y_pred_dbscan else 0)})')
plt.grid(True)
plt.show()
# 对比K-Means
kmeans_moons = KMeans(n_clusters=2, random_state=42)
y_pred_kmeans = kmeans_moons.fit_predict(X_moons)
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# K-Means
axes[0].scatter(X_moons[:, 0], X_moons[:, 1], c=y_pred_kmeans,
cmap='viridis', alpha=0.6)
axes[0].set_title('K-Means')
axes[0].grid(True)
# DBSCAN
axes[1].scatter(X_moons[:, 0], X_moons[:, 1], c=y_pred_dbscan,
cmap='viridis', alpha=0.6)
axes[1].set_title('DBSCAN')
axes[1].grid(True)
plt.tight_layout()
plt.show()
---
d.参数调优
---
# 不同eps值
eps_values = [0.1, 0.2, 0.3, 0.5]
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.ravel()
for idx, eps in enumerate(eps_values):
dbscan_eps = DBSCAN(eps=eps, min_samples=5)
y_pred_eps = dbscan_eps.fit_predict(X_moons)
n_clusters = len(set(y_pred_eps)) - (1 if -1 in y_pred_eps else 0)
n_noise = list(y_pred_eps).count(-1)
axes[idx].scatter(X_moons[:, 0], X_moons[:, 1], c=y_pred_eps,
cmap='viridis', alpha=0.6)
axes[idx].set_title(f'eps={eps} (簇数={n_clusters}, 噪声={n_noise})')
axes[idx].grid(True)
plt.tight_layout()
plt.show()
---
04.高斯混合模型
a.原理
假设数据由多个高斯分布混合而成
b.EM算法
a.E步骤
计算每个样本属于各个高斯分布的概率
b.M步骤
更新高斯分布的参数
c.代码示例
---
from sklearn.mixture import GaussianMixture
# 高斯混合模型
gmm = GaussianMixture(n_components=4, random_state=42)
y_pred_gmm = gmm.fit_predict(X_blobs)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred_gmm,
cmap='viridis', alpha=0.6)
plt.scatter(gmm.means_[:, 0], gmm.means_[:, 1],
c='red', marker='X', s=200, label='均值')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('高斯混合模型')
plt.legend()
plt.grid(True)
plt.show()
# 概率分布
proba = gmm.predict_proba(X_blobs)
print(f"前5个样本的概率分布:\n{proba[:5]}")
# 对比K-Means
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# K-Means
axes[0].scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred,
cmap='viridis', alpha=0.6)
axes[0].set_title('K-Means (硬聚类)')
axes[0].grid(True)
# GMM
axes[1].scatter(X_blobs[:, 0], X_blobs[:, 1], c=y_pred_gmm,
cmap='viridis', alpha=0.6)
axes[1].set_title('GMM (软聚类)')
axes[1].grid(True)
plt.tight_layout()
plt.show()
---
d.选择组件数
---
# BIC和AIC
n_components_range = range(1, 11)
bic_scores = []
aic_scores = []
for n in n_components_range:
gmm_n = GaussianMixture(n_components=n, random_state=42)
gmm_n.fit(X_blobs)
bic_scores.append(gmm_n.bic(X_blobs))
aic_scores.append(gmm_n.aic(X_blobs))
plt.figure(figsize=(10, 6))
plt.plot(n_components_range, bic_scores, 'bo-', linewidth=2, label='BIC')
plt.plot(n_components_range, aic_scores, 'rs-', linewidth=2, label='AIC')
plt.xlabel('组件数')
plt.ylabel('信息准则')
plt.title('BIC和AIC选择组件数')
plt.legend()
plt.grid(True)
plt.show()
---
7.2 异常检测
01.基于统计的方法
a.Z-score
---
# 生成数据
np.random.seed(42)
X_normal = np.random.randn(1000, 2)
X_outliers = np.random.uniform(low=-4, high=4, size=(50, 2))
X_anomaly = np.vstack([X_normal, X_outliers])
# Z-score异常检测
from scipy import stats
z_scores = np.abs(stats.zscore(X_anomaly))
threshold = 3
outliers_zscore = (z_scores > threshold).any(axis=1)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_anomaly[~outliers_zscore, 0], X_anomaly[~outliers_zscore, 1],
c='blue', alpha=0.6, label='正常')
plt.scatter(X_anomaly[outliers_zscore, 0], X_anomaly[outliers_zscore, 1],
c='red', alpha=0.6, label='异常')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('Z-score异常检测')
plt.legend()
plt.grid(True)
plt.show()
print(f"检测到{outliers_zscore.sum()}个异常点")
---
b.IQR方法
---
# IQR异常检测
Q1 = np.percentile(X_anomaly, 25, axis=0)
Q3 = np.percentile(X_anomaly, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_iqr = ((X_anomaly < lower_bound) | (X_anomaly > upper_bound)).any(axis=1)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_anomaly[~outliers_iqr, 0], X_anomaly[~outliers_iqr, 1],
c='blue', alpha=0.6, label='正常')
plt.scatter(X_anomaly[outliers_iqr, 0], X_anomaly[outliers_iqr, 1],
c='red', alpha=0.6, label='异常')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('IQR异常检测')
plt.legend()
plt.grid(True)
plt.show()
print(f"检测到{outliers_iqr.sum()}个异常点")
---
02.Isolation Forest
a.原理
异常点更容易被隔离,需要更少的分裂次数
b.代码示例
---
from sklearn.ensemble import IsolationForest
# Isolation Forest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
y_pred_iso = iso_forest.fit_predict(X_anomaly)
# -1表示异常,1表示正常
outliers_iso = y_pred_iso == -1
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_anomaly[~outliers_iso, 0], X_anomaly[~outliers_iso, 1],
c='blue', alpha=0.6, label='正常')
plt.scatter(X_anomaly[outliers_iso, 0], X_anomaly[outliers_iso, 1],
c='red', alpha=0.6, label='异常')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('Isolation Forest异常检测')
plt.legend()
plt.grid(True)
plt.show()
print(f"检测到{outliers_iso.sum()}个异常点")
# 异常分数
scores = iso_forest.score_samples(X_anomaly)
plt.figure(figsize=(10, 6))
plt.hist(scores, bins=50, edgecolor='black')
plt.xlabel('异常分数')
plt.ylabel('频数')
plt.title('异常分数分布')
plt.grid(True)
plt.show()
---
03.One-Class SVM
a.原理
学习正常数据的边界,边界外的为异常
b.代码示例
---
from sklearn.svm import OneClassSVM
# One-Class SVM
oc_svm = OneClassSVM(nu=0.05, kernel='rbf', gamma='auto')
y_pred_svm = oc_svm.fit_predict(X_anomaly)
outliers_svm = y_pred_svm == -1
# 可视化
plt.figure(figsize=(10, 6))
# 数据点
plt.scatter(X_anomaly[~outliers_svm, 0], X_anomaly[~outliers_svm, 1],
c='blue', alpha=0.6, label='正常')
plt.scatter(X_anomaly[outliers_svm, 0], X_anomaly[outliers_svm, 1],
c='red', alpha=0.6, label='异常')
# 决策边界
xx, yy = np.meshgrid(np.linspace(X_anomaly[:, 0].min() - 1,
X_anomaly[:, 0].max() + 1, 100),
np.linspace(X_anomaly[:, 1].min() - 1,
X_anomaly[:, 1].max() + 1, 100))
Z = oc_svm.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='black')
plt.contourf(xx, yy, Z, levels=[Z.min(), 0], alpha=0.2, colors='orange')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('One-Class SVM异常检测')
plt.legend()
plt.grid(True)
plt.show()
print(f"检测到{outliers_svm.sum()}个异常点")
---
04.Local Outlier Factor
a.原理
基于局部密度的异常检测
b.代码示例
---
from sklearn.neighbors import LocalOutlierFactor
# LOF
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
y_pred_lof = lof.fit_predict(X_anomaly)
outliers_lof = y_pred_lof == -1
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_anomaly[~outliers_lof, 0], X_anomaly[~outliers_lof, 1],
c='blue', alpha=0.6, label='正常')
plt.scatter(X_anomaly[outliers_lof, 0], X_anomaly[outliers_lof, 1],
c='red', alpha=0.6, label='异常')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('LOF异常检测')
plt.legend()
plt.grid(True)
plt.show()
print(f"检测到{outliers_lof.sum()}个异常点")
# 局部异常因子
lof_scores = -lof.negative_outlier_factor_
plt.figure(figsize=(10, 6))
plt.scatter(X_anomaly[:, 0], X_anomaly[:, 1],
c=lof_scores, cmap='viridis', alpha=0.6)
plt.colorbar(label='LOF分数')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('LOF分数分布')
plt.grid(True)
plt.show()
---
05.方法对比
a.综合对比
---
# 对比所有方法
methods = {
'Z-score': outliers_zscore,
'IQR': outliers_iqr,
'Isolation Forest': outliers_iso,
'One-Class SVM': outliers_svm,
'LOF': outliers_lof
}
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.ravel()
for idx, (name, outliers) in enumerate(methods.items()):
axes[idx].scatter(X_anomaly[~outliers, 0], X_anomaly[~outliers, 1],
c='blue', alpha=0.6, label='正常')
axes[idx].scatter(X_anomaly[outliers, 0], X_anomaly[outliers, 1],
c='red', alpha=0.6, label='异常')
axes[idx].set_xlabel('特征1')
axes[idx].set_ylabel('特征2')
axes[idx].set_title(f'{name} (异常={outliers.sum()})')
axes[idx].legend()
axes[idx].grid(True)
# 隐藏多余的子图
axes[-1].axis('off')
plt.tight_layout()
plt.show()
# 统计
print("各方法检测到的异常点数:")
for name, outliers in methods.items():
print(f" {name}: {outliers.sum()}")
---