01.整数对象
a.PyLongObject
a.任意精度
Python整数支持任意精度,自动扩展。
b.大整数
---
x = 2 ** 1000
print(f"位数: {x.bit_length()}")
print(f"字节数: {(x.bit_length() + 7) // 8}")
---
b.小整数池
a.缓存范围
-5到256的整数预创建。
b.缓存测试
---
a = 256
b = 256
print(f"256缓存: {a is b}")
x = 257
y = 257
print(f"257不缓存: {x is y}")
---
02.字符串对象
a.PyUnicodeObject
a.Unicode编码
Python 3字符串使用Unicode。
b.编码示例
---
s = "Hello世界"
print(f"长度: {len(s)}")
print(f"UTF-8: {s.encode('utf-8')}")
print(f"字节数: {len(s.encode('utf-8'))}")
---
b.字符串驻留
a.intern机制
标识符自动驻留。
b.驻留示例
---
import sys
s1 = "python"
s2 = "python"
print(f"自动驻留: {s1 is s2}")
s3 = "hello world"
s4 = "hello world"
print(f"非驻留: {s3 is s4}")
s3 = sys.intern(s3)
s4 = sys.intern(s4)
print(f"手动驻留: {s3 is s4}")
---
03.列表对象
a.PyListObject
a.动态数组
列表使用动态数组实现,支持快速索引。
b.扩容策略
---
import sys
lst = []
for i in range(10):
lst.append(i)
print(f"长度{len(lst)}: {sys.getsizeof(lst)} bytes")
---
b.列表操作
a.append和extend
append添加单个元素,extend添加多个。
b.性能对比
---
import timeit
def use_append():
lst = []
for i in range(1000):
lst.append(i)
def use_extend():
lst = []
lst.extend(range(1000))
time1 = timeit.timeit(use_append, number=1000)
time2 = timeit.timeit(use_extend, number=1000)
print(f"append: {time1:.4f}秒")
print(f"extend: {time2:.4f}秒")
---
04.字典对象
a.PyDictObject
a.哈希表
字典使用哈希表实现,O(1)查找。
b.哈希冲突
---
class BadHash:
def __init__(self, value):
self.value = value
def __hash__(self):
return 1 # 所有对象相同哈希
def __eq__(self, other):
return self.value == other.value
d = {}
for i in range(5):
d[BadHash(i)] = i
print(f"字典大小: {len(d)}")
---
b.字典优化
a.紧凑字典
Python 3.6+使用紧凑字典,节省内存。
b.内存对比
---
import sys
d1 = {i: i for i in range(100)}
d2 = dict.fromkeys(range(100))
print(f"普通字典: {sys.getsizeof(d1)}")
print(f"fromkeys: {sys.getsizeof(d2)}")
---
05.集合对象
a.PySetObject
a.哈希集合
集合使用哈希表,元素唯一。
b.集合操作
---
s1 = {1, 2, 3}
s2 = {2, 3, 4}
print(f"并集: {s1 | s2}")
print(f"交集: {s1 & s2}")
print(f"差集: {s1 - s2}")
---
b.frozenset
a.不可变集合
frozenset不可变,可作为字典键。
b.frozenset示例
---
fs = frozenset([1, 2, 3])
d = {fs: "value"}
print(f"字典: {d}")
print(f"可哈希: {hash(fs)}")
---
06.类型转换
a.隐式转换
a.数值提升
整数和浮点数运算自动转换。
b.转换示例
---
x = 10
y = 3.14
result = x + y
print(f"类型: {type(result)}")
print(f"结果: {result}")
---
b.显式转换
a.类型构造
int()、str()等显式转换。
b.转换方法
---
s = "123"
n = int(s)
print(f"字符串转整数: {n}")
f = 3.14
i = int(f)
print(f"浮点转整数: {i}")
lst = [1, 2, 3]
tup = tuple(lst)
print(f"列表转元组: {tup}")
---
5.3 描述符协议
01.描述符基础
a.__get__/__set__/__delete__
a.描述符方法
实现这些方法的对象是描述符。
b.基础示例
---
class Descriptor:
def __get__(self, obj, objtype=None):
print("__get__调用")
return 42
def __set__(self, obj, value):
print(f"__set__调用: {value}")
def __delete__(self, obj):
print("__delete__调用")
class MyClass:
attr = Descriptor()
obj = MyClass()
print(obj.attr)
obj.attr = 100
del obj.attr
---
b.数据描述符vs非数据描述符
a.优先级
数据描述符优先于实例__dict__。
b.优先级示例
---
class DataDescriptor:
def __get__(self, obj, objtype=None):
return "数据描述符"
def __set__(self, obj, value):
pass
class NonDataDescriptor:
def __get__(self, obj, objtype=None):
return "非数据描述符"
class MyClass:
data_desc = DataDescriptor()
non_data_desc = NonDataDescriptor()
obj = MyClass()
obj.__dict__['data_desc'] = "实例属性"
obj.__dict__['non_data_desc'] = "实例属性"
print(f"数据描述符: {obj.data_desc}")
print(f"非数据描述符: {obj.non_data_desc}")
---
02.property装饰器
a.属性访问控制
a.getter/setter
property创建托管属性。
b.property示例
---
class Temperature:
def __init__(self, celsius):
self._celsius = celsius
@property
def celsius(self):
return self._celsius
@celsius.setter
def celsius(self, value):
if value < -273.15:
raise ValueError("温度过低")
self._celsius = value
@property
def fahrenheit(self):
return self._celsius * 9/5 + 32
t = Temperature(25)
print(f"摄氏度: {t.celsius}")
print(f"华氏度: {t.fahrenheit}")
t.celsius = 30
print(f"新温度: {t.celsius}")
---
b.只读属性
a.无setter
只定义getter创建只读属性。
b.只读示例
---
class Circle:
def __init__(self, radius):
self._radius = radius
@property
def radius(self):
return self._radius
@property
def area(self):
return 3.14 * self._radius ** 2
c = Circle(5)
print(f"半径: {c.radius}")
print(f"面积: {c.area}")
try:
c.area = 100
except AttributeError as e:
print(f"错误: {e}")
---
03.方法描述符
a.函数对象
a.函数是描述符
函数实现__get__,绑定到实例。
b.绑定方法
---
class MyClass:
def method(self):
return "实例方法"
obj = MyClass()
print(f"未绑定: {MyClass.method}")
print(f"绑定: {obj.method}")
print(f"调用: {obj.method()}")
---
b.classmethod和staticmethod
a.类方法
classmethod接收类作为第一个参数。
b.静态方法
---
class MyClass:
@classmethod
def class_method(cls):
return f"类方法: {cls.__name__}"
@staticmethod
def static_method():
return "静态方法"
print(MyClass.class_method())
print(MyClass.static_method())
obj = MyClass()
print(obj.class_method())
print(obj.static_method())
---
04.自定义描述符
a.类型检查
a.验证描述符
描述符实现类型检查。
b.类型检查示例
---
class TypedProperty:
def __init__(self, name, expected_type):
self.name = name
self.expected_type = expected_type
def __get__(self, obj, objtype=None):
if obj is None:
return self
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
if not isinstance(value, self.expected_type):
raise TypeError(f"期望{self.expected_type},得到{type(value)}")
obj.__dict__[self.name] = value
class Person:
name = TypedProperty('name', str)
age = TypedProperty('age', int)
p = Person()
p.name = "Alice"
p.age = 30
try:
p.age = "thirty"
except TypeError as e:
print(f"错误: {e}")
---
b.惰性属性
a.延迟计算
首次访问时计算,之后缓存。
b.惰性示例
---
class LazyProperty:
def __init__(self, func):
self.func = func
def __get__(self, obj, objtype=None):
if obj is None:
return self
value = self.func(obj)
setattr(obj, self.func.__name__, value)
return value
class DataSet:
def __init__(self, filename):
self.filename = filename
@LazyProperty
def data(self):
print("加载数据...")
return [1, 2, 3, 4, 5]
ds = DataSet("data.txt")
print("数据集创建")
print(f"数据: {ds.data}")
print(f"再次访问: {ds.data}")
---
05.描述符应用
a.ORM字段
a.数据库字段
ORM使用描述符定义字段。
b.字段示例
---
class Field:
def __init__(self, name, field_type):
self.name = name
self.field_type = field_type
def __get__(self, obj, objtype=None):
if obj is None:
return self
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
if not isinstance(value, self.field_type):
raise TypeError(f"字段{self.name}类型错误")
obj.__dict__[self.name] = value
class Model:
id = Field('id', int)
name = Field('name', str)
m = Model()
m.id = 1
m.name = "记录"
print(f"ID: {m.id}, Name: {m.name}")
---
b.单位转换
a.自动转换
描述符实现单位自动转换。
b.转换示例
---
class Meter:
def __init__(self, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None:
return self
return obj.__dict__.get(self.name, 0)
def __set__(self, obj, value):
obj.__dict__[self.name] = value
class Kilometer:
def __init__(self, meter_attr):
self.meter_attr = meter_attr
def __get__(self, obj, objtype=None):
if obj is None:
return self
meters = getattr(obj, self.meter_attr)
return meters / 1000
def __set__(self, obj, value):
setattr(obj, self.meter_attr, value * 1000)
class Distance:
meters = Meter('_meters')
kilometers = Kilometer('_meters')
d = Distance()
d.meters = 5000
print(f"米: {d.meters}")
print(f"千米: {d.kilometers}")
d.kilometers = 10
print(f"米: {d.meters}")
---
06.描述符协议细节
a.__set_name__
a.自动命名
Python 3.6+支持__set_name__。
b.命名示例
---
class NamedDescriptor:
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None:
return self
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
print(f"设置{self.name} = {value}")
obj.__dict__[self.name] = value
class MyClass:
attr1 = NamedDescriptor()
attr2 = NamedDescriptor()
obj = MyClass()
obj.attr1 = 10
obj.attr2 = 20
---
b.__delete__
a.删除属性
__delete__处理属性删除。
b.删除示例
---
class ManagedAttribute:
def __init__(self, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None:
return self
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
obj.__dict__[self.name] = value
def __delete__(self, obj):
print(f"删除{self.name}")
del obj.__dict__[self.name]
class MyClass:
attr = ManagedAttribute('attr')
obj = MyClass()
obj.attr = 42
print(f"值: {obj.attr}")
del obj.attr
---
5.4 元类编程
01.type元类
a.动态创建类
a.type()函数
type(name, bases, dict)动态创建类。
b.动态类示例
---
# 普通方式
class MyClass:
x = 10
# 动态创建
DynamicClass = type('DynamicClass', (), {'x': 10})
print(f"类名: {DynamicClass.__name__}")
print(f"属性: {DynamicClass.x}")
obj = DynamicClass()
print(f"实例: {obj.x}")
---
b.类的类
a.元类概念
类是type的实例,type是自己的实例。
b.元类关系
---
class MyClass:
pass
obj = MyClass()
print(f"obj类型: {type(obj)}")
print(f"MyClass类型: {type(MyClass)}")
print(f"type类型: {type(type)}")
print(f"\nobj是MyClass实例: {isinstance(obj, MyClass)}")
print(f"MyClass是type实例: {isinstance(MyClass, type)}")
---
02.自定义元类
a.元类定义
a.继承type
自定义元类继承type。
b.元类示例
---
class Meta(type):
def __new__(cls, name, bases, attrs):
print(f"创建类: {name}")
attrs['created_by'] = 'Meta'
return super().__new__(cls, name, bases, attrs)
class MyClass(metaclass=Meta):
pass
print(f"创建者: {MyClass.created_by}")
---
b.__init_subclass__
a.简化元类
__init_subclass__简化类定制。
b.子类钩子
---
class Base:
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
print(f"子类创建: {cls.__name__}")
cls.subclass_name = cls.__name__
class Derived(Base):
pass
print(f"子类名: {Derived.subclass_name}")
---
03.元类应用
a.单例模式
a.元类单例
元类控制实例创建。
b.单例示例
---
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=Singleton):
def __init__(self):
print("数据库连接创建")
db1 = Database()
db2 = Database()
print(f"相同实例: {db1 is db2}")
---
b.ORM框架
a.模型定义
ORM使用元类注册模型。
b.ORM示例
---
class ModelMeta(type):
def __new__(cls, name, bases, attrs):
if name != 'Model':
print(f"注册模型: {name}")
fields = {k: v for k, v in attrs.items()
if not k.startswith('_')}
attrs['_fields'] = fields
return super().__new__(cls, name, bases, attrs)
class Model(metaclass=ModelMeta):
pass
class User(Model):
name = str
age = int
print(f"User字段: {User._fields}")
---
04.类装饰器
a.装饰器vs元类
a.类装饰器
装饰器修改已创建的类。
b.装饰器示例
---
def add_method(cls):
def new_method(self):
return "新方法"
cls.new_method = new_method
return cls
@add_method
class MyClass:
pass
obj = MyClass()
print(obj.new_method())
---
b.组合使用
a.元类+装饰器
元类控制创建,装饰器修改类。
b.组合示例
---
class Meta(type):
def __new__(cls, name, bases, attrs):
attrs['from_meta'] = True
return super().__new__(cls, name, bases, attrs)
def decorator(cls):
cls.from_decorator = True
return cls
@decorator
class MyClass(metaclass=Meta):
pass
print(f"元类: {MyClass.from_meta}")
print(f"装饰器: {MyClass.from_decorator}")
---
05.抽象基类
a.ABC模块
a.抽象方法
abc.abstractmethod定义抽象方法。
b.ABC示例
---
from abc import ABC, abstractmethod
class Shape(ABC):
@abstractmethod
def area(self):
pass
class Circle(Shape):
def __init__(self, radius):
self.radius = radius
def area(self):
return 3.14 * self.radius ** 2
c = Circle(5)
print(f"面积: {c.area()}")
try:
s = Shape()
except TypeError as e:
print(f"错误: {e}")
---
b.虚拟子类
a.register
register注册虚拟子类。
b.虚拟子类示例
---
from abc import ABC
class MyABC(ABC):
pass
class MyClass:
pass
MyABC.register(MyClass)
obj = MyClass()
print(f"是子类: {isinstance(obj, MyABC)}")
print(f"真实类型: {type(obj)}")
---
06.元编程技巧
a.__prepare__
a.自定义命名空间
__prepare__返回类命名空间。
b.有序字典
---
from collections import OrderedDict
class OrderedMeta(type):
@classmethod
def __prepare__(cls, name, bases):
return OrderedDict()
def __new__(cls, name, bases, attrs):
attrs['_order'] = list(attrs.keys())
return super().__new__(cls, name, bases, attrs)
class MyClass(metaclass=OrderedMeta):
x = 1
y = 2
z = 3
print(f"定义顺序: {MyClass._order}")
---
b.类属性验证
a.元类验证
元类验证类定义。
b.验证示例
---
class ValidateMeta(type):
def __new__(cls, name, bases, attrs):
if 'required_method' not in attrs:
raise TypeError(f"{name}必须定义required_method")
return super().__new__(cls, name, bases, attrs)
try:
class BadClass(metaclass=ValidateMeta):
pass
except TypeError as e:
print(f"错误: {e}")
class GoodClass(metaclass=ValidateMeta):
def required_method(self):
pass
print("GoodClass创建成功")
---
5.5 对象内存布局
01.对象头部
a.PyObject结构
a.ob_refcnt
8字节引用计数。
b.ob_type
---
import sys
x = 42
print(f"对象大小: {sys.getsizeof(x)} bytes")
print(f"类型: {type(x)}")
---
b.对齐要求
a.内存对齐
对象按8字节对齐。
b.对齐示例
---
import sys
objects = [
True,
42,
3.14,
"hello",
[],
{}
]
for obj in objects:
print(f"{type(obj).__name__}: {sys.getsizeof(obj)} bytes")
---
02.实例布局
a.__dict__存储
a.属性字典
实例属性存储在__dict__。
b.内存占用
---
import sys
class MyClass:
def __init__(self):
self.x = 1
self.y = 2
obj = MyClass()
print(f"对象: {sys.getsizeof(obj)} bytes")
print(f"__dict__: {sys.getsizeof(obj.__dict__)} bytes")
print(f"总计: {sys.getsizeof(obj) + sys.getsizeof(obj.__dict__)} bytes")
---
b.__slots__布局
a.固定槽位
__slots__使用固定槽位。
b.内存节省
---
import sys
class WithDict:
def __init__(self):
self.x = 1
self.y = 2
class WithSlots:
__slots__ = ('x', 'y')
def __init__(self):
self.x = 1
self.y = 2
obj1 = WithDict()
obj2 = WithSlots()
size1 = sys.getsizeof(obj1) + sys.getsizeof(obj1.__dict__)
size2 = sys.getsizeof(obj2)
print(f"WithDict: {size1} bytes")
print(f"WithSlots: {size2} bytes")
print(f"节省: {size1 - size2} bytes")
---
03.容器布局
a.列表内存
a.动态数组
列表预分配额外空间。
b.容量增长
---
import sys
lst = []
for i in range(20):
lst.append(i)
size = sys.getsizeof(lst)
print(f"长度{len(lst)}: {size} bytes")
---
b.字典内存
a.哈希表
字典使用哈希表,负载因子2/3。
b.字典扩容
---
import sys
d = {}
for i in range(20):
d[i] = i
size = sys.getsizeof(d)
print(f"键数{len(d)}: {size} bytes")
---
04.内存优化
a.对象池
a.小对象池
小对象使用内存池。
b.池化效果
---
import timeit
def create_objects():
for _ in range(1000):
obj = object()
time = timeit.timeit(create_objects, number=1000)
print(f"创建时间: {time:.4f}秒")
---
b.紧凑存储
a.压缩技巧
使用__slots__、tuple等紧凑类型。
b.对比测试
---
import sys
# 列表
lst = [1, 2, 3, 4, 5]
# 元组
tup = (1, 2, 3, 4, 5)
# array
import array
arr = array.array('i', [1, 2, 3, 4, 5])
print(f"列表: {sys.getsizeof(lst)} bytes")
print(f"元组: {sys.getsizeof(tup)} bytes")
print(f"array: {sys.getsizeof(arr)} bytes")
---
05.内存分析
a.对象大小
a.sys.getsizeof
获取对象占用内存。
b.递归计算
---
import sys
def total_size(obj, seen=None):
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
if isinstance(obj, dict):
size += sum(total_size(v, seen) for v in obj.values())
size += sum(total_size(k, seen) for k in obj.keys())
elif hasattr(obj, '__dict__'):
size += total_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
size += sum(total_size(i, seen) for i in obj)
return size
data = {'a': [1, 2, 3], 'b': {'x': 10}}
print(f"总大小: {total_size(data)} bytes")
---
b.内存剖析
a.memory_profiler
分析内存使用。
b.剖析示例
---
import tracemalloc
tracemalloc.start()
# 分配内存
data = [list(range(100)) for _ in range(100)]
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("Top 3内存使用:")
for stat in top_stats[:3]:
print(stat)
tracemalloc.stop()
---
06.内存泄漏检测
a.引用循环
a.检测工具
gc模块检测循环引用。
b.检测示例
---
import gc
class Node:
def __init__(self):
self.ref = None
# 创建循环
n1 = Node()
n2 = Node()
n1.ref = n2
n2.ref = n1
del n1, n2
# 检测
collected = gc.collect()
print(f"回收对象: {collected}个")
---
b.弱引用检测
a.weakref模块
使用弱引用避免循环。
b.弱引用示例
---
import weakref
import gc
class Node:
def __init__(self):
self.ref = None
n1 = Node()
n2 = Node()
n1.ref = weakref.ref(n2)
n2.ref = weakref.ref(n1)
print(f"n1引用: {n1.ref()}")
print(f"n2引用: {n2.ref()}")
del n2
gc.collect()
print(f"n2删除后: {n1.ref()}")
---
6. GIL全局解释器锁
6.1 GIL原理
01.GIL概念
a.全局锁
a.互斥锁
GIL是全局互斥锁,同一时刻只有一个线程执行Python字节码。
b.GIL示例
---
import threading
import time
counter = 0
def increment():
global counter
for _ in range(1000000):
counter += 1
threads = [threading.Thread(target=increment) for _ in range(2)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
print(f"计数: {counter}")
print(f"时间: {time.time() - start:.2f}秒")
---
b.设计原因
a.内存管理
GIL简化引用计数的线程安全。
b.C扩展兼容
---
import sys
import threading
print(f"GIL检查间隔: {sys.getswitchinterval()}秒")
def cpu_bound():
total = 0
for i in range(10000000):
total += i
return total
# 单线程
start = time.time()
cpu_bound()
single_time = time.time() - start
# 多线程
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
multi_time = time.time() - start
print(f"单线程: {single_time:.2f}秒")
print(f"多线程: {multi_time:.2f}秒")
---
02.GIL获取释放
a.字节码计数
a.检查间隔
每执行一定数量字节码检查是否释放GIL。
b.间隔设置
---
import sys
print(f"默认间隔: {sys.getswitchinterval()}")
sys.setswitchinterval(0.001)
print(f"新间隔: {sys.getswitchinterval()}")
---
b.IO操作
a.自动释放
IO操作自动释放GIL。
b.IO示例
---
import threading
import time
def io_bound():
time.sleep(1)
threads = [threading.Thread(target=io_bound) for _ in range(4)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
print(f"4个线程sleep 1秒: {time.time() - start:.2f}秒")
---
03.GIL影响
a.CPU密集型
a.性能下降
多线程CPU密集型任务性能不升反降。
b.性能测试
---
import threading
import time
def cpu_task():
total = sum(i*i for i in range(1000000))
return total
# 单线程
start = time.time()
for _ in range(4):
cpu_task()
single = time.time() - start
# 多线程
start = time.time()
threads = [threading.Thread(target=cpu_task) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
multi = time.time() - start
print(f"单线程: {single:.2f}秒")
print(f"多线程: {multi:.2f}秒")
print(f"加速比: {single/multi:.2f}x")
---
b.IO密集型
a.性能提升
IO密集型任务多线程有效。
b.IO测试
---
import threading
import time
import requests
urls = ['http://httpbin.org/delay/1'] * 4
def fetch(url):
try:
requests.get(url, timeout=5)
except:
pass
# 单线程
start = time.time()
for url in urls:
fetch(url)
single = time.time() - start
# 多线程
start = time.time()
threads = [threading.Thread(target=fetch, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
multi = time.time() - start
print(f"单线程: {single:.2f}秒")
print(f"多线程: {multi:.2f}秒")
---
04.GIL替代方案
a.多进程
a.multiprocessing
每个进程独立GIL。
b.多进程示例
---
from multiprocessing import Process, cpu_count
import time
def cpu_task():
total = sum(i*i for i in range(10000000))
# 多进程
start = time.time()
processes = [Process(target=cpu_task) for _ in range(cpu_count())]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"多进程时间: {time.time() - start:.2f}秒")
print(f"CPU核心数: {cpu_count()}")
---
b.异步IO
a.asyncio
单线程异步处理IO。
b.异步示例
---
import asyncio
import time
async def io_task():
await asyncio.sleep(1)
async def main():
tasks = [io_task() for _ in range(4)]
await asyncio.gather(*tasks)
start = time.time()
asyncio.run(main())
print(f"异步时间: {time.time() - start:.2f}秒")
---
05.GIL调试
a.线程状态
a.threading模块
查看线程状态。
b.状态查看
---
import threading
import time
def worker():
time.sleep(2)
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
t.start()
print(f"活动线程: {threading.active_count()}")
print(f"线程列表: {[t.name for t in threading.enumerate()]}")
for t in threads:
t.join()
---
b.死锁检测
a.Lock超时
使用超时避免死锁。
b.超时示例
---
import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()
def task1():
with lock1:
time.sleep(0.1)
if lock2.acquire(timeout=1):
print("task1获取lock2")
lock2.release()
else:
print("task1超时")
def task2():
with lock2:
time.sleep(0.1)
if lock1.acquire(timeout=1):
print("task2获取lock1")
lock1.release()
else:
print("task2超时")
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()
---
06.无GIL Python
a.Nogil项目
a.实验性实现
移除GIL的Python实现。
b.性能对比
---
# 标准CPython
import threading
import time
def benchmark():
total = 0
for i in range(10000000):
total += i
start = time.time()
threads = [threading.Thread(target=benchmark) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"CPython多线程: {time.time() - start:.2f}秒")
print("Nogil Python理论上可获得4x加速")
---
b.PyPy STM
a.软件事务内存
PyPy的STM实现无GIL。
b.STM说明
---
print("PyPy STM特性:")
print("- 无GIL")
print("- 事务内存")
print("- 多核并行")
print("- 实验阶段")
---
6.3 临时文件处理
01.临时文件
a.TemporaryFile
a.功能说明
创建临时文件,关闭后自动删除。
b.代码示例
---
import tempfile
# 创建临时文件
with tempfile.TemporaryFile(mode='w+t') as f:
f.write('临时数据\n')
f.seek(0)
print(f'内容: {f.read()}')
# 文件自动删除
# 二进制模式
with tempfile.TemporaryFile(mode='w+b') as f:
f.write(b'\x00\x01\x02')
f.seek(0)
data = f.read()
print(f'字节: {data.hex()}')
---
b.NamedTemporaryFile
a.功能说明
创建有名称的临时文件。
b.代码示例
---
import tempfile
import os
# 命名临时文件
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write('临时内容')
temp_path = f.name
with open(temp_path) as f:
print(f'读取: {f.read()}')
os.unlink(temp_path)
---
02.临时目录
a.TemporaryDirectory
a.功能说明
创建临时目录,退出时自动删除。
b.代码示例
---
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
print(f'临时目录: {tmpdir}')
file_path = Path(tmpdir) / 'data.txt'
file_path.write_text('内容')
# 目录自动删除
---
b.mkdtemp
a.功能说明
创建临时目录,需手动删除。
b.代码示例
---
import tempfile
import shutil
tmpdir = tempfile.mkdtemp()
try:
file_path = f'{tmpdir}/data.txt'
with open(file_path, 'w') as f:
f.write('数据')
finally:
shutil.rmtree(tmpdir)
---
03.临时文件配置
a.gettempdir
a.功能说明
获取系统临时目录路径。
b.代码示例
---
import tempfile
tmpdir = tempfile.gettempdir()
print(f'系统临时目录: {tmpdir}')
print(f'默认前缀: {tempfile.gettempprefix()}')
---
b.SpooledTemporaryFile
a.功能说明
小文件在内存,超过阈值写入磁盘。
b.代码示例
---
import tempfile
with tempfile.SpooledTemporaryFile(max_size=1024, mode='w+') as f:
f.write('小数据')
print(f'在内存: {not f._rolled}')
f.write('x' * 2000)
print(f'在磁盘: {f._rolled}')
---
04.安全临时文件
a.mkstemp
a.功能说明
安全创建临时文件,返回文件描述符。
b.代码示例
---
import tempfile
import os
fd, path = tempfile.mkstemp()
try:
os.write(fd, b'secure data')
os.close(fd)
with open(path, 'rb') as f:
print(f'内容: {f.read()}')
finally:
os.unlink(path)
---
b.权限控制
a.功能说明
临时文件默认只有创建者可访问。
b.代码示例
---
import tempfile
import os
import stat
with tempfile.NamedTemporaryFile(delete=False) as f:
path = f.name
f.write(b'data')
st = os.stat(path)
print(f'权限: {stat.filemode(st.st_mode)}')
os.unlink(path)
---
6.4 文件锁与并发
01.文件锁
a.fcntl锁
a.功能说明
Unix系统使用fcntl实现文件锁。
b.代码示例
---
import fcntl
import time
# 独占锁
with open('data.txt', 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
f.write('独占访问')
time.sleep(2)
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# 共享锁
with open('data.txt', 'r') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
content = f.read()
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# 非阻塞锁
with open('data.txt', 'w') as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
f.write('数据')
except BlockingIOError:
print('文件已被锁定')
---
b.跨平台锁
a.功能说明
使用portalocker实现跨平台文件锁。
b.代码示例
---
import portalocker
import time
# 独占锁
with open('data.txt', 'w') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write('数据')
time.sleep(1)
# 共享锁
with open('data.txt', 'r') as f:
portalocker.lock(f, portalocker.LOCK_SH)
content = f.read()
# 超时锁
with open('data.txt', 'w') as f:
try:
portalocker.lock(f, portalocker.LOCK_EX, timeout=5)
f.write('数据')
except portalocker.LockException:
print('获取锁超时')
---
02.并发读写
a.多进程写入
a.功能说明
多进程安全写入文件。
b.代码示例
---
import multiprocessing
import fcntl
def write_log(msg):
with open('log.txt', 'a') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
f.write(f'{msg}\n')
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
if __name__ == '__main__':
processes = []
for i in range(5):
p = multiprocessing.Process(target=write_log, args=(f'进程{i}',))
processes.append(p)
p.start()
for p in processes:
p.join()
---
b.线程安全
a.功能说明
使用锁保证线程安全。
b.代码示例
---
import threading
file_lock = threading.Lock()
def write_data(data):
with file_lock:
with open('output.txt', 'a') as f:
f.write(f'{data}\n')
threads = []
for i in range(10):
t = threading.Thread(target=write_data, args=(f'线程{i}',))
threads.append(t)
t.start()
for t in threads:
t.join()
---
03.内存映射并发
a.共享内存
a.功能说明
多进程通过mmap共享数据。
b.代码示例
---
import mmap
import multiprocessing
def writer(mm):
mm.seek(0)
mm.write(b'Hello from writer')
def reader(mm):
mm.seek(0)
print(f'读取: {mm.read(20).decode()}')
if __name__ == '__main__':
with open('shared.dat', 'w+b') as f:
f.write(b'\x00' * 100)
with open('shared.dat', 'r+b') as f:
mm = mmap.mmap(f.fileno(), 100)
p1 = multiprocessing.Process(target=writer, args=(mm,))
p1.start()
p1.join()
p2 = multiprocessing.Process(target=reader, args=(mm,))
p2.start()
p2.join()
mm.close()
---
b.同步原语
a.功能说明
使用multiprocessing.Lock同步。
b.代码示例
---
import multiprocessing
import mmap
def update_counter(mm, lock, value):
with lock:
mm.seek(0)
current = int.from_bytes(mm.read(4), 'little')
mm.seek(0)
mm.write((current + value).to_bytes(4, 'little'))
if __name__ == '__main__':
lock = multiprocessing.Lock()
with open('counter.dat', 'w+b') as f:
f.write((0).to_bytes(4, 'little'))
with open('counter.dat', 'r+b') as f:
mm = mmap.mmap(f.fileno(), 4)
processes = []
for i in range(10):
p = multiprocessing.Process(target=update_counter, args=(mm, lock, 1))
processes.append(p)
p.start()
for p in processes:
p.join()
mm.seek(0)
result = int.from_bytes(mm.read(4), 'little')
print(f'计数器: {result}')
mm.close()
---
04.异步文件操作
a.aiofiles
a.功能说明
异步文件IO库。
b.代码示例
---
import asyncio
import aiofiles
async def write_async(filename, data):
async with aiofiles.open(filename, 'w') as f:
await f.write(data)
async def read_async(filename):
async with aiofiles.open(filename, 'r') as f:
return await f.read()
async def main():
await write_async('async.txt', 'Hello Async')
content = await read_async('async.txt')
print(f'内容: {content}')
asyncio.run(main())
---
b.并发异步IO
a.功能说明
并发执行多个异步文件操作。
b.代码示例
---
import asyncio
import aiofiles
async def process_file(filename):
async with aiofiles.open(filename, 'r') as f:
content = await f.read()
return len(content)
async def main():
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = [process_file(f) for f in files]
results = await asyncio.gather(*tasks)
print(f'文件大小: {results}')
asyncio.run(main())
---
6.5 性能优化
01.缓冲优化
a.缓冲区大小
a.功能说明
调整缓冲区大小提升性能。
b.代码示例
---
import time
# 默认缓冲
start = time.time()
with open('large.txt', 'w') as f:
for i in range(100000):
f.write(f'行{i}\n')
print(f'默认: {time.time() - start:.2f}秒')
# 大缓冲
start = time.time()
with open('large.txt', 'w', buffering=1024*1024) as f:
for i in range(100000):
f.write(f'行{i}\n')
print(f'大缓冲: {time.time() - start:.2f}秒')
# 无缓冲
start = time.time()
with open('large.txt', 'wb', buffering=0) as f:
for i in range(10000):
f.write(f'行{i}\n'.encode())
print(f'无缓冲: {time.time() - start:.2f}秒')
---
b.批量操作
a.功能说明
批量读写减少系统调用。
b.代码示例
---
import time
# 逐行写入
start = time.time()
with open('output.txt', 'w') as f:
for i in range(10000):
f.write(f'行{i}\n')
print(f'逐行: {time.time() - start:.2f}秒')
# 批量写入
start = time.time()
lines = [f'行{i}\n' for i in range(10000)]
with open('output.txt', 'w') as f:
f.writelines(lines)
print(f'批量: {time.time() - start:.2f}秒')
---
02.内存映射优化
a.大文件处理
a.功能说明
mmap处理大文件更高效。
b.代码示例
---
import mmap
import time
# 普通读取
start = time.time()
with open('large.bin', 'rb') as f:
data = f.read()
count = data.count(b'\x00')
print(f'普通: {time.time() - start:.2f}秒, 计数: {count}')
# mmap读取
start = time.time()
with open('large.bin', 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
count = mm[:].count(b'\x00')
mm.close()
print(f'mmap: {time.time() - start:.2f}秒, 计数: {count}')
---
b.随机访问
a.功能说明
mmap优化随机访问性能。
b.代码示例
---
import mmap
import random
import time
# 创建测试文件
with open('random.dat', 'wb') as f:
f.write(b'\x00' * 10000000)
positions = [random.randint(0, 9999999) for _ in range(1000)]
# 普通随机访问
start = time.time()
with open('random.dat', 'r+b') as f:
for pos in positions:
f.seek(pos)
f.write(b'\xff')
print(f'普通: {time.time() - start:.2f}秒')
# mmap随机访问
start = time.time()
with open('random.dat', 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0)
for pos in positions:
mm[pos] = 0xff
mm.close()
print(f'mmap: {time.time() - start:.2f}秒')
---
03.异步IO优化
a.并发读取
a.功能说明
异步并发读取多个文件。
b.代码示例
---
import asyncio
import aiofiles
import time
async def read_file(filename):
async with aiofiles.open(filename, 'r') as f:
return await f.read()
async def concurrent_read(files):
tasks = [read_file(f) for f in files]
return await asyncio.gather(*tasks)
# 同步读取
start = time.time()
contents = []
for f in ['file1.txt', 'file2.txt', 'file3.txt']:
with open(f) as file:
contents.append(file.read())
print(f'同步: {time.time() - start:.2f}秒')
# 异步读取
start = time.time()
contents = asyncio.run(concurrent_read(['file1.txt', 'file2.txt', 'file3.txt']))
print(f'异步: {time.time() - start:.2f}秒')
---
b.流式处理
a.功能说明
异步流式处理大文件。
b.代码示例
---
import asyncio
import aiofiles
async def process_large_file(input_file, output_file):
async with aiofiles.open(input_file, 'r') as fin:
async with aiofiles.open(output_file, 'w') as fout:
async for line in fin:
processed = line.upper()
await fout.write(processed)
asyncio.run(process_large_file('input.txt', 'output.txt'))
---
04.压缩优化
a.压缩级别
a.功能说明
选择合适的压缩级别平衡速度和大小。
b.代码示例
---
import gzip
import time
data = b'x' * 1000000
# 快速压缩
start = time.time()
with gzip.open('fast.gz', 'wb', compresslevel=1) as f:
f.write(data)
print(f'快速: {time.time() - start:.2f}秒')
# 最佳压缩
start = time.time()
with gzip.open('best.gz', 'wb', compresslevel=9) as f:
f.write(data)
print(f'最佳: {time.time() - start:.2f}秒')
# 默认压缩
start = time.time()
with gzip.open('default.gz', 'wb') as f:
f.write(data)
print(f'默认: {time.time() - start:.2f}秒')
---
b.流式压缩
a.功能说明
流式压缩节省内存。
b.代码示例
---
import gzip
# 流式压缩
with open('input.txt', 'rb') as fin:
with gzip.open('output.gz', 'wb') as fout:
while chunk := fin.read(8192):
fout.write(chunk)
# 流式解压
with gzip.open('output.gz', 'rb') as fin:
with open('output.txt', 'wb') as fout:
while chunk := fin.read(8192):
fout.write(chunk)
---
6.5 性能优化
01.缓冲优化
a.缓冲区大小
a.功能说明
调整缓冲区大小提升性能。
b.代码示例
---
import time
# 默认缓冲
start = time.time()
with open('large.txt', 'w') as f:
for i in range(100000):
f.write(f'行{i}\n')
print(f'默认: {time.time() - start:.2f}秒')
# 大缓冲
start = time.time()
with open('large.txt', 'w', buffering=1024*1024) as f:
for i in range(100000):
f.write(f'行{i}\n')
print(f'大缓冲: {time.time() - start:.2f}秒')
# 无缓冲
start = time.time()
with open('large.txt', 'wb', buffering=0) as f:
for i in range(10000):
f.write(f'行{i}\n'.encode())
print(f'无缓冲: {time.time() - start:.2f}秒')
---
b.批量操作
a.功能说明
批量读写减少系统调用。
b.代码示例
---
import time
# 逐行写入
start = time.time()
with open('output.txt', 'w') as f:
for i in range(10000):
f.write(f'行{i}\n')
print(f'逐行: {time.time() - start:.2f}秒')
# 批量写入
start = time.time()
lines = [f'行{i}\n' for i in range(10000)]
with open('output.txt', 'w') as f:
f.writelines(lines)
print(f'批量: {time.time() - start:.2f}秒')
---
02.内存映射优化
a.大文件处理
a.功能说明
mmap处理大文件更高效。
b.代码示例
---
import mmap
import time
# 普通读取
start = time.time()
with open('large.bin', 'rb') as f:
data = f.read()
count = data.count(b'\x00')
print(f'普通: {time.time() - start:.2f}秒, 计数: {count}')
# mmap读取
start = time.time()
with open('large.bin', 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
count = mm[:].count(b'\x00')
mm.close()
print(f'mmap: {time.time() - start:.2f}秒, 计数: {count}')
---
b.随机访问
a.功能说明
mmap优化随机访问性能。
b.代码示例
---
import mmap
import random
import time
# 创建测试文件
with open('random.dat', 'wb') as f:
f.write(b'\x00' * 10000000)
positions = [random.randint(0, 9999999) for _ in range(1000)]
# 普通随机访问
start = time.time()
with open('random.dat', 'r+b') as f:
for pos in positions:
f.seek(pos)
f.write(b'\xff')
print(f'普通: {time.time() - start:.2f}秒')
# mmap随机访问
start = time.time()
with open('random.dat', 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0)
for pos in positions:
mm[pos] = 0xff
mm.close()
print(f'mmap: {time.time() - start:.2f}秒')
---
03.异步IO优化
a.并发读取
a.功能说明
异步并发读取多个文件。
b.代码示例
---
import asyncio
import aiofiles
import time
async def read_file(filename):
async with aiofiles.open(filename, 'r') as f:
return await f.read()
async def concurrent_read(files):
tasks = [read_file(f) for f in files]
return await asyncio.gather(*tasks)
# 同步读取
start = time.time()
contents = []
for f in ['file1.txt', 'file2.txt', 'file3.txt']:
with open(f) as file:
contents.append(file.read())
print(f'同步: {time.time() - start:.2f}秒')
# 异步读取
start = time.time()
contents = asyncio.run(concurrent_read(['file1.txt', 'file2.txt', 'file3.txt']))
print(f'异步: {time.time() - start:.2f}秒')
---
b.流式处理
a.功能说明
异步流式处理大文件。
b.代码示例
---
import asyncio
import aiofiles
async def process_large_file(input_file, output_file):
async with aiofiles.open(input_file, 'r') as fin:
async with aiofiles.open(output_file, 'w') as fout:
async for line in fin:
processed = line.upper()
await fout.write(processed)
asyncio.run(process_large_file('input.txt', 'output.txt'))
---
04.压缩优化
a.压缩级别
a.功能说明
选择合适的压缩级别平衡速度和大小。
b.代码示例
---
import gzip
import time
data = b'x' * 1000000
# 快速压缩
start = time.time()
with gzip.open('fast.gz', 'wb', compresslevel=1) as f:
f.write(data)
print(f'快速: {time.time() - start:.2f}秒')
# 最佳压缩
start = time.time()
with gzip.open('best.gz', 'wb', compresslevel=9) as f:
f.write(data)
print(f'最佳: {time.time() - start:.2f}秒')
# 默认压缩
start = time.time()
with gzip.open('default.gz', 'wb') as f:
f.write(data)
print(f'默认: {time.time() - start:.2f}秒')
---
b.流式压缩
a.功能说明
流式压缩节省内存。
b.代码示例
---
import gzip
# 流式压缩
with open('input.txt', 'rb') as fin:
with gzip.open('output.gz', 'wb') as fout:
while chunk := fin.read(8192):
fout.write(chunk)
# 流式解压
with gzip.open('output.gz', 'rb') as fin:
with open('output.txt', 'wb') as fout:
while chunk := fin.read(8192):
fout.write(chunk)
---