1. 文件IO

1.1 文件打开模式

01.模式分类
    a.文本模式
        a.读取模式r
            以只读方式打开文件,文件指针位于文件开头,文件必须存在否则抛出FileNotFoundError异常。
        b.写入模式w
            以写入方式打开文件,如果文件存在则清空内容,如果文件不存在则创建新文件。
        c.追加模式a
            以追加方式打开文件,文件指针位于文件末尾,如果文件不存在则创建新文件。
        d.读写模式r+
            以读写方式打开文件,文件指针位于文件开头,文件必须存在,可以同时进行读写操作。
    b.二进制模式
        a.rb模式
            以二进制只读方式打开文件,用于读取图片、音频、视频等二进制文件。
        b.wb模式
            以二进制写入方式打开文件,如果文件存在则清空内容。
        c.ab模式
            以二进制追加方式打开文件,文件指针位于文件末尾。
        d.rb+模式
            以二进制读写方式打开文件,可以同时进行读写操作。

02.模式组合
    a.常用组合
        a.功能说明
            Python支持多种模式组合,通过在基础模式后添加+号实现读写功能,添加b实现二进制操作。
        b.代码示例
            ---
            # 文本模式示例
            # r模式:只读,文件必须存在
            f1 = open('data.txt', 'r', encoding='utf-8')
            content = f1.read()
            f1.close()

            # w模式:写入,清空原内容
            f2 = open('output.txt', 'w', encoding='utf-8')
            f2.write('新内容')
            f2.close()

            # a模式:追加,保留原��容
            f3 = open('log.txt', 'a', encoding='utf-8')
            f3.write('追加日志\n')
            f3.close()

            # r+模式:读写,文件必须存在
            f4 = open('data.txt', 'r+', encoding='utf-8')
            content = f4.read()
            f4.write('追加内容')
            f4.close()
            ---
    b.二进制组合
        a.功能说明
            二进制模式用于处理非文本文件,如图片、音频、视频等,不需要指定encoding参数。
        b.代码示例
            ---
            # rb模式:二进制只读
            with open('image.png', 'rb') as f:
                image_data = f.read()
                print(f'文件大小: {len(image_data)} 字节')

            # wb模式:二进制写入
            with open('copy.png', 'wb') as f:
                f.write(image_data)

            # ab模式:二进制追加
            with open('data.bin', 'ab') as f:
                f.write(b'\x00\x01\x02\x03')

            # rb+模式:二进制读写
            with open('data.bin', 'rb+') as f:
                data = f.read(10)  # 读取前10字节
                f.seek(0)  # 回到文件开头
                f.write(b'\xFF\xFF')  # 写入2字节
            ---

03.编码参数
    a.encoding参数
        a.功能说明
            文本模式下必须指定编码格式,常用utf-8、gbk、ascii等,避免出现乱码问题。
        b.代码示例
            ---
            # UTF-8编码(推荐)
            with open('utf8.txt', 'w', encoding='utf-8') as f:
                f.write('中文内容')

            # GBK编码(Windows中文)
            with open('gbk.txt', 'w', encoding='gbk') as f:
                f.write('中文内容')

            # 读取时指定编码
            with open('utf8.txt', 'r', encoding='utf-8') as f:
                content = f.read()

            # 处理编码错误
            with open('data.txt', 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()  # 忽略无法解码的字符
            ---
    b.errors参数
        a.strict模式
            默认模式,遇到编码错误抛出UnicodeDecodeError异常。
        b.ignore模式
            忽略无法解码的字符,跳过错误继续处理。
        c.replace模式
            将无法解码的字符替换为?或其他占位符。
        d.代码示例
            ---
            # strict模式(默认)
            try:
                with open('bad_encoding.txt', 'r', encoding='utf-8') as f:
                    content = f.read()
            except UnicodeDecodeError as e:
                print(f'编码错误: {e}')

            # ignore模式
            with open('bad_encoding.txt', 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()  # 跳过错误字符

            # replace模式
            with open('bad_encoding.txt', 'r', encoding='utf-8', errors='replace') as f:
                content = f.read()  # 用�替换错误字符
            ---

1.2 文本文件与二进制文件

01.文本文件
    a.特点
        a.编码格式
            文本文件以字符形式存储,需要指定编码格式如UTF-8、GBK等,内容可以用文本编辑器直接查看。
        b.换行符处理
            不同操作系统使用不同的换行符,Windows使用\r\n,Unix/Linux使用\n,Mac使用\r,Python自动处理换行符转换。
        c.适用场景
            适合存储配置文件、日志文件、CSV文件、JSON文件等可读性强的数据。
    b.读写操作
        a.功能说明
            文本文件读写时Python自动进行编码解码,将字节流转换为字符串,支持按行读取和写入。
        b.代码示例
            ---
            # 写入文本文件
            with open('config.txt', 'w', encoding='utf-8') as f:
                f.write('server=localhost\n')
                f.write('port=8080\n')
                f.write('timeout=30\n')

            # 读取文本文件
            with open('config.txt', 'r', encoding='utf-8') as f:
                content = f.read()
                print(content)

            # 按行读取
            with open('config.txt', 'r', encoding='utf-8') as f:
                for line in f:
                    key, value = line.strip().split('=')
                    print(f'{key}: {value}')

            # 读取所有行到列表
            with open('config.txt', 'r', encoding='utf-8') as f:
                lines = f.readlines()
                print(f'共{len(lines)}行')
            ---

02.二进制文件
    a.特点
        a.字节存储
            二进制文件以字节形式存储,不需要编码解码,直接操作原始字节数据,无法用文本编辑器查看。
        b.精确控制
            可以精确控制每个字节的读写,适合处理图片、音频、视频、压缩文件等非文本数据。
        c.跨平台
            二进制文件不受操作系统换行符影响,在不同平台间传输不会出现格式问题。
    b.读写操作
        a.功能说明
            二进制模式下读写bytes对象,不进行编码解码,保持数据原始格式。
        b.代码示例
            ---
            # 复制图片文件
            with open('source.jpg', 'rb') as src:
                with open('dest.jpg', 'wb') as dst:
                    data = src.read()
                    dst.write(data)
                    print(f'复制了{len(data)}字节')

            # 分块读取大文件
            chunk_size = 1024 * 1024  # 1MB
            with open('large_file.bin', 'rb') as f:
                while True:
                    chunk = f.read(chunk_size)
                    if not chunk:
                        break
                    # 处理数据块
                    print(f'读取{len(chunk)}字节')

            # 写入二进制数据
            with open('data.bin', 'wb') as f:
                f.write(b'\x89PNG\r\n\x1a\n')  # PNG文件头
                f.write(bytes([0, 1, 2, 3, 4]))  # 字节序列
            ---

03.格式转换
    a.文本转二进制
        a.功能说明
            将字符串编码为字节序列,使用encode方法指定编码格式。
        b.代码示例
            ---
            # 字符串转字节
            text = '中文内容'

            # UTF-8编码
            utf8_bytes = text.encode('utf-8')
            print(f'UTF-8: {utf8_bytes}')
            print(f'长度: {len(utf8_bytes)}字节')

            # GBK编码
            gbk_bytes = text.encode('gbk')
            print(f'GBK: {gbk_bytes}')
            print(f'长度: {len(gbk_bytes)}字节')

            # 写入二进制文件
            with open('text_as_binary.bin', 'wb') as f:
                f.write(utf8_bytes)
            ---
    b.二进制转文本
        a.功能说明
            将字节序列解码为字符串,使用decode方法指定编码格式,需要确保编码格式正确。
        b.代码示例
            ---
            # 读取二进制数据
            with open('text_as_binary.bin', 'rb') as f:
                binary_data = f.read()

            # 解码为字符串
            text = binary_data.decode('utf-8')
            print(f'解码结果: {text}')

            # 处理解码错误
            try:
                text = binary_data.decode('ascii')
            except UnicodeDecodeError:
                print('ASCII解码失败')
                text = binary_data.decode('utf-8', errors='replace')

            # 自动检测编码
            import chardet
            result = chardet.detect(binary_data)
            encoding = result['encoding']
            text = binary_data.decode(encoding)
            print(f'检测到编码: {encoding}')
            ---

04.混合模式
    a.文本与二进制混合
        a.功能说明
            某些文件格式包含文本和二进制数据,需要根据文件结构选择合适的读取方式。
        b.代码示例
            ---
            # 读取带BOM的UTF-8文件
            with open('utf8_bom.txt', 'rb') as f:
                data = f.read()
                # 检查BOM标记
                if data.startswith(b'\xef\xbb\xbf'):
                    print('检测到UTF-8 BOM')
                    text = data[3:].decode('utf-8')
                else:
                    text = data.decode('utf-8')

            # 处理CSV文件(文本格式)
            import csv
            with open('data.csv', 'r', encoding='utf-8') as f:
                reader = csv.reader(f)
                for row in reader:
                    print(row)

            # 处理Excel文件(二进制格式)
            import openpyxl
            wb = openpyxl.load_workbook('data.xlsx')
            ws = wb.active
            for row in ws.iter_rows(values_only=True):
                print(row)
            ---
    b.性能对比
        a.文本模式性能
            文本模式需要编码解码,处理大文件时性能较低,但可读性好。
        b.二进制模式性能
            二进制模式直接操作字节,性能高,适合处理大文件和非文本数据。
        c.代码示例
            ---
            import time

            # 文本模式性能测试
            start = time.time()
            with open('large.txt', 'r', encoding='utf-8') as f:
                content = f.read()
            text_time = time.time() - start
            print(f'文本模式: {text_time:.3f}秒')

            # 二进制模式性能测试
            start = time.time()
            with open('large.txt', 'rb') as f:
                content = f.read()
            binary_time = time.time() - start
            print(f'二进制模式: {binary_time:.3f}秒')

            # 性能提升
            improvement = (text_time - binary_time) / text_time * 100
            print(f'性能提升: {improvement:.1f}%')
            ---

1.3 文件对象方法

01.读取方法
    a.read方法
        a.功能说明
            读取指定字节数或全部内容,不指定参数时读取整个文件,返回字符串或字节对象。
        b.代码示例
            ---
            # 读取全部内容
            with open('data.txt', 'r', encoding='utf-8') as f:
                content = f.read()
                print(f'文件大小: {len(content)}字符')

            # 读取指定字节数
            with open('data.txt', 'r', encoding='utf-8') as f:
                chunk = f.read(100)  # 读取100字符
                print(f'前100字符: {chunk}')

            # 二进制模式读取
            with open('image.png', 'rb') as f:
                header = f.read(8)  # 读取文件头
                print(f'文件头: {header.hex()}')
            ---
    b.readline方法
        a.功能说明
            读取一行内容,包含换行符,每次调用读取下一行,到达文件末尾返回空字符串。
        b.代码示例
            ---
            # 逐行读取
            with open('log.txt', 'r', encoding='utf-8') as f:
                line1 = f.readline()
                line2 = f.readline()
                print(f'第1行: {line1.strip()}')
                print(f'第2行: {line2.strip()}')

            # 读取所有行
            with open('log.txt', 'r', encoding='utf-8') as f:
                while True:
                    line = f.readline()
                    if not line:
                        break
                    print(line.strip())

            # 限制读取长度
            with open('log.txt', 'r', encoding='utf-8') as f:
                line = f.readline(50)  # 最多读取50字符
                print(f'部分行: {line}')
            ---
    c.readlines方法
        a.功能说明
            读取所有行到列表,每个元素是一行内容,包含换行符,适合处理小文件。
        b.代码示例
            ---
            # 读取所有行
            with open('config.txt', 'r', encoding='utf-8') as f:
                lines = f.readlines()
                print(f'共{len(lines)}行')
                for i, line in enumerate(lines, 1):
                    print(f'{i}: {line.strip()}')

            # 过滤空行
            with open('data.txt', 'r', encoding='utf-8') as f:
                lines = [line.strip() for line in f.readlines() if line.strip()]
                print(f'非空行数: {len(lines)}')

            # 限制读取行数
            with open('large.txt', 'r', encoding='utf-8') as f:
                lines = f.readlines(1000)  # 读取约1000字节
                print(f'读取了{len(lines)}行')
            ---

02.写入方法
    a.write方法
        a.功能说明
            写入字符串或字节数据,返回写入的字符数或字节数,不自动添加换行符。
        b.代码示例
            ---
            # 写入文本
            with open('output.txt', 'w', encoding='utf-8') as f:
                n = f.write('第一行\n')
                print(f'写入{n}字符')
                f.write('第二行\n')
                f.write('第三行\n')

            # 写入二进制数据
            with open('data.bin', 'wb') as f:
                n = f.write(b'\x00\x01\x02\x03')
                print(f'写入{n}字节')

            # 追加写入
            with open('log.txt', 'a', encoding='utf-8') as f:
                import datetime
                timestamp = datetime.datetime.now()
                f.write(f'[{timestamp}] 日志信息\n')
            ---
    b.writelines方法
        a.功能说明
            写入字符串列表���不自动添加换行符,需要手动在每个字符串末尾添加\n。
        b.代码示例
            ---
            # 写入多行
            lines = ['第一行\n', '第二行\n', '第三行\n']
            with open('output.txt', 'w', encoding='utf-8') as f:
                f.writelines(lines)

            # 从列表生成文件
            data = ['apple', 'banana', 'orange']
            with open('fruits.txt', 'w', encoding='utf-8') as f:
                f.writelines(f'{item}\n' for item in data)

            # 复制文件
            with open('source.txt', 'r', encoding='utf-8') as src:
                with open('dest.txt', 'w', encoding='utf-8') as dst:
                    dst.writelines(src.readlines())
            ---

03.位置控制
    a.seek方法
        a.功能说明
            移动文件指针到指定位置,第一个参数是偏移量,第二个参数是起始位置,0表示文件开头,1表示当前位置,2表示文件末尾。
        b.代码示例
            ---
            # 移动到文件开头
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.read(10)
                f.seek(0)  # 回到开头
                content = f.read()
                print(content)

            # 移动到指定位置
            with open('data.txt', 'rb') as f:
                f.seek(100)  # 跳过前100字节
                data = f.read(50)
                print(f'读取: {data}')

            # 移动到文件末尾
            with open('data.txt', 'rb') as f:
                f.seek(0, 2)  # 移到末尾
                size = f.tell()
                print(f'文件大小: {size}字节')

            # 相对当前位置移动
            with open('data.bin', 'rb') as f:
                f.read(10)
                f.seek(5, 1)  # 从当前位置向后移5字节
                data = f.read(10)
            ---
    b.tell方法
        a.功能说明
            返回当前文件指针位置,以字节为单位,用于记录读写位置。
        b.代码示例
            ---
            # 获取当前位置
            with open('data.txt', 'r', encoding='utf-8') as f:
                print(f'初始位置: {f.tell()}')
                f.read(50)
                print(f'读取后位置: {f.tell()}')

            # 记录和恢复位置
            with open('data.txt', 'r+', encoding='utf-8') as f:
                pos = f.tell()
                content = f.read(100)
                f.seek(pos)  # 恢复到之前位置
                f.write('修改内容')

            # 计算文件大小
            with open('data.bin', 'rb') as f:
                f.seek(0, 2)
                size = f.tell()
                print(f'文件大小: {size}字节')
            ---

04.其他方法
    a.flush方法
        a.功能说明
            强制将缓冲区内容写入磁盘,确保数据持久化,不关闭文件。
        b.代码示例
            ---
            # 实时写入日志
            import time
            with open('realtime.log', 'w', encoding='utf-8') as f:
                for i in range(10):
                    f.write(f'进度: {i*10}%\n')
                    f.flush()  # 立即写入磁盘
                    time.sleep(1)

            # 确保数据安全
            with open('important.txt', 'w', encoding='utf-8') as f:
                f.write('重要数据')
                f.flush()  # 强制写入
                # 继续其他操作
            ---
    b.truncate方法
        a.功能说明
            截断文件到指定大小,不指定参数时截断到当前位置,删除后面的内容。
        b.代码示例
            ---
            # 清空文件
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.truncate(0)  # 清空文件

            # 截断到指定大小
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.truncate(100)  # 保留前100字节

            # 截断到当前位置
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.read(50)
                f.truncate()  # 删除50字节后的内容
            ---
    c.fileno方法
        a.功能说明
            返回文件描述符,是一个整数,用于底层系统调用。
        b.代码示例
            ---
            # 获取文件描述符
            with open('data.txt', 'r', encoding='utf-8') as f:
                fd = f.fileno()
                print(f'文件描述符: {fd}')

            # 使用os模块操作
            import os
            with open('data.txt', 'r', encoding='utf-8') as f:
                fd = f.fileno()
                # 获取文件状态
                stat = os.fstat(fd)
                print(f'文件大小: {stat.st_size}')
                print(f'修改时间: {stat.st_mtime}')
            ---
    d.isatty方法
        a.功能说明
            判断文件是否连接到终端设备,返回布尔值。
        b.代码示例
            ---
            # 检查是否是终端
            import sys
            if sys.stdout.isatty():
                print('输出到终端')
            else:
                print('输出被重定向')

            # 文件对象检查
            with open('data.txt', 'r', encoding='utf-8') as f:
                if f.isatty():
                    print('连接到终端')
                else:
                    print('普通文件')
            ---

1.4 上下文管理器

01.with语句
    a.基本用法
        a.功能说明
            with语句自动管理资源,确保文件在使用后正确关闭,即使发生异常也能保证资源释放。
        b.代码示例
            ---
            # 基本with语句
            with open('data.txt', 'r', encoding='utf-8') as f:
                content = f.read()
                print(content)
            # 文件自动关闭

            # 等价的传统写法
            f = open('data.txt', 'r', encoding='utf-8')
            try:
                content = f.read()
                print(content)
            finally:
                f.close()

            # 异常处理
            try:
                with open('data.txt', 'r', encoding='utf-8') as f:
                    content = f.read()
                    # 即使这里抛出异常,文件也会被关闭
                    result = 1 / 0
            except ZeroDivisionError:
                print('发生错误,但文件已关闭')
            ---
    b.多文件操作
        a.功能说明
            with语句支持同时打开多个文件,使用逗号分隔,所有文件都会自动关闭。
        b.代码示例
            ---
            # 同时打开两个文件
            with open('source.txt', 'r', encoding='utf-8') as src, \
                 open('dest.txt', 'w', encoding='utf-8') as dst:
                content = src.read()
                dst.write(content)

            # 多文件处理
            with open('file1.txt', 'r', encoding='utf-8') as f1, \
                 open('file2.txt', 'r', encoding='utf-8') as f2, \
                 open('output.txt', 'w', encoding='utf-8') as out:
                out.write(f1.read())
                out.write('\n---分隔线---\n')
                out.write(f2.read())

            # 嵌套with语句
            with open('source.txt', 'r', encoding='utf-8') as src:
                with open('dest.txt', 'w', encoding='utf-8') as dst:
                    for line in src:
                        dst.write(line.upper())
            ---

02.自定义上下文管理器
    a.类实现
        a.功能说明
            实现__enter__和__exit__方法创建自定义上下文管理器,__enter__在进入with块时调用,__exit__在退出时调用。
        b.代码示例
            ---
            # 自定义文件管理器
            class FileManager:
                def __init__(self, filename, mode):
                    self.filename = filename
                    self.mode = mode
                    self.file = None

                def __enter__(self):
                    print(f'打开文件: {self.filename}')
                    self.file = open(self.filename, self.mode, encoding='utf-8')
                    return self.file

                def __exit__(self, exc_type, exc_val, exc_tb):
                    if self.file:
                        self.file.close()
                        print(f'关闭文件: {self.filename}')
                    if exc_type:
                        print(f'发生异常: {exc_type.__name__}')
                    return False  # 不抑制异常

            # 使用自定义管理器
            with FileManager('test.txt', 'w') as f:
                f.write('测试内容')
            ---
    b.装饰器实现
        a.功能说明
            使用contextlib.contextmanager装饰器将生成器函数转换为上下文管理器,yield前的代码在进入时执行,yield后的代码在退出时执行。
        b.代码示例
            ---
            from contextlib import contextmanager

            # 装饰器方式
            @contextmanager
            def file_manager(filename, mode):
                print(f'打开文件: {filename}')
                f = open(filename, mode, encoding='utf-8')
                try:
                    yield f
                finally:
                    f.close()
                    print(f'关闭文件: {filename}')

            # 使用装饰器管理器
            with file_manager('test.txt', 'w') as f:
                f.write('测试内容')

            # 带异常处理的管理器
            @contextmanager
            def safe_file(filename, mode):
                try:
                    f = open(filename, mode, encoding='utf-8')
                    yield f
                except IOError as e:
                    print(f'文件操作失败: {e}')
                    yield None
                finally:
                    if f:
                        f.close()

            with safe_file('test.txt', 'r') as f:
                if f:
                    content = f.read()
            ---

03.资源管理
    a.自动清理
        a.功能说明
            上下文管理器确保资源正确释放,避免资源泄漏,即使发生异常也能保证清理。
        b.代码示例
            ---
            # 文件锁管理
            import fcntl
            from contextlib import contextmanager

            @contextmanager
            def file_lock(filename):
                f = open(filename, 'r+', encoding='utf-8')
                try:
                    fcntl.flock(f.fileno(), fcntl.LOCK_EX)
                    print('获取文件锁')
                    yield f
                finally:
                    fcntl.flock(f.fileno(), fcntl.LOCK_UN)
                    f.close()
                    print('释放文件锁')

            # 使用文件锁
            with file_lock('shared.txt') as f:
                content = f.read()
                f.seek(0)
                f.write('修改内容')

            # 临时文件管理
            import tempfile

            @contextmanager
            def temp_file():
                f = tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8')
                try:
                    yield f
                finally:
                    f.close()
                    import os
                    os.unlink(f.name)
                    print('删除临时文件')

            with temp_file() as f:
                f.write('临时数据')
                f.seek(0)
                print(f.read())
            ---
    b.嵌套管理
        a.功能说明
            使用contextlib.ExitStack管理多个上下文,动态添加和管理资源。
        b.代码示例
            ---
            from contextlib import ExitStack

            # 动态管理多个文件
            def process_files(filenames):
                with ExitStack() as stack:
                    files = [stack.enter_context(open(f, 'r', encoding='utf-8'))
                            for f in filenames]
                    for f in files:
                        print(f.read())

            process_files(['file1.txt', 'file2.txt', 'file3.txt'])

            # 条件资源管理
            def conditional_write(filename, condition):
                with ExitStack() as stack:
                    if condition:
                        f = stack.enter_context(open(filename, 'w', encoding='utf-8'))
                        f.write('条件满足,写入数据')
                    else:
                        print('条件不满足,跳过写入')

            conditional_write('output.txt', True)

            # 批量文件处理
            def merge_files(input_files, output_file):
                with ExitStack() as stack:
                    output = stack.enter_context(open(output_file, 'w', encoding='utf-8'))
                    for filename in input_files:
                        f = stack.enter_context(open(filename, 'r', encoding='utf-8'))
                        output.write(f.read())
                        output.write('\n')

            merge_files(['a.txt', 'b.txt', 'c.txt'], 'merged.txt')
            ---

04.最佳实践
    a.异常处理
        a.功能说明
            上下文管理器的__exit__方法接收异常信息,可以选择处理或传播异常。
        b.代码示例
            ---
            from contextlib import contextmanager

            # 抑制特定异常
            @contextmanager
            def ignore_errors(filename, mode):
                try:
                    f = open(filename, mode, encoding='utf-8')
                    yield f
                except FileNotFoundError:
                    print('文件不存在,已忽略')
                    yield None
                finally:
                    if 'f' in locals() and f:
                        f.close()

            with ignore_errors('missing.txt', 'r') as f:
                if f:
                    content = f.read()

            # 记录异常日志
            @contextmanager
            def logged_file(filename, mode):
                import logging
                try:
                    f = open(filename, mode, encoding='utf-8')
                    yield f
                except Exception as e:
                    logging.error(f'文件操作失败: {e}')
                    raise
                finally:
                    if 'f' in locals() and f:
                        f.close()
            ---
    b.性能优化
        a.功能说明
            合理使用上下文管理器可以提高代码性能和可读性,避免资源泄漏。
        b.代码示例
            ---
            # 批量处理优化
            def process_large_files(filenames):
                with ExitStack() as stack:
                    # 延迟打开文件
                    for filename in filenames:
                        f = stack.enter_context(open(filename, 'r', encoding='utf-8'))
                        # 逐个处理,避免同时打开过多文件
                        for line in f:
                            process_line(line)

            # 缓冲区管理
            @contextmanager
            def buffered_write(filename, buffer_size=8192):
                f = open(filename, 'w', encoding='utf-8', buffering=buffer_size)
                try:
                    yield f
                finally:
                    f.flush()
                    f.close()

            with buffered_write('output.txt') as f:
                for i in range(10000):
                    f.write(f'行{i}\n')

            def process_line(line):
                pass
            ---

1.5 文件指针操作

01.指针位置
    a.tell方法
        a.功能说明
            返回当前文件指针的字节位置,从文件开头计算,用于记录和恢复读写位置。
        b.代码示例
            ---
            # 获取当前位置
            with open('data.txt', 'r', encoding='utf-8') as f:
                print(f'初始位置: {f.tell()}')  # 0
                f.read(10)
                print(f'读取10字符后: {f.tell()}')
                f.read(20)
                print(f'再读取20字符后: {f.tell()}')

            # 二进制模式位置
            with open('data.bin', 'rb') as f:
                print(f'初始: {f.tell()}')
                f.read(100)
                pos = f.tell()
                print(f'当前位置: {pos}字节')

            # 计算文件大小
            with open('file.txt', 'rb') as f:
                f.seek(0, 2)  # 移到末尾
                size = f.tell()
                print(f'文件大小: {size}字节')
            ---
    b.seek方法
        a.功能说明
            移动文件指针到指定位置,第一个参数是偏移量,第二个参数是参考点,0表示文件开头,1表示当前位置,2表示文件末尾。
        b.代码示例
            ---
            # 移动到文件开头
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.read(50)
                f.seek(0)  # 回到开头
                content = f.read()

            # 移动到指定位置
            with open('data.txt', 'rb') as f:
                f.seek(100)  # 跳到第100字节
                data = f.read(50)

            # 从当前位置移动
            with open('data.bin', 'rb') as f:
                f.read(10)
                f.seek(5, 1)  # 从当前位置向后移5字节
                data = f.read()

            # 从文件末尾移动
            with open('data.txt', 'rb') as f:
                f.seek(-100, 2)  # 从末尾向前100字节
                tail = f.read()
            ---

02.随机访问
    a.读取指定位置
        a.功能说明
            通过seek和tell组合实现随机访问,可以跳转到文件任意位置读取数据。
        b.代码示例
            ---
            # 读取文件中间部分
            with open('large.txt', 'rb') as f:
                # 读取1000-2000字节
                f.seek(1000)
                chunk = f.read(1000)
                print(f'读取{len(chunk)}字节')

            # 读取多个片段
            with open('data.bin', 'rb') as f:
                positions = [0, 100, 200, 300]
                for pos in positions:
                    f.seek(pos)
                    data = f.read(10)
                    print(f'位置{pos}: {data.hex()}')

            # 跳过特定内容
            with open('log.txt', 'r', encoding='utf-8') as f:
                f.seek(500)  # 跳过前500字节
                for line in f:
                    print(line.strip())
            ---
    b.修改指定位置
        a.功能说明
            使用r+或rb+模式可以在文件任意位置修改内容,不影响其他部分。
        b.代码示例
            ---
            # 修改文件中间内容
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.seek(10)  # 跳到第10字节
                f.write('修改')  # 覆盖原内容

            # 二进制文件修改
            with open('data.bin', 'rb+') as f:
                f.seek(100)
                f.write(b'\xFF\xFF\xFF\xFF')  # 修改4字节

            # 插入数据(需要读取后面内容)
            with open('data.txt', 'r+', encoding='utf-8') as f:
                f.seek(10)
                rest = f.read()  # 保存后面内容
                f.seek(10)
                f.write('插入内容')
                f.write(rest)
            ---

03.分块处理
    a.固定大小分块
        a.功能说明
            将大文件分成固定大小的块处理,避免一次性加载整个文件到内存。
        b.代码示例
            ---
            # 分块读取大文件
            chunk_size = 1024 * 1024  # 1MB
            with open('large_file.bin', 'rb') as f:
                chunk_num = 0
                while True:
                    chunk = f.read(chunk_size)
                    if not chunk:
                        break
                    chunk_num += 1
                    print(f'处理第{chunk_num}块,大小{len(chunk)}字节')
                    # 处理数据块
                    process_chunk(chunk)

            # 分块复制文件
            def copy_file(src, dst, chunk_size=8192):
                with open(src, 'rb') as f_src:
                    with open(dst, 'wb') as f_dst:
                        while True:
                            chunk = f_src.read(chunk_size)
                            if not chunk:
                                break
                            f_dst.write(chunk)

            copy_file('source.bin', 'dest.bin')

            def process_chunk(chunk):
                pass
            ---
    b.按行分块
        a.功能说明
            对文本文件按行分块处理,适合日志文件和CSV文件等行式数据。
        b.代码示例
            ---
            # 批量处理行
            batch_size = 1000
            with open('large.log', 'r', encoding='utf-8') as f:
                batch = []
                for line in f:
                    batch.append(line.strip())
                    if len(batch) >= batch_size:
                        process_batch(batch)
                        batch = []
                # 处理剩余行
                if batch:
                    process_batch(batch)

            # 分页读取
            def read_page(filename, page_num, page_size=100):
                with open(filename, 'r', encoding='utf-8') as f:
                    # 跳过前面的页
                    for _ in range(page_num * page_size):
                        if not f.readline():
                            return []
                    # 读取当前页
                    lines = []
                    for _ in range(page_size):
                        line = f.readline()
                        if not line:
                            break
                        lines.append(line.strip())
                    return lines

            page1 = read_page('data.txt', 0)  # 第1页
            page2 = read_page('data.txt', 1)  # 第2页

            def process_batch(batch):
                pass
            ---

04.高级技巧
    a.双向遍历
        a.功能说明
            通过seek实现文件的双向遍历,可以从后向前读取文件。
        b.代码示例
            ---
            # 从后向前读取行
            def read_reverse(filename):
                with open(filename, 'rb') as f:
                    f.seek(0, 2)  # 移到末尾
                    position = f.tell()
                    lines = []
                    buffer = b''

                    while position >= 0:
                        f.seek(position)
                        char = f.read(1)
                        if char == b'\n' and buffer:
                            lines.append(buffer[::-1].decode('utf-8'))
                            buffer = b''
                        else:
                            buffer += char
                        position -= 1

                    if buffer:
                        lines.append(buffer[::-1].decode('utf-8'))
                    return lines

            # 读取最后N行
            def tail(filename, n=10):
                lines = read_reverse(filename)
                return lines[:n]

            last_lines = tail('log.txt', 20)
            for line in last_lines:
                print(line)
            ---
    b.索引建立
        a.功能说明
            为大文件建立索引,记录关键位置,实现快速定位和查找。
        b.代码示例
            ---
            # 建立行索引
            def build_line_index(filename):
                index = [0]  # 第一行从0开始
                with open(filename, 'rb') as f:
                    while True:
                        line = f.readline()
                        if not line:
                            break
                        index.append(f.tell())
                return index

            # 使用索引快速读取指定行
            def read_line_by_number(filename, line_num, index):
                if line_num >= len(index):
                    return None
                with open(filename, 'rb') as f:
                    f.seek(index[line_num])
                    line = f.readline()
                    return line.decode('utf-8').strip()

            # 使用示例
            index = build_line_index('large.txt')
            line_100 = read_line_by_number('large.txt', 100, index)
            print(f'第100行: {line_100}')

            # 建立关键字索引
            def build_keyword_index(filename, keyword):
                positions = []
                with open(filename, 'rb') as f:
                    while True:
                        pos = f.tell()
                        line = f.readline()
                        if not line:
                            break
                        if keyword.encode() in line:
                            positions.append(pos)
                return positions

            # 查找所有包含关键字的行
            keyword_pos = build_keyword_index('log.txt', 'ERROR')
            print(f'找到{len(keyword_pos)}个错误')
            ---
    c.内存映射
        a.功能说明
            使用mmap将文件映射到内存,实现高效的随机访问和修改。
        b.代码示例
            ---
            import mmap

            # 内存映射读取
            with open('large.bin', 'rb') as f:
                with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
                    # 像字符串一样访问
                    print(f'文件大小: {len(m)}')
                    print(f'前10字节: {m[:10].hex()}')

                    # 查找内容
                    pos = m.find(b'pattern')
                    if pos != -1:
                        print(f'找到位置: {pos}')

            # 内存映射修改
            with open('data.bin', 'r+b') as f:
                with mmap.mmap(f.fileno(), 0) as m:
                    # 修改指定位置
                    m[100:104] = b'\xFF\xFF\xFF\xFF'
                    # 查找并替换
                    m[:] = m[:].replace(b'old', b'new')

            # 共享内存映射
            with open('shared.dat', 'r+b') as f:
                m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE)
                m[0:10] = b'shared data'
                m.flush()
                m.close()
            ---

2. 标准输入输出

2.1 sys标准流

01.标准流概述
    a.三个标准流
        a.stdin标准输入
            sys.stdin是标准输入流,默认从键盘读取数据,可以重定向到文件或管道。
        b.stdout标准输出
            sys.stdout是标准输出流,默认输出到终端,print函数默认写入此流。
        c.stderr标准错误
            sys.stderr是标准错误流,用于输出错误信息,默认输出到终端,与stdout分离。
    b.基本使用
        a.功能说明
            标准流是类文件对象,支持read、write、readline等方法,可以像操作文件一样操作标准流。
        b.代码示例
            ---
            import sys

            # 读取标准输入
            line = sys.stdin.readline()
            print(f'读取到: {line.strip()}')

            # 写入标准输出
            sys.stdout.write('输出到stdout\n')
            sys.stdout.flush()

            # 写入标准错误
            sys.stderr.write('错误信息\n')
            sys.stderr.flush()

            # 检查是否是终端
            if sys.stdout.isatty():
                print('输出到终端')
            else:
                print('输出被重定向')
            ---

02.标准输入操作
    a.读取方法
        a.功能说明
            sys.stdin支持read、readline、readlines等方法,可以逐行或批量读取输入。
        b.代码示例
            ---
            import sys

            # 读取一行
            print('请输入一行文本:')
            line = sys.stdin.readline()
            print(f'你输入了: {line.strip()}')

            # 读取所有行
            print('输入多行文本(Ctrl+D结束):')
            lines = sys.stdin.readlines()
            print(f'共输入{len(lines)}行')

            # 逐行处理
            print('输入数字(输入quit退出):')
            for line in sys.stdin:
                line = line.strip()
                if line == 'quit':
                    break
                try:
                    num = int(line)
                    print(f'数字的平方: {num ** 2}')
                except ValueError:
                    print('请输入有效数字')
            ---
    b.缓冲控制
        a.功能说明
            标准输入默认是行缓冲,可以通过设置缓冲模式控制读取行为。
        b.代码示例
            ---
            import sys

            # 获取文件描述符
            fd = sys.stdin.fileno()
            print(f'stdin文件描述符: {fd}')

            # 检查缓冲模式
            import io
            if isinstance(sys.stdin, io.TextIOWrapper):
                print(f'缓冲大小: {sys.stdin.buffer.raw._blksize}')

            # 无缓冲读取(需要使用os模块)
            import os
            os.set_blocking(fd, False)
            try:
                data = sys.stdin.read(1)
                print(f'读取: {data}')
            except BlockingIOError:
                print('无数据可读')
            finally:
                os.set_blocking(fd, True)
            ---

03.标准输出操作
    a.写入方法
        a.功能说明
            sys.stdout.write直接写入字符串,不自动添加换行符,返回写入的字符数。
        b.代码示例
            ---
            import sys

            # 基本写入
            sys.stdout.write('Hello ')
            sys.stdout.write('World\n')

            # 格式化输出
            name = 'Python'
            version = 3.9
            sys.stdout.write(f'{name} {version}\n')

            # 进度条示例
            import time
            for i in range(101):
                sys.stdout.write(f'\r进度: {i}%')
                sys.stdout.flush()
                time.sleep(0.05)
            sys.stdout.write('\n')

            # 彩色输出(ANSI转义码)
            sys.stdout.write('\033[31m红色文本\033[0m\n')
            sys.stdout.write('\033[32m绿色文本\033[0m\n')
            sys.stdout.write('\033[33m黄色文本\033[0m\n')
            ---
    b.刷新缓冲
        a.功能说明
            sys.stdout默认是行缓冲,遇到换行符或缓冲区满时自动刷新,可以手动调用flush强制刷新。
        b.代码示例
            ---
            import sys
            import time

            # 实时输出
            sys.stdout.write('加载中')
            for i in range(5):
                sys.stdout.write('.')
                sys.stdout.flush()  # 立即显示
                time.sleep(0.5)
            sys.stdout.write(' 完成\n')

            # 日志实时写入
            def log(message):
                timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
                sys.stdout.write(f'[{timestamp}] {message}\n')
                sys.stdout.flush()

            log('程序启动')
            log('正在处理数据')
            log('处理完成')

            # 禁用缓冲
            sys.stdout = open(sys.stdout.fileno(), 'w', buffering=1)
            print('无缓冲输出')
            ---

04.标准错误操作
    a.错误输出
        a.功能说明
            sys.stderr专门用于输出错误和警告信息,与stdout分离,便于重定向和日志管理。
        b.代码示例
            ---
            import sys

            # 输出错误信息
            sys.stderr.write('错误: 文件不存在\n')

            # 警告信息
            def warning(msg):
                sys.stderr.write(f'警告: {msg}\n')
                sys.stderr.flush()

            warning('配置文件缺失')
            warning('使用默认配置')

            # 异常信息
            try:
                result = 10 / 0
            except ZeroDivisionError as e:
                sys.stderr.write(f'异常: {type(e).__name__}: {e}\n')

            # 调试信息
            DEBUG = True
            def debug(msg):
                if DEBUG:
                    sys.stderr.write(f'DEBUG: {msg}\n')

            debug('变量值: x=10')
            debug('进入函数: process_data')
            ---
    b.错误处理
        a.功能说明
            合理使用stderr可以将正常输出和错误信息分离,便于日志分析和错误追踪。
        b.代码示例
            ---
            import sys

            # 分离输出和错误
            def process_file(filename):
                try:
                    with open(filename, 'r') as f:
                        content = f.read()
                        sys.stdout.write(f'成功读取: {filename}\n')
                        return content
                except FileNotFoundError:
                    sys.stderr.write(f'错误: 文件不存在 {filename}\n')
                    return None
                except PermissionError:
                    sys.stderr.write(f'错误: 无权限访问 {filename}\n')
                    return None

            # 批量处理
            files = ['file1.txt', 'file2.txt', 'missing.txt']
            for f in files:
                process_file(f)

            # 错误统计
            error_count = 0
            def error(msg):
                global error_count
                error_count += 1
                sys.stderr.write(f'错误 #{error_count}: {msg}\n')

            error('连接超时')
            error('数据格式错误')
            sys.stderr.write(f'\n总错误数: {error_count}\n')
            ---

2.2 input与print

01.input函数
    a.基本用法
        a.功能说明
            input函数从标准输入读取一行文本,自动去除末尾换行符,返回字符串类型。
        b.代码示例
            ---
            # 基本输入
            name = input('请输入你的名字: ')
            print(f'你好, {name}!')

            # 数字输入
            age = int(input('请输入年龄: '))
            print(f'你{age}岁了')

            # 多个输入
            x = float(input('输入x坐标: '))
            y = float(input('输入y坐标: '))
            distance = (x**2 + y**2) ** 0.5
            print(f'距离原点: {distance:.2f}')

            # 输入验证
            while True:
                try:
                    num = int(input('输入正整数: '))
                    if num > 0:
                        break
                    print('必须是正数')
                except ValueError:
                    print('输入无效,请输入数字')
            ---
    b.高级用法
        a.功能说明
            input可以处理多种输入格式,支持默认值、列表输入等复杂场景。
        b.代码示例
            ---
            # 带默认值的输入
            def input_with_default(prompt, default):
                value = input(f'{prompt} [{default}]: ')
                return value if value else default

            host = input_with_default('服务器地址', 'localhost')
            port = int(input_with_default('端口', '8080'))

            # 列表输入
            numbers = input('输入多个数字(空格分隔): ')
            num_list = [int(x) for x in numbers.split()]
            print(f'总和: {sum(num_list)}')

            # 密码输入(隐藏显示)
            import getpass
            password = getpass.getpass('请输入密码: ')
            print('密码已接收')

            # 多行输入
            print('输入多行文本(输入END结束):')
            lines = []
            while True:
                line = input()
                if line == 'END':
                    break
                lines.append(line)
            text = '\n'.join(lines)
            print(f'共输入{len(lines)}行')
            ---

02.print函数
    a.基本用法
        a.功能说明
            print函数输出到标准输出,自动添加换行符,支持多个参数和格式化输出。
        b.代码示例
            ---
            # 基本输出
            print('Hello World')

            # 多个参数
            name = 'Python'
            version = 3.9
            print('语言:', name, '版本:', version)

            # 格式化输出
            pi = 3.14159
            print(f'圆周率: {pi:.2f}')

            # 不换行输出
            print('加载中', end='')
            print('.', end='')
            print('.', end='')
            print('.', end=' ')
            print('完成')

            # 自定义分隔符
            print('apple', 'banana', 'orange', sep=', ')
            print('2024', '01', '15', sep='-')
            ---
    b.高级用法
        a.功能说明
            print支持重定向输出、刷新缓冲、格式化等高级功能。
        b.代码示例
            ---
            import sys

            # 输出到文件
            with open('output.txt', 'w') as f:
                print('写入文件', file=f)
                print('第二行', file=f)

            # 输出到stderr
            print('错误信息', file=sys.stderr)

            # 强制刷新
            import time
            for i in range(5):
                print(f'进度: {i}', end=' ', flush=True)
                time.sleep(1)
            print()

            # 格式化表格
            data = [
                ('张三', 25, 5000),
                ('李四', 30, 6000),
                ('王五', 28, 5500)
            ]
            print(f'{"姓名":<6} {"年龄":<4} {"工资":<6}')
            print('-' * 20)
            for name, age, salary in data:
                print(f'{name:<6} {age:<4} {salary:<6}')
            ---

03.格式化输出
    a.f-string格式化
        a.功能说明
            f-string是Python 3.6+推荐的格式化方式,支持表达式、格式说明符等。
        b.代码示例
            ---
            # 基本格式化
            name = 'Alice'
            age = 25
            print(f'姓名: {name}, 年龄: {age}')

            # 表达式
            x = 10
            y = 20
            print(f'{x} + {y} = {x + y}')

            # 格式说明符
            pi = 3.14159
            print(f'保留2位: {pi:.2f}')
            print(f'保留4位: {pi:.4f}')
            print(f'科学计数: {pi:.2e}')

            # 对齐和填充
            print(f'{"左对齐":<10}|')
            print(f'{"右对齐":>10}|')
            print(f'{"居中":^10}|')
            print(f'{"填充":*^10}|')

            # 数字格式
            num = 1234567
            print(f'千分位: {num:,}')
            print(f'百分比: {0.85:.1%}')
            print(f'十六进制: {255:#x}')
            print(f'二进制: {10:#b}')
            ---
    b.format方法
        a.功能说明
            format方法是传统的格式化方式,支持位置参数和关键字参数。
        b.代码示例
            ---
            # 位置参数
            print('{} + {} = {}'.format(10, 20, 30))

            # 索引参数
            print('{0} {1} {0}'.format('Hello', 'World'))

            # 关键字参数
            print('{name}今年{age}岁'.format(name='张三', age=25))

            # 混合使用
            print('{0}的成绩: 数学{math}, 英语{english}'.format(
                '李四', math=90, english=85))

            # 格式说明符
            print('{:.2f}'.format(3.14159))
            print('{:>10}'.format('右对齐'))
            print('{:0>5}'.format(42))  # 补零

            # 字典格式化
            person = {'name': '王五', 'age': 30}
            print('{name}今年{age}岁'.format(**person))
            ---

04.输入输出组合
    a.交互式程序
        a.功能说明
            结合input和print实现交互式命令行程序,提供友好的用户界面。
        b.代码示例
            ---
            # 简单计算器
            def calculator():
                print('=== 简单计算器 ===')
                while True:
                    print('\n操作: 1.加法 2.减法 3.乘法 4.除法 0.退出')
                    choice = input('请选择: ')

                    if choice == '0':
                        print('再见!')
                        break

                    if choice not in ['1', '2', '3', '4']:
                        print('无效选择')
                        continue

                    try:
                        a = float(input('第一个数: '))
                        b = float(input('第二个数: '))

                        if choice == '1':
                            print(f'结果: {a + b}')
                        elif choice == '2':
                            print(f'结果: {a - b}')
                        elif choice == '3':
                            print(f'结果: {a * b}')
                        elif choice == '4':
                            if b != 0:
                                print(f'结果: {a / b}')
                            else:
                                print('错误: 除数不能为0')
                    except ValueError:
                        print('错误: 请输入有效数字')

            calculator()
            ---
    b.菜单系统
        a.功能说明
            使用循环和条件语句实现多级菜单系统,提供清晰的导航结构。
        b.代码示例
            ---
            # 文件管理菜单
            def file_menu():
                while True:
                    print('\n' + '='*30)
                    print('文件管理系统')
                    print('='*30)
                    print('1. 查看文件')
                    print('2. 创建文件')
                    print('3. 删除文件')
                    print('4. 重命名文件')
                    print('0. 返回')
                    print('='*30)

                    choice = input('请选择: ')

                    if choice == '0':
                        break
                    elif choice == '1':
                        filename = input('文件名: ')
                        print(f'查看文件: {filename}')
                    elif choice == '2':
                        filename = input('文件名: ')
                        print(f'创建文件: {filename}')
                    elif choice == '3':
                        filename = input('文件名: ')
                        confirm = input(f'确认删除 {filename}? (y/n): ')
                        if confirm.lower() == 'y':
                            print('文件已删除')
                    elif choice == '4':
                        old = input('原文件名: ')
                        new = input('新文件名: ')
                        print(f'重命名: {old} -> {new}')
                    else:
                        print('无效选择')

            file_menu()
            ---

2.3 重定向标准流

01.输出重定向
    a.重定向到文件
        a.功能说明
            通过替换sys.stdout将标准输出重定向到文件,所有print输出都会写入文件。
        b.代码示例
            ---
            import sys

            # 保存原始stdout
            original_stdout = sys.stdout

            # 重定向到文件
            with open('output.log', 'w') as f:
                sys.stdout = f
                print('这行会写入文件')
                print('第二行')
                print('第三行')

            # 恢复stdout
            sys.stdout = original_stdout
            print('这行输出到终端')

            # 使用上下文管理器
            class RedirectStdout:
                def __init__(self, filename):
                    self.filename = filename
                    self.original = None
                    self.file = None

                def __enter__(self):
                    self.original = sys.stdout
                    self.file = open(self.filename, 'w')
                    sys.stdout = self.file
                    return self.file

                def __exit__(self, *args):
                    sys.stdout = self.original
                    self.file.close()

            with RedirectStdout('log.txt'):
                print('重定向输出')
            ---
    b.重定向到StringIO
        a.功能说明
            重定向到StringIO对象可以捕获输出内容到内存,便于测试和处理。
        b.代码示例
            ---
            import sys
            from io import StringIO

            # 捕获输出
            output = StringIO()
            sys.stdout = output

            print('第一行')
            print('第二行')

            # 获取输出内容
            sys.stdout = sys.__stdout__
            content = output.getvalue()
            print(f'捕获的内容:\n{content}')

            # 测试函数输出
            def test_function():
                output = StringIO()
                old_stdout = sys.stdout
                sys.stdout = output
                try:
                    print('测试输出')
                    return output.getvalue()
                finally:
                    sys.stdout = old_stdout

            result = test_function()
            print(f'函数输出: {result}')
            ---

02.输入重定向
    a.从文件读取
        a.功能说明
            重定向sys.stdin可以从文件读取输入,模拟用户输入进行测试。
        b.代码示例
            ---
            import sys

            # 创建测试输入文件
            with open('input.txt', 'w') as f:
                f.write('Alice\n')
                f.write('25\n')
                f.write('Beijing\n')

            # 重定向stdin
            original_stdin = sys.stdin
            with open('input.txt', 'r') as f:
                sys.stdin = f
                name = input('姓名: ')
                age = input('年龄: ')
                city = input('城市: ')

            sys.stdin = original_stdin
            print(f'{name}, {age}岁, 来自{city}')

            # 批量测试
            test_data = ['10', '20', '30']
            from io import StringIO
            sys.stdin = StringIO('\n'.join(test_data))

            numbers = []
            for _ in range(3):
                num = int(input())
                numbers.append(num)

            sys.stdin = sys.__stdin__
            print(f'总和: {sum(numbers)}')
            ---
    b.管道输入
        a.功能说明
            从管道读取输入,实现命令行工具的数据流处理。
        b.代码示例
            ---
            import sys

            # 检查是否有管道输入
            if not sys.stdin.isatty():
                print('从管道读取数据')
                for line in sys.stdin:
                    print(f'处理: {line.strip()}')
            else:
                print('从终端读取数据')
                line = input('输入: ')
                print(f'你输入了: {line}')

            # 统计行数
            line_count = 0
            for line in sys.stdin:
                line_count += 1
            print(f'共{line_count}行')

            # 过滤数据
            for line in sys.stdin:
                if 'ERROR' in line:
                    sys.stdout.write(line)
            ---

03.错误重定向
    a.重定向stderr
        a.功能说明
            重定向sys.stderr可以将错误信息输出到文件,便于日志记录和调试。
        b.代码示例
            ---
            import sys

            # 重定向stderr到文件
            original_stderr = sys.stderr
            with open('error.log', 'w') as f:
                sys.stderr = f
                print('错误信息1', file=sys.stderr)
                print('错误信息2', file=sys.stderr)

            sys.stderr = original_stderr

            # 同时重定向stdout和stderr
            with open('output.log', 'w') as out, \
                 open('error.log', 'w') as err:
                sys.stdout = out
                sys.stderr = err

                print('正常输出')
                print('错误输出', file=sys.stderr)

                sys.stdout = sys.__stdout__
                sys.stderr = sys.__stderr__

            # 合并输出和错误
            with open('combined.log', 'w') as f:
                sys.stdout = f
                sys.stderr = f
                print('正常信息')
                print('错误信息', file=sys.stderr)
                sys.stdout = sys.__stdout__
                sys.stderr = sys.__stderr__
            ---
    b.分离输出
        a.功能说明
            将正常输出和错误输出分离到不同文件,便于日志分析。
        b.代码示例
            ---
            import sys
            from contextlib import contextmanager

            @contextmanager
            def redirect_streams(stdout_file, stderr_file):
                old_out = sys.stdout
                old_err = sys.stderr
                try:
                    sys.stdout = open(stdout_file, 'w')
                    sys.stderr = open(stderr_file, 'w')
                    yield
                finally:
                    sys.stdout.close()
                    sys.stderr.close()
                    sys.stdout = old_out
                    sys.stderr = old_err

            # 使用重定向
            with redirect_streams('output.log', 'error.log'):
                print('正常日志')
                print('错误日志', file=sys.stderr)
                try:
                    1 / 0
                except Exception as e:
                    print(f'异常: {e}', file=sys.stderr)
            ---

04.contextlib重定向
    a.redirect_stdout
        a.功能说明
            contextlib提供的redirect_stdout上下文管理器,简化输出重定向操作。
        b.代码示例
            ---
            from contextlib import redirect_stdout
            from io import StringIO

            # 捕获输出
            f = StringIO()
            with redirect_stdout(f):
                print('重定向输出')
                print('第二行')

            output = f.getvalue()
            print(f'捕获内容: {output}')

            # 重定向到文件
            with open('output.txt', 'w') as f:
                with redirect_stdout(f):
                    print('写入文件')
                    for i in range(5):
                        print(f'行{i}')

            # 嵌套重定向
            with open('outer.txt', 'w') as f1:
                with redirect_stdout(f1):
                    print('外层输出')
                    with open('inner.txt', 'w') as f2:
                        with redirect_stdout(f2):
                            print('内层输出')
                    print('返回外层')
            ---
    b.redirect_stderr
        a.功能说明
            redirect_stderr用于重定向标准错误流,用法与redirect_stdout类似。
        b.代码示例
            ---
            from contextlib import redirect_stderr
            from io import StringIO

            # 捕获错误
            f = StringIO()
            with redirect_stderr(f):
                print('错误信息', file=sys.stderr)
                import warnings
                warnings.warn('警告信息')

            errors = f.getvalue()
            print(f'错误内容: {errors}')

            # 同时重定向
            from contextlib import redirect_stdout, redirect_stderr

            with open('output.log', 'w') as out, \
                 open('error.log', 'w') as err:
                with redirect_stdout(out), redirect_stderr(err):
                    print('正常输出')
                    print('错误输出', file=sys.stderr)

            # 抑制输出
            import os
            with redirect_stdout(open(os.devnull, 'w')):
                print('这行不会显示')
            ---

2.4 缓冲机制

01.缓冲类型
    a.行缓冲
        a.功能说明
            行缓冲模式下,遇到换行符时自动刷新缓冲区,标准输出默认使用行缓冲。
        b.代码示例
            ---
            import sys

            # 标准输出默认行缓冲
            sys.stdout.write('第一行\n')  # 自动刷新
            sys.stdout.write('第二行')    # 不刷新
            sys.stdout.write('\n')        # 刷新

            # 设置行缓冲
            sys.stdout = open(sys.stdout.fileno(), 'w', buffering=1)
            print('行缓冲输出')

            # 测试行缓冲
            import time
            for i in range(5):
                print(f'行{i}')  # 每行立即显示
                time.sleep(1)
            ---
    b.全缓冲
        a.功能说明
            全缓冲模式下,缓冲区满或手动flush时才刷新,适合文件IO提高性能。
        b.代码示例
            ---
            # 设置全缓冲
            buffer_size = 8192
            f = open('output.txt', 'w', buffering=buffer_size)

            # 写入数据
            for i in range(1000):
                f.write(f'行{i}\n')
            # 数据在缓冲区中

            f.flush()  # 强制刷新
            f.close()

            # 大缓冲区提高性能
            with open('large.txt', 'w', buffering=65536) as f:
                for i in range(100000):
                    f.write(f'数据{i}\n')
            ---
    c.无缓冲
        a.功能说明
            无缓冲模式下,每次写入立即刷新到磁盘,适合实时日志和关键数据。
        b.代码示例
            ---
            # 无缓冲写入
            f = open('realtime.log', 'w', buffering=0)  # 文本模式不支持
            # 需要使用二进制模式
            f = open('realtime.log', 'wb', buffering=0)
            f.write(b'实时写入\n')
            f.close()

            # 标准输出无缓冲
            import sys
            sys.stdout = open(sys.stdout.fileno(), 'w', buffering=1)

            # 实时日志
            import time
            with open('log.txt', 'wb', buffering=0) as f:
                for i in range(10):
                    f.write(f'日志{i}\n'.encode())
                    time.sleep(0.5)
            ---

02.缓冲区大小
    a.默认缓冲
        a.功能说明
            Python根据文件类型自动选择缓冲大小,终端使用行缓冲,文件使用全缓冲。
        b.代码示例
            ---
            import sys
            import io

            # 查看默认缓冲大小
            with open('test.txt', 'w') as f:
                if hasattr(f, 'buffer'):
                    print(f'缓冲大小: {f.buffer.raw._blksize}')

            # 标准输出缓冲
            if isinstance(sys.stdout, io.TextIOWrapper):
                print(f'stdout缓冲: {sys.stdout.line_buffering}')

            # 文件缓冲信息
            import os
            stat = os.stat('test.txt')
            print(f'块大小: {stat.st_blksize}')
            ---
    b.自定义缓冲
        a.功能说明
            通过buffering参数设置缓冲区大小,优化IO性能。
        b.代码示例
            ---
            # 小缓冲区
            with open('output.txt', 'w', buffering=512) as f:
                f.write('小缓冲区\n')

            # 大缓冲区
            with open('output.txt', 'w', buffering=65536) as f:
                for i in range(10000):
                    f.write(f'行{i}\n')

            # 性能对比
            import time

            # 默认缓冲
            start = time.time()
            with open('test1.txt', 'w') as f:
                for i in range(10000):
                    f.write(f'行{i}\n')
            time1 = time.time() - start

            # 大缓冲
            start = time.time()
            with open('test2.txt', 'w', buffering=65536) as f:
                for i in range(10000):
                    f.write(f'行{i}\n')
            time2 = time.time() - start

            print(f'默认: {time1:.3f}秒')
            print(f'大缓冲: {time2:.3f}秒')
            ---

03.刷新控制
    a.自动刷新
        a.功能说明
            缓冲区满、遇到换行符或文件关闭时自动刷新,无需手动干预。
        b.代码示例
            ---
            # 行缓冲自动刷新
            import sys
            for i in range(5):
                print(f'行{i}')  # 自动刷新

            # 缓冲区满自动刷新
            with open('output.txt', 'w', buffering=100) as f:
                f.write('x' * 50)   # 未满
                f.write('x' * 60)   # 超过100,自动刷新

            # 文件关闭自动刷新
            f = open('test.txt', 'w')
            f.write('数据')
            f.close()  # 自动刷新并关闭
            ---
    b.手动刷新
        a.功能说明
            使用flush方法强制刷新缓冲区,确保数据立即写入磁盘。
        b.代码示例
            ---
            import sys
            import time

            # 进度显示
            for i in range(101):
                sys.stdout.write(f'\r进度: {i}%')
                sys.stdout.flush()  # 立即显示
                time.sleep(0.05)
            print()

            # 实时日志
            with open('log.txt', 'w') as f:
                for i in range(10):
                    f.write(f'[{time.time()}] 事件{i}\n')
                    f.flush()  # 立即写入
                    time.sleep(1)

            # 关键数据
            with open('important.txt', 'w') as f:
                f.write('重要数据')
                f.flush()  # 确保写入
                # 继续其他操作
            ---

04.性能优化
    a.批量写入
        a.功能说明
            使用大缓冲区和批量写入减少系统调用次数,提高IO性能。
        b.代码示例
            ---
            import time

            # 逐行写入(慢)
            start = time.time()
            with open('test1.txt', 'w') as f:
                for i in range(10000):
                    f.write(f'行{i}\n')
                    f.flush()  # 每次都刷新
            time1 = time.time() - start

            # 批量写入(快)
            start = time.time()
            with open('test2.txt', 'w', buffering=65536) as f:
                lines = [f'行{i}\n' for i in range(10000)]
                f.writelines(lines)
            time2 = time.time() - start

            print(f'逐行: {time1:.3f}秒')
            print(f'批量: {time2:.3f}秒')
            print(f'提升: {time1/time2:.1f}倍')

            # 分块写入
            chunk_size = 1000
            with open('output.txt', 'w', buffering=65536) as f:
                for i in range(0, 10000, chunk_size):
                    chunk = [f'行{j}\n' for j in range(i, min(i+chunk_size, 10000))]
                    f.writelines(chunk)
            ---
    b.缓冲策略
        a.功能说明
            根据应用场景选择合适的缓冲策略,平衡性能和实时性。
        b.代码示例
            ---
            # 高性能写入
            def high_performance_write(filename, data):
                with open(filename, 'w', buffering=65536) as f:
                    f.writelines(data)

            # 实时日志
            def realtime_log(filename, message):
                with open(filename, 'a', buffering=1) as f:
                    f.write(f'{message}\n')

            # 关键数据
            def critical_write(filename, data):
                with open(filename, 'wb', buffering=0) as f:
                    f.write(data.encode())

            # 使用示例
            data = [f'行{i}\n' for i in range(10000)]
            high_performance_write('fast.txt', data)

            import time
            for i in range(5):
                realtime_log('log.txt', f'事件{i}')
                time.sleep(1)

            critical_write('important.txt', '关键数据')

            # 自适应缓冲
            def adaptive_write(filename, data, realtime=False):
                buffering = 1 if realtime else 65536
                with open(filename, 'w', buffering=buffering) as f:
                    if isinstance(data, list):
                        f.writelines(data)
                    else:
                        f.write(data)
                    if realtime:
                        f.flush()

            adaptive_write('output.txt', data, realtime=False)
            ---

3. 字节流与字符流

3.1 BytesIO与StringIO

01.BytesIO
    a.基本操作
        a.功能说明
            BytesIO在内存中操作二进制数据,提供类似文件对象的接口,无需创建实际文件。
        b.代码示例
            ---
            from io import BytesIO

            # 创建BytesIO对象
            bio = BytesIO()
            bio.write(b'Hello ')
            bio.write(b'World')
            print(f'当前位置: {bio.tell()}')

            # 读取内容
            bio.seek(0)
            content = bio.read()
            print(f'内容: {content}')

            # 从已有数据创建
            bio = BytesIO(b'Initial data')
            print(bio.read())

            # 获取所有数据
            bio = BytesIO()
            bio.write(b'test data')
            data = bio.getvalue()  # 不改变指针位置
            print(f'数据: {data}')
            ---
    b.应用场景
        a.功能说明
            BytesIO适合处理临时二进制数据、网络传输、图片处理等场景,避免磁盘IO开销。
        b.代码示例
            ---
            from io import BytesIO
            from PIL import Image

            # 图片处理
            img = Image.open('photo.jpg')
            bio = BytesIO()
            img.save(bio, format='PNG')
            png_data = bio.getvalue()
            print(f'PNG大小: {len(png_data)}字节')

            # 网络数据缓冲
            import requests
            response = requests.get('https://example.com/image.jpg')
            bio = BytesIO(response.content)
            img = Image.open(bio)
            img.show()

            # 数据压缩
            import gzip
            bio = BytesIO()
            with gzip.GzipFile(fileobj=bio, mode='wb') as gz:
                gz.write(b'compress this data')
            compressed = bio.getvalue()
            print(f'压缩后: {len(compressed)}字节')
            ---

02.StringIO
    a.基本操作
        a.功能说明
            StringIO在内存中操作文本数据,提供字符串的文件接口,支持读写和定位操作。
        b.代码示例
            ---
            from io import StringIO

            # 创建StringIO对象
            sio = StringIO()
            sio.write('第一行\n')
            sio.write('第二行\n')
            print(f'当前位置: {sio.tell()}')

            # 读取内容
            sio.seek(0)
            content = sio.read()
            print(content)

            # 从已有字符串创建
            sio = StringIO('初始内容\n更多内容')
            for line in sio:
                print(line.strip())

            # 获取所有内容
            sio = StringIO()
            sio.write('测试数据')
            text = sio.getvalue()
            print(f'内容: {text}')
            ---
    b.应用场景
        a.功能说明
            StringIO适合处理临时文本、日志缓冲、CSV数据、单元测试等场景。
        b.代码示例
            ---
            from io import StringIO
            import csv

            # CSV处理
            sio = StringIO()
            writer = csv.writer(sio)
            writer.writerow(['姓名', '年龄', '城市'])
            writer.writerow(['张三', 25, '北京'])
            writer.writerow(['李四', 30, '上海'])
            csv_data = sio.getvalue()
            print(csv_data)

            # 日志缓冲
            import logging
            log_stream = StringIO()
            handler = logging.StreamHandler(log_stream)
            logger = logging.getLogger('test')
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
            logger.info('测试日志')
            logs = log_stream.getvalue()
            print(f'日志内容: {logs}')

            # 单元测试
            import sys
            old_stdout = sys.stdout
            sys.stdout = StringIO()
            print('捕获的输出')
            output = sys.stdout.getvalue()
            sys.stdout = old_stdout
            print(f'捕获到: {output}')
            ---

03.内存文件操作
    a.读写操作
        a.功能说明
            内存文件支持seek、tell、read、write等标准文件操作,性能优于磁盘文件。
        b.代码示例
            ---
            from io import BytesIO, StringIO

            # BytesIO读写
            bio = BytesIO()
            bio.write(b'0123456789')
            bio.seek(5)
            bio.write(b'ABCDE')
            bio.seek(0)
            print(bio.read())  # b'01234ABCDE'

            # StringIO读写
            sio = StringIO()
            sio.write('Hello World')
            sio.seek(6)
            sio.write('Python')
            sio.seek(0)
            print(sio.read())  # Hello Python

            # 分块读取
            bio = BytesIO(b'x' * 1000)
            chunks = []
            while True:
                chunk = bio.read(100)
                if not chunk:
                    break
                chunks.append(chunk)
            print(f'读取{len(chunks)}块')
            ---
    b.性能优化
        a.功能说明
            内存文件避免磁盘IO,适合频繁读写的临时数据,但需注意内存占用。
        b.代码示例
            ---
            import time
            from io import BytesIO

            # 性能对比
            data = b'x' * 1000000

            # 磁盘文件
            start = time.time()
            with open('temp.bin', 'wb') as f:
                for _ in range(100):
                    f.write(data)
            disk_time = time.time() - start

            # 内存文件
            start = time.time()
            bio = BytesIO()
            for _ in range(100):
                bio.write(data)
            memory_time = time.time() - start

            print(f'磁盘: {disk_time:.3f}秒')
            print(f'内存: {memory_time:.3f}秒')
            print(f'提升: {disk_time/memory_time:.1f}倍')

            # 清理
            import os
            os.remove('temp.bin')
            ---

04.实用技巧
    a.数据转换
        a.功能说明
            BytesIO和StringIO可以方便地在字节和字符串之间转换,处理编码问题。
        b.代码示例
            ---
            from io import BytesIO, StringIO

            # 字符串转字节
            sio = StringIO('中文内容')
            text = sio.getvalue()
            bio = BytesIO(text.encode('utf-8'))
            print(f'字节数: {len(bio.getvalue())}')

            # 字节转字符串
            bio = BytesIO('测试数据'.encode('utf-8'))
            data = bio.getvalue()
            sio = StringIO(data.decode('utf-8'))
            print(sio.getvalue())

            # 编码转换
            bio = BytesIO('中文'.encode('gbk'))
            gbk_data = bio.getvalue()
            utf8_data = gbk_data.decode('gbk').encode('utf-8')
            print(f'GBK: {len(gbk_data)}字节')
            print(f'UTF-8: {len(utf8_data)}字节')
            ---
    b.上下文管理
        a.功能说明
            BytesIO和StringIO支持with语句,自动管理资源,虽然不涉及文件关闭。
        b.代码示例
            ---
            from io import BytesIO, StringIO

            # BytesIO上下文
            with BytesIO() as bio:
                bio.write(b'temporary data')
                data = bio.getvalue()
                print(f'数据: {data}')

            # StringIO上下文
            with StringIO() as sio:
                sio.write('临时文本')
                text = sio.getvalue()
                print(f'文本: {text}')

            # 嵌套使用
            with BytesIO() as bio:
                with StringIO() as sio:
                    sio.write('text')
                    bio.write(sio.getvalue().encode())
                    print(bio.getvalue())

            # 函数返回
            def create_buffer():
                bio = BytesIO()
                bio.write(b'data')
                return bio.getvalue()

            result = create_buffer()
            print(f'结果: {result}')
            ---
    c.流复制
        a.功能说明
            使用shutil.copyfileobj可以在不同类型的流之间复制数据。
        b.代码示例
            ---
            from io import BytesIO, StringIO
            import shutil

            # 文件到内存
            with open('data.txt', 'rb') as f:
                bio = BytesIO()
                shutil.copyfileobj(f, bio)
                print(f'复制{bio.tell()}字节')

            # 内存到文件
            bio = BytesIO(b'save this data')
            bio.seek(0)
            with open('output.bin', 'wb') as f:
                shutil.copyfileobj(bio, f)

            # 内存间复制
            src = BytesIO(b'source data')
            dst = BytesIO()
            src.seek(0)
            shutil.copyfileobj(src, dst)
            print(dst.getvalue())

            # 分块复制
            def copy_stream(src, dst, chunk_size=8192):
                while True:
                    chunk = src.read(chunk_size)
                    if not chunk:
                        break
                    dst.write(chunk)

            src = BytesIO(b'x' * 100000)
            dst = BytesIO()
            src.seek(0)
            copy_stream(src, dst)
            print(f'复制{dst.tell()}字节')
            ---

3.2 编码与解码

01.字符编码
    a.常见编码
        a.功能说明
            Python支持多种字符编码,UTF-8是默认编码,支持全球所有字符,GBK用于中文,ASCII用于英文。
        b.代码示例
            ---
            # UTF-8编码
            text = '中文English123'
            utf8_bytes = text.encode('utf-8')
            print(f'UTF-8: {utf8_bytes}')
            print(f'长度: {len(utf8_bytes)}字节')

            # GBK编码
            gbk_bytes = text.encode('gbk')
            print(f'GBK: {gbk_bytes}')
            print(f'长度: {len(gbk_bytes)}字节')

            # ASCII编码
            ascii_text = 'Hello123'
            ascii_bytes = ascii_text.encode('ascii')
            print(f'ASCII: {ascii_bytes}')

            # 查看所有编码
            import encodings
            print(f'支持的编码数: {len(encodings.aliases.aliases)}')
            ---
    b.编码检测
        a.功能说明
            使用chardet库自动检测文件编码,避免解码错误,适合处理未知编码的文件。
        b.代码示例
            ---
            import chardet

            # 检测文件编码
            with open('unknown.txt', 'rb') as f:
                raw_data = f.read()
                result = chardet.detect(raw_data)
                encoding = result['encoding']
                confidence = result['confidence']
                print(f'编码: {encoding}')
                print(f'置信度: {confidence:.2%}')

            # 使用检测到的编码读取
            with open('unknown.txt', 'r', encoding=encoding) as f:
                content = f.read()
                print(content)

            # 批量检测
            import os
            for filename in os.listdir('.'):
                if filename.endswith('.txt'):
                    with open(filename, 'rb') as f:
                        data = f.read(1000)  # 只读前1000字节
                        result = chardet.detect(data)
                        print(f'{filename}: {result["encoding"]}')
            ---

02.编码转换
    a.字符串编码
        a.功能说明
            使用encode方法将字符串转换为字节序列,指定编码格式,处理编码错误。
        b.代码示例
            ---
            # 基本编码
            text = 'Python编程'
            bytes_utf8 = text.encode('utf-8')
            bytes_gbk = text.encode('gbk')
            print(f'UTF-8: {len(bytes_utf8)}字节')
            print(f'GBK: {len(bytes_gbk)}字节')

            # 错误处理
            text = 'Hello\ud800World'  # 包含无效字符
            # 忽略错误
            safe_bytes = text.encode('utf-8', errors='ignore')
            # 替换错误
            replace_bytes = text.encode('utf-8', errors='replace')
            # 使用XML字符引用
            xml_bytes = text.encode('utf-8', errors='xmlcharrefreplace')
            print(f'忽略: {safe_bytes}')
            print(f'替换: {replace_bytes}')
            print(f'XML: {xml_bytes}')

            # 编码对比
            text = '中文'
            encodings = ['utf-8', 'gbk', 'gb2312', 'big5']
            for enc in encodings:
                try:
                    data = text.encode(enc)
                    print(f'{enc}: {len(data)}字节 {data.hex()}')
                except:
                    print(f'{enc}: 不支持')
            ---
    b.字节解码
        a.功能说明
            使用decode方法将字节序列转换为字符串,需要指定正���的编码格式。
        b.代码示例
            ---
            # 基本解码
            utf8_bytes = b'\xe4\xb8\xad\xe6\x96\x87'
            text = utf8_bytes.decode('utf-8')
            print(f'解码结果: {text}')

            # 错误处理
            invalid_bytes = b'\xff\xfe'
            # 忽略错误
            text1 = invalid_bytes.decode('utf-8', errors='ignore')
            # 替换错误
            text2 = invalid_bytes.decode('utf-8', errors='replace')
            # 使用反斜杠转义
            text3 = invalid_bytes.decode('utf-8', errors='backslashreplace')
            print(f'忽略: {text1}')
            print(f'替换: {text2}')
            print(f'转义: {text3}')

            # 尝试多种编码
            data = b'\xd6\xd0\xce\xc4'
            encodings = ['utf-8', 'gbk', 'gb2312']
            for enc in encodings:
                try:
                    text = data.decode(enc)
                    print(f'{enc}: {text}')
                except:
                    print(f'{enc}: 解码失败')
            ---

03.文件编码处理
    a.读取不同编码
        a.功能说明
            打开文件时指定encoding参数,Python自动处理编码转换,避免乱码问题。
        b.代码示例
            ---
            # UTF-8文件
            with open('utf8.txt', 'r', encoding='utf-8') as f:
                content = f.read()
                print(content)

            # GBK文件
            with open('gbk.txt', 'r', encoding='gbk') as f:
                content = f.read()
                print(content)

            # 自动检测编码
            import chardet
            def read_file_auto(filename):
                with open(filename, 'rb') as f:
                    raw = f.read()
                    result = chardet.detect(raw)
                    encoding = result['encoding']
                with open(filename, 'r', encoding=encoding) as f:
                    return f.read()

            content = read_file_auto('unknown.txt')
            print(content)

            # 处理BOM
            with open('utf8_bom.txt', 'r', encoding='utf-8-sig') as f:
                content = f.read()  # 自动去除BOM
                print(content)
            ---
    b.编码转换
        a.功能说明
            读取一种编码的文件,转换为另一种编码保存,实现文件编码转换。
        b.代码示例
            ---
            # GBK转UTF-8
            with open('gbk.txt', 'r', encoding='gbk') as f:
                content = f.read()
            with open('utf8.txt', 'w', encoding='utf-8') as f:
                f.write(content)

            # 批量转换
            import os
            def convert_encoding(src_dir, src_enc, dst_enc):
                for filename in os.listdir(src_dir):
                    if filename.endswith('.txt'):
                        src_path = os.path.join(src_dir, filename)
                        with open(src_path, 'r', encoding=src_enc) as f:
                            content = f.read()
                        with open(src_path, 'w', encoding=dst_enc) as f:
                            f.write(content)
                        print(f'转换: {filename}')

            convert_encoding('.', 'gbk', 'utf-8')

            # 二进制方式转换
            with open('gbk.txt', 'rb') as f:
                gbk_data = f.read()
            text = gbk_data.decode('gbk')
            utf8_data = text.encode('utf-8')
            with open('utf8.txt', 'wb') as f:
                f.write(utf8_data)
            ---

04.编码最佳实践
    a.统一使用UTF-8
        a.功能说明
            UTF-8是最通用的编码,支持所有语言,建议所有文件都使用UTF-8编码。
        b.代码示例
            ---
            # 设置默认编码
            import sys
            print(f'默认编码: {sys.getdefaultencoding()}')

            # 文件操作统一UTF-8
            with open('data.txt', 'w', encoding='utf-8') as f:
                f.write('中文English日本語')

            with open('data.txt', 'r', encoding='utf-8') as f:
                content = f.read()
                print(content)

            # 网络传输UTF-8
            import json
            data = {'name': '张三', 'age': 25}
            json_str = json.dumps(data, ensure_ascii=False)
            json_bytes = json_str.encode('utf-8')
            print(f'JSON: {json_bytes}')

            # 数据库UTF-8
            import sqlite3
            conn = sqlite3.connect('test.db')
            conn.execute('PRAGMA encoding = "UTF-8"')
            conn.execute('CREATE TABLE IF NOT EXISTS users (name TEXT)')
            conn.execute('INSERT INTO users VALUES (?)', ('张三',))
            conn.commit()
            ---
    b.错误处理
        a.功能说明
            合理处理编码错误,使用errors参数控制错误行为,记录错误信息。
        b.代码示例
            ---
            # 安全读取文件
            def safe_read(filename, encoding='utf-8'):
                try:
                    with open(filename, 'r', encoding=encoding) as f:
                        return f.read()
                except UnicodeDecodeError as e:
                    print(f'解码错误: {e}')
                    # 使用替换模式重试
                    with open(filename, 'r', encoding=encoding, errors='replace') as f:
                        return f.read()

            content = safe_read('data.txt')

            # 记录编码问题
            import logging
            def read_with_log(filename):
                encodings = ['utf-8', 'gbk', 'gb2312', 'latin1']
                for enc in encodings:
                    try:
                        with open(filename, 'r', encoding=enc) as f:
                            content = f.read()
                            logging.info(f'成功使用{enc}编码')
                            return content
                    except UnicodeDecodeError:
                        logging.warning(f'{enc}编码失败')
                logging.error(f'所有编码都失败: {filename}')
                return None

            # 验证编码
            def validate_encoding(data, encoding='utf-8'):
                try:
                    data.decode(encoding)
                    return True
                except UnicodeDecodeError:
                    return False

            with open('test.txt', 'rb') as f:
                data = f.read()
                if validate_encoding(data):
                    print('UTF-8编码有效')
                else:
                    print('UTF-8编码无效')
            ---
    c.性能优化
        a.功能说明
            编码解码有性能开销,大文件处理时应考虑性能优化。
        b.代码示例
            ---
            import time

            # 性能测试
            text = '中文' * 100000

            # 编码性能
            start = time.time()
            for _ in range(100):
                data = text.encode('utf-8')
            utf8_time = time.time() - start

            start = time.time()
            for _ in range(100):
                data = text.encode('gbk')
            gbk_time = time.time() - start

            print(f'UTF-8编码: {utf8_time:.3f}秒')
            print(f'GBK编码: {gbk_time:.3f}秒')

            # 批量处理优化
            def process_large_file(filename):
                chunk_size = 1024 * 1024  # 1MB
                with open(filename, 'rb') as f:
                    while True:
                        chunk = f.read(chunk_size)
                        if not chunk:
                            break
                        # 处理字节数据,避免频繁编码
                        process_bytes(chunk)

            def process_bytes(data):
                pass

            # 缓存编码结果
            from functools import lru_cache
            @lru_cache(maxsize=1000)
            def cached_encode(text):
                return text.encode('utf-8')

            for i in range(1000):
                data = cached_encode('重复文本')
            ---

3.3 文本流与二进制流

01.TextIOWrapper
    a.文本流包装
        a.功能说明
            TextIOWrapper将二进制流包装为文本流,自动处理编码解码和换行符转换。
        b.代码示例
            ---
            import io

            # 创建文本流
            binary_stream = io.BytesIO(b'Hello World')
            text_stream = io.TextIOWrapper(binary_stream, encoding='utf-8')
            content = text_stream.read()
            print(f'内容: {content}')

            # 自定义换行符
            binary_stream = io.BytesIO()
            text_stream = io.TextIOWrapper(
                binary_stream,
                encoding='utf-8',
                newline='\r\n'  # Windows换行符
            )
            text_stream.write('第一行\n第二行\n')
            text_stream.flush()
            print(f'二进制: {binary_stream.getvalue()}')

            # 设置缓冲
            binary_stream = io.BytesIO()
            text_stream = io.TextIOWrapper(
                binary_stream,
                encoding='utf-8',
                line_buffering=True
            )
            text_stream.write('行缓冲\n')
            ---
    b.流属性
        a.功能说明
            TextIOWrapper提供buffer、encoding、errors等属性,访问底层流和编码信息。
        b.代码示例
            ---
            # 查看流属性
            with open('data.txt', 'r', encoding='utf-8') as f:
                print(f'编码: {f.encoding}')
                print(f'错误处理: {f.errors}')
                print(f'换行符: {f.newlines}')
                print(f'行缓冲: {f.line_buffering}')
                print(f'缓冲区: {f.buffer}')

            # 访问底层二进制流
            with open('data.txt', 'r', encoding='utf-8') as f:
                binary_stream = f.buffer
                raw_data = binary_stream.read(10)
                print(f'原始数据: {raw_data}')

            # 修改流属性
            import sys
            print(f'stdout编码: {sys.stdout.encoding}')
            if hasattr(sys.stdout, 'reconfigure'):
                sys.stdout.reconfigure(encoding='utf-8')
            ---

02.BufferedIOBase
    a.缓冲流基类
        a.功能说明
            BufferedIOBase是所有缓冲流的基类,提供read、write、seek等方法。
        b.代码示例
            ---
            import io

            # BufferedReader
            binary_data = b'x' * 1000
            raw_stream = io.BytesIO(binary_data)
            buffered_reader = io.BufferedReader(raw_stream, buffer_size=100)
            chunk = buffered_reader.read(50)
            print(f'读取: {len(chunk)}字节')

            # BufferedWriter
            raw_stream = io.BytesIO()
            buffered_writer = io.BufferedWriter(raw_stream, buffer_size=100)
            buffered_writer.write(b'buffered data')
            buffered_writer.flush()
            print(f'写入: {raw_stream.getvalue()}')

            # BufferedRandom
            raw_stream = io.BytesIO(b'initial data')
            buffered_random = io.BufferedRandom(raw_stream)
            buffered_random.seek(0)
            buffered_random.write(b'modified')
            buffered_random.seek(0)
            print(f'内容: {buffered_random.read()}')
            ---
    b.缓冲控制
        a.功能说明
            控制缓冲区大小和刷新策略,优化IO性能。
        b.代码示例
            ---
            import io

            # 自定义缓冲大小
            raw_stream = io.BytesIO()
            writer = io.BufferedWriter(raw_stream, buffer_size=8192)
            for i in range(100):
                writer.write(b'data\n')
            writer.flush()
            print(f'总大小: {len(raw_stream.getvalue())}')

            # 查看缓冲区状态
            raw_stream = io.BytesIO()
            writer = io.BufferedWriter(raw_stream, buffer_size=100)
            writer.write(b'x' * 50)
            print(f'缓冲区未满')
            writer.write(b'x' * 60)
            print(f'缓冲区已刷新')

            # 禁用缓冲
            with open('output.txt', 'wb', buffering=0) as f:
                f.write(b'no buffer\n')
            ---

03.流转换
    a.二进制转文本
        a.功能说明
            使用TextIOWrapper将二进制流转换为文本流,指定编码格式。
        b.代码示例
            ---
            import io

            # BytesIO转StringIO
            bio = io.BytesIO('中文内容'.encode('utf-8'))
            text_wrapper = io.TextIOWrapper(bio, encoding='utf-8')
            text = text_wrapper.read()
            print(f'文本: {text}')

            # 文件流转换
            with open('data.bin', 'rb') as binary_file:
                text_file = io.TextIOWrapper(binary_file, encoding='utf-8')
                for line in text_file:
                    print(line.strip())

            # 网络流转换
            import socket
            sock = socket.socket()
            sock.connect(('example.com', 80))
            sock_file = sock.makefile('rwb')
            text_sock = io.TextIOWrapper(sock_file, encoding='utf-8')
            text_sock.write('GET / HTTP/1.0\r\n\r\n')
            text_sock.flush()
            response = text_sock.read()
            sock.close()
            ---
    b.文本转二进制
        a.功能说明
            通过buffer属性访问文本流的底层二进制流。
        b.代码示例
            ---
            import io

            # 获取底层二进制流
            sio = io.StringIO('text data')
            # StringIO没有buffer属性,需要手动转换
            text = sio.getvalue()
            bio = io.BytesIO(text.encode('utf-8'))
            print(f'二进制: {bio.getvalue()}')

            # 文件流转换
            with open('data.txt', 'r', encoding='utf-8') as text_file:
                binary_stream = text_file.buffer
                raw_bytes = binary_stream.read(10)
                print(f'原始字节: {raw_bytes}')

            # 混合读取
            with open('data.txt', 'r', encoding='utf-8') as f:
                text = f.read(10)  # 读取10个字符
                f.buffer.seek(0)
                binary = f.buffer.read(10)  # 读取10个字节
                print(f'文本: {text}')
                print(f'二进制: {binary}')
            ---

04.流操作技巧
    a.流复用
        a.功能说明
            同一个底层流可以被多个包装器使用,实现不同的访问方式。
        b.代码示例
            ---
            import io

            # 多个包装器
            raw_stream = io.BytesIO(b'shared data')
            reader1 = io.BufferedReader(raw_stream)
            reader2 = io.BufferedReader(raw_stream)
            # 注意:共享位置指针
            data1 = reader1.read(5)
            data2 = reader2.read(5)
            print(f'Reader1: {data1}')
            print(f'Reader2: {data2}')

            # 不同编码访问
            raw_stream = io.BytesIO('中文'.encode('utf-8'))
            utf8_reader = io.TextIOWrapper(raw_stream, encoding='utf-8')
            text = utf8_reader.read()
            print(f'UTF-8: {text}')

            # 流链
            raw = io.BytesIO(b'x' * 1000)
            buffered = io.BufferedReader(raw, buffer_size=100)
            text = io.TextIOWrapper(buffered, encoding='utf-8')
            content = text.read()
            ---
    b.性能优化
        a.功能说明
            合理选择流类型和缓冲大小,提高IO性能。
        b.代码示例
            ---
            import io
            import time

            # 性能对比
            data = b'x' * 1000000

            # 无缓冲
            start = time.time()
            raw = io.BytesIO()
            for _ in range(100):
                raw.write(data)
            no_buffer_time = time.time() - start

            # 有缓冲
            start = time.time()
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=65536)
            for _ in range(100):
                buffered.write(data)
            buffered.flush()
            buffer_time = time.time() - start

            print(f'无缓冲: {no_buffer_time:.3f}秒')
            print(f'有缓冲: {buffer_time:.3f}秒')

            # 大缓冲区
            def write_with_buffer(filename, data, buffer_size):
                with open(filename, 'wb', buffering=buffer_size) as f:
                    f.write(data)

            sizes = [1024, 8192, 65536]
            for size in sizes:
                start = time.time()
                write_with_buffer('test.bin', data, size)
                elapsed = time.time() - start
                print(f'缓冲{size}: {elapsed:.3f}秒')

            import os
            os.remove('test.bin')
            ---
    c.错误处理
        a.功能说明
            处理流操作中的编码错误、IO错误等异常情况。
        b.代码示例
            ---
            import io

            # 编码错误处理
            binary_data = b'\xff\xfe invalid utf-8'
            text_stream = io.TextIOWrapper(
                io.BytesIO(binary_data),
                encoding='utf-8',
                errors='replace'
            )
            try:
                content = text_stream.read()
                print(f'内容: {content}')
            except Exception as e:
                print(f'错误: {e}')

            # IO错误处理
            def safe_stream_read(stream):
                try:
                    return stream.read()
                except io.UnsupportedOperation:
                    print('流不支持读取')
                except ValueError:
                    print('流已关闭')
                return None

            # 流状态检查
            stream = io.BytesIO(b'data')
            print(f'可读: {stream.readable()}')
            print(f'可写: {stream.writable()}')
            print(f'可定位: {stream.seekable()}')
            stream.close()
            print(f'已关闭: {stream.closed}')
            ---

3.4 字节序处理

01.字节序概念
    a.大端与小端
        a.功能说明
            大端序(Big-Endian)高位字节在前,小端序(Little-Endian)低位字节在前,网络传输使用大端序。
        b.代码示例
            ---
            import struct
            import sys

            # 查看系统字节序
            print(f'系统字节序: {sys.byteorder}')

            # 整数转字节
            num = 0x12345678
            # 大端序
            big_endian = num.to_bytes(4, byteorder='big')
            print(f'大端: {big_endian.hex()}')  # 12345678
            # 小端序
            little_endian = num.to_bytes(4, byteorder='little')
            print(f'小端: {little_endian.hex()}')  # 78563412

            # 字节转整数
            big_num = int.from_bytes(big_endian, byteorder='big')
            little_num = int.from_bytes(little_endian, byteorder='little')
            print(f'大端解析: {hex(big_num)}')
            print(f'小端解析: {hex(little_num)}')
            ---
    b.网络字节序
        a.功能说明
            网络协议统一使用大端序,socket编程需要进行字节序转换。
        b.代码示例
            ---
            import socket
            import struct

            # 主机序转网络序
            host_long = 0x12345678
            net_long = socket.htonl(host_long)
            print(f'主机序: {hex(host_long)}')
            print(f'网络序: {hex(net_long)}')

            # 网络序转主机序
            host_back = socket.ntohl(net_long)
            print(f'转回: {hex(host_back)}')

            # 短整数转换
            host_short = 0x1234
            net_short = socket.htons(host_short)
            print(f'短整数网络序: {hex(net_short)}')

            # 打包网络数据
            data = struct.pack('!I', 0x12345678)  # ! 表示网络序
            print(f'网络数据: {data.hex()}')
            value = struct.unpack('!I', data)[0]
            print(f'解包: {hex(value)}')
            ---

02.struct模块
    a.数据打包
        a.功能说明
            struct.pack将Python值转换为字节序列,支持多种数据类型和字节序。
        b.代码示例
            ---
            import struct

            # 基本类型打包
            # 整数
            data = struct.pack('i', 12345)  # 有符号整数
            print(f'整数: {data.hex()}')
            # 浮点数
            data = struct.pack('f', 3.14)
            print(f'浮点: {data.hex()}')
            # 字符串
            data = struct.pack('10s', b'hello')
            print(f'字符串: {data}')

            # 多个值打包
            data = struct.pack('i f 10s', 100, 3.14, b'test')
            print(f'组合: {data.hex()}')

            # 指定字节序
            # 大端序
            data = struct.pack('>i', 0x12345678)
            print(f'大端: {data.hex()}')
            # 小端序
            data = struct.pack('<i', 0x12345678)
            print(f'小端: {data.hex()}')
            # 网络序(大端)
            data = struct.pack('!i', 0x12345678)
            print(f'网络: {data.hex()}')
            ---
    b.数据解包
        a.功能说明
            struct.unpack将字节序列转换为Python值,需要指定格式字符串。
        b.代码示例
            ---
            import struct

            # 基本解包
            data = b'\x39\x30\x00\x00'
            value = struct.unpack('i', data)[0]
            print(f'整数: {value}')

            # 多值解包
            data = struct.pack('i f 10s', 100, 3.14, b'test')
            num, pi, text = struct.unpack('i f 10s', data)
            print(f'整数: {num}')
            print(f'浮点: {pi}')
            print(f'文本: {text}')

            # 解包文件头
            # PNG文件头
            png_header = b'\x89PNG\r\n\x1a\n'
            magic = struct.unpack('8s', png_header)[0]
            print(f'PNG标识: {magic}')

            # 解包网络数据包
            packet = struct.pack('!HHI', 1, 2, 0x12345678)
            ver, type, seq = struct.unpack('!HHI', packet)
            print(f'版本: {ver}, 类型: {type}, 序号: {hex(seq)}')
            ---

03.二进制文件格式
    a.读取二进制文件
        a.功能说明
            使用struct解析二进制文件格式,如图片、音频、视频等文件头。
        b.代码示例
            ---
            import struct

            # 读取BMP文件头
            def read_bmp_header(filename):
                with open(filename, 'rb') as f:
                    # 文件头(14字节)
                    header = f.read(14)
                    magic, size, _, _, offset = struct.unpack('<2sIHHI', header)
                    print(f'标识: {magic}')
                    print(f'文件大小: {size}字节')
                    print(f'数据偏移: {offset}')

                    # 信息头(40字节)
                    info = f.read(40)
                    info_size, width, height = struct.unpack('<III', info[:12])
                    print(f'宽度: {width}')
                    print(f'高度: {height}')

            # 读取WAV文件头
            def read_wav_header(filename):
                with open(filename, 'rb') as f:
                    # RIFF头
                    riff = f.read(12)
                    magic, size, wave = struct.unpack('<4sI4s', riff)
                    print(f'格式: {magic.decode()}')
                    print(f'WAVE: {wave.decode()}')

                    # fmt块
                    fmt = f.read(24)
                    fmt_id, fmt_size, audio_fmt, channels, rate = \
                        struct.unpack('<4sIHHI', fmt[:16])
                    print(f'声道: {channels}')
                    print(f'采样率: {rate}Hz')

            # 读取自定义格式
            def read_custom_file(filename):
                with open(filename, 'rb') as f:
                    # 文件头:版本(2) + 记录数(4) + 保留(10)
                    header = f.read(16)
                    version, count = struct.unpack('<HI10x', header)
                    print(f'版本: {version}')
                    print(f'记录数: {count}')

                    # 读取记录
                    for i in range(count):
                        record = f.read(20)
                        id, value, name = struct.unpack('<I f 12s', record)
                        print(f'ID: {id}, 值: {value}, 名称: {name.decode().strip()}')
            ---
    b.写入二进制文件
        a.功能说明
            使用struct创建二进制文件,按照特定格式写入数据。
        b.代码示例
            ---
            import struct

            # 创建自定义二进制文件
            def create_binary_file(filename):
                with open(filename, 'wb') as f:
                    # 写入文件头
                    header = struct.pack('<4sHHI', b'MYFT', 1, 0, 100)
                    f.write(header)

                    # 写入数据记录
                    for i in range(10):
                        record = struct.pack('<I f 20s',
                            i,
                            i * 1.5,
                            f'Record{i}'.encode()
                        )
                        f.write(record)

            create_binary_file('data.bin')

            # 创建配置文件
            def save_config(filename, config):
                with open(filename, 'wb') as f:
                    # 魔数和版本
                    f.write(struct.pack('<4sH', b'CONF', 1))
                    # 配置项数量
                    f.write(struct.pack('<I', len(config)))
                    # 写入每个配置项
                    for key, value in config.items():
                        key_bytes = key.encode()[:32]
                        val_bytes = str(value).encode()[:64]
                        f.write(struct.pack('<32s64s', key_bytes, val_bytes))

            config = {'host': 'localhost', 'port': '8080'}
            save_config('config.bin', config)

            # 追加二进制记录
            def append_record(filename, record_id, data):
                with open(filename, 'ab') as f:
                    timestamp = int(time.time())
                    record = struct.pack('<I I 100s',
                        record_id,
                        timestamp,
                        data.encode()
                    )
                    f.write(record)

            import time
            append_record('log.bin', 1, 'Log message')
            ---

04.实用技巧
    a.calcsize计算大小
        a.功能说明
            struct.calcsize计算格式字符串对应的字节数,用于预分配空间。
        b.代码示例
            ---
            import struct

            # 计算结构大小
            fmt = 'i f 10s'
            size = struct.calcsize(fmt)
            print(f'结构大小: {size}字节')

            # 不同字节序的大小
            print(f'本地: {struct.calcsize("=i")}')
            print(f'标准: {struct.calcsize("i")}')
            print(f'网络: {struct.calcsize("!i")}')

            # 计算文件头大小
            header_fmt = '<4sHHI'
            header_size = struct.calcsize(header_fmt)
            print(f'文件头: {header_size}字节')

            # 预分配缓冲区
            record_fmt = '<I f 20s'
            record_size = struct.calcsize(record_fmt)
            buffer = bytearray(record_size * 100)
            print(f'缓冲区: {len(buffer)}字节')
            ---
    b.对齐与填充
        a.功能说明
            使用填充字符控制结构对齐,x表示填充字节,提高内存访问效率。
        b.代码示例
            ---
            import struct

            # 无对齐
            data1 = struct.pack('cI', b'A', 100)
            print(f'无对齐: {len(data1)}字节')

            # 标准对齐
            data2 = struct.pack('=cI', b'A', 100)
            print(f'标准对齐: {len(data2)}字节')

            # 手动填充
            data3 = struct.pack('c3xI', b'A', 100)
            print(f'手动填充: {len(data3)}字节')

            # 结构对齐示例
            # 不对齐:1+4+1=6字节
            fmt1 = 'c I c'
            size1 = struct.calcsize(fmt1)
            # 对齐:1+3(填充)+4+1+3(填充)=12字节
            fmt2 = '=c I c'
            size2 = struct.calcsize(fmt2)
            print(f'不对齐: {size1}')
            print(f'对齐: {size2}')

            # 读取对齐的结构
            data = struct.pack('=c3xI', b'X', 12345)
            char, num = struct.unpack('=c3xI', data)
            print(f'字符: {char}, 数字: {num}')
            ---
    c.性能优化
        a.功能说明
            使用Struct对象缓存格式字符串,避免重复解析,提高性能。
        b.代码示例
            ---
            import struct
            import time

            # 重复使用pack
            start = time.time()
            for i in range(100000):
                data = struct.pack('i f 10s', i, i*1.5, b'test')
            time1 = time.time() - start

            # 使用Struct对象
            start = time.time()
            s = struct.Struct('i f 10s')
            for i in range(100000):
                data = s.pack(i, i*1.5, b'test')
            time2 = time.time() - start

            print(f'pack: {time1:.3f}秒')
            print(f'Struct: {time2:.3f}秒')
            print(f'提升: {time1/time2:.1f}倍')

            # 批量处理
            s = struct.Struct('<I f 20s')
            records = []
            for i in range(1000):
                records.append(s.pack(i, i*1.5, f'rec{i}'.encode()))
            total_size = len(records) * s.size
            print(f'总大小: {total_size}字节')

            # 内存视图优化
            buffer = bytearray(s.size * 1000)
            for i in range(1000):
                offset = i * s.size
                s.pack_into(buffer, offset, i, i*1.5, f'r{i}'.encode())
            print(f'缓冲区: {len(buffer)}字节')
            ---

4. 缓冲IO

4.1 BufferedReader与BufferedWriter

01.BufferedReader
    a.基本用法
        a.功能说明
            BufferedReader为原始流添加缓冲功能,减少系统调用次数,提高读取性能。
        b.代码示例
            ---
            import io

            # 创建BufferedReader
            raw_stream = io.BytesIO(b'x' * 10000)
            buffered = io.BufferedReader(raw_stream, buffer_size=1024)
            data = buffered.read(500)
            print(f'读取: {len(data)}字节')

            # 从文件创建
            with open('data.bin', 'rb') as raw:
                buffered = io.BufferedReader(raw, buffer_size=8192)
                chunk = buffered.read(1000)
                print(f'缓冲读取: {len(chunk)}字节')

            # 默认缓冲大小
            raw = io.BytesIO(b'test data')
            buffered = io.BufferedReader(raw)
            print(f'默认缓冲: {buffered._DEFAULT_BUFFER_SIZE}')
            ---
    b.读取方法
        a.功能说明
            BufferedReader提供read、read1、peek等方法,支持不同的读取策略。
        b.代码示例
            ---
            import io

            # read方法
            raw = io.BytesIO(b'Hello World')
            buffered = io.BufferedReader(raw, buffer_size=100)
            data = buffered.read(5)
            print(f'read: {data}')

            # read1方法(最多一次系统调用)
            raw = io.BytesIO(b'x' * 1000)
            buffered = io.BufferedReader(raw, buffer_size=100)
            data = buffered.read1(200)  # 最多读取缓冲区大小
            print(f'read1: {len(data)}字节')

            # peek方法(不移动指针)
            raw = io.BytesIO(b'preview data')
            buffered = io.BufferedReader(raw)
            preview = buffered.peek(5)
            print(f'peek: {preview}')
            actual = buffered.read(5)
            print(f'read: {actual}')

            # readline方法
            raw = io.BytesIO(b'line1\nline2\nline3\n')
            buffered = io.BufferedReader(raw)
            line = buffered.readline()
            print(f'行: {line}')
            ---

02.BufferedWriter
    a.基本用法
        a.功能说明
            BufferedWriter为原始流添加写缓冲,累积数据到缓冲区满或flush时才写入。
        b.代码示例
            ---
            import io

            # 创建BufferedWriter
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=100)
            buffered.write(b'buffered data')
            print(f'缓冲区: {len(raw.getvalue())}')  # 0,未刷新
            buffered.flush()
            print(f'刷新后: {len(raw.getvalue())}')

            # 自动刷新
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=10)
            buffered.write(b'x' * 5)  # 未满
            print(f'写入5字节: {len(raw.getvalue())}')
            buffered.write(b'x' * 10)  # 超过缓冲区
            print(f'写入15字节: {len(raw.getvalue())}')

            # 文件写入
            with open('output.bin', 'wb') as raw:
                buffered = io.BufferedWriter(raw, buffer_size=8192)
                for i in range(1000):
                    buffered.write(b'data\n')
                buffered.flush()
            ---
    b.写入方法
        a.功能说明
            BufferedWriter提供write、writelines、flush等方法,控制数据写入。
        b.代码示例
            ---
            import io

            # write方法
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=100)
            n = buffered.write(b'test data')
            print(f'写入{n}字节')

            # writelines方法
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=100)
            lines = [b'line1\n', b'line2\n', b'line3\n']
            buffered.writelines(lines)
            buffered.flush()
            print(f'总共: {raw.getvalue()}')

            # flush方法
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=100)
            buffered.write(b'important')
            buffered.flush()  # 立即写入
            print(f'已刷新: {raw.getvalue()}')

            # 上下文管理
            raw = io.BytesIO()
            with io.BufferedWriter(raw, buffer_size=100) as buffered:
                buffered.write(b'auto flush on exit')
            print(f'自动刷新: {raw.getvalue()}')
            ---

03.性能对比
    a.读取性能
        a.功能说明
            缓冲读取显著减少系统调用,提高小块读取的性能。
        b.代码示例
            ---
            import io
            import time

            # 准备测试数据
            data = b'x' * 1000000

            # 无缓冲读取
            start = time.time()
            raw = io.BytesIO(data)
            total = 0
            while True:
                chunk = raw.read(100)
                if not chunk:
                    break
                total += len(chunk)
            no_buffer_time = time.time() - start

            # 缓冲读取
            start = time.time()
            raw = io.BytesIO(data)
            buffered = io.BufferedReader(raw, buffer_size=8192)
            total = 0
            while True:
                chunk = buffered.read(100)
                if not chunk:
                    break
                total += len(chunk)
            buffer_time = time.time() - start

            print(f'无缓冲: {no_buffer_time:.3f}秒')
            print(f'有缓冲: {buffer_time:.3f}秒')
            print(f'提升: {no_buffer_time/buffer_time:.1f}倍')
            ---
    b.写入性能
        a.功能说明
            缓冲写入合并小块数据,减少系统调用,提高写入效率。
        b.代码示例
            ---
            import io
            import time

            # 无缓冲写入
            start = time.time()
            raw = io.BytesIO()
            for i in range(10000):
                raw.write(b'data\n')
            no_buffer_time = time.time() - start

            # 缓冲写入
            start = time.time()
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=8192)
            for i in range(10000):
                buffered.write(b'data\n')
            buffered.flush()
            buffer_time = time.time() - start

            print(f'无缓冲: {no_buffer_time:.3f}秒')
            print(f'有缓冲: {buffer_time:.3f}秒')
            print(f'提升: {no_buffer_time/buffer_time:.1f}倍')

            # 不同缓冲大小对比
            sizes = [512, 1024, 4096, 8192, 16384]
            for size in sizes:
                start = time.time()
                raw = io.BytesIO()
                buffered = io.BufferedWriter(raw, buffer_size=size)
                for i in range(10000):
                    buffered.write(b'data\n')
                buffered.flush()
                elapsed = time.time() - start
                print(f'缓冲{size}: {elapsed:.3f}秒')
            ---

04.实用技巧
    a.缓冲区管理
        a.功能说明
            合理设置缓冲区大小,根据数据特点选择最优配置。
        b.代码示例
            ---
            import io

            # 查看缓冲区状态
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=100)
            buffered.write(b'x' * 50)
            # 无法直接查看缓冲区内容,但可以��过raw判断
            print(f'raw大小: {len(raw.getvalue())}')

            # 强制刷新
            buffered.flush()
            print(f'刷新后: {len(raw.getvalue())}')

            # 自适应缓冲
            def create_buffered_writer(raw, data_size):
                if data_size < 1024:
                    buffer_size = 512
                elif data_size < 1024 * 1024:
                    buffer_size = 8192
                else:
                    buffer_size = 65536
                return io.BufferedWriter(raw, buffer_size=buffer_size)

            # 使用示例
            raw = io.BytesIO()
            writer = create_buffered_writer(raw, 100000)
            writer.write(b'data')
            ---
    b.错误处理
        a.功能说明
            处理缓冲IO中的异常,确保数据完整性。
        b.代码示例
            ---
            import io

            # 写入错误处理
            def safe_buffered_write(filename, data):
                try:
                    with open(filename, 'wb') as raw:
                        buffered = io.BufferedWriter(raw, buffer_size=8192)
                        buffered.write(data)
                        buffered.flush()
                        return True
                except IOError as e:
                    print(f'写入失败: {e}')
                    return False

            # 读取错误处理
            def safe_buffered_read(filename):
                try:
                    with open(filename, 'rb') as raw:
                        buffered = io.BufferedReader(raw, buffer_size=8192)
                        return buffered.read()
                except IOError as e:
                    print(f'读取失败: {e}')
                    return None

            # 确保刷新
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=100)
            try:
                buffered.write(b'critical data')
                # 可能抛出异常的操作
                result = process_data()
            finally:
                buffered.flush()  # 确保数据写入

            def process_data():
                pass
            ---
    c.组合使用
        a.功能说明
            BufferedReader和BufferedWriter可以组合使用,实现高效的数据处理。
        b.代码示例
            ---
            import io

            # 数据转换
            def transform_data(input_file, output_file):
                with open(input_file, 'rb') as in_raw:
                    reader = io.BufferedReader(in_raw, buffer_size=8192)
                    with open(output_file, 'wb') as out_raw:
                        writer = io.BufferedWriter(out_raw, buffer_size=8192)
                        while True:
                            chunk = reader.read(1024)
                            if not chunk:
                                break
                            # 处理数据
                            processed = chunk.upper()
                            writer.write(processed)
                        writer.flush()

            # 管道处理
            def pipeline_process(data):
                # 阶段1:缓冲读取
                input_stream = io.BytesIO(data)
                reader = io.BufferedReader(input_stream, buffer_size=1024)

                # 阶段2:处理
                temp_stream = io.BytesIO()
                writer = io.BufferedWriter(temp_stream, buffer_size=1024)
                while True:
                    chunk = reader.read(100)
                    if not chunk:
                        break
                    writer.write(chunk.upper())
                writer.flush()

                # 阶段3:输出
                return temp_stream.getvalue()

            result = pipeline_process(b'hello world')
            print(f'结果: {result}')
            ---

4.2 缓冲区大小设置

01.默认缓冲大小
    a.系统默认值
        a.功能说明
            Python根据平台和文件类型自动选择缓冲大小,通常为4KB-8KB,平衡性能和内存。
        b.代码示例
            ---
            import io

            # 查看默认缓冲大小
            print(f'默认: {io.DEFAULT_BUFFER_SIZE}字节')

            # 文件默认缓冲
            with open('test.txt', 'w') as f:
                if hasattr(f, 'buffer'):
                    print(f'文件缓冲: {f.buffer._DEFAULT_BUFFER_SIZE}')

            # BufferedReader默认
            raw = io.BytesIO(b'data')
            buffered = io.BufferedReader(raw)
            print(f'Reader默认: {buffered._DEFAULT_BUFFER_SIZE}')

            # 不同模式的缓冲
            # 文本模式
            with open('test.txt', 'w', encoding='utf-8') as f:
                print(f'文本模式缓冲: 行缓冲')
            # 二进制模式
            with open('test.bin', 'wb') as f:
                print(f'二进制模式缓冲: 全缓冲')
            ---
    b.平台差异
        a.功能说明
            不同操作系统的默认缓冲大小可能不同,需要考虑跨平台兼容性。
        b.代码示例
            ---
            import io
            import sys
            import os

            # 查看平台信息
            print(f'平台: {sys.platform}')
            print(f'默认缓冲: {io.DEFAULT_BUFFER_SIZE}')

            # 文件系统块大小
            if hasattr(os, 'statvfs'):
                stat = os.statvfs('.')
                print(f'块大小: {stat.f_bsize}')

            # 根据平台设置
            if sys.platform == 'win32':
                buffer_size = 8192
            else:
                buffer_size = 4096
            print(f'推荐缓冲: {buffer_size}')

            # 自适应缓冲
            def get_optimal_buffer_size():
                if sys.platform == 'darwin':
                    return 16384
                elif sys.platform == 'linux':
                    return 8192
                else:
                    return io.DEFAULT_BUFFER_SIZE

            optimal = get_optimal_buffer_size()
            print(f'最优缓冲: {optimal}')
            ---

02.自定义缓冲大小
    a.小缓冲区
        a.功能说明
            小缓冲区(512B-2KB)适合实时性要求高的场景,减少延迟但增加系统调用。
        b.代码示例
            ---
            import io

            # 512字节缓冲
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=512)
            buffered.write(b'real-time data')
            buffered.flush()

            # 实时日志
            with open('realtime.log', 'wb', buffering=1024) as f:
                for i in range(10):
                    f.write(f'Log {i}\n'.encode())
                    f.flush()  # 立即写入

            # 网络数据包
            def send_packet(data):
                buffer = io.BytesIO()
                writer = io.BufferedWriter(buffer, buffer_size=512)
                writer.write(data)
                writer.flush()
                return buffer.getvalue()

            packet = send_packet(b'small packet')
            print(f'数据包: {len(packet)}字节')
            ---
    b.大缓冲区
        a.功能说明
            大缓冲区(64KB-1MB)适合批量处理,减少系统调用提高吞吐量。
        b.代码示例
            ---
            import io

            # 64KB缓冲
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=65536)
            for i in range(10000):
                buffered.write(b'data\n')
            buffered.flush()

            # 大文件处理
            def process_large_file(input_file, output_file):
                with open(input_file, 'rb', buffering=1024*1024) as f_in:
                    with open(output_file, 'wb', buffering=1024*1024) as f_out:
                        while True:
                            chunk = f_in.read(65536)
                            if not chunk:
                                break
                            f_out.write(chunk)

            # 批量写入
            def batch_write(filename, data_list):
                with open(filename, 'wb', buffering=524288) as f:
                    writer = io.BufferedWriter(f, buffer_size=524288)
                    for data in data_list:
                        writer.write(data)
                    writer.flush()

            data = [b'x' * 1000 for _ in range(1000)]
            batch_write('batch.bin', data)
            ---

03.性能调优
    a.基准测试
        a.功能说明
            通过基准测试找到最优缓冲大小,平衡性能和资源消耗。
        b.代码示例
            ---
            import io
            import time

            # 测试不同缓冲大小
            def benchmark_buffer_size(data, sizes):
                results = {}
                for size in sizes:
                    start = time.time()
                    raw = io.BytesIO()
                    buffered = io.BufferedWriter(raw, buffer_size=size)
                    for chunk in data:
                        buffered.write(chunk)
                    buffered.flush()
                    elapsed = time.time() - start
                    results[size] = elapsed
                return results

            # 准备测试数据
            test_data = [b'x' * 100 for _ in range(10000)]
            sizes = [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]

            results = benchmark_buffer_size(test_data, sizes)
            for size, time_taken in results.items():
                print(f'{size:6d}字节: {time_taken:.4f}秒')

            # 找出最优大小
            optimal_size = min(results, key=results.get)
            print(f'最优缓冲: {optimal_size}字节')
            ---
    b.动态调整
        a.功能说明
            根据数据特征动态调整缓冲大小,适应不同的工作负载。
        b.代码示例
            ---
            import io

            # 根据数据大小调整
            def adaptive_buffer_size(data_size):
                if data_size < 1024:
                    return 512
                elif data_size < 10240:
                    return 2048
                elif data_size < 102400:
                    return 8192
                elif data_size < 1048576:
                    return 32768
                else:
                    return 65536

            # 使用自适应缓冲
            def write_with_adaptive_buffer(filename, data):
                buffer_size = adaptive_buffer_size(len(data))
                with open(filename, 'wb', buffering=buffer_size) as f:
                    f.write(data)
                print(f'使用缓冲: {buffer_size}字节')

            # 测试不同大小数据
            small_data = b'x' * 500
            medium_data = b'x' * 50000
            large_data = b'x' * 5000000

            write_with_adaptive_buffer('small.bin', small_data)
            write_with_adaptive_buffer('medium.bin', medium_data)
            write_with_adaptive_buffer('large.bin', large_data)

            # 根据操作类型调整
            class AdaptiveBuffer:
                def __init__(self):
                    self.read_size = 8192
                    self.write_size = 8192

                def adjust_for_sequential(self):
                    self.read_size = 65536
                    self.write_size = 65536

                def adjust_for_random(self):
                    self.read_size = 4096
                    self.write_size = 4096

                def get_reader(self, raw):
                    return io.BufferedReader(raw, buffer_size=self.read_size)

                def get_writer(self, raw):
                    return io.BufferedWriter(raw, buffer_size=self.write_size)

            buffer = AdaptiveBuffer()
            buffer.adjust_for_sequential()
            ---

04.最佳实践
    a.场景选择
        a.功能说明
            根据应用场景选择合适的缓冲大小,考虑实时性、吞吐量和内存占用。
        b.代码示例
            ---
            import io

            # 实时日志:小缓冲
            def realtime_logger(filename):
                return open(filename, 'wb', buffering=1024)

            # 批量处理:大缓冲
            def batch_processor(filename):
                return open(filename, 'wb', buffering=65536)

            # 网络传输:中等缓冲
            def network_buffer():
                return io.BytesIO()  # 使用默认

            # 配置文件:小文件无需大缓冲
            def config_file(filename):
                return open(filename, 'w', buffering=512, encoding='utf-8')

            # 数据库导出:大缓冲
            def database_export(filename):
                return open(filename, 'wb', buffering=524288)

            # 使用示例
            with realtime_logger('app.log') as log:
                log.write(b'Real-time log entry\n')
                log.flush()

            with batch_processor('export.dat') as batch:
                for i in range(100000):
                    batch.write(f'Record {i}\n'.encode())
            ---
    b.内存考虑
        a.功能说明
            大缓冲区占用更多内存,需要在性能和内存之间权衡。
        b.代码示例
            ---
            import io
            import sys

            # 计算缓冲区内存占用
            def calculate_buffer_memory(buffer_size, num_streams):
                total = buffer_size * num_streams
                return total

            # 示例:100个流
            sizes = [4096, 8192, 16384, 32768, 65536]
            num_streams = 100

            for size in sizes:
                memory = calculate_buffer_memory(size, num_streams)
                print(f'{size}字节缓冲 x {num_streams}流 = {memory/1024:.1f}KB')

            # 内存受限环境
            def create_memory_efficient_buffer(raw, available_memory):
                # 假设可用内存的10%用于缓冲
                max_buffer = int(available_memory * 0.1)
                buffer_size = min(max_buffer, 8192)
                return io.BufferedWriter(raw, buffer_size=buffer_size)

            # 监控内存使用
            def get_buffer_info(buffered):
                if hasattr(buffered, '_DEFAULT_BUFFER_SIZE'):
                    return buffered._DEFAULT_BUFFER_SIZE
                return 'Unknown'

            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=16384)
            print(f'缓冲大小: {get_buffer_info(buffered)}')
            ---
    c.性能监控
        a.功能说明
            监控缓冲IO性能,及时发现和解决性能问题。
        b.代码示例
            ---
            import io
            import time

            # 性能监控装饰器
            class BufferedIOMonitor:
                def __init__(self, buffer_size):
                    self.buffer_size = buffer_size
                    self.write_count = 0
                    self.write_bytes = 0
                    self.flush_count = 0
                    self.start_time = time.time()

                def create_writer(self, raw):
                    writer = io.BufferedWriter(raw, buffer_size=self.buffer_size)
                    # 包装write方法
                    original_write = writer.write
                    def monitored_write(data):
                        self.write_count += 1
                        self.write_bytes += len(data)
                        return original_write(data)
                    writer.write = monitored_write

                    # 包装flush方法
                    original_flush = writer.flush
                    def monitored_flush():
                        self.flush_count += 1
                        return original_flush()
                    writer.flush = monitored_flush

                    return writer

                def report(self):
                    elapsed = time.time() - self.start_time
                    print(f'缓冲大小: {self.buffer_size}')
                    print(f'写入次数: {self.write_count}')
                    print(f'写入字节: {self.write_bytes}')
                    print(f'刷新次数: {self.flush_count}')
                    print(f'总时间: {elapsed:.3f}秒')
                    if elapsed > 0:
                        print(f'吞吐量: {self.write_bytes/elapsed/1024:.1f}KB/s')

            # 使用监控
            monitor = BufferedIOMonitor(buffer_size=8192)
            raw = io.BytesIO()
            writer = monitor.create_writer(raw)

            for i in range(1000):
                writer.write(b'data\n')
            writer.flush()

            monitor.report()
            ---

4.3 flush操作

01.flush时机
    a.自动flush
        a.功能说明
            缓冲区满、文件关闭、程序退出时自动flush,无需手动干预。
        b.代码示例
            ---
            import io

            # 缓冲区满自动flush
            raw = io.BytesIO()
            buffered = io.BufferedWriter(raw, buffer_size=10)
            buffered.write(b'12345')  # 未满
            print(f'写入5字节: {len(raw.getvalue())}')
            buffered.write(b'67890ABCDE')  # 超过10字节
            print(f'自动flush: {len(raw.getvalue())}')

            # 文件关闭自动flush
            with open('auto.txt', 'w') as f:
                f.write('data')
            # 退出with时自动flush和关闭

            # 程序退出自动flush
            f = open('exit.txt', 'w')
            f.write('data')
            # 程序正常退出时自动flush
            ---
    b.手动flush
        a.功能说明
            关键数据、实时日志、进度显示等场景需要手动flush确保数据立即写入。
        b.代码示例
            ---
            import time

            # 实时进度显示
            import sys
            for i in range(101):
                sys.stdout.write(f'\r进度: {i}%')
                sys.stdout.flush()  # 立即显示
                time.sleep(0.02)
            print()

            # 关键数据写入
            with open('critical.txt', 'w') as f:
                f.write('重要数据')
                f.flush()  # 确保写入磁盘
                # 继续其他操作

            # 实时日志
            with open('realtime.log', 'w') as f:
                for i in range(10):
                    f.write(f'[{time.time()}] Event {i}\n')
                    f.flush()  # 立即写入
                    time.sleep(0.5)
            ---

02.flush策略
    a.定时flush
        a.功能说明
            按时间间隔定期flush,平衡实时性和性能。
        b.代码示例
            ---
            import time
            import threading

            # 定时flush类
            class TimedFlushWriter:
                def __init__(self, file, interval=1.0):
                    self.file = file
                    self.interval = interval
                    self.last_flush = time.time()

                def write(self, data):
                    self.file.write(data)
                    now = time.time()
                    if now - self.last_flush >= self.interval:
                        self.file.flush()
                        self.last_flush = now

                def flush(self):
                    self.file.flush()
                    self.last_flush = time.time()

            # 使用定时flush
            with open('timed.log', 'w') as f:
                writer = TimedFlushWriter(f, interval=2.0)
                for i in range(20):
                    writer.write(f'Log {i}\n')
                    time.sleep(0.5)
                writer.flush()  # 最后确保flush

            # 后台定时flush
            class BackgroundFlusher:
                def __init__(self, file, interval=1.0):
                    self.file = file
                    self.interval = interval
                    self.running = True
                    self.thread = threading.Thread(target=self._flush_loop)
                    self.thread.daemon = True
                    self.thread.start()

                def _flush_loop(self):
                    while self.running:
                        time.sleep(self.interval)
                        if self.running:
                            self.file.flush()

                def stop(self):
                    self.running = False
                    self.thread.join()
                    self.file.flush()

            with open('background.log', 'w') as f:
                flusher = BackgroundFlusher(f, interval=1.0)
                for i in range(10):
                    f.write(f'Entry {i}\n')
                    time.sleep(0.3)
                flusher.stop()
            ---
    b.条件flush
        a.功能说明
            根据数据量、重要性等条件决定是否flush。
        b.代码示例
            ---
            # 按数据量flush
            class SizeBasedFlusher:
                def __init__(self, file, flush_size=1024):
                    self.file = file
                    self.flush_size = flush_size
                    self.written = 0

                def write(self, data):
                    self.file.write(data)
                    self.written += len(data)
                    if self.written >= self.flush_size:
                        self.file.flush()
                        self.written = 0

            with open('size_based.txt', 'w') as f:
                writer = SizeBasedFlusher(f, flush_size=100)
                for i in range(50):
                    writer.write(f'Line {i}\n')

            # 按优先级flush
            class PriorityFlusher:
                def __init__(self, file):
                    self.file = file

                def write(self, data, priority='normal'):
                    self.file.write(data)
                    if priority == 'high':
                        self.file.flush()

            with open('priority.log', 'w') as f:
                writer = PriorityFlusher(f)
                writer.write('Normal log\n', priority='normal')
                writer.write('Critical error!\n', priority='high')
                writer.write('Info message\n', priority='normal')
                f.flush()
            ---

03.flush性能
    a.性能影响
        a.功能说明
            频繁flush增加系统调用,降低性能,需要权衡实时性和效率。
        b.代码示例
            ---
            import time

            # 测试flush性能影响
            def test_flush_performance(flush_freq):
                start = time.time()
                with open('perf_test.txt', 'w') as f:
                    for i in range(10000):
                        f.write(f'Line {i}\n')
                        if i % flush_freq == 0:
                            f.flush()
                return time.time() - start

            # 不同flush频率对比
            frequencies = [1, 10, 100, 1000, 10000]
            for freq in frequencies:
                elapsed = test_flush_performance(freq)
                print(f'每{freq}行flush: {elapsed:.3f}秒')

            # 无flush vs 有flush
            start = time.time()
            with open('no_flush.txt', 'w') as f:
                for i in range(10000):
                    f.write(f'Line {i}\n')
            no_flush_time = time.time() - start

            start = time.time()
            with open('with_flush.txt', 'w') as f:
                for i in range(10000):
                    f.write(f'Line {i}\n')
                    f.flush()
            with_flush_time = time.time() - start

            print(f'无flush: {no_flush_time:.3f}秒')
            print(f'每行flush: {with_flush_time:.3f}秒')
            print(f'性能差异: {with_flush_time/no_flush_time:.1f}倍')
            ---
    b.优化策略
        a.功能说明
            批量flush、异步flush等策略优化性能。
        b.代码示例
            ---
            # 批量flush
            class BatchFlusher:
                def __init__(self, file, batch_size=100):
                    self.file = file
                    self.batch_size = batch_size
                    self.count = 0

                def write(self, data):
                    self.file.write(data)
                    self.count += 1
                    if self.count >= self.batch_size:
                        self.file.flush()
                        self.count = 0

                def close(self):
                    if self.count > 0:
                        self.file.flush()

            with open('batch.txt', 'w') as f:
                writer = BatchFlusher(f, batch_size=50)
                for i in range(200):
                    writer.write(f'Line {i}\n')
                writer.close()

            # 智能flush
            class SmartFlusher:
                def __init__(self, file):
                    self.file = file
                    self.buffer_size = 0
                    self.last_flush = time.time()

                def write(self, data):
                    self.file.write(data)
                    self.buffer_size += len(data)
                    now = time.time()
                    # 条件:缓冲超过1KB或距上次flush超过1秒
                    if self.buffer_size >= 1024 or now - self.last_flush >= 1.0:
                        self.file.flush()
                        self.buffer_size = 0
                        self.last_flush = now

            import time
            with open('smart.txt', 'w') as f:
                writer = SmartFlusher(f)
                for i in range(100):
                    writer.write(f'Data {i}\n')
                    time.sleep(0.1)
            ---

04.最佳实践
    a.场景选择
        a.功能说明
            根据应用场景选择合适的flush策略。
        b.代码示例
            ---
            # 高性能批处理:最少flush
            def batch_processing(filename, data):
                with open(filename, 'w') as f:
                    for item in data:
                        f.write(f'{item}\n')
                # 只在关闭时flush

            # 实时监控:频繁flush
            def realtime_monitor(filename):
                with open(filename, 'w') as f:
                    while True:
                        data = get_monitor_data()
                        f.write(f'{data}\n')
                        f.flush()  # 立即写入
                        time.sleep(1)

            def get_monitor_data():
                return f'[{time.time()}] Status OK'

            # 日志系统:定时flush
            class Logger:
                def __init__(self, filename):
                    self.file = open(filename, 'w')
                    self.buffer = []
                    self.flush_interval = 5.0
                    self.last_flush = time.time()

                def log(self, message):
                    self.buffer.append(f'[{time.time()}] {message}\n')
                    if time.time() - self.last_flush >= self.flush_interval:
                        self.flush()

                def flush(self):
                    self.file.writelines(self.buffer)
                    self.file.flush()
                    self.buffer = []
                    self.last_flush = time.time()

                def close(self):
                    self.flush()
                    self.file.close()

            logger = Logger('app.log')
            for i in range(20):
                logger.log(f'Event {i}')
                time.sleep(0.5)
            logger.close()
            ---
    b.错误处理
        a.功能说明
            flush可能失败,需要妥善处理错误。
        b.代码示例
            ---
            # 安全flush
            def safe_flush(file):
                try:
                    file.flush()
                    return True
                except IOError as e:
                    print(f'Flush失败: {e}')
                    return False

            # 重试flush
            def retry_flush(file, max_retries=3):
                for i in range(max_retries):
                    try:
                        file.flush()
                        return True
                    except IOError as e:
                        print(f'Flush失败(尝试{i+1}): {e}')
                        time.sleep(0.1)
                return False

            # 确保flush
            def ensure_flush(file):
                try:
                    file.flush()
                except Exception as e:
                    print(f'Flush错误: {e}')
                finally:
                    # 确保文件描述符同步
                    import os
                    if hasattr(file, 'fileno'):
                        try:
                            os.fsync(file.fileno())
                        except:
                            pass

            # 使用示例
            with open('safe.txt', 'w') as f:
                f.write('important data')
                if not safe_flush(f):
                    print('警告:数据可能未写入')
            ---

4.4 无缓冲IO

01.无缓冲模式
    a.启用无缓冲
        a.功能说明
            设置buffering=0启用无缓冲模式,每次写入立即刷新到磁盘,只支持二进制模式。
        b.代码示例
            ---
            # 无缓冲写入
            with open('unbuffered.bin', 'wb', buffering=0) as f:
                f.write(b'data1\n')  # 立即写入
                f.write(b'data2\n')  # 立即写入

            # 文本模式不支持无缓冲
            try:
                f = open('test.txt', 'w', buffering=0)
            except ValueError as e:
                print(f'错误: {e}')  # 文本模式不能无缓冲

            # 标准输出无缓冲
            import sys
            import os
            # 重新打开stdout为无缓冲
            sys.stdout = os.fdopen(sys.stdout.fileno(), 'wb', buffering=0)
            sys.stdout.write(b'Unbuffered output\n')
            ---
    b.应用场景
        a.功能说明
            无缓冲适合实时日志、关键数据、进程间通信等需要立即写入的场景。
        b.代码示例
            ---
            # 实时日志
            import time
            with open('realtime.log', 'wb', buffering=0) as f:
                for i in range(10):
                    timestamp = time.time()
                    f.write(f'[{timestamp}] Event {i}\n'.encode())
                    time.sleep(0.5)

            # 关键数据
            def save_critical_data(filename, data):
                with open(filename, 'wb', buffering=0) as f:
                    f.write(data)
                # 数据立即写入,不会丢失

            save_critical_data('critical.dat', b'important')

            # 进程间通信
            import os
            # 创建命名管道
            fifo_path = '/tmp/myfifo'
            if not os.path.exists(fifo_path):
                os.mkfifo(fifo_path)

            # 写入端(无缓冲)
            with open(fifo_path, 'wb', buffering=0) as f:
                f.write(b'message\n')
            ---

02.性能特点
    a.性能开销
        a.功能说明
            无缓冲每次写入都是系统调用,性能较低,但保证数据实时性。
        b.代码示例
            ---
            import time

            # 性能对比
            data = b'x' * 100

            # 无缓冲
            start = time.time()
            with open('unbuffered.bin', 'wb', buffering=0) as f:
                for _ in range(1000):
                    f.write(data)
            unbuffered_time = time.time() - start

            # 默认缓冲
            start = time.time()
            with open('buffered.bin', 'wb') as f:
                for _ in range(1000):
                    f.write(data)
            buffered_time = time.time() - start

            print(f'无缓冲: {unbuffered_time:.3f}秒')
            print(f'有缓冲: {buffered_time:.3f}秒')
            print(f'性能差异: {unbuffered_time/buffered_time:.1f}倍')

            # 不同数据大小的影响
            sizes = [10, 100, 1000, 10000]
            for size in sizes:
                data = b'x' * size
                start = time.time()
                with open('test.bin', 'wb', buffering=0) as f:
                    for _ in range(100):
                        f.write(data)
                elapsed = time.time() - start
                print(f'{size}字节: {elapsed:.3f}秒')
            ---
    b.实时性保证
        a.功能说明
            无缓冲保证数据立即写入,适合需要强实时性的场景。
        b.代码示例
            ---
            import time

            # 实时监控
            def realtime_monitor(filename):
                with open(filename, 'wb', buffering=0) as f:
                    while True:
                        timestamp = time.time()
                        status = get_system_status()
                        f.write(f'[{timestamp}] {status}\n'.encode())
                        time.sleep(1)

            def get_system_status():
                return 'OK'

            # 崩溃安全日志
            class CrashSafeLogger:
                def __init__(self, filename):
                    self.file = open(filename, 'wb', buffering=0)

                def log(self, message):
                    timestamp = time.time()
                    self.file.write(f'[{timestamp}] {message}\n'.encode())
                    # 即使程序崩溃,日志也已写入

                def close(self):
                    self.file.close()

            logger = CrashSafeLogger('crash_safe.log')
            logger.log('Application started')
            logger.log('Processing data')
            logger.close()

            # 数据库WAL
            def write_wal_entry(wal_file, entry):
                with open(wal_file, 'ab', buffering=0) as f:
                    f.write(entry)
                # WAL条目立即持久化

            write_wal_entry('db.wal', b'transaction_data')
            ---

03.替代方案
    a.行缓冲
        a.功能说明
            文本模式使用行缓冲(buffering=1),遇到换行符自动刷新。
        b.代码示例
            ---
            # 行缓冲
            with open('line_buffered.txt', 'w', buffering=1) as f:
                f.write('Line 1\n')  # 自动刷新
                f.write('Line 2')    # 不刷新
                f.write('\n')        # 刷新

            # 标准输出行缓冲
            import sys
            sys.stdout = open(sys.stdout.fileno(), 'w', buffering=1)
            print('This line is flushed immediately')

            # 实时日志(行缓冲)
            import time
            with open('log.txt', 'w', buffering=1) as f:
                for i in range(10):
                    f.write(f'[{time.time()}] Event {i}\n')
                    time.sleep(0.5)
            ---
    b.小缓冲+频繁flush
        a.功能说明
            使用小缓冲区配合频繁flush,平衡性能和实时性。
        b.代码示例
            ---
            # 小缓冲区
            with open('small_buffer.bin', 'wb', buffering=512) as f:
                for i in range(100):
                    f.write(b'data\n')
                    if i % 10 == 0:
                        f.flush()

            # 定时flush
            import time
            class TimedFlushFile:
                def __init__(self, filename, flush_interval=1.0):
                    self.file = open(filename, 'wb', buffering=4096)
                    self.flush_interval = flush_interval
                    self.last_flush = time.time()

                def write(self, data):
                    self.file.write(data)
                    now = time.time()
                    if now - self.last_flush >= self.flush_interval:
                        self.file.flush()
                        self.last_flush = now

                def close(self):
                    self.file.flush()
                    self.file.close()

            f = TimedFlushFile('timed.bin', flush_interval=0.5)
            for i in range(20):
                f.write(b'data\n')
                time.sleep(0.2)
            f.close()

            # 智能缓冲
            class SmartBuffer:
                def __init__(self, filename):
                    self.file = open(filename, 'wb', buffering=8192)
                    self.pending = 0

                def write(self, data, critical=False):
                    self.file.write(data)
                    self.pending += len(data)
                    if critical or self.pending >= 4096:
                        self.file.flush()
                        self.pending = 0

                def close(self):
                    self.file.flush()
                    self.file.close()

            buf = SmartBuffer('smart.bin')
            buf.write(b'normal data\n', critical=False)
            buf.write(b'critical data\n', critical=True)
            buf.close()
            ---

04.最佳实践
    a.使用建议
        a.功能说明
            根据需求选择合适的缓冲策略,避免过度使用无缓冲。
        b.代码示例
            ---
            # 场景1:高性能批处理 - 使用大缓冲
            def batch_process(input_file, output_file):
                with open(input_file, 'rb', buffering=65536) as f_in:
                    with open(output_file, 'wb', buffering=65536) as f_out:
                        while True:
                            chunk = f_in.read(8192)
                            if not chunk:
                                break
                            f_out.write(process(chunk))

            def process(data):
                return data.upper()

            # 场景2:实时日志 - 使用行缓冲
            def realtime_log(filename):
                with open(filename, 'w', buffering=1) as f:
                    import time
                    for i in range(10):
                        f.write(f'[{time.time()}] Log {i}\n')
                        time.sleep(0.5)

            # 场景3:关键数据 - 使用无缓冲
            def save_transaction(filename, data):
                with open(filename, 'ab', buffering=0) as f:
                    f.write(data)

            # 场景4:配置文件 - 使用默认缓冲
            def save_config(filename, config):
                with open(filename, 'w') as f:
                    import json
                    json.dump(config, f)
            ---
    b.性能优化
        a.功能说明
            在保证需求的前提下,尽量使用缓冲提高性能。
        b.代码示例
            ---
            # 批量写入优化
            def optimized_write(filename, data_list):
                # 使用大缓冲区
                with open(filename, 'wb', buffering=65536) as f:
                    # 批量写入
                    for chunk in data_list:
                        f.write(chunk)
                # 关闭时自动flush

            # 混合策略
            class HybridWriter:
                def __init__(self, filename):
                    self.file = open(filename, 'wb', buffering=8192)
                    self.critical_mode = False

                def set_critical(self, critical):
                    if critical and not self.critical_mode:
                        self.file.flush()
                        # 切换到小缓冲
                        self.file = open(self.file.name, 'ab', buffering=512)
                    self.critical_mode = critical

                def write(self, data):
                    self.file.write(data)
                    if self.critical_mode:
                        self.file.flush()

                def close(self):
                    self.file.flush()
                    self.file.close()

            writer = HybridWriter('hybrid.bin')
            writer.write(b'normal data\n')
            writer.set_critical(True)
            writer.write(b'critical data\n')
            writer.set_critical(False)
            writer.write(b'normal data again\n')
            writer.close()

            # 自适应缓冲
            def adaptive_write(filename, data, realtime=False):
                if realtime:
                    # 实时模式:行缓冲
                    with open(filename, 'w', buffering=1) as f:
                        f.write(data)
                else:
                    # 批处理模式:大缓冲
                    with open(filename, 'w', buffering=65536) as f:
                        f.write(data)

            adaptive_write('batch.txt', 'large data', realtime=False)
            adaptive_write('realtime.txt', 'urgent data', realtime=True)
            ---
    c.错误处理
        a.功能说明
            无缓冲IO更容易遇到IO错误,需要完善的错误处理。
        b.代码示例
            ---
            # 安全的无缓冲写入
            def safe_unbuffered_write(filename, data):
                try:
                    with open(filename, 'wb', buffering=0) as f:
                        f.write(data)
                    return True
                except IOError as e:
                    print(f'写入失败: {e}')
                    return False

            # 重试机制
            def retry_write(filename, data, max_retries=3):
                for i in range(max_retries):
                    try:
                        with open(filename, 'wb', buffering=0) as f:
                            f.write(data)
                        return True
                    except IOError as e:
                        print(f'尝试{i+1}失败: {e}')
                        import time
                        time.sleep(0.1)
                return False

            # 降级策略
            def write_with_fallback(filename, data):
                # 先尝试无缓冲
                try:
                    with open(filename, 'wb', buffering=0) as f:
                        f.write(data)
                    return 'unbuffered'
                except IOError:
                    # 降级到小缓冲+flush
                    try:
                        with open(filename, 'wb', buffering=512) as f:
                            f.write(data)
                            f.flush()
                        return 'buffered'
                    except IOError as e:
                        print(f'写入完全失败: {e}')
                        return 'failed'

            result = write_with_fallback('test.bin', b'data')
            print(f'写入模式: {result}')
            ---

5. 对象模型

5.1 PyObject基础

01.对象结构
    a.PyObject头部
        a.ob_refcnt
            引用计数器,记录对象引用数。
        b.ob_type
            ---
            import sys

            x = 42
            print(f"类型: {type(x)}")
            print(f"引用计数: {sys.getrefcount(x)}")
            print(f"对象ID: {id(x)}")
            ---
    b.PyVarObject
        a.可变长度
            列表、元组等可变长度对象包含ob_size字段。
        b.大小查看
            ---
            import sys

            lst = [1, 2, 3]
            tup = (1, 2, 3)

            print(f"列表大小: {sys.getsizeof(lst)} bytes")
            print(f"元组大小: {sys.getsizeof(tup)} bytes")
            print(f"列表长度: {len(lst)}")
            ---

02.类型对象
    a.PyTypeObject
        a.类型信息
            每个对象的ob_type指向类型对象,包含类型名称、大小、方法等。
        b.类型查看
            ---
            class MyClass:
                pass

            obj = MyClass()
            print(f"对象类型: {type(obj)}")
            print(f"类型的类型: {type(type(obj))}")
            print(f"类型名称: {type(obj).__name__}")
            print(f"类型基类: {type(obj).__bases__}")
            ---
    b.元类
        a.type元类
            type是所有类的元类,控制类的创建。
        b.元类示例
            ---
            class Meta(type):
                def __new__(cls, name, bases, attrs):
                    print(f"创建类: {name}")
                    return super().__new__(cls, name, bases, attrs)

            class MyClass(metaclass=Meta):
                pass

            obj = MyClass()
            print(f"类型: {type(obj)}")
            print(f"元类: {type(type(obj))}")
            ---

03.对象属性
    a.__dict__
        a.属性字典
            实例属性存储在__dict__字典中。
        b.属性访问
            ---
            class Person:
                def __init__(self, name):
                    self.name = name

            p = Person("Alice")
            print(f"__dict__: {p.__dict__}")

            p.age = 30
            print(f"添加属性: {p.__dict__}")
            ---
    b.__slots__
        a.固定属性
            __slots__限制属性,节省内存。
        b.slots示例
            ---
            import sys

            class WithDict:
                def __init__(self, x):
                    self.x = x

            class WithSlots:
                __slots__ = ('x',)
                def __init__(self, x):
                    self.x = x

            obj1 = WithDict(42)
            obj2 = WithSlots(42)

            print(f"WithDict: {sys.getsizeof(obj1) + sys.getsizeof(obj1.__dict__)}")
            print(f"WithSlots: {sys.getsizeof(obj2)}")
            ---

04.特殊方法
    a.构造析构
        a.__new__和__init__
            __new__创建对象,__init__初始化对象。
        b.示例
            ---
            class Singleton:
                _instance = None

                def __new__(cls):
                    if cls._instance is None:
                        cls._instance = super().__new__(cls)
                    return cls._instance

            s1 = Singleton()
            s2 = Singleton()
            print(f"相同对象: {s1 is s2}")
            ---
    b.运算符重载
        a.魔术方法
            __add__、__eq__等方法重载运算符。
        b.重载示例
            ---
            class Vector:
                def __init__(self, x, y):
                    self.x = x
                    self.y = y

                def __add__(self, other):
                    return Vector(self.x + other.x, self.y + other.y)

                def __repr__(self):
                    return f"Vector({self.x}, {self.y})"

            v1 = Vector(1, 2)
            v2 = Vector(3, 4)
            v3 = v1 + v2
            print(v3)
            ---

05.对象协议
    a.序列协议
        a.__len__和__getitem__
            实现序列协议支持len()和索引访问。
        b.序列示例
            ---
            class MyList:
                def __init__(self, data):
                    self.data = data

                def __len__(self):
                    return len(self.data)

                def __getitem__(self, index):
                    return self.data[index]

            lst = MyList([1, 2, 3])
            print(f"长度: {len(lst)}")
            print(f"索引: {lst[1]}")
            ---
    b.迭代器协议
        a.__iter__和__next__
            实现迭代器协议支持for循环。
        b.迭代器示例
            ---
            class Counter:
                def __init__(self, max):
                    self.max = max
                    self.current = 0

                def __iter__(self):
                    return self

                def __next__(self):
                    if self.current >= self.max:
                        raise StopIteration
                    self.current += 1
                    return self.current

            for i in Counter(5):
                print(i, end=' ')
            ---

06.对象生命周期
    a.创建
        a.内存分配
            对象创建时分配内存,初始化引用计数。
        b.创建过程
            ---
            class Tracked:
                def __new__(cls):
                    print("__new__: 分配内存")
                    return super().__new__(cls)

                def __init__(self):
                    print("__init__: 初始化")

            obj = Tracked()
            ---
    b.销毁
        a.__del__方法
            引用计数为0时调用__del__。
        b.销毁示例
            ---
            class Resource:
                def __init__(self, name):
                    self.name = name
                    print(f"{name} 创建")

                def __del__(self):
                    print(f"{self.name} 销毁")

            r = Resource("资源")
            del r
            ---

5.2 内置类型实现

01.整数对象
    a.PyLongObject
        a.任意精度
            Python整数支持任意精度,自动扩展。
        b.大整数
            ---
            x = 2 ** 1000
            print(f"位数: {x.bit_length()}")
            print(f"字节数: {(x.bit_length() + 7) // 8}")
            ---
    b.小整数池
        a.缓存范围
            -5到256的整数预创建。
        b.缓存测试
            ---
            a = 256
            b = 256
            print(f"256缓存: {a is b}")

            x = 257
            y = 257
            print(f"257不缓存: {x is y}")
            ---

02.字符串对象
    a.PyUnicodeObject
        a.Unicode编码
            Python 3字符串使用Unicode。
        b.编码示例
            ---
            s = "Hello世界"
            print(f"长度: {len(s)}")
            print(f"UTF-8: {s.encode('utf-8')}")
            print(f"字节数: {len(s.encode('utf-8'))}")
            ---
    b.字符串驻留
        a.intern机制
            标识符自动驻留。
        b.驻留示例
            ---
            import sys

            s1 = "python"
            s2 = "python"
            print(f"自动驻留: {s1 is s2}")

            s3 = "hello world"
            s4 = "hello world"
            print(f"非驻留: {s3 is s4}")

            s3 = sys.intern(s3)
            s4 = sys.intern(s4)
            print(f"手动驻留: {s3 is s4}")
            ---

03.列表对象
    a.PyListObject
        a.动态数组
            列表使用动态数组实现,支持快速索引。
        b.扩容策略
            ---
            import sys

            lst = []
            for i in range(10):
                lst.append(i)
                print(f"长度{len(lst)}: {sys.getsizeof(lst)} bytes")
            ---
    b.列表操作
        a.append和extend
            append添加单个元素,extend添加多个。
        b.性能对比
            ---
            import timeit

            def use_append():
                lst = []
                for i in range(1000):
                    lst.append(i)

            def use_extend():
                lst = []
                lst.extend(range(1000))

            time1 = timeit.timeit(use_append, number=1000)
            time2 = timeit.timeit(use_extend, number=1000)

            print(f"append: {time1:.4f}秒")
            print(f"extend: {time2:.4f}秒")
            ---

04.字典对象
    a.PyDictObject
        a.哈希表
            字典使用哈希表实现,O(1)查找。
        b.哈希冲突
            ---
            class BadHash:
                def __init__(self, value):
                    self.value = value

                def __hash__(self):
                    return 1  # 所有对象相同哈希

                def __eq__(self, other):
                    return self.value == other.value

            d = {}
            for i in range(5):
                d[BadHash(i)] = i

            print(f"字典大小: {len(d)}")
            ---
    b.字典优化
        a.紧凑字典
            Python 3.6+使用紧凑字典,节省内存。
        b.内存对比
            ---
            import sys

            d1 = {i: i for i in range(100)}
            d2 = dict.fromkeys(range(100))

            print(f"普通字典: {sys.getsizeof(d1)}")
            print(f"fromkeys: {sys.getsizeof(d2)}")
            ---

05.集合对象
    a.PySetObject
        a.哈希集合
            集合使用哈希表,元素唯一。
        b.集合操作
            ---
            s1 = {1, 2, 3}
            s2 = {2, 3, 4}

            print(f"并集: {s1 | s2}")
            print(f"交集: {s1 & s2}")
            print(f"差集: {s1 - s2}")
            ---
    b.frozenset
        a.不可变集合
            frozenset不可变,可作为字典键。
        b.frozenset示例
            ---
            fs = frozenset([1, 2, 3])
            d = {fs: "value"}

            print(f"字典: {d}")
            print(f"可哈希: {hash(fs)}")
            ---

06.类型转换
    a.隐式转换
        a.数值提升
            整数和浮点数运算自动转换。
        b.转换示例
            ---
            x = 10
            y = 3.14

            result = x + y
            print(f"类型: {type(result)}")
            print(f"结果: {result}")
            ---
    b.显式转换
        a.类型构造
            int()、str()等显式转换。
        b.转换方法
            ---
            s = "123"
            n = int(s)
            print(f"字符串转整数: {n}")

            f = 3.14
            i = int(f)
            print(f"浮点转整数: {i}")

            lst = [1, 2, 3]
            tup = tuple(lst)
            print(f"列表转元组: {tup}")
            ---

5.3 描述符协议

01.描述符基础
    a.__get__/__set__/__delete__
        a.描述符方法
            实现这些方法的对象是描述符。
        b.基础示例
            ---
            class Descriptor:
                def __get__(self, obj, objtype=None):
                    print("__get__调用")
                    return 42

                def __set__(self, obj, value):
                    print(f"__set__调用: {value}")

                def __delete__(self, obj):
                    print("__delete__调用")

            class MyClass:
                attr = Descriptor()

            obj = MyClass()
            print(obj.attr)
            obj.attr = 100
            del obj.attr
            ---
    b.数据描述符vs非数据描述符
        a.优先级
            数据描述符优先于实例__dict__。
        b.优先级示例
            ---
            class DataDescriptor:
                def __get__(self, obj, objtype=None):
                    return "数据描述符"

                def __set__(self, obj, value):
                    pass

            class NonDataDescriptor:
                def __get__(self, obj, objtype=None):
                    return "非数据描述符"

            class MyClass:
                data_desc = DataDescriptor()
                non_data_desc = NonDataDescriptor()

            obj = MyClass()
            obj.__dict__['data_desc'] = "实例属性"
            obj.__dict__['non_data_desc'] = "实例属性"

            print(f"数据描述符: {obj.data_desc}")
            print(f"非数据描述符: {obj.non_data_desc}")
            ---

02.property装饰器
    a.属性访问控制
        a.getter/setter
            property创建托管属性。
        b.property示例
            ---
            class Temperature:
                def __init__(self, celsius):
                    self._celsius = celsius

                @property
                def celsius(self):
                    return self._celsius

                @celsius.setter
                def celsius(self, value):
                    if value < -273.15:
                        raise ValueError("温度过低")
                    self._celsius = value

                @property
                def fahrenheit(self):
                    return self._celsius * 9/5 + 32

            t = Temperature(25)
            print(f"摄氏度: {t.celsius}")
            print(f"华氏度: {t.fahrenheit}")

            t.celsius = 30
            print(f"新温度: {t.celsius}")
            ---
    b.只读属性
        a.无setter
            只定义getter创建只读属性。
        b.只读示例
            ---
            class Circle:
                def __init__(self, radius):
                    self._radius = radius

                @property
                def radius(self):
                    return self._radius

                @property
                def area(self):
                    return 3.14 * self._radius ** 2

            c = Circle(5)
            print(f"半径: {c.radius}")
            print(f"面积: {c.area}")

            try:
                c.area = 100
            except AttributeError as e:
                print(f"错误: {e}")
            ---

03.方法描述符
    a.函数对象
        a.函数是描述符
            函数实现__get__,绑定到实例。
        b.绑定方法
            ---
            class MyClass:
                def method(self):
                    return "实例方法"

            obj = MyClass()

            print(f"未绑定: {MyClass.method}")
            print(f"绑定: {obj.method}")
            print(f"调用: {obj.method()}")
            ---
    b.classmethod和staticmethod
        a.类方法
            classmethod接收类作为第一个参数。
        b.静态方法
            ---
            class MyClass:
                @classmethod
                def class_method(cls):
                    return f"类方法: {cls.__name__}"

                @staticmethod
                def static_method():
                    return "静态方法"

            print(MyClass.class_method())
            print(MyClass.static_method())

            obj = MyClass()
            print(obj.class_method())
            print(obj.static_method())
            ---

04.自定义描述符
    a.类型检查
        a.验证描述符
            描述符实现类型检查。
        b.类型检查示例
            ---
            class TypedProperty:
                def __init__(self, name, expected_type):
                    self.name = name
                    self.expected_type = expected_type

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    return obj.__dict__.get(self.name)

                def __set__(self, obj, value):
                    if not isinstance(value, self.expected_type):
                        raise TypeError(f"期望{self.expected_type},得到{type(value)}")
                    obj.__dict__[self.name] = value

            class Person:
                name = TypedProperty('name', str)
                age = TypedProperty('age', int)

            p = Person()
            p.name = "Alice"
            p.age = 30

            try:
                p.age = "thirty"
            except TypeError as e:
                print(f"错误: {e}")
            ---
    b.惰性属性
        a.延迟计算
            首次访问时计算,之后缓存。
        b.惰性示例
            ---
            class LazyProperty:
                def __init__(self, func):
                    self.func = func

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    value = self.func(obj)
                    setattr(obj, self.func.__name__, value)
                    return value

            class DataSet:
                def __init__(self, filename):
                    self.filename = filename

                @LazyProperty
                def data(self):
                    print("加载数据...")
                    return [1, 2, 3, 4, 5]

            ds = DataSet("data.txt")
            print("数据集创建")
            print(f"数据: {ds.data}")
            print(f"再次访问: {ds.data}")
            ---

05.描述符应用
    a.ORM字段
        a.数据库字段
            ORM使用描述符定义字段。
        b.字段示例
            ---
            class Field:
                def __init__(self, name, field_type):
                    self.name = name
                    self.field_type = field_type

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    return obj.__dict__.get(self.name)

                def __set__(self, obj, value):
                    if not isinstance(value, self.field_type):
                        raise TypeError(f"字段{self.name}类型错误")
                    obj.__dict__[self.name] = value

            class Model:
                id = Field('id', int)
                name = Field('name', str)

            m = Model()
            m.id = 1
            m.name = "记录"
            print(f"ID: {m.id}, Name: {m.name}")
            ---
    b.单位转换
        a.自动转换
            描述符实现单位自动转换。
        b.转换示例
            ---
            class Meter:
                def __init__(self, name):
                    self.name = name

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    return obj.__dict__.get(self.name, 0)

                def __set__(self, obj, value):
                    obj.__dict__[self.name] = value

            class Kilometer:
                def __init__(self, meter_attr):
                    self.meter_attr = meter_attr

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    meters = getattr(obj, self.meter_attr)
                    return meters / 1000

                def __set__(self, obj, value):
                    setattr(obj, self.meter_attr, value * 1000)

            class Distance:
                meters = Meter('_meters')
                kilometers = Kilometer('_meters')

            d = Distance()
            d.meters = 5000
            print(f"米: {d.meters}")
            print(f"千米: {d.kilometers}")

            d.kilometers = 10
            print(f"米: {d.meters}")
            ---

06.描述符协议细节
    a.__set_name__
        a.自动命名
            Python 3.6+支持__set_name__。
        b.命名示例
            ---
            class NamedDescriptor:
                def __set_name__(self, owner, name):
                    self.name = name

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    return obj.__dict__.get(self.name)

                def __set__(self, obj, value):
                    print(f"设置{self.name} = {value}")
                    obj.__dict__[self.name] = value

            class MyClass:
                attr1 = NamedDescriptor()
                attr2 = NamedDescriptor()

            obj = MyClass()
            obj.attr1 = 10
            obj.attr2 = 20
            ---
    b.__delete__
        a.删除属性
            __delete__处理属性删除。
        b.删除示例
            ---
            class ManagedAttribute:
                def __init__(self, name):
                    self.name = name

                def __get__(self, obj, objtype=None):
                    if obj is None:
                        return self
                    return obj.__dict__.get(self.name)

                def __set__(self, obj, value):
                    obj.__dict__[self.name] = value

                def __delete__(self, obj):
                    print(f"删除{self.name}")
                    del obj.__dict__[self.name]

            class MyClass:
                attr = ManagedAttribute('attr')

            obj = MyClass()
            obj.attr = 42
            print(f"值: {obj.attr}")
            del obj.attr
            ---

5.4 元类编程

01.type元类
    a.动态创建类
        a.type()函数
            type(name, bases, dict)动态创建类。
        b.动态类示例
            ---
            # 普通方式
            class MyClass:
                x = 10

            # 动态创建
            DynamicClass = type('DynamicClass', (), {'x': 10})

            print(f"类名: {DynamicClass.__name__}")
            print(f"属性: {DynamicClass.x}")

            obj = DynamicClass()
            print(f"实例: {obj.x}")
            ---
    b.类的类
        a.元类概念
            类是type的实例,type是自己的实例。
        b.元类关系
            ---
            class MyClass:
                pass

            obj = MyClass()

            print(f"obj类型: {type(obj)}")
            print(f"MyClass类型: {type(MyClass)}")
            print(f"type类型: {type(type)}")

            print(f"\nobj是MyClass实例: {isinstance(obj, MyClass)}")
            print(f"MyClass是type实例: {isinstance(MyClass, type)}")
            ---

02.自定义元类
    a.元类定义
        a.继承type
            自定义元类继承type。
        b.元类示例
            ---
            class Meta(type):
                def __new__(cls, name, bases, attrs):
                    print(f"创建类: {name}")
                    attrs['created_by'] = 'Meta'
                    return super().__new__(cls, name, bases, attrs)

            class MyClass(metaclass=Meta):
                pass

            print(f"创建者: {MyClass.created_by}")
            ---
    b.__init_subclass__
        a.简化元类
            __init_subclass__简化类定制。
        b.子类钩子
            ---
            class Base:
                def __init_subclass__(cls, **kwargs):
                    super().__init_subclass__(**kwargs)
                    print(f"子类创建: {cls.__name__}")
                    cls.subclass_name = cls.__name__

            class Derived(Base):
                pass

            print(f"子类名: {Derived.subclass_name}")
            ---

03.元类应用
    a.单例模式
        a.元类单例
            元类控制实例创建。
        b.单例示例
            ---
            class Singleton(type):
                _instances = {}

                def __call__(cls, *args, **kwargs):
                    if cls not in cls._instances:
                        cls._instances[cls] = super().__call__(*args, **kwargs)
                    return cls._instances[cls]

            class Database(metaclass=Singleton):
                def __init__(self):
                    print("数据库连接创建")

            db1 = Database()
            db2 = Database()
            print(f"相同实例: {db1 is db2}")
            ---
    b.ORM框架
        a.模型定义
            ORM使用元类注册模型。
        b.ORM示例
            ---
            class ModelMeta(type):
                def __new__(cls, name, bases, attrs):
                    if name != 'Model':
                        print(f"注册模型: {name}")
                        fields = {k: v for k, v in attrs.items()
                                if not k.startswith('_')}
                        attrs['_fields'] = fields
                    return super().__new__(cls, name, bases, attrs)

            class Model(metaclass=ModelMeta):
                pass

            class User(Model):
                name = str
                age = int

            print(f"User字段: {User._fields}")
            ---

04.类装饰器
    a.装饰器vs元类
        a.类装饰器
            装饰器修改已创建的类。
        b.装饰器示例
            ---
            def add_method(cls):
                def new_method(self):
                    return "新方法"
                cls.new_method = new_method
                return cls

            @add_method
            class MyClass:
                pass

            obj = MyClass()
            print(obj.new_method())
            ---
    b.组合使用
        a.元类+装饰器
            元类控制创建,装饰器修改类。
        b.组合示例
            ---
            class Meta(type):
                def __new__(cls, name, bases, attrs):
                    attrs['from_meta'] = True
                    return super().__new__(cls, name, bases, attrs)

            def decorator(cls):
                cls.from_decorator = True
                return cls

            @decorator
            class MyClass(metaclass=Meta):
                pass

            print(f"元类: {MyClass.from_meta}")
            print(f"装饰器: {MyClass.from_decorator}")
            ---

05.抽象基类
    a.ABC模块
        a.抽象方法
            abc.abstractmethod定义抽象方法。
        b.ABC示例
            ---
            from abc import ABC, abstractmethod

            class Shape(ABC):
                @abstractmethod
                def area(self):
                    pass

            class Circle(Shape):
                def __init__(self, radius):
                    self.radius = radius

                def area(self):
                    return 3.14 * self.radius ** 2

            c = Circle(5)
            print(f"面积: {c.area()}")

            try:
                s = Shape()
            except TypeError as e:
                print(f"错误: {e}")
            ---
    b.虚拟子类
        a.register
            register注册虚拟子类。
        b.虚拟子类示例
            ---
            from abc import ABC

            class MyABC(ABC):
                pass

            class MyClass:
                pass

            MyABC.register(MyClass)

            obj = MyClass()
            print(f"是子类: {isinstance(obj, MyABC)}")
            print(f"真实类型: {type(obj)}")
            ---

06.元编程技巧
    a.__prepare__
        a.自定义命名空间
            __prepare__返回类命名空间。
        b.有序字典
            ---
            from collections import OrderedDict

            class OrderedMeta(type):
                @classmethod
                def __prepare__(cls, name, bases):
                    return OrderedDict()

                def __new__(cls, name, bases, attrs):
                    attrs['_order'] = list(attrs.keys())
                    return super().__new__(cls, name, bases, attrs)

            class MyClass(metaclass=OrderedMeta):
                x = 1
                y = 2
                z = 3

            print(f"定义顺序: {MyClass._order}")
            ---
    b.类属性验证
        a.元类验证
            元类验证类定义。
        b.验证示例
            ---
            class ValidateMeta(type):
                def __new__(cls, name, bases, attrs):
                    if 'required_method' not in attrs:
                        raise TypeError(f"{name}必须定义required_method")
                    return super().__new__(cls, name, bases, attrs)

            try:
                class BadClass(metaclass=ValidateMeta):
                    pass
            except TypeError as e:
                print(f"错误: {e}")

            class GoodClass(metaclass=ValidateMeta):
                def required_method(self):
                    pass

            print("GoodClass创建成功")
            ---

5.5 对象内存布局

01.对象头部
    a.PyObject结构
        a.ob_refcnt
            8字节引用计数。
        b.ob_type
            ---
            import sys

            x = 42
            print(f"对象大小: {sys.getsizeof(x)} bytes")
            print(f"类型: {type(x)}")
            ---
    b.对齐要求
        a.内存对齐
            对象按8字节对齐。
        b.对齐示例
            ---
            import sys

            objects = [
                True,
                42,
                3.14,
                "hello",
                [],
                {}
            ]

            for obj in objects:
                print(f"{type(obj).__name__}: {sys.getsizeof(obj)} bytes")
            ---

02.实例布局
    a.__dict__存储
        a.属性字典
            实例属性存储在__dict__。
        b.内存占用
            ---
            import sys

            class MyClass:
                def __init__(self):
                    self.x = 1
                    self.y = 2

            obj = MyClass()
            print(f"对象: {sys.getsizeof(obj)} bytes")
            print(f"__dict__: {sys.getsizeof(obj.__dict__)} bytes")
            print(f"总计: {sys.getsizeof(obj) + sys.getsizeof(obj.__dict__)} bytes")
            ---
    b.__slots__布局
        a.固定槽位
            __slots__使用固定槽位。
        b.内存节省
            ---
            import sys

            class WithDict:
                def __init__(self):
                    self.x = 1
                    self.y = 2

            class WithSlots:
                __slots__ = ('x', 'y')
                def __init__(self):
                    self.x = 1
                    self.y = 2

            obj1 = WithDict()
            obj2 = WithSlots()

            size1 = sys.getsizeof(obj1) + sys.getsizeof(obj1.__dict__)
            size2 = sys.getsizeof(obj2)

            print(f"WithDict: {size1} bytes")
            print(f"WithSlots: {size2} bytes")
            print(f"节省: {size1 - size2} bytes")
            ---

03.容器布局
    a.列表内存
        a.动态数组
            列表预分配额外空间。
        b.容量增长
            ---
            import sys

            lst = []
            for i in range(20):
                lst.append(i)
                size = sys.getsizeof(lst)
                print(f"长度{len(lst)}: {size} bytes")
            ---
    b.字典内存
        a.哈希表
            字典使用哈希表,负载因子2/3。
        b.字典扩容
            ---
            import sys

            d = {}
            for i in range(20):
                d[i] = i
                size = sys.getsizeof(d)
                print(f"键数{len(d)}: {size} bytes")
            ---

04.内存优化
    a.对象池
        a.小对象池
            小对象使用内存池。
        b.池化效果
            ---
            import timeit

            def create_objects():
                for _ in range(1000):
                    obj = object()

            time = timeit.timeit(create_objects, number=1000)
            print(f"创建时间: {time:.4f}秒")
            ---
    b.紧凑存储
        a.压缩技巧
            使用__slots__、tuple等紧凑类型。
        b.对比测试
            ---
            import sys

            # 列表
            lst = [1, 2, 3, 4, 5]
            # 元组
            tup = (1, 2, 3, 4, 5)
            # array
            import array
            arr = array.array('i', [1, 2, 3, 4, 5])

            print(f"列表: {sys.getsizeof(lst)} bytes")
            print(f"元组: {sys.getsizeof(tup)} bytes")
            print(f"array: {sys.getsizeof(arr)} bytes")
            ---

05.内存分析
    a.对象大小
        a.sys.getsizeof
            获取对象占用内存。
        b.递归计算
            ---
            import sys

            def total_size(obj, seen=None):
                size = sys.getsizeof(obj)
                if seen is None:
                    seen = set()

                obj_id = id(obj)
                if obj_id in seen:
                    return 0

                seen.add(obj_id)

                if isinstance(obj, dict):
                    size += sum(total_size(v, seen) for v in obj.values())
                    size += sum(total_size(k, seen) for k in obj.keys())
                elif hasattr(obj, '__dict__'):
                    size += total_size(obj.__dict__, seen)
                elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
                    size += sum(total_size(i, seen) for i in obj)

                return size

            data = {'a': [1, 2, 3], 'b': {'x': 10}}
            print(f"总大小: {total_size(data)} bytes")
            ---
    b.内存剖析
        a.memory_profiler
            分析内存使用。
        b.剖析示例
            ---
            import tracemalloc

            tracemalloc.start()

            # 分配内存
            data = [list(range(100)) for _ in range(100)]

            snapshot = tracemalloc.take_snapshot()
            top_stats = snapshot.statistics('lineno')

            print("Top 3内存使用:")
            for stat in top_stats[:3]:
                print(stat)

            tracemalloc.stop()
            ---

06.内存泄漏检测
    a.引用循环
        a.检测工具
            gc模块检测循环引用。
        b.检测示例
            ---
            import gc

            class Node:
                def __init__(self):
                    self.ref = None

            # 创建循环
            n1 = Node()
            n2 = Node()
            n1.ref = n2
            n2.ref = n1

            del n1, n2

            # 检测
            collected = gc.collect()
            print(f"回收对象: {collected}个")
            ---
    b.弱引用检测
        a.weakref模块
            使用弱引用避免循环。
        b.弱引用示例
            ---
            import weakref
            import gc

            class Node:
                def __init__(self):
                    self.ref = None

            n1 = Node()
            n2 = Node()
            n1.ref = weakref.ref(n2)
            n2.ref = weakref.ref(n1)

            print(f"n1引用: {n1.ref()}")
            print(f"n2引用: {n2.ref()}")

            del n2
            gc.collect()

            print(f"n2删除后: {n1.ref()}")
            ---

6. GIL全局解释器锁

6.1 GIL原理

01.GIL概念
    a.全局锁
        a.互斥锁
            GIL是全局互斥锁,同一时刻只有一个线程执行Python字节码。
        b.GIL示例
            ---
            import threading
            import time

            counter = 0

            def increment():
                global counter
                for _ in range(1000000):
                    counter += 1

            threads = [threading.Thread(target=increment) for _ in range(2)]

            start = time.time()
            for t in threads:
                t.start()
            for t in threads:
                t.join()

            print(f"计数: {counter}")
            print(f"时间: {time.time() - start:.2f}秒")
            ---
    b.设计原因
        a.内存管理
            GIL简化引用计数的线程安全。
        b.C扩展兼容
            ---
            import sys
            import threading

            print(f"GIL检查间隔: {sys.getswitchinterval()}秒")

            def cpu_bound():
                total = 0
                for i in range(10000000):
                    total += i
                return total

            # 单线程
            start = time.time()
            cpu_bound()
            single_time = time.time() - start

            # 多线程
            start = time.time()
            threads = [threading.Thread(target=cpu_bound) for _ in range(2)]
            for t in threads:
                t.start()
            for t in threads:
                t.join()
            multi_time = time.time() - start

            print(f"单线程: {single_time:.2f}秒")
            print(f"多线程: {multi_time:.2f}秒")
            ---

02.GIL获取释放
    a.字节码计数
        a.检查间隔
            每执行一定数量字节码检查是否释放GIL。
        b.间隔设置
            ---
            import sys

            print(f"默认间隔: {sys.getswitchinterval()}")

            sys.setswitchinterval(0.001)
            print(f"新间隔: {sys.getswitchinterval()}")
            ---
    b.IO操作
        a.自动释放
            IO操作自动释放GIL。
        b.IO示例
            ---
            import threading
            import time

            def io_bound():
                time.sleep(1)

            threads = [threading.Thread(target=io_bound) for _ in range(4)]

            start = time.time()
            for t in threads:
                t.start()
            for t in threads:
                t.join()

            print(f"4个线程sleep 1秒: {time.time() - start:.2f}秒")
            ---

03.GIL影响
    a.CPU密集型
        a.性能下降
            多线程CPU密集型任务性能不升反降。
        b.性能测试
            ---
            import threading
            import time

            def cpu_task():
                total = sum(i*i for i in range(1000000))
                return total

            # 单线程
            start = time.time()
            for _ in range(4):
                cpu_task()
            single = time.time() - start

            # 多线程
            start = time.time()
            threads = [threading.Thread(target=cpu_task) for _ in range(4)]
            for t in threads:
                t.start()
            for t in threads:
                t.join()
            multi = time.time() - start

            print(f"单线程: {single:.2f}秒")
            print(f"多线程: {multi:.2f}秒")
            print(f"加速比: {single/multi:.2f}x")
            ---
    b.IO密集型
        a.性能提升
            IO密集型任务多线程有效。
        b.IO测试
            ---
            import threading
            import time
            import requests

            urls = ['http://httpbin.org/delay/1'] * 4

            def fetch(url):
                try:
                    requests.get(url, timeout=5)
                except:
                    pass

            # 单线程
            start = time.time()
            for url in urls:
                fetch(url)
            single = time.time() - start

            # 多线程
            start = time.time()
            threads = [threading.Thread(target=fetch, args=(url,)) for url in urls]
            for t in threads:
                t.start()
            for t in threads:
                t.join()
            multi = time.time() - start

            print(f"单线程: {single:.2f}秒")
            print(f"多线程: {multi:.2f}秒")
            ---

04.GIL替代方案
    a.多进程
        a.multiprocessing
            每个进程独立GIL。
        b.多进程示例
            ---
            from multiprocessing import Process, cpu_count
            import time

            def cpu_task():
                total = sum(i*i for i in range(10000000))

            # 多进程
            start = time.time()
            processes = [Process(target=cpu_task) for _ in range(cpu_count())]
            for p in processes:
                p.start()
            for p in processes:
                p.join()

            print(f"多进程时间: {time.time() - start:.2f}秒")
            print(f"CPU核心数: {cpu_count()}")
            ---
    b.异步IO
        a.asyncio
            单线程异步处理IO。
        b.异步示例
            ---
            import asyncio
            import time

            async def io_task():
                await asyncio.sleep(1)

            async def main():
                tasks = [io_task() for _ in range(4)]
                await asyncio.gather(*tasks)

            start = time.time()
            asyncio.run(main())
            print(f"异步时间: {time.time() - start:.2f}秒")
            ---

05.GIL调试
    a.线程状态
        a.threading模块
            查看线程状态。
        b.状态查看
            ---
            import threading
            import time

            def worker():
                time.sleep(2)

            threads = [threading.Thread(target=worker) for _ in range(3)]
            for t in threads:
                t.start()

            print(f"活动线程: {threading.active_count()}")
            print(f"线程列表: {[t.name for t in threading.enumerate()]}")

            for t in threads:
                t.join()
            ---
    b.死锁检测
        a.Lock超时
            使用超时避免死锁。
        b.超时示例
            ---
            import threading
            import time

            lock1 = threading.Lock()
            lock2 = threading.Lock()

            def task1():
                with lock1:
                    time.sleep(0.1)
                    if lock2.acquire(timeout=1):
                        print("task1获取lock2")
                        lock2.release()
                    else:
                        print("task1超时")

            def task2():
                with lock2:
                    time.sleep(0.1)
                    if lock1.acquire(timeout=1):
                        print("task2获取lock1")
                        lock1.release()
                    else:
                        print("task2超时")

            t1 = threading.Thread(target=task1)
            t2 = threading.Thread(target=task2)
            t1.start()
            t2.start()
            t1.join()
            t2.join()
            ---

06.无GIL Python
    a.Nogil项目
        a.实验性实现
            移除GIL的Python实现。
        b.性能对比
            ---
            # 标准CPython
            import threading
            import time

            def benchmark():
                total = 0
                for i in range(10000000):
                    total += i

            start = time.time()
            threads = [threading.Thread(target=benchmark) for _ in range(4)]
            for t in threads:
                t.start()
            for t in threads:
                t.join()

            print(f"CPython多线程: {time.time() - start:.2f}秒")
            print("Nogil Python理论上可获得4x加速")
            ---
    b.PyPy STM
        a.软件事务内存
            PyPy的STM实现无GIL。
        b.STM说明
            ---
            print("PyPy STM特性:")
            print("- 无GIL")
            print("- 事务内存")
            print("- 多核并行")
            print("- 实验阶段")
            ---

6.3 临时文件处理

01.临时文件
    a.TemporaryFile
        a.功能说明
            创建临时文件,关闭后自动删除。
        b.代码示例
            ---
            import tempfile

            # 创建临时文件
            with tempfile.TemporaryFile(mode='w+t') as f:
                f.write('临时数据\n')
                f.seek(0)
                print(f'内容: {f.read()}')
            # 文件自动删除

            # 二进制模式
            with tempfile.TemporaryFile(mode='w+b') as f:
                f.write(b'\x00\x01\x02')
                f.seek(0)
                data = f.read()
                print(f'字节: {data.hex()}')
            ---
    b.NamedTemporaryFile
        a.功能说明
            创建有名称的临时文件。
        b.代码示例
            ---
            import tempfile
            import os

            # 命名临时文件
            with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
                f.write('临时内容')
                temp_path = f.name

            with open(temp_path) as f:
                print(f'读取: {f.read()}')

            os.unlink(temp_path)
            ---

02.临时目录
    a.TemporaryDirectory
        a.功能说明
            创建临时目录,退出时自动删除。
        b.代码示例
            ---
            import tempfile
            from pathlib import Path

            with tempfile.TemporaryDirectory() as tmpdir:
                print(f'临时目录: {tmpdir}')
                file_path = Path(tmpdir) / 'data.txt'
                file_path.write_text('内容')
            # 目录自动删除
            ---
    b.mkdtemp
        a.功能说明
            创建临时目录,需手动删除。
        b.代码示例
            ---
            import tempfile
            import shutil

            tmpdir = tempfile.mkdtemp()
            try:
                file_path = f'{tmpdir}/data.txt'
                with open(file_path, 'w') as f:
                    f.write('数据')
            finally:
                shutil.rmtree(tmpdir)
            ---

03.临时文件配置
    a.gettempdir
        a.功能说明
            获取系统临时目录路径。
        b.代码示例
            ---
            import tempfile

            tmpdir = tempfile.gettempdir()
            print(f'系统临时目录: {tmpdir}')
            print(f'默认前缀: {tempfile.gettempprefix()}')
            ---
    b.SpooledTemporaryFile
        a.功能说明
            小文件在内存,超过阈值写入磁盘。
        b.代码示例
            ---
            import tempfile

            with tempfile.SpooledTemporaryFile(max_size=1024, mode='w+') as f:
                f.write('小数据')
                print(f'在内存: {not f._rolled}')

                f.write('x' * 2000)
                print(f'在磁盘: {f._rolled}')
            ---

04.安全临时文件
    a.mkstemp
        a.功能说明
            安全创建临时文件,返回文件描述符。
        b.代码示例
            ---
            import tempfile
            import os

            fd, path = tempfile.mkstemp()
            try:
                os.write(fd, b'secure data')
                os.close(fd)

                with open(path, 'rb') as f:
                    print(f'内容: {f.read()}')
            finally:
                os.unlink(path)
            ---
    b.权限控制
        a.功能说明
            临时文件默认只有创建者可访问。
        b.代码示例
            ---
            import tempfile
            import os
            import stat

            with tempfile.NamedTemporaryFile(delete=False) as f:
                path = f.name
                f.write(b'data')

            st = os.stat(path)
            print(f'权限: {stat.filemode(st.st_mode)}')
            os.unlink(path)
            ---

6.4 文件锁与并发

01.文件锁
    a.fcntl锁
        a.功能说明
            Unix系统使用fcntl实现文件锁。
        b.代码示例
            ---
            import fcntl
            import time

            # 独占锁
            with open('data.txt', 'w') as f:
                fcntl.flock(f.fileno(), fcntl.LOCK_EX)
                f.write('独占访问')
                time.sleep(2)
                fcntl.flock(f.fileno(), fcntl.LOCK_UN)

            # 共享锁
            with open('data.txt', 'r') as f:
                fcntl.flock(f.fileno(), fcntl.LOCK_SH)
                content = f.read()
                fcntl.flock(f.fileno(), fcntl.LOCK_UN)

            # 非阻塞锁
            with open('data.txt', 'w') as f:
                try:
                    fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                    f.write('数据')
                except BlockingIOError:
                    print('文件已被锁定')
            ---
    b.跨平台锁
        a.功能说明
            使用portalocker实现跨平台文件锁。
        b.代码示例
            ---
            import portalocker
            import time

            # 独占锁
            with open('data.txt', 'w') as f:
                portalocker.lock(f, portalocker.LOCK_EX)
                f.write('数据')
                time.sleep(1)

            # 共享锁
            with open('data.txt', 'r') as f:
                portalocker.lock(f, portalocker.LOCK_SH)
                content = f.read()

            # 超时锁
            with open('data.txt', 'w') as f:
                try:
                    portalocker.lock(f, portalocker.LOCK_EX, timeout=5)
                    f.write('数据')
                except portalocker.LockException:
                    print('获取锁超时')
            ---

02.并发读写
    a.多进程写入
        a.功能说明
            多进程安全写入文件。
        b.代码示例
            ---
            import multiprocessing
            import fcntl

            def write_log(msg):
                with open('log.txt', 'a') as f:
                    fcntl.flock(f.fileno(), fcntl.LOCK_EX)
                    f.write(f'{msg}\n')
                    fcntl.flock(f.fileno(), fcntl.LOCK_UN)

            if __name__ == '__main__':
                processes = []
                for i in range(5):
                    p = multiprocessing.Process(target=write_log, args=(f'进程{i}',))
                    processes.append(p)
                    p.start()

                for p in processes:
                    p.join()
            ---
    b.线程安全
        a.功能说明
            使用锁保证线程安全。
        b.代码示例
            ---
            import threading

            file_lock = threading.Lock()

            def write_data(data):
                with file_lock:
                    with open('output.txt', 'a') as f:
                        f.write(f'{data}\n')

            threads = []
            for i in range(10):
                t = threading.Thread(target=write_data, args=(f'线程{i}',))
                threads.append(t)
                t.start()

            for t in threads:
                t.join()
            ---

03.内存映射并发
    a.共享内存
        a.功能说明
            多进程通过mmap共享数据。
        b.代码示例
            ---
            import mmap
            import multiprocessing

            def writer(mm):
                mm.seek(0)
                mm.write(b'Hello from writer')

            def reader(mm):
                mm.seek(0)
                print(f'读取: {mm.read(20).decode()}')

            if __name__ == '__main__':
                with open('shared.dat', 'w+b') as f:
                    f.write(b'\x00' * 100)

                with open('shared.dat', 'r+b') as f:
                    mm = mmap.mmap(f.fileno(), 100)

                    p1 = multiprocessing.Process(target=writer, args=(mm,))
                    p1.start()
                    p1.join()

                    p2 = multiprocessing.Process(target=reader, args=(mm,))
                    p2.start()
                    p2.join()

                    mm.close()
            ---
    b.同步原语
        a.功能说明
            使用multiprocessing.Lock同步。
        b.代码示例
            ---
            import multiprocessing
            import mmap

            def update_counter(mm, lock, value):
                with lock:
                    mm.seek(0)
                    current = int.from_bytes(mm.read(4), 'little')
                    mm.seek(0)
                    mm.write((current + value).to_bytes(4, 'little'))

            if __name__ == '__main__':
                lock = multiprocessing.Lock()

                with open('counter.dat', 'w+b') as f:
                    f.write((0).to_bytes(4, 'little'))

                with open('counter.dat', 'r+b') as f:
                    mm = mmap.mmap(f.fileno(), 4)

                    processes = []
                    for i in range(10):
                        p = multiprocessing.Process(target=update_counter, args=(mm, lock, 1))
                        processes.append(p)
                        p.start()

                    for p in processes:
                        p.join()

                    mm.seek(0)
                    result = int.from_bytes(mm.read(4), 'little')
                    print(f'计数器: {result}')
                    mm.close()
            ---

04.异步文件操作
    a.aiofiles
        a.功能说明
            异步文件IO库。
        b.代码示例
            ---
            import asyncio
            import aiofiles

            async def write_async(filename, data):
                async with aiofiles.open(filename, 'w') as f:
                    await f.write(data)

            async def read_async(filename):
                async with aiofiles.open(filename, 'r') as f:
                    return await f.read()

            async def main():
                await write_async('async.txt', 'Hello Async')
                content = await read_async('async.txt')
                print(f'内容: {content}')

            asyncio.run(main())
            ---
    b.并发异步IO
        a.功能说明
            并发执行多个异步文件操作。
        b.代码示例
            ---
            import asyncio
            import aiofiles

            async def process_file(filename):
                async with aiofiles.open(filename, 'r') as f:
                    content = await f.read()
                    return len(content)

            async def main():
                files = ['file1.txt', 'file2.txt', 'file3.txt']
                tasks = [process_file(f) for f in files]
                results = await asyncio.gather(*tasks)
                print(f'文件大小: {results}')

            asyncio.run(main())
            ---

6.5 性能优化

01.缓冲优化
    a.缓冲区大小
        a.功能说明
            调整缓冲区大小提升性能。
        b.代码示例
            ---
            import time

            # 默认缓冲
            start = time.time()
            with open('large.txt', 'w') as f:
                for i in range(100000):
                    f.write(f'行{i}\n')
            print(f'默认: {time.time() - start:.2f}秒')

            # 大缓冲
            start = time.time()
            with open('large.txt', 'w', buffering=1024*1024) as f:
                for i in range(100000):
                    f.write(f'行{i}\n')
            print(f'大缓冲: {time.time() - start:.2f}秒')

            # 无缓冲
            start = time.time()
            with open('large.txt', 'wb', buffering=0) as f:
                for i in range(10000):
                    f.write(f'行{i}\n'.encode())
            print(f'无缓冲: {time.time() - start:.2f}秒')
            ---
    b.批量操作
        a.功能说明
            批量读写减少系统调用。
        b.代码示例
            ---
            import time

            # 逐行写入
            start = time.time()
            with open('output.txt', 'w') as f:
                for i in range(10000):
                    f.write(f'行{i}\n')
            print(f'逐行: {time.time() - start:.2f}秒')

            # 批量写入
            start = time.time()
            lines = [f'行{i}\n' for i in range(10000)]
            with open('output.txt', 'w') as f:
                f.writelines(lines)
            print(f'批量: {time.time() - start:.2f}秒')
            ---

02.内存映射优化
    a.大文件处理
        a.功能说明
            mmap处理大文件更高效。
        b.代码示例
            ---
            import mmap
            import time

            # 普通读取
            start = time.time()
            with open('large.bin', 'rb') as f:
                data = f.read()
                count = data.count(b'\x00')
            print(f'普通: {time.time() - start:.2f}秒, 计数: {count}')

            # mmap读取
            start = time.time()
            with open('large.bin', 'rb') as f:
                mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
                count = mm[:].count(b'\x00')
                mm.close()
            print(f'mmap: {time.time() - start:.2f}秒, 计数: {count}')
            ---
    b.随机访问
        a.功能说明
            mmap优化随机访问性能。
        b.代码示例
            ---
            import mmap
            import random
            import time

            # 创建测试文件
            with open('random.dat', 'wb') as f:
                f.write(b'\x00' * 10000000)

            positions = [random.randint(0, 9999999) for _ in range(1000)]

            # 普通随机访问
            start = time.time()
            with open('random.dat', 'r+b') as f:
                for pos in positions:
                    f.seek(pos)
                    f.write(b'\xff')
            print(f'普通: {time.time() - start:.2f}秒')

            # mmap随机访问
            start = time.time()
            with open('random.dat', 'r+b') as f:
                mm = mmap.mmap(f.fileno(), 0)
                for pos in positions:
                    mm[pos] = 0xff
                mm.close()
            print(f'mmap: {time.time() - start:.2f}秒')
            ---

03.异步IO优化
    a.并发读取
        a.功能说明
            异步并发读取多个文件。
        b.代码示例
            ---
            import asyncio
            import aiofiles
            import time

            async def read_file(filename):
                async with aiofiles.open(filename, 'r') as f:
                    return await f.read()

            async def concurrent_read(files):
                tasks = [read_file(f) for f in files]
                return await asyncio.gather(*tasks)

            # 同步读取
            start = time.time()
            contents = []
            for f in ['file1.txt', 'file2.txt', 'file3.txt']:
                with open(f) as file:
                    contents.append(file.read())
            print(f'同步: {time.time() - start:.2f}秒')

            # 异步读取
            start = time.time()
            contents = asyncio.run(concurrent_read(['file1.txt', 'file2.txt', 'file3.txt']))
            print(f'异步: {time.time() - start:.2f}秒')
            ---
    b.流式处理
        a.功能说明
            异步流式处理大文件。
        b.代码示例
            ---
            import asyncio
            import aiofiles

            async def process_large_file(input_file, output_file):
                async with aiofiles.open(input_file, 'r') as fin:
                    async with aiofiles.open(output_file, 'w') as fout:
                        async for line in fin:
                            processed = line.upper()
                            await fout.write(processed)

            asyncio.run(process_large_file('input.txt', 'output.txt'))
            ---

04.压缩优化
    a.压缩级别
        a.功能说明
            选择合适的压缩级别平衡速度和大小。
        b.代码示例
            ---
            import gzip
            import time

            data = b'x' * 1000000

            # 快速压缩
            start = time.time()
            with gzip.open('fast.gz', 'wb', compresslevel=1) as f:
                f.write(data)
            print(f'快速: {time.time() - start:.2f}秒')

            # 最佳压缩
            start = time.time()
            with gzip.open('best.gz', 'wb', compresslevel=9) as f:
                f.write(data)
            print(f'最佳: {time.time() - start:.2f}秒')

            # 默认压缩
            start = time.time()
            with gzip.open('default.gz', 'wb') as f:
                f.write(data)
            print(f'默认: {time.time() - start:.2f}秒')
            ---
    b.流式压缩
        a.功能说明
            流式压缩节省内存。
        b.代码示例
            ---
            import gzip

            # 流式压缩
            with open('input.txt', 'rb') as fin:
                with gzip.open('output.gz', 'wb') as fout:
                    while chunk := fin.read(8192):
                        fout.write(chunk)

            # 流式解压
            with gzip.open('output.gz', 'rb') as fin:
                with open('output.txt', 'wb') as fout:
                    while chunk := fin.read(8192):
                        fout.write(chunk)
            ---

6.5 性能优化

01.缓冲优化
    a.缓冲区大小
        a.功能说明
            调整缓冲区大小提升性能。
        b.代码示例
            ---
            import time

            # 默认缓冲
            start = time.time()
            with open('large.txt', 'w') as f:
                for i in range(100000):
                    f.write(f'行{i}\n')
            print(f'默认: {time.time() - start:.2f}秒')

            # 大缓冲
            start = time.time()
            with open('large.txt', 'w', buffering=1024*1024) as f:
                for i in range(100000):
                    f.write(f'行{i}\n')
            print(f'大缓冲: {time.time() - start:.2f}秒')

            # 无缓冲
            start = time.time()
            with open('large.txt', 'wb', buffering=0) as f:
                for i in range(10000):
                    f.write(f'行{i}\n'.encode())
            print(f'无缓冲: {time.time() - start:.2f}秒')
            ---
    b.批量操作
        a.功能说明
            批量读写减少系统调用。
        b.代码示例
            ---
            import time

            # 逐行写入
            start = time.time()
            with open('output.txt', 'w') as f:
                for i in range(10000):
                    f.write(f'行{i}\n')
            print(f'逐行: {time.time() - start:.2f}秒')

            # 批量写入
            start = time.time()
            lines = [f'行{i}\n' for i in range(10000)]
            with open('output.txt', 'w') as f:
                f.writelines(lines)
            print(f'批量: {time.time() - start:.2f}秒')
            ---

02.内存映射优化
    a.大文件处理
        a.功能说明
            mmap处理大文件更高效。
        b.代码示例
            ---
            import mmap
            import time

            # 普通读取
            start = time.time()
            with open('large.bin', 'rb') as f:
                data = f.read()
                count = data.count(b'\x00')
            print(f'普通: {time.time() - start:.2f}秒, 计数: {count}')

            # mmap读取
            start = time.time()
            with open('large.bin', 'rb') as f:
                mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
                count = mm[:].count(b'\x00')
                mm.close()
            print(f'mmap: {time.time() - start:.2f}秒, 计数: {count}')
            ---
    b.随机访问
        a.功能说明
            mmap优化随机访问性能。
        b.代码示例
            ---
            import mmap
            import random
            import time

            # 创建测试文件
            with open('random.dat', 'wb') as f:
                f.write(b'\x00' * 10000000)

            positions = [random.randint(0, 9999999) for _ in range(1000)]

            # 普通随机访问
            start = time.time()
            with open('random.dat', 'r+b') as f:
                for pos in positions:
                    f.seek(pos)
                    f.write(b'\xff')
            print(f'普通: {time.time() - start:.2f}秒')

            # mmap随机访问
            start = time.time()
            with open('random.dat', 'r+b') as f:
                mm = mmap.mmap(f.fileno(), 0)
                for pos in positions:
                    mm[pos] = 0xff
                mm.close()
            print(f'mmap: {time.time() - start:.2f}秒')
            ---

03.异步IO优化
    a.并发读取
        a.功能说明
            异步并发读取多个文件。
        b.代码示例
            ---
            import asyncio
            import aiofiles
            import time

            async def read_file(filename):
                async with aiofiles.open(filename, 'r') as f:
                    return await f.read()

            async def concurrent_read(files):
                tasks = [read_file(f) for f in files]
                return await asyncio.gather(*tasks)

            # 同步读取
            start = time.time()
            contents = []
            for f in ['file1.txt', 'file2.txt', 'file3.txt']:
                with open(f) as file:
                    contents.append(file.read())
            print(f'同步: {time.time() - start:.2f}秒')

            # 异步读取
            start = time.time()
            contents = asyncio.run(concurrent_read(['file1.txt', 'file2.txt', 'file3.txt']))
            print(f'异步: {time.time() - start:.2f}秒')
            ---
    b.流式处理
        a.功能说明
            异步流式处理大文件。
        b.代码示例
            ---
            import asyncio
            import aiofiles

            async def process_large_file(input_file, output_file):
                async with aiofiles.open(input_file, 'r') as fin:
                    async with aiofiles.open(output_file, 'w') as fout:
                        async for line in fin:
                            processed = line.upper()
                            await fout.write(processed)

            asyncio.run(process_large_file('input.txt', 'output.txt'))
            ---

04.压缩优化
    a.压缩级别
        a.功能说明
            选择合适的压缩级别平衡速度和大小。
        b.代码示例
            ---
            import gzip
            import time

            data = b'x' * 1000000

            # 快速压缩
            start = time.time()
            with gzip.open('fast.gz', 'wb', compresslevel=1) as f:
                f.write(data)
            print(f'快速: {time.time() - start:.2f}秒')

            # 最佳压缩
            start = time.time()
            with gzip.open('best.gz', 'wb', compresslevel=9) as f:
                f.write(data)
            print(f'最佳: {time.time() - start:.2f}秒')

            # 默认压缩
            start = time.time()
            with gzip.open('default.gz', 'wb') as f:
                f.write(data)
            print(f'默认: {time.time() - start:.2f}秒')
            ---
    b.流式压缩
        a.功能说明
            流式压缩节省内存。
        b.代码示例
            ---
            import gzip

            # 流式压缩
            with open('input.txt', 'rb') as fin:
                with gzip.open('output.gz', 'wb') as fout:
                    while chunk := fin.read(8192):
                        fout.write(chunk)

            # 流式解压
            with gzip.open('output.gz', 'rb') as fin:
                with open('output.txt', 'wb') as fout:
                    while chunk := fin.read(8192):
                        fout.write(chunk)
            ---