1 事务基础
1.1 定义与ACID特性
01.事务定义
a.基本概念
事务是数据库管理系统执行过程中的一个逻辑单位,由一个或多个操作序列组成,这些操作要么全部执行成功,要么全部不执行。在Rust中,事务不仅应用于数据库操作,还可以通过STM等机制应用于内存操作,确保并发环境下的数据一致性。
b.事务边界
事务具有明确的开始和结束边界,通过BEGIN、COMMIT和ROLLBACK等操作来控制事务的生命周期,确保操作的原子性。
02.ACID特性详解
a.原子性:Atomicity
a.定义
原子性保证事务中的所有操作要么全部成功提交,要么全部失败回滚,不存在部分执行的中间状态。
b.Rust实现示例
---
use sqlx::{PgPool, Postgres, Transaction};
use anyhow::Result;
// 银行转账示例:展示原子性
async fn transfer_money(
pool: &PgPool,
from_account: i32,
to_account: i32,
amount: i64
) -> Result<()> {
// 开始事务
let mut tx: Transaction<Postgres> = pool.begin().await?;
// 操作1:从源账户扣款
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(amount)
.bind(from_account)
.execute(&mut *tx)
.await?;
// 操作2:向目标账户加款
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(amount)
.bind(to_account)
.execute(&mut *tx)
.await?;
// 提交事务:两个操作都成功才提交
tx.commit().await?;
Ok(())
}
---
b.一致性:Consistency
a.定义
一致性确保事务执行前后,数据库从一个一致性状态转换到另一个一致性状态,所有约束和规则都得到满足。
b.约束检查示例
---
use sqlx::PgPool;
use anyhow::{Result, anyhow};
// 确保账户余额不为负:一致性约束
async fn withdraw_with_check(
pool: &PgPool,
account_id: i32,
amount: i64
) -> Result<()> {
let mut tx = pool.begin().await?;
// 查询当前余额
let balance: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE"
)
.bind(account_id)
.fetch_one(&mut *tx)
.await?;
// 一致性检查:余额不能为负
if balance.0 < amount {
return Err(anyhow!("余额不足,违反一致性约束"));
}
// 执行扣款
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(amount)
.bind(account_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
---
c.隔离性:Isolation
a.定义
隔离性确保并发执行的多个事务之间相互隔离,一个事务的中间状态对其他事务不可见,避免脏读、不可重复读和幻读等问题。
b.隔离级别示例
---
use sqlx::{PgPool, Postgres, Transaction, IsolationLevel};
use anyhow::Result;
// 使用可序列化隔离级别
async fn serializable_transaction(pool: &PgPool) -> Result<()> {
// 设置隔离级别为SERIALIZABLE
let mut tx = pool.begin().await?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await?;
// 读取数据
let count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM orders WHERE status = 'pending'"
)
.fetch_one(&mut *tx)
.await?;
// 基于读取的数据进行操作
if count.0 > 100 {
sqlx::query("UPDATE orders SET priority = 'high' WHERE status = 'pending'")
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
---
d.持久性:Durability
a.定义
持久性保证事务一旦提交,其对数据库的修改就是永久性的,即使系统发生故障也不会丢失。
b.WAL机制
数据库通过预写日志WAL机制实现持久性,在修改数据前先将操作记录到日志中,确保故障恢复时可以重放操作。Rust应用通过数据库驱动自动享受这一保障。
03.ACID在Rust中的实践意义
a.类型安全保障
Rust的类型系统和所有权机制与ACID特性天然契合,编译期即可防止许多违反一致性的操作,如空指针、数据竞争等。
b.错误处理
Rust的Result类型强制开发者处理事务可能的失败情况,确保原子性和一致性不被忽视。
c.异步事务
Rust的async/await机制使得事务操作可以高效地与异步IO结合,在保证ACID特性的同时实现高并发性能。
1.2 事务的必要性
01.并发控制问题
a.数据竞争
在多线程或多进程环境中,多个操作同时访问和修改共享数据时,如果没有事务保护,会导致数据不一致、丢失更新等问题。
b.无事务的并发问题示例
---
use std::sync::Arc;
use tokio::sync::Mutex;
// 错误示例:无事务保护的并发操作
async fn unsafe_concurrent_update(counter: Arc<Mutex<i32>>) {
let mut value = counter.lock().await;
// 读取当前值
let current = *value;
// 模拟业务处理延迟
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// 基于旧值更新:可能导致丢失更新
*value = current + 1;
}
// 正确示例:使用事务保护
use sqlx::PgPool;
async fn safe_concurrent_update(pool: &PgPool, id: i32) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;
// 使用FOR UPDATE锁定行,防止并发修改
sqlx::query("UPDATE counters SET value = value + 1 WHERE id = $1")
.bind(id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
---
02.数据完整性保障
a.业务逻辑原子性
复杂的业务操作通常包含多个步骤,这些步骤必须作为一个整体成功或失败,否则会导致数据处于不一致的中间状态。
b.订单处理示例
---
use sqlx::{PgPool, Postgres, Transaction};
use anyhow::Result;
// 订单处理:库存扣减、订单创建、支付记录必须原子执行
async fn process_order(
pool: &PgPool,
user_id: i32,
product_id: i32,
quantity: i32,
amount: i64
) -> Result<i32> {
let mut tx = pool.begin().await?;
// 步骤1:检查并扣减库存
let stock: (i32,) = sqlx::query_as(
"SELECT stock FROM products WHERE id = $1 FOR UPDATE"
)
.bind(product_id)
.fetch_one(&mut *tx)
.await?;
if stock.0 < quantity {
return Err(anyhow::anyhow!("库存不足"));
}
sqlx::query("UPDATE products SET stock = stock - $1 WHERE id = $2")
.bind(quantity)
.bind(product_id)
.execute(&mut *tx)
.await?;
// 步骤2:创建订单记录
let order_id: (i32,) = sqlx::query_as(
"INSERT INTO orders (user_id, product_id, quantity, amount, status)
VALUES ($1, $2, $3, $4, 'pending') RETURNING id"
)
.bind(user_id)
.bind(product_id)
.bind(quantity)
.bind(amount)
.fetch_one(&mut *tx)
.await?;
// 步骤3:创建支付记录
sqlx::query(
"INSERT INTO payments (order_id, amount, status) VALUES ($1, $2, 'pending')"
)
.bind(order_id.0)
.bind(amount)
.execute(&mut *tx)
.await?;
// 所有步骤成功才提交
tx.commit().await?;
Ok(order_id.0)
}
---
03.故障恢复能力
a.系统崩溃保护
事务的持��性和原子性确保系统在崩溃后可以恢复到一致状态,未提交的事务会自动回滚,已提交的事务不会丢失。
b.错误处理与回滚
---
use sqlx::PgPool;
use anyhow::{Result, Context};
// 带错误处理的事务:任何步骤失败都会自动回滚
async fn transfer_with_error_handling(
pool: &PgPool,
from_id: i32,
to_id: i32,
amount: i64
) -> Result<()> {
let mut tx = pool.begin().await
.context("开始事务失败")?;
// 扣款操作
let rows_affected = sqlx::query(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2 AND balance >= $1"
)
.bind(amount)
.bind(from_id)
.execute(&mut *tx)
.await
.context("扣款操作失败")?
.rows_affected();
if rows_affected == 0 {
// 余额不足,事务会自动回滚
return Err(anyhow::anyhow!("余额不足或账户不存在"));
}
// 加款操作
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(amount)
.bind(to_id)
.execute(&mut *tx)
.await
.context("加款操作失败")?;
// 提交事务
tx.commit().await
.context("提交事务失败")?;
Ok(())
}
---
04.分布式系统中的必要性
a.跨服务一致性
在微服务架构中,事务机制确保跨多个服务的操作保持一致性,避免部分成功部分失败的情况。
b.最终一致性
通过事务消息、Saga等模式,事务机制帮助分布式系统实现最终一致性,确保系统整体的数据正确性。
1.3 Rust中的事务模型
01.数据库事务模型
a.基于连接的事务
Rust数据库驱动如sqlx、diesel等提供基于连接的事务模型,通过Transaction对象封装事务操作,利用RAII机制自动管理事务生命周期。
b.Transaction对象示例
---
use sqlx::{PgPool, Postgres, Transaction};
use anyhow::Result;
// Transaction对象自动管理事务生命周期
async fn transaction_lifecycle_demo(pool: &PgPool) -> Result<()> {
// begin()创建Transaction对象
let mut tx: Transaction<Postgres> = pool.begin().await?;
// 在事务中执行操作
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("事务开始")
.execute(&mut *tx)
.await?;
// 显式提交
tx.commit().await?;
// Transaction对象被消费,事务结束
Ok(())
}
// 自动回滚:Transaction对象被drop时未commit则自动回滚
async fn auto_rollback_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("这条记录不会被保存")
.execute(&mut *tx)
.await?;
// 函数返回时tx被drop,事务自动回滚
Ok(())
}
---
02.STM软件事务内存模型
a.乐观并发控制
STM采用乐观并发控制策略,事务执行时不加锁,提交时检测冲突,冲突则自动重试,适合读多写少的场景。
b.STM基础示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
// STM事务:转账操作
fn stm_transfer(
from: Arc<TVar<i64>>,
to: Arc<TVar<i64>>,
amount: i64
) -> Result<(), stm::StmError> {
// atomically确保操作的原子性
atomically(|trans| {
// 读取源账户余额
let from_balance = from.read(trans)?;
// 检查余额
if from_balance < amount {
return Err(stm::StmError::Retry);
}
// 扣款
from.write(trans, from_balance - amount)?;
// 加款
let to_balance = to.read(trans)?;
to.write(trans, to_balance + amount)?;
Ok(())
})
}
// 使用示例
fn main() {
let account_a = Arc::new(TVar::new(1000));
let account_b = Arc::new(TVar::new(500));
// 执行转账:STM自动处理并发冲突
stm_transfer(account_a.clone(), account_b.clone(), 200).unwrap();
}
---
03.类型系统与事务安全
a.所有权保证
Rust的所有权系统确保Transaction对象不会被意外复制或共享,防止事务状态混乱,编译期即可发现事务使用错误。
b.生命周期约束
---
use sqlx::{PgPool, Postgres, Transaction};
// 编译器强制Transaction生命周期管理
async fn transaction_lifetime_demo(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;
// tx的可变借用确保同一时刻只有一个操作使用事务
let result = sqlx::query("SELECT * FROM users")
.fetch_all(&mut *tx) // 可变借用tx
.await?;
// 不能同时进行另一个操作,编译器会报错
// let result2 = sqlx::query("SELECT * FROM orders")
// .fetch_all(&mut *tx) // 错误:tx已被借用
// .await?;
tx.commit().await?;
Ok(())
}
---
04.异步事务模型
a.async/await集成
Rust的异步运行时与事务无缝集成,事务操作可以与其他异步IO操作并发执行,提高系统吞吐量。
b.异步事务示例
---
use sqlx::PgPool;
use tokio::try_join;
use anyhow::Result;
// 并发执行多个独立事务
async fn concurrent_transactions(pool: &PgPool) -> Result<()> {
let pool1 = pool.clone();
let pool2 = pool.clone();
// 两个事务并发执行
let (result1, result2) = try_join!(
async {
let mut tx = pool1.begin().await?;
sqlx::query("UPDATE counters SET value = value + 1 WHERE id = 1")
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok::<_, sqlx::Error>(())
},
async {
let mut tx = pool2.begin().await?;
sqlx::query("UPDATE counters SET value = value + 1 WHERE id = 2")
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok::<_, sqlx::Error>(())
}
)?;
Ok(())
}
---
05.错误处理模型
a.Result类型强制处理
Rust的Result类型强制开发者显式处理事务可能的失败情况,避免忽略错误导致的数据不一致。
b.错误传播与恢复
---
use sqlx::PgPool;
use anyhow::{Result, Context};
// 分层错误处理
async fn layered_error_handling(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await
.context("数据库连接失败")?;
// 业务操作1
sqlx::query("INSERT INTO orders (user_id, amount) VALUES ($1, $2)")
.bind(1)
.bind(100)
.execute(&mut *tx)
.await
.context("创建订单失败")?;
// 业务操作2
sqlx::query("UPDATE users SET order_count = order_count + 1 WHERE id = $1")
.bind(1)
.execute(&mut *tx)
.await
.context("更新用户订单数失败")?;
// 提交事务
tx.commit().await
.context("提交事务失败")?;
Ok(())
}
---
1.4 数据库事务
01.关系型数据库事务
a.ACID保证
关系型数据库如PostgreSQL、MySQL通过WAL日志、MVCC等机制提供完整的ACID保证,是最常用的事务实现方式。
b.PostgreSQL事务示例
---
use sqlx::PgPool;
use anyhow::Result;
// PostgreSQL完整事务示例
async fn postgres_transaction_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 创建用户
let user_id: (i32,) = sqlx::query_as(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id"
)
.bind("张三")
.bind("[email protected]")
.fetch_one(&mut *tx)
.await?;
// 创建用户配置
sqlx::query(
"INSERT INTO user_settings (user_id, theme, language)
VALUES ($1, $2, $3)"
)
.bind(user_id.0)
.bind("dark")
.bind("zh-CN")
.execute(&mut *tx)
.await?;
// 初始化用户钱包
sqlx::query(
"INSERT INTO wallets (user_id, balance) VALUES ($1, $2)"
)
.bind(user_id.0)
.bind(0)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
---
02.NoSQL数据库事务
a.MongoDB事务
MongoDB 4.0+支持多文档事务,通过会话机制实现ACID特性,适合需要跨文档一致性的场景。
b.MongoDB事务示例
---
use mongodb::{Client, options::ClientOptions};
use mongodb::bson::doc;
use anyhow::Result;
// MongoDB事务示例
async fn mongodb_transaction_demo() -> Result<()> {
let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
let client = Client::with_options(client_options)?;
// 开始会话
let mut session = client.start_session(None).await?;
// 开始事务
session.start_transaction(None).await?;
let db = client.database("mydb");
let users = db.collection::<mongodb::bson::Document>("users");
let orders = db.collection::<mongodb::bson::Document>("orders");
// 操作1:插入用户
users.insert_one_with_session(
doc! { "name": "李四", "email": "[email protected]" },
None,
&mut session
).await?;
// 操作2:创建订单
orders.insert_one_with_session(
doc! { "user_id": 1, "amount": 100, "status": "pending" },
None,
&mut session
).await?;
// 提交事务
session.commit_transaction().await?;
Ok(())
}
---
03.分布式数据库事务
a.TiDB分布式事务
TiDB提供分布式ACID事务支持,使用Percolator模型实现跨节点事务一致性,对应用透明。
b.TiDB事务示例
---
use sqlx::MySqlPool;
use anyhow::Result;
// TiDB分布式事务:与MySQL协议兼容
async fn tidb_distributed_transaction(pool: &MySqlPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 跨分区操作:TiDB自动处理分布式一致性
sqlx::query(
"INSERT INTO orders (user_id, product_id, amount)
VALUES (?, ?, ?)"
)
.bind(1001) // 可能在节点A
.bind(2001) // 可能在节点B
.bind(500)
.execute(&mut *tx)
.await?;
sqlx::query(
"UPDATE inventory SET stock = stock - ?
WHERE product_id = ?"
)
.bind(1)
.bind(2001) // 可能在节点C
.execute(&mut *tx)
.await?;
// TiDB保证跨节点的ACID特性
tx.commit().await?;
Ok(())
}
---
04.事务与连接池
a.连接池管理
数据库连接池管理多个数据库连接,事务从池中获取连接,使用完毕后归还,提高资源利用率。
b.连接池配置示例
---
use sqlx::postgres::PgPoolOptions;
use std::time::Duration;
use anyhow::Result;
// 配置连接池
async fn setup_connection_pool() -> Result<sqlx::PgPool> {
let pool = PgPoolOptions::new()
.max_connections(20) // 最大连接数
.min_connections(5) // 最小连接数
.acquire_timeout(Duration::from_secs(30)) // 获取连接超时
.idle_timeout(Duration::from_secs(600)) // 空闲连接超时
.max_lifetime(Duration::from_secs(1800)) // 连接最大生命周期
.connect("postgres://user:pass@localhost/db")
.await?;
Ok(pool)
}
// 使用连接池执行事务
async fn use_pooled_transaction(pool: &sqlx::PgPool) -> Result<()> {
// 从池中获取连接并开始事务
let mut tx = pool.begin().await?;
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("使用连接池的事务")
.execute(&mut *tx)
.await?;
tx.commit().await?;
// 连接自动归还到池中
Ok(())
}
---
1.5 内存事务
01.STM软件事务内存
a.基本原理
STM通过版本控制和冲突检测实现内存级别的事务操作,事务执行时记录读写集合,提交时检测冲突,冲突则自动重试。
b.STM基础操作
---
use stm::{atomically, TVar, StmResult};
use std::sync::Arc;
// 创建事务变量
fn create_tvars() -> (Arc<TVar<i32>>, Arc<TVar<i32>>) {
let var1 = Arc::new(TVar::new(100));
let var2 = Arc::new(TVar::new(200));
(var1, var2)
}
// 原子性读写操作
fn atomic_swap(
var1: Arc<TVar<i32>>,
var2: Arc<TVar<i32>>
) -> StmResult<()> {
atomically(|trans| {
// 读取两个变量的值
let val1 = var1.read(trans)?;
let val2 = var2.read(trans)?;
// 交换值
var1.write(trans, val2)?;
var2.write(trans, val2)?;
Ok(())
})
}
// 条件等待:retry机制
fn wait_for_condition(counter: Arc<TVar<i32>>) -> StmResult<i32> {
atomically(|trans| {
let value = counter.read(trans)?;
// 如果条件不满足,retry会阻塞直到变量被修改
if value < 10 {
return Err(stm::StmError::Retry);
}
Ok(value)
})
}
---
02.Arc与Mutex组合
a.粗粒度锁
使用Arc和Mutex可以实现简单的内存事务,但粒度较粗,并发性能受限,适合简单场景。
b.Mutex事务示例
---
use std::sync::{Arc, Mutex};
use anyhow::Result;
// 使用Mutex实现简单事务
struct Account {
balance: i64,
}
fn transfer_with_mutex(
from: Arc<Mutex<Account>>,
to: Arc<Mutex<Account>>,
amount: i64
) -> Result<()> {
// 按固定顺序获取锁,避免死锁
let mut from_guard = from.lock().unwrap();
let mut to_guard = to.lock().unwrap();
// 检查余额
if from_guard.balance < amount {
return Err(anyhow::anyhow!("余额不足"));
}
// 执行转账
from_guard.balance -= amount;
to_guard.balance += amount;
// 锁自动释放
Ok(())
}
// 使用示例
fn main() {
let account_a = Arc::new(Mutex::new(Account { balance: 1000 }));
let account_b = Arc::new(Mutex::new(Account { balance: 500 }));
transfer_with_mutex(account_a.clone(), account_b.clone(), 200).unwrap();
}
---
03.RwLock读写锁
a.读写分离
RwLock允许多个读者或一个写者,提高读多写少场景的并发性能,适合缓存等场景。
b.RwLock事务示例
---
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use anyhow::Result;
// 使用RwLock实现缓存事务
struct Cache {
data: Arc<RwLock<HashMap<String, String>>>,
}
impl Cache {
fn new() -> Self {
Cache {
data: Arc::new(RwLock::new(HashMap::new())),
}
}
// 读操作:多个线程可以同时读
fn get(&self, key: &str) -> Option<String> {
let read_guard = self.data.read().unwrap();
read_guard.get(key).cloned()
}
// 写操作:独占访问
fn set(&self, key: String, value: String) -> Result<()> {
let mut write_guard = self.data.write().unwrap();
write_guard.insert(key, value);
Ok(())
}
// 事务性更新:读-修改-写
fn update_if_exists(&self, key: &str, new_value: String) -> Result<bool> {
let mut write_guard = self.data.write().unwrap();
if write_guard.contains_key(key) {
write_guard.insert(key.to_string(), new_value);
Ok(true)
} else {
Ok(false)
}
}
}
---
04.无锁数据结构
a.原子操作
使用原子类型如AtomicI64、AtomicBool等实现无锁的内存事务,性能最高但功能受限。
b.原子操作示例
---
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
// 使用原子操作实现计数器
struct AtomicCounter {
value: Arc<AtomicI64>,
}
impl AtomicCounter {
fn new(initial: i64) -> Self {
AtomicCounter {
value: Arc::new(AtomicI64::new(initial)),
}
}
// 原子性增加
fn increment(&self) -> i64 {
self.value.fetch_add(1, Ordering::SeqCst)
}
// 原子性比较并交换:CAS操作
fn compare_and_swap(&self, expected: i64, new: i64) -> Result<i64, i64> {
self.value
.compare_exchange(expected, new, Ordering::SeqCst, Ordering::SeqCst)
}
// 原子性条件更新
fn update_if_greater(&self, new_value: i64) {
loop {
let current = self.value.load(Ordering::SeqCst);
if new_value <= current {
break;
}
// CAS操作:如果成功则退出,失败则重试
match self.value.compare_exchange(
current,
new_value,
Ordering::SeqCst,
Ordering::SeqCst
) {
Ok(_) => break,
Err(_) => continue, // 冲突,重试
}
}
}
}
---
1.6 分布式事务
01.分布式事务挑战
a.CAP定理
分布式系统中无法同时满足一致性、可用性和分区容错性,必须在三者之间权衡,事务设计需要根据业务需求选择合适的策略。
b.网络分区问题
网络故障可能导致节点间通信中断,事务协调变得困难,需要特殊机制处理分区场景下的一致性问题。
02.两阶段提交:2PC
a.协议流程
协调者向所有参与者发送准备请求,收到所有确认后发送提交请求,任一参与者失败则全部回滚,保证强一致性但性能较差。
b.2PC模拟实现
---
use std::sync::Arc;
use tokio::sync::Mutex;
use anyhow::{Result, anyhow};
// 参与者接口
#[async_trait::async_trait]
trait Participant {
async fn prepare(&self) -> Result<()>;
async fn commit(&self) -> Result<()>;
async fn rollback(&self) -> Result<()>;
}
// 协调者
struct Coordinator {
participants: Vec<Arc<dyn Participant + Send + Sync>>,
}
impl Coordinator {
// 执行两阶段提交
async fn execute_2pc(&self) -> Result<()> {
// 阶段1:准备阶段
for participant in &self.participants {
if let Err(e) = participant.prepare().await {
// 任一参与者准备失败,通知所有参与者回滚
self.rollback_all().await;
return Err(anyhow!("准备阶段失败: {}", e));
}
}
// 阶段2:提交阶段
for participant in &self.participants {
if let Err(e) = participant.commit().await {
// 提交失败,记录日志但无法回滚
eprintln!("提交失败: {}", e);
return Err(anyhow!("提交阶段失败: {}", e));
}
}
Ok(())
}
async fn rollback_all(&self) {
for participant in &self.participants {
let _ = participant.rollback().await;
}
}
}
---
03.Saga模式
a.补偿机制
Saga将长事务拆分为多个本地事务,每个本地事务都有对应的补偿操作,失败时通过补偿操作回滚已完成的步骤,实现最终一致性。
b.Saga实现示例
---
use anyhow::Result;
// Saga步骤定义
struct SagaStep {
name: String,
action: Box<dyn Fn() -> Result<()> + Send + Sync>,
compensation: Box<dyn Fn() -> Result<()> + Send + Sync>,
}
// Saga编排器
struct SagaOrchestrator {
steps: Vec<SagaStep>,
}
impl SagaOrchestrator {
fn new() -> Self {
SagaOrchestrator { steps: Vec::new() }
}
fn add_step(
&mut self,
name: String,
action: Box<dyn Fn() -> Result<()> + Send + Sync>,
compensation: Box<dyn Fn() -> Result<()> + Send + Sync>,
) {
self.steps.push(SagaStep { name, action, compensation });
}
// 执行Saga
fn execute(&self) -> Result<()> {
let mut completed_steps = Vec::new();
// 顺序执行所有步骤
for step in &self.steps {
match (step.action)() {
Ok(_) => {
println!("步骤 {} 执行成功", step.name);
completed_steps.push(step);
}
Err(e) => {
println!("步骤 {} 执行失败: {}", step.name, e);
// 逆序执行补偿操作
self.compensate(&completed_steps);
return Err(e);
}
}
}
Ok(())
}
fn compensate(&self, completed_steps: &[&SagaStep]) {
for step in completed_steps.iter().rev() {
if let Err(e) = (step.compensation)() {
eprintln!("补偿操作 {} 失败: {}", step.name, e);
} else {
println!("补偿操作 {} 执行成功", step.name);
}
}
}
}
---
04.分布式事务在Rust中的实践
a.gRPC跨服务事务
使用gRPC实现跨服务的分布式事务协调,通过Tonic等库提供高性能的RPC通信。
b.gRPC事务示例
---
use tonic::{Request, Response, Status};
use anyhow::Result;
// 定义事务服务接口
#[tonic::async_trait]
trait TransactionService {
async fn prepare(&self, tx_id: String) -> Result<bool, Status>;
async fn commit(&self, tx_id: String) -> Result<(), Status>;
async fn rollback(&self, tx_id: String) -> Result<(), Status>;
}
// 分布式事务协调器
struct DistributedCoordinator {
services: Vec<Box<dyn TransactionService + Send + Sync>>,
}
impl DistributedCoordinator {
async fn execute_distributed_tx(&self, tx_id: String) -> Result<()> {
// 准备阶段:调用所有服务的prepare
for service in &self.services {
if !service.prepare(tx_id.clone()).await
.map_err(|e| anyhow::anyhow!("准备失败: {}", e))? {
// 准备失败,回滚
self.rollback_all(tx_id).await;
return Err(anyhow::anyhow!("分布式事务准备失败"));
}
}
// 提交阶段:调用所有服务的commit
for service in &self.services {
service.commit(tx_id.clone()).await
.map_err(|e| anyhow::anyhow!("提交失败: {}", e))?;
}
Ok(())
}
async fn rollback_all(&self, tx_id: String) {
for service in &self.services {
let _ = service.rollback(tx_id.clone()).await;
}
}
}
---
1.7 事务隔离级别
01.隔离级别概述
a.并发问题
多个事务并发执行时可能出现脏读、不可重复读、幻读等问题,隔离级别定义了事务之间的可见性规则,平衡一致性和性能。
b.SQL标准隔离级别
SQL标准定义了四个隔离级别:读未提交、读已提交、可重复读、可序列化,隔离级别越高一致性越强但性能越差。
02.读未提交:Read Uncommitted
a.特性
最低隔离级别,事务可以读取其他未提交事务的修改,可能出现脏读、不可重复读和幻读,实际应用中很少使用。
b.示例说明
---
use sqlx::PgPool;
use anyhow::Result;
// 读未提交示例:可能读到脏数据
async fn read_uncommitted_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 设置隔离级别为READ UNCOMMITTED
// 注意:PostgreSQL不支持此级别,会自动升级为READ COMMITTED
sqlx::query("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
.execute(&mut *tx)
.await?;
// 读取数据:可能读到其他事务未提交的修改
let balance: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = $1"
)
.bind(1)
.fetch_one(&mut *tx)
.await?;
println!("读取到的余额(可能是脏数据): {}", balance.0);
tx.commit().await?;
Ok(())
}
---
03.读已提交:Read Committed
a.特性
事务只能读取已提交的数据,避免脏读,但仍可能出现不可重复读和幻读,是PostgreSQL和Oracle的默认隔离级别。
b.实现示例
---
use sqlx::PgPool;
use anyhow::Result;
// 读已提交示例:避免脏读
async fn read_committed_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// PostgreSQL默认就是READ COMMITTED
sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
.execute(&mut *tx)
.await?;
// 第一次读取
let balance1: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = $1"
)
.bind(1)
.fetch_one(&mut *tx)
.await?;
println!("第一次读取: {}", balance1.0);
// 模拟其他事务提交了修改
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// 第二次读取:可能得到不同的值(不可重复读)
let balance2: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = $1"
)
.bind(1)
.fetch_one(&mut *tx)
.await?;
println!("第二次读取: {}", balance2.0);
tx.commit().await?;
Ok(())
}
---
04.可重复读:Repeatable Read
a.特性
事务执行期间多次读取同一数据得到相同结果,避免脏读和不可重复读,但仍可能出现幻读,是MySQL InnoDB的默认隔离级别。
b.实现示例
---
use sqlx::MySqlPool;
use anyhow::Result;
// 可重复读示例:保证读取一致性
async fn repeatable_read_demo(pool: &MySqlPool) -> Result<()> {
let mut tx = pool.begin().await?;
// MySQL InnoDB默认就是REPEATABLE READ
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(&mut *tx)
.await?;
// 第一次读取
let balance1: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = ?"
)
.bind(1)
.fetch_one(&mut *tx)
.await?;
println!("第一次读取: {}", balance1.0);
// 即使其他事务提交了修改,本事务仍读到相同的值
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// 第二次读取:得到相同的值(可重复读)
let balance2: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = ?"
)
.bind(1)
.fetch_one(&mut *tx)
.await?;
println!("第二次读取: {}", balance2.0);
assert_eq!(balance1.0, balance2.0);
tx.commit().await?;
Ok(())
}
---
05.可序列化:Serializable
a.特性
最高隔离级别,事务串行执行,完全避免脏读、不可重复读和幻读,通过锁或MVCC实现,性能最差但一致性最强。
b.实现示例
---
use sqlx::PgPool;
use anyhow::Result;
// 可序列化示例:最强一致性保证
async fn serializable_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 设置为SERIALIZABLE
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await?;
// 读取订单总数
let count1: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM orders WHERE status = 'pending'"
)
.fetch_one(&mut *tx)
.await?;
// 基于读取的数据进行决策
if count1.0 > 100 {
sqlx::query("UPDATE orders SET priority = 'high' WHERE status = 'pending'")
.execute(&mut *tx)
.await?;
}
// 再次读取:保证没有幻读
let count2: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM orders WHERE status = 'pending'"
)
.fetch_one(&mut *tx)
.await?;
// 如果有其他事务插入了新订单,本事务会检测到冲突并回滚
tx.commit().await?;
Ok(())
}
---
06.隔离级别选择策略
a.性能与一致性权衡
读已提交适合大多数场景,可重复读适合需要读取一致性的报表查询,可序列化适合关键业务操作如金融交易。
b.实际应用建议
---
use sqlx::PgPool;
use anyhow::Result;
// 根据业务场景选择隔离级别
async fn choose_isolation_level(pool: &PgPool) -> Result<()> {
// 场景1:普通查询 - 使用READ COMMITTED
let mut tx1 = pool.begin().await?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
.execute(&mut *tx1)
.await?;
// 执行查询...
tx1.commit().await?;
// 场景2:报表生成 - 使用REPEATABLE READ
let mut tx2 = pool.begin().await?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(&mut *tx2)
.await?;
// 生成报表...
tx2.commit().await?;
// 场景3:金融交易 - 使用SERIALIZABLE
let mut tx3 = pool.begin().await?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx3)
.await?;
// 执行转账...
tx3.commit().await?;
Ok(())
}
---
07.MVCC多版本并发控制
a.实现原理
PostgreSQL和MySQL InnoDB通过MVCC实现高隔离级别,每个事务看到数据的快照版本,读写不阻塞,提高并发性能。
b.版本可见性
MVCC为每行数据维护多个版本,事务根据自己的快照时间戳决定哪个版本可见,已提交的旧版本会被垃圾回收。
2 数据库事务
2.1 diesel事务
01.Diesel ORM简介
a.特点
Diesel是Rust生态中最成熟的ORM框架,提供类型安全的查询构建器和强大的事务支持,编译期检查SQL正确性。
b.事务模型
Diesel通过Connection trait提供事务支持,使用闭包封装事务逻辑,自动处理提交和回滚。
02.基础事务操作
a.transaction方法
Diesel的transaction方法接受一个闭包,闭包内的所有操作在同一事务中执行,返回Ok则提交,返回Err则回滚。
b.基础示例
---
use diesel::prelude::*;
use diesel::pg::PgConnection;
use anyhow::Result;
// Diesel事务基础用法
fn create_user_with_profile(conn: &mut PgConnection) -> Result<i32> {
conn.transaction(|conn| {
// 插入用户
let user_id: i32 = diesel::insert_into(users::table)
.values((
users::name.eq("张三"),
users::email.eq("[email protected]"),
))
.returning(users::id)
.get_result(conn)?;
// 插入用户配置
diesel::insert_into(user_profiles::table)
.values((
user_profiles::user_id.eq(user_id),
user_profiles::bio.eq("这是个人简介"),
))
.execute(conn)?;
Ok(user_id)
})
}
---
03.嵌套事务
a.Savepoint支持
Diesel支持嵌套事务,通过Savepoint实现,内层事务失败不影响外层事务。
b.嵌套事务示例
---
use diesel::prelude::*;
use diesel::pg::PgConnection;
use diesel::result::Error;
// 嵌套事务示例
fn nested_transaction_demo(conn: &mut PgConnection) -> Result<(), Error> {
conn.transaction(|conn| {
// 外层事务:创建订单
let order_id: i32 = diesel::insert_into(orders::table)
.values(orders::total.eq(100))
.returning(orders::id)
.get_result(conn)?;
// 内层事务:尝试扣减库存
let stock_result = conn.transaction(|conn| {
diesel::update(products::table.find(1))
.set(products::stock.eq(products::stock - 1))
.execute(conn)?;
Ok(())
});
// 内层事务失败不影响外层
if stock_result.is_err() {
println!("库存扣减失败,但订单已创建");
}
Ok(())
})
}
---
04.查询构建器与事务
a.类型安全查询
Diesel的查询构建器在编译期检查SQL正确性,与事务结合使用可以确保类型安全的数据库操作。
b.复杂查询示例
---
use diesel::prelude::*;
use diesel::pg::PgConnection;
use anyhow::Result;
// 使用查询构建器的事务
fn transfer_with_query_builder(
conn: &mut PgConnection,
from_id: i32,
to_id: i32,
amount: i64
) -> Result<()> {
conn.transaction(|conn| {
// 查询源账户余额
let from_balance: i64 = accounts::table
.find(from_id)
.select(accounts::balance)
.for_update() // 行锁
.first(conn)?;
if from_balance < amount {
return Err(diesel::result::Error::RollbackTransaction);
}
// 扣款
diesel::update(accounts::table.find(from_id))
.set(accounts::balance.eq(accounts::balance - amount))
.execute(conn)?;
// 加款
diesel::update(accounts::table.find(to_id))
.set(accounts::balance.eq(accounts::balance + amount))
.execute(conn)?;
Ok(())
})
}
---
05.连接池集成
a.r2d2连接池
Diesel通常与r2d2连接池配合使用,提供高效的连接管理和事务支持。
b.连接池配置示例
---
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
use std::time::Duration;
// 配置Diesel连接池
type DbPool = r2d2::Pool<ConnectionManager<PgConnection>>;
fn create_pool(database_url: &str) -> DbPool {
let manager = ConnectionManager::<PgConnection>::new(database_url);
r2d2::Pool::builder()
.max_size(15)
.min_idle(Some(5))
.connection_timeout(Duration::from_secs(30))
.build(manager)
.expect("Failed to create pool")
}
// 使用连接池执行事务
fn execute_with_pool(pool: &DbPool) -> Result<(), diesel::result::Error> {
let mut conn = pool.get().expect("Failed to get connection");
conn.transaction(|conn| {
diesel::insert_into(logs::table)
.values(logs::message.eq("使用连接池的事务"))
.execute(conn)?;
Ok(())
})
}
---
2.2 sqlx事务
01.sqlx特性
a.异步优先
sqlx是纯异步的数据库驱动,与tokio等异步运行时深度集成,提供高性能的异步事务支持。
b.编译期SQL检查
通过宏在编译期验证SQL语句正确性,结合事务使用可以确保类型安全和SQL正确性。
02.基础事务操作
a.begin方法
使用pool.begin()或connection.begin()创建事务,返回Transaction对象,通过commit或rollback结束事务。
b.基础示例
---
use sqlx::{PgPool, Postgres, Transaction};
use anyhow::Result;
// sqlx基础事务
async fn create_order_with_items(pool: &PgPool) -> Result<i64> {
let mut tx: Transaction<Postgres> = pool.begin().await?;
// 创建订单
let order_id: (i64,) = sqlx::query_as(
"INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING id"
)
.bind(1)
.bind(100)
.fetch_one(&mut *tx)
.await?;
// 创建订单项
sqlx::query(
"INSERT INTO order_items (order_id, product_id, quantity)
VALUES ($1, $2, $3)"
)
.bind(order_id.0)
.bind(101)
.bind(2)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(order_id.0)
}
---
03.编译期SQL验证
a.query宏
使用query!宏在编译期检查SQL语句,与数据库schema对比,确保类型匹配和字段存在。
b.编译期验证示例
---
use sqlx::PgPool;
use anyhow::Result;
// 编译期SQL检查
async fn compile_time_checked_transaction(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// query!宏在编译期验证SQL
let user = sqlx::query!(
"SELECT id, name, email FROM users WHERE id = $1",
1
)
.fetch_one(&mut *tx)
.await?;
// 类型安全:user.name是String类型
println!("User: {} <{}>", user.name, user.email);
// 更新操作也会被检查
sqlx::query!(
"UPDATE users SET last_login = NOW() WHERE id = $1",
user.id
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
---
04.事务隔离级别
a.设置隔离级别
sqlx支持通过SQL语句设置事务隔离级别,可以根据业务需求选择合适的隔离级别。
b.隔离级别示例
---
use sqlx::PgPool;
use anyhow::Result;
// 设置事务隔离级别
async fn transaction_with_isolation(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 设置为SERIALIZABLE隔离级别
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await?;
// 执行需要强一致性的操作
let balance: (i64,) = sqlx::query_as(
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE"
)
.bind(1)
.fetch_one(&mut *tx)
.await?;
if balance.0 >= 100 {
sqlx::query("UPDATE accounts SET balance = balance - 100 WHERE id = $1")
.bind(1)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
---
05.批量操作与事务
a.批量插入
在事务中执行批量操作可以提高性能,同时保证原子性。
b.批量操作示例
---
use sqlx::PgPool;
use anyhow::Result;
// 批量插入示例
async fn batch_insert_in_transaction(
pool: &PgPool,
users: Vec<(String, String)>
) -> Result<()> {
let mut tx = pool.begin().await?;
// 批量插入用户
for (name, email) in users {
sqlx::query(
"INSERT INTO users (name, email) VALUES ($1, $2)"
)
.bind(&name)
.bind(&email)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
// 使用COPY优化批量插入
async fn optimized_batch_insert(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// PostgreSQL的COPY命令性能更高
sqlx::query(
"COPY users (name, email) FROM STDIN WITH (FORMAT csv)"
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
---
06.错误处理与重试
a.事务重试机制
对于可重试的错误如序列化失败,可以实现自动重试机制。
b.重试示例
---
use sqlx::PgPool;
use anyhow::Result;
use std::time::Duration;
// 带重试的事务
async fn transaction_with_retry(pool: &PgPool) -> Result<()> {
let max_retries = 3;
let mut attempts = 0;
loop {
attempts += 1;
let mut tx = pool.begin().await?;
let result = async {
sqlx::query("UPDATE counters SET value = value + 1 WHERE id = 1")
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok::<_, sqlx::Error>(())
}.await;
match result {
Ok(_) => return Ok(()),
Err(e) if attempts < max_retries => {
// 序列化失败,重试
eprintln!("事务失败,重试 {}/{}", attempts, max_retries);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => return Err(e.into()),
}
}
}
---
2.3 tokio-postgres事务
01.tokio-postgres特性
a.纯异步驱动
tokio-postgres是PostgreSQL的原生异步驱动,提供底层的事务控制能力,性能优异。
b.Pipeline支持
支持请求管道化,可以在一个往返中发送多个命令,提高事务性能。
02.事务基础操作
a.transaction方法
使用Client的transaction方法创建事务,接受闭包执行事务逻辑。
b.基础示例
---
use tokio_postgres::{Client, NoTls, Error};
// tokio-postgres基础事务
async fn basic_transaction(client: &Client) -> Result<(), Error> {
let transaction = client.transaction().await?;
// 插入数据
transaction.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
&[&"王五", &"[email protected]"]
).await?;
// 更新数据
transaction.execute(
"UPDATE users SET active = true WHERE email = $1",
&[&"[email protected]"]
).await?;
// 提交事务
transaction.commit().await?;
Ok(())
}
---
03.Prepared Statement与事务
a.预编译语句
在事务中使用预编译语句可以提高性能,特别是执行多次相同结构的查询时。
b.预编译示例
---
use tokio_postgres::{Client, Error};
// 使用预编译语句的事务
async fn transaction_with_prepared(client: &Client) -> Result<(), Error> {
let transaction = client.transaction().await?;
// 预编译插入语句
let stmt = transaction.prepare(
"INSERT INTO logs (level, message) VALUES ($1, $2)"
).await?;
// 批量插入
for i in 0..100 {
transaction.execute(
&stmt,
&[&"INFO", &format!("Log message {}", i)]
).await?;
}
transaction.commit().await?;
Ok(())
}
---
04.COPY操作与事务
a.高性能批量导入
PostgreSQL的COPY命令提供最高性能的批量数据导入,tokio-postgres完整支持。
b.COPY示例
---
use tokio_postgres::{Client, Error};
use tokio_postgres::types::ToSql;
// 使用COPY批量导入
async fn bulk_copy_transaction(client: &Client) -> Result<(), Error> {
let transaction = client.transaction().await?;
// 开始COPY操作
let sink = transaction.copy_in(
"COPY users (name, email) FROM STDIN WITH (FORMAT csv)"
).await?;
// 写入数据
let data = b"Alice,[email protected]\nBob,[email protected]\n";
let mut writer = tokio::io::BufWriter::new(sink);
use tokio::io::AsyncWriteExt;
writer.write_all(data).await?;
writer.flush().await?;
let rows_inserted = writer.into_inner().finish().await?;
println!("Inserted {} rows", rows_inserted);
transaction.commit().await?;
Ok(())
}
---
05.Pipeline管道化
a.批量请求
Pipeline允许在单个网络往返中发送多个请求,显著提高事务吞吐量。
b.Pipeline示例
---
use tokio_postgres::{Client, Error};
// 使用Pipeline优化事务
async fn pipelined_transaction(client: &Client) -> Result<(), Error> {
let transaction = client.transaction().await?;
// 创建Pipeline
let mut pipeline = transaction.pipeline();
// 批量添加查询
for i in 0..10 {
pipeline.query(
"INSERT INTO counters (name, value) VALUES ($1, $2)",
&[&format!("counter_{}", i), &i]
);
}
// 一次性发送所有请求
pipeline.execute().await?;
transaction.commit().await?;
Ok(())
}
---
06.通知与事务
a.LISTEN/NOTIFY
PostgreSQL的LISTEN/NOTIFY机制可以与事务结合,实现事件驱动的数据库应用。
b.通知示例
---
use tokio_postgres::{Client, Error};
// 事务中发送通知
async fn transaction_with_notify(client: &Client) -> Result<(), Error> {
let transaction = client.transaction().await?;
// 插入数据
transaction.execute(
"INSERT INTO events (type, data) VALUES ($1, $2)",
&[&"user_created", &"{\"user_id\": 123}"]
).await?;
// 发送通知
transaction.execute(
"NOTIFY events, 'user_created'",
&[]
).await?;
transaction.commit().await?;
Ok(())
}
// 监听通知
async fn listen_for_events(mut client: Client) -> Result<(), Error> {
client.execute("LISTEN events", &[]).await?;
loop {
let notification = client.notifications().next().await;
if let Some(notif) = notification {
println!("Received: {} on channel {}", notif.payload(), notif.channel());
}
}
}
---
2.4 sea-orm事务
01.SeaORM特性
a.异步ORM
SeaORM是Rust生态中的现代异步ORM框架,支持MySQL、PostgreSQL、SQLite,提供类型安全的查询构建和完整的事务支持。
b.实体驱动
通过实体定义数据模型,自动生成类型安全的查询接口,与事务无缝集成。
02.基础事务操作
a.begin方法
使用DatabaseConnection的begin方法创建事务,返回DatabaseTransaction对象。
b.基础示例
---
use sea_orm::{Database, DatabaseConnection, DatabaseTransaction};
use sea_orm::{entity::*, query::*};
use anyhow::Result;
// SeaORM基础事务
async fn create_user_transaction(db: &DatabaseConnection) -> Result<()> {
let txn: DatabaseTransaction = db.begin().await?;
// 插入用户实体
let user = user::ActiveModel {
name: Set("赵六".to_owned()),
email: Set("[email protected]".to_owned()),
..Default::default()
};
let user_result = user.insert(&txn).await?;
// 插入关联的配置
let profile = user_profile::ActiveModel {
user_id: Set(user_result.id),
bio: Set("个人简介".to_owned()),
..Default::default()
};
profile.insert(&txn).await?;
txn.commit().await?;
Ok(())
}
---
03.实体操作与事务
a.CRUD操作
SeaORM的实体CRUD操作可以在事务中执行,保证数据一致性。
b.实体操作示例
---
use sea_orm::{DatabaseConnection, DatabaseTransaction, EntityTrait, Set};
use anyhow::Result;
// 实体更新事务
async fn update_user_transaction(
db: &DatabaseConnection,
user_id: i32,
new_email: String
) -> Result<()> {
let txn = db.begin().await?;
// 查询用户
let user = User::find_by_id(user_id)
.one(&txn)
.await?
.ok_or_else(|| anyhow::anyhow!("用户不存在"))?;
// 更新用户
let mut user: user::ActiveModel = user.into();
user.email = Set(new_email);
user.updated_at = Set(chrono::Utc::now().naive_utc());
user.update(&txn).await?;
// 记录操作日志
let log = audit_log::ActiveModel {
user_id: Set(user_id),
action: Set("update_email".to_owned()),
timestamp: Set(chrono::Utc::now().naive_utc()),
..Default::default()
};
log.insert(&txn).await?;
txn.commit().await?;
Ok(())
}
---
04.关联查询与事务
a.关联加载
SeaORM支持在事务中进行关联查询,通过find_also_related等方法加载关联数据。
b.关联查询示例
---
use sea_orm::{DatabaseConnection, EntityTrait, Related};
use anyhow::Result;
// 关联查询事务
async fn query_user_with_orders(
db: &DatabaseConnection,
user_id: i32
) -> Result<(user::Model, Vec<order::Model>)> {
let txn = db.begin().await?;
// 查询用户及其订单
let user = User::find_by_id(user_id)
.one(&txn)
.await?
.ok_or_else(|| anyhow::anyhow!("用户不存在"))?;
let orders = user
.find_related(Order)
.all(&txn)
.await?;
txn.commit().await?;
Ok((user, orders))
}
// 复杂关联操作
async fn transfer_order_ownership(
db: &DatabaseConnection,
order_id: i32,
new_user_id: i32
) -> Result<()> {
let txn = db.begin().await?;
// 查询订单
let order = Order::find_by_id(order_id)
.one(&txn)
.await?
.ok_or_else(|| anyhow::anyhow!("订单不存在"))?;
// 更新订单所有者
let mut order: order::ActiveModel = order.into();
order.user_id = Set(new_user_id);
order.update(&txn).await?;
// 更新用户订单计数
User::update_many()
.col_expr(user::Column::OrderCount, Expr::col(user::Column::OrderCount).sub(1))
.filter(user::Column::Id.eq(order.user_id.unwrap()))
.exec(&txn)
.await?;
User::update_many()
.col_expr(user::Column::OrderCount, Expr::col(user::Column::OrderCount).add(1))
.filter(user::Column::Id.eq(new_user_id))
.exec(&txn)
.await?;
txn.commit().await?;
Ok(())
}
---
05.批量操作与事务
a.批量插入
SeaORM支持在事务中进行批量插入操作,提高性能。
b.批量操作示例
---
use sea_orm::{DatabaseConnection, EntityTrait, Set};
use anyhow::Result;
// 批量插入事务
async fn batch_insert_users(
db: &DatabaseConnection,
users: Vec<(String, String)>
) -> Result<()> {
let txn = db.begin().await?;
// 构建批量插入数据
let user_models: Vec<user::ActiveModel> = users
.into_iter()
.map(|(name, email)| user::ActiveModel {
name: Set(name),
email: Set(email),
created_at: Set(chrono::Utc::now().naive_utc()),
..Default::default()
})
.collect();
// 批量插入
User::insert_many(user_models)
.exec(&txn)
.await?;
txn.commit().await?;
Ok(())
}
// 批量更新事务
async fn batch_update_status(
db: &DatabaseConnection,
order_ids: Vec<i32>,
new_status: String
) -> Result<()> {
let txn = db.begin().await?;
Order::update_many()
.col_expr(order::Column::Status, Expr::value(new_status))
.col_expr(order::Column::UpdatedAt, Expr::current_timestamp())
.filter(order::Column::Id.is_in(order_ids))
.exec(&txn)
.await?;
txn.commit().await?;
Ok(())
}
---
06.迁移与事务
a.Schema迁移
SeaORM的迁移系统自动在事务中执行DDL操作,确保schema变更的原子性。
b.迁移示例
---
use sea_orm_migration::prelude::*;
// 定义迁移
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// 在事务中执行多个DDL操作
manager
.create_table(
Table::create()
.table(User::Table)
.if_not_exists()
.col(
ColumnDef::new(User::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(User::Name).string().not_null())
.col(ColumnDef::new(User::Email).string().not_null())
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_user_email")
.table(User::Table)
.col(User::Email)
.unique()
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(User::Table).to_owned())
.await
}
}
---
2.5 事务提交与回滚
01.提交机制
a.显式提交
调用commit方法显式提交事务,将所有修改持久化到数据库,事务成功结束。
b.提交示例
---
use sqlx::PgPool;
use anyhow::Result;
// 显式提交事务
async fn explicit_commit_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 执行多个操作
sqlx::query("INSERT INTO users (name) VALUES ($1)")
.bind("用户A")
.execute(&mut *tx)
.await?;
sqlx::query("INSERT INTO users (name) VALUES ($1)")
.bind("用户B")
.execute(&mut *tx)
.await?;
// 显式提交:所有操作持久化
tx.commit().await?;
println!("事务已提交");
Ok(())
}
// 条件提交
async fn conditional_commit(pool: &PgPool, should_commit: bool) -> Result<()> {
let mut tx = pool.begin().await?;
sqlx::query("UPDATE counters SET value = value + 1")
.execute(&mut *tx)
.await?;
if should_commit {
tx.commit().await?;
println!("事务已提交");
} else {
// 不提交,事务会在drop时自动回滚
println!("事务将被回滚");
}
Ok(())
}
---
02.回滚机制
a.显式回滚
调用rollback方法显式回滚事务,撤销所有未提交的修改,事务终止。
b.回滚示例
---
use sqlx::PgPool;
use anyhow::Result;
// 显式回滚事务
async fn explicit_rollback_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 执行操作
sqlx::query("INSERT INTO temp_data (value) VALUES ($1)")
.bind(100)
.execute(&mut *tx)
.await?;
// 检查某个条件
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM temp_data")
.fetch_one(&mut *tx)
.await?;
if count.0 > 1000 {
// 显式回滚
tx.rollback().await?;
return Err(anyhow::anyhow!("数据量超限,事务已回滚"));
}
tx.commit().await?;
Ok(())
}
// 错误时回滚
async fn rollback_on_error(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
let result = async {
sqlx::query("INSERT INTO orders (amount) VALUES ($1)")
.bind(100)
.execute(&mut *tx)
.await?;
// 模拟业务逻辑错误
if true {
return Err(anyhow::anyhow!("业务逻辑错误"));
}
Ok::<_, anyhow::Error>(())
}.await;
match result {
Ok(_) => tx.commit().await?,
Err(e) => {
tx.rollback().await?;
return Err(e);
}
}
Ok(())
}
---
03.自动回滚
a.Drop时回滚
Transaction对象被drop时,如果未调用commit,会自动回滚事务,这是Rust RAII机制的体现。
b.自动回滚示例
---
use sqlx::PgPool;
use anyhow::Result;
// 自动回滚:函数返回时tx被drop
async fn auto_rollback_on_return(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("这条记录不会被保存")
.execute(&mut *tx)
.await?;
// 函数返回,tx被drop,自动回滚
Ok(())
}
// 自动回滚:提前返回
async fn auto_rollback_on_early_return(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
sqlx::query("INSERT INTO users (name) VALUES ($1)")
.bind("临时用户")
.execute(&mut *tx)
.await?;
// 检查条件,提前返回
let exists: (bool,) = sqlx::query_as(
"SELECT EXISTS(SELECT 1 FROM users WHERE name = $1)"
)
.bind("临时用户")
.fetch_one(&mut *tx)
.await?;
if exists.0 {
// 提前返回,tx被drop,自动回滚
return Ok(());
}
tx.commit().await?;
Ok(())
}
// 自动回滚:panic时
async fn auto_rollback_on_panic(pool: &PgPool) {
let mut tx = pool.begin().await.unwrap();
sqlx::query("INSERT INTO critical_data (value) VALUES ($1)")
.bind(999)
.execute(&mut *tx)
.await
.unwrap();
// panic发生,tx被drop,自动回滚
panic!("发生panic,事务会自动回滚");
}
---
04.部分提交与Savepoint
a.Savepoint概念
Savepoint允许在事务中设置保存点,可以回滚到保存点而不是回滚整个事务。
b.Savepoint示例
---
use sqlx::PgPool;
use anyhow::Result;
// 使用Savepoint实现部分回滚
async fn savepoint_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 操作1:创建用户
sqlx::query("INSERT INTO users (name) VALUES ($1)")
.bind("主用户")
.execute(&mut *tx)
.await?;
// 设置保存点
sqlx::query("SAVEPOINT sp1")
.execute(&mut *tx)
.await?;
// 操作2:尝试创建配置
let config_result = sqlx::query("INSERT INTO user_configs (user_id, key) VALUES ($1, $2)")
.bind(1)
.bind("invalid_key")
.execute(&mut *tx)
.await;
if config_result.is_err() {
// 回滚到保存点,操作1保留
sqlx::query("ROLLBACK TO SAVEPOINT sp1")
.execute(&mut *tx)
.await?;
println!("配置创建失败,已回滚到保存点");
}
// 提交事务:操作1被保留
tx.commit().await?;
Ok(())
}
// 嵌套Savepoint
async fn nested_savepoint_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 第一层操作
sqlx::query("INSERT INTO orders (amount) VALUES ($1)")
.bind(100)
.execute(&mut *tx)
.await?;
sqlx::query("SAVEPOINT sp1").execute(&mut *tx).await?;
// 第二层操作
sqlx::query("INSERT INTO order_items (order_id, product_id) VALUES ($1, $2)")
.bind(1)
.bind(101)
.execute(&mut *tx)
.await?;
sqlx::query("SAVEPOINT sp2").execute(&mut *tx).await?;
// 第三层操作
let result = sqlx::query("INSERT INTO payments (order_id, amount) VALUES ($1, $2)")
.bind(1)
.bind(100)
.execute(&mut *tx)
.await;
if result.is_err() {
// 只回滚第三层
sqlx::query("ROLLBACK TO SAVEPOINT sp2").execute(&mut *tx).await?;
}
tx.commit().await?;
Ok(())
}
---
05.两阶段提交
a.Prepare阶段
在分布式事务中,prepare阶段准备提交但不实际提交,等待协调者的最终决定。
b.两阶段提交示例
---
use sqlx::PgPool;
use anyhow::Result;
// 模拟两阶段提交
async fn two_phase_commit_demo(pool: &PgPool, tx_id: &str) -> Result<()> {
let mut tx = pool.begin().await?;
// 执行业务操作
sqlx::query("INSERT INTO distributed_orders (tx_id, amount) VALUES ($1, $2)")
.bind(tx_id)
.bind(100)
.execute(&mut *tx)
.await?;
// Prepare阶段:准备提交
sqlx::query(&format!("PREPARE TRANSACTION '{}'", tx_id))
.execute(&mut *tx)
.await?;
// 此时事务处于prepared状态,等待commit或rollback
Ok(())
}
// 提交prepared事务
async fn commit_prepared(pool: &PgPool, tx_id: &str) -> Result<()> {
sqlx::query(&format!("COMMIT PREPARED '{}'", tx_id))
.execute(pool)
.await?;
Ok(())
}
// 回滚prepared事务
async fn rollback_prepared(pool: &PgPool, tx_id: &str) -> Result<()> {
sqlx::query(&format!("ROLLBACK PREPARED '{}'", tx_id))
.execute(pool)
.await?;
Ok(())
}
---
2.6 保存点:Savepoint
01.Savepoint概念
a.定义
Savepoint是事务中的命名检查点,允许部分回滚事务而不影响整个事务,提供更细粒度的事务控制。
b.使用场景
适用于复杂业务流程,某些步骤失败时只回滚该步骤,保留其他已完成的操作,避免重复执行。
02.创建Savepoint
a.SAVEPOINT命令
使用SAVEPOINT命令创建保存点,为保存点指定唯一名称。
b.创建示例
---
use sqlx::PgPool;
use anyhow::Result;
// 创建和使用Savepoint
async fn create_savepoint_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 第一步:创建订单
let order_id: (i32,) = sqlx::query_as(
"INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING id"
)
.bind(1)
.bind(100)
.fetch_one(&mut *tx)
.await?;
// 创建保存点sp1
sqlx::query("SAVEPOINT sp1")
.execute(&mut *tx)
.await?;
// 第二步:添加订单项
sqlx::query("INSERT INTO order_items (order_id, product_id, quantity) VALUES ($1, $2, $3)")
.bind(order_id.0)
.bind(101)
.bind(2)
.execute(&mut *tx)
.await?;
// 创建保存点sp2
sqlx::query("SAVEPOINT sp2")
.execute(&mut *tx)
.await?;
// 第三步:扣减库存
let stock_result = sqlx::query("UPDATE products SET stock = stock - $1 WHERE id = $2 AND stock >= $1")
.bind(2)
.bind(101)
.execute(&mut *tx)
.await;
if stock_result.is_err() || stock_result.unwrap().rows_affected() == 0 {
// 库存不足,回滚到sp2
sqlx::query("ROLLBACK TO SAVEPOINT sp2")
.execute(&mut *tx)
.await?;
println!("库存不足,已回滚库存扣减操作");
}
tx.commit().await?;
Ok(())
}
---
03.回滚到Savepoint
a.ROLLBACK TO命令
使用ROLLBACK TO SAVEPOINT命令回滚到指定保存点,撤销该保存点之后的所有操作。
b.回滚示例
---
use sqlx::PgPool;
use anyhow::Result;
// 回滚到Savepoint
async fn rollback_to_savepoint_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 插入主记录
sqlx::query("INSERT INTO main_records (data) VALUES ($1)")
.bind("主数据")
.execute(&mut *tx)
.await?;
sqlx::query("SAVEPOINT before_details").execute(&mut *tx).await?;
// 尝试插入详细记录
for i in 0..5 {
let result = sqlx::query("INSERT INTO detail_records (main_id, value) VALUES ($1, $2)")
.bind(1)
.bind(i)
.execute(&mut *tx)
.await;
if result.is_err() {
// 某条详细记录失败,回滚所有详细记录
sqlx::query("ROLLBACK TO SAVEPOINT before_details")
.execute(&mut *tx)
.await?;
println!("详细记录插入失败,已回滚");
break;
}
}
// 主记录仍然保留
tx.commit().await?;
Ok(())
}
---
04.释放Savepoint
a.RELEASE命令
使用RELEASE SAVEPOINT命令释放保存点,之后无法再回滚到该保存点,但不影响事务继续执行。
b.释放示例
---
use sqlx::PgPool;
use anyhow::Result;
// 释放Savepoint
async fn release_savepoint_demo(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("步骤1")
.execute(&mut *tx)
.await?;
sqlx::query("SAVEPOINT step1").execute(&mut *tx).await?;
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("步骤2")
.execute(&mut *tx)
.await?;
// 步骤2成功,释放保存点
sqlx::query("RELEASE SAVEPOINT step1")
.execute(&mut *tx)
.await?;
// 此后无法回滚到step1
sqlx::query("INSERT INTO logs (message) VALUES ($1)")
.bind("步骤3")
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
---
05.嵌套Savepoint
a.多层保存点
可以创建多个嵌套的保存点,形成保存点栈,支持多层次的部分回滚。
b.嵌套示例
---
use sqlx::PgPool;
use anyhow::Result;
// 嵌套Savepoint处理复杂业务
async fn nested_savepoint_business(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 层级1:创建订单
let order_id: (i32,) = sqlx::query_as(
"INSERT INTO orders (user_id, status) VALUES ($1, $2) RETURNING id"
)
.bind(1)
.bind("pending")
.fetch_one(&mut *tx)
.await?;
sqlx::query("SAVEPOINT order_created").execute(&mut *tx).await?;
// 层级2:添加订单项
let mut items_added = 0;
for product_id in vec![101, 102, 103] {
sqlx::query(&format!("SAVEPOINT item_{}", product_id))
.execute(&mut *tx)
.await?;
let result = sqlx::query(
"INSERT INTO order_items (order_id, product_id, quantity) VALUES ($1, $2, $3)"
)
.bind(order_id.0)
.bind(product_id)
.bind(1)
.execute(&mut *tx)
.await;
if result.is_ok() {
items_added += 1;
// 成功,释放该项的保存点
sqlx::query(&format!("RELEASE SAVEPOINT item_{}", product_id))
.execute(&mut *tx)
.await?;
} else {
// 失败,回滚该项
sqlx::query(&format!("ROLLBACK TO SAVEPOINT item_{}", product_id))
.execute(&mut *tx)
.await?;
}
}
if items_added == 0 {
// 没有任何订单项,回滚整个订单
sqlx::query("ROLLBACK TO SAVEPOINT order_created")
.execute(&mut *tx)
.await?;
return Err(anyhow::anyhow!("订单创建失败:无有效商品"));
}
tx.commit().await?;
Ok(())
}
---
06.Savepoint最佳实践
a.命名规范
使用有意义的保存点名称,反映业务逻辑,便于理解和维护。
b.及时释放
成功完成的步骤应及时释放保存点,减少内存开销。
c.错误处理
---
use sqlx::PgPool;
use anyhow::Result;
// Savepoint错误处理最佳实践
async fn savepoint_best_practice(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 主操作
sqlx::query("INSERT INTO main_table (data) VALUES ($1)")
.bind("主数据")
.execute(&mut *tx)
.await?;
// 可选操作:使用Savepoint保护
let optional_result = async {
sqlx::query("SAVEPOINT optional_op").execute(&mut *tx).await?;
let result = sqlx::query("INSERT INTO optional_table (data) VALUES ($1)")
.bind("可选数据")
.execute(&mut *tx)
.await;
match result {
Ok(_) => {
// 成功,释放保存点
sqlx::query("RELEASE SAVEPOINT optional_op").execute(&mut *tx).await?;
Ok(true)
}
Err(_) => {
// 失败,回滚到保存点
sqlx::query("ROLLBACK TO SAVEPOINT optional_op").execute(&mut *tx).await?;
Ok(false)
}
}
}.await?;
if optional_result {
println!("可选操作成功");
} else {
println!("可选操作失败,但主操作保留");
}
tx.commit().await?;
Ok(())
}
---
2.7 嵌套事务
01.嵌套事务概念
a.定义
嵌套事务是在一个事务内部开启另一个事务,形成父子事务关系,子事务可以独立提交或回滚。
b.实现机制
大多数数据库通过Savepoint机制实现嵌套事务,子事务实际上是父事务中的保存点。
02.数据库层面的嵌套事务
a.PostgreSQL实现
PostgreSQL不支持真正的嵌套事务,但通过Savepoint可以模拟嵌套事务行为。
b.PostgreSQL示例
---
use sqlx::PgPool;
use anyhow::Result;
// PostgreSQL嵌套事务(通过Savepoint实现)
async fn postgres_nested_transaction(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 外层事务:创建用户
let user_id: (i32,) = sqlx::query_as(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id"
)
.bind("外层用户")
.bind("[email protected]")
.fetch_one(&mut *tx)
.await?;
// 模拟嵌套事务:创建保存点
sqlx::query("SAVEPOINT nested_tx").execute(&mut *tx).await?;
// 内层事务:创建用户配置
let config_result = sqlx::query(
"INSERT INTO user_configs (user_id, theme) VALUES ($1, $2)"
)
.bind(user_id.0)
.bind("dark")
.execute(&mut *tx)
.await;
if config_result.is_err() {
// 内层事务失败,回滚到保存点
sqlx::query("ROLLBACK TO SAVEPOINT nested_tx")
.execute(&mut *tx)
.await?;
println!("内层事务回滚,外层事务继续");
} else {
// 内层事务成功,释放保存点
sqlx::query("RELEASE SAVEPOINT nested_tx")
.execute(&mut *tx)
.await?;
}
// 外层事务提交
tx.commit().await?;
Ok(())
}
---
03.应用层面的嵌套事务
a.手动管理
在应用层通过函数调用栈实现嵌套事务逻辑,每层函数负责自己的事务边界。
b.应用层示例
---
use sqlx::{PgPool, Postgres, Transaction};
use anyhow::Result;
// 内层事务函数
async fn inner_transaction(tx: &mut Transaction<'_, Postgres>) -> Result<()> {
// 创建保存点
sqlx::query("SAVEPOINT inner").execute(&mut **tx).await?;
let result = sqlx::query("INSERT INTO inner_table (data) VALUES ($1)")
.bind("内层数据")
.execute(&mut **tx)
.await;
match result {
Ok(_) => {
sqlx::query("RELEASE SAVEPOINT inner").execute(&mut **tx).await?;
Ok(())
}
Err(e) => {
sqlx::query("ROLLBACK TO SAVEPOINT inner").execute(&mut **tx).await?;
Err(e.into())
}
}
}
// 外层事务函数
async fn outer_transaction(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 外层操作
sqlx::query("INSERT INTO outer_table (data) VALUES ($1)")
.bind("外层数据")
.execute(&mut *tx)
.await?;
// 调用内层事务
let inner_result = inner_transaction(&mut tx).await;
if inner_result.is_err() {
println!("内层事务失败,但外层继续");
}
// 外层提交
tx.commit().await?;
Ok(())
}
---
04.Diesel嵌套事务
a.自动Savepoint
Diesel支持嵌套事务,内层transaction调用自动创建Savepoint。
b.Diesel示例
---
use diesel::prelude::*;
use diesel::pg::PgConnection;
use anyhow::Result;
// Diesel嵌套事务
fn diesel_nested_transaction(conn: &mut PgConnection) -> Result<()> {
conn.transaction(|conn| {
// 外层事务操作
diesel::insert_into(users::table)
.values(users::name.eq("外层用户"))
.execute(conn)?;
// 内层事务:自动创建Savepoint
let inner_result = conn.transaction(|conn| {
diesel::insert_into(user_profiles::table)
.values((
user_profiles::user_id.eq(1),
user_profiles::bio.eq("内层数据"),
))
.execute(conn)?;
Ok::<_, diesel::result::Error>(())
});
// 内层事务失败不影响外层
if inner_result.is_err() {
println!("内层事务失��");
}
Ok(())
})
}
// 多层嵌套
fn multi_level_nested(conn: &mut PgConnection) -> Result<()> {
conn.transaction(|conn| {
// 第一层
diesel::insert_into(level1::table)
.values(level1::data.eq("层级1"))
.execute(conn)?;
conn.transaction(|conn| {
// 第二层
diesel::insert_into(level2::table)
.values(level2::data.eq("层级2"))
.execute(conn)?;
conn.transaction(|conn| {
// 第三层
diesel::insert_into(level3::table)
.values(level3::data.eq("层级3"))
.execute(conn)?;
Ok(())
})?;
Ok(())
})?;
Ok(())
})
}
---
05.嵌套事务的错误处理
a.异常传播
内层事务的错误可以选择性地传播到外层,或在内层捕获处理。
b.错误处理示例
---
use sqlx::PgPool;
use anyhow::Result;
// 嵌套事务错误处理
async fn nested_error_handling(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 外层操作
sqlx::query("INSERT INTO orders (status) VALUES ($1)")
.bind("pending")
.execute(&mut *tx)
.await?;
// 内层事务1:必须成功
sqlx::query("SAVEPOINT critical").execute(&mut *tx).await?;
let critical_result = sqlx::query("INSERT INTO critical_data (value) VALUES ($1)")
.bind(100)
.execute(&mut *tx)
.await;
if critical_result.is_err() {
// 关键操作失败,回滚整个事务
tx.rollback().await?;
return Err(anyhow::anyhow!("关键操作失败"));
}
sqlx::query("RELEASE SAVEPOINT critical").execute(&mut *tx).await?;
// 内层事务2:可选操作
sqlx::query("SAVEPOINT optional").execute(&mut *tx).await?;
let optional_result = sqlx::query("INSERT INTO optional_data (value) VALUES ($1)")
.bind(200)
.execute(&mut *tx)
.await;
if optional_result.is_err() {
// 可选操作失败,只回滚该操作
sqlx::query("ROLLBACK TO SAVEPOINT optional").execute(&mut *tx).await?;
println!("可选操作失败,继续执行");
} else {
sqlx::query("RELEASE SAVEPOINT optional").execute(&mut *tx).await?;
}
tx.commit().await?;
Ok(())
}
---
06.嵌套事务最佳实践
a.避免过深嵌套
嵌套层级不宜过深,一般不超过3层,过深会增加复杂度和维护成本。
b.明确事务边界
每层事务的职责应清晰,避免跨层操作导致的混乱。
c.实践示例
---
use sqlx::PgPool;
use anyhow::Result;
// 嵌套事务最佳实践
async fn nested_best_practice(pool: &PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
// 主业务逻辑
let order_id = create_order(&mut tx).await?;
// 子业务1:添加订单项(可失败)
let items_result = add_order_items(&mut tx, order_id).await;
if items_result.is_err() {
println!("订单项添加失败,订单仍然创建");
}
// 子业务2:发送通知(可失败)
let notify_result = send_notification(&mut tx, order_id).await;
if notify_result.is_err() {
println!("通知发送失败,不影响订单");
}
tx.commit().await?;
Ok(())
}
async fn create_order(tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<i32> {
let order_id: (i32,) = sqlx::query_as(
"INSERT INTO orders (status) VALUES ($1) RETURNING id"
)
.bind("pending")
.fetch_one(&mut **tx)
.await?;
Ok(order_id.0)
}
async fn add_order_items(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
order_id: i32
) -> Result<()> {
sqlx::query("SAVEPOINT add_items").execute(&mut **tx).await?;
let result = sqlx::query("INSERT INTO order_items (order_id, product_id) VALUES ($1, $2)")
.bind(order_id)
.bind(101)
.execute(&mut **tx)
.await;
match result {
Ok(_) => {
sqlx::query("RELEASE SAVEPOINT add_items").execute(&mut **tx).await?;
Ok(())
}
Err(e) => {
sqlx::query("ROLLBACK TO SAVEPOINT add_items").execute(&mut **tx).await?;
Err(e.into())
}
}
}
async fn send_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
order_id: i32
) -> Result<()> {
sqlx::query("SAVEPOINT notify").execute(&mut **tx).await?;
let result = sqlx::query("INSERT INTO notifications (order_id, message) VALUES ($1, $2)")
.bind(order_id)
.bind("订单已创建")
.execute(&mut **tx)
.await;
match result {
Ok(_) => {
sqlx::query("RELEASE SAVEPOINT notify").execute(&mut **tx).await?;
Ok(())
}
Err(e) => {
sqlx::query("ROLLBACK TO SAVEPOINT notify").execute(&mut **tx).await?;
Err(e.into())
}
}
}
---
3 STM软件事务内存
3.1 STM概念
01.STM基本原理
a.事务内存概念
STM将数据库事务的ACID特性应用到内存操作中,通过版本控制和冲突检测实现无锁并发,事务执行时记录读写集合,提交时检测冲突并自动重试。
b.核心机制
---
use stm::{atomically, TVar};
use std::sync::Arc;
// STM基础示例:银行转账
fn transfer_money(
from: Arc<TVar<i32>>,
to: Arc<TVar<i32>>,
amount: i32
) {
atomically(|trans| {
// 读取余额
let from_balance = from.read(trans)?;
let to_balance = to.read(trans)?;
// 检查余额
if from_balance < amount {
return Err(stm::StmError::Retry);
}
// 原子更新
from.write(trans, from_balance - amount)?;
to.write(trans, to_balance + amount)?;
Ok(())
});
}
---
02.STM与锁的对比
a.死锁问题
传统锁需要小心管理锁顺序避免死锁,STM通过自动重试机制完全避免死锁问题,简化并发编程。
b.组合性
---
use stm::{atomically, TVar};
use std::sync::{Arc, Mutex};
// 传统锁:难以组合
fn lock_based_transfer(
from: Arc<Mutex<i32>>,
to: Arc<Mutex<i32>>,
amount: i32
) -> Result<(), String> {
// 必须按固定顺序获取锁
let mut from_guard = from.lock().unwrap();
let mut to_guard = to.lock().unwrap();
if *from_guard < amount {
return Err("余额不足".to_string());
}
*from_guard -= amount;
*to_guard += amount;
Ok(())
}
// STM:易于组合
fn stm_based_transfer(
from: Arc<TVar<i32>>,
to: Arc<TVar<i32>>,
amount: i32
) {
atomically(|trans| {
let from_balance = from.read(trans)?;
if from_balance < amount {
return Err(stm::StmError::Retry);
}
let to_balance = to.read(trans)?;
from.write(trans, from_balance - amount)?;
to.write(trans, to_balance + amount)?;
Ok(())
});
}
---
03.ACID特性
a.原子性保证
STM事务中的所有操作要么全部成功,要么全部失败,不会出现部分更新的情况。
b.隔离性实现
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
// 多线程并发访问
fn concurrent_updates() {
let counter = Arc::new(TVar::new(0));
let mut handles = vec![];
// 启动10个线程同时更新
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..100 {
atomically(|trans| {
let value = counter_clone.read(trans)?;
counter_clone.write(trans, value + 1)?;
Ok(())
});
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 验证结果:应该是1000
let final_value = atomically(|trans| counter.read(trans));
println!("最终值: {}", final_value); // 输出: 1000
}
---
04.适用场景
a.复杂并发逻辑
需要协调多个共享变量更新的场景,如多账户转账、库存管理、配置更新等。
b.高竞争环境
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
// 高竞争场景:多生产者多消费者
struct SharedQueue {
items: Arc<TVar<Vec<i32>>>,
capacity: usize,
}
impl SharedQueue {
fn new(capacity: usize) -> Self {
SharedQueue {
items: Arc::new(TVar::new(Vec::new())),
capacity,
}
}
// 生产者:队列满时阻塞
fn push(&self, item: i32) {
atomically(|trans| {
let mut queue = self.items.read(trans)?;
if queue.len() >= self.capacity {
return Err(stm::StmError::Retry);
}
queue.push(item);
self.items.write(trans, queue)?;
Ok(())
});
}
// 消费者:队列空时阻塞
fn pop(&self) -> i32 {
atomically(|trans| {
let mut queue = self.items.read(trans)?;
if queue.is_empty() {
return Err(stm::StmError::Retry);
}
let item = queue.remove(0);
self.items.write(trans, queue)?;
Ok(item)
})
}
}
---
3.2 stm库
01.stm库简介
a.Rust生态STM实现
stm是Rust中最成熟的软件事务内存库,提供完整的TVar、atomically、retry和orElse等核心功能。
b.依赖配置
---
// Cargo.toml
[dependencies]
stm = "0.4"
---
02.基础API
a.TVar创建
TVar是事务变量,所有STM操作都基于TVar进行,支持任何实现Clone的类型。
b.创建示例
---
use stm::TVar;
// 创建基本类型TVar
let counter = TVar::new(0);
let message = TVar::new(String::from("Hello"));
// 创建复杂类型TVar
#[derive(Clone)]
struct User {
id: u32,
name: String,
balance: i64,
}
let user = TVar::new(User {
id: 1,
name: "Alice".to_string(),
balance: 1000,
});
---
03.atomically函数
a.事务执行
atomically函数接受一个闭包,在闭包中执行所有事务操作,自动处理冲突和重试。
b.使用示例
---
use stm::{atomically, TVar};
let balance = TVar::new(100);
// 读取操作
let value = atomically(|trans| {
balance.read(trans)
});
println!("余额: {}", value);
// 写入操作
atomically(|trans| {
balance.write(trans, 200)?;
Ok(())
});
// 读-修改-写操作
atomically(|trans| {
let current = balance.read(trans)?;
balance.write(trans, current + 50)?;
Ok(())
});
---
04.多变量事务
a.原子性保证
STM可以在一个事务中操作多个TVar,保证所有操作的原子性。
b.多变量示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
struct BankSystem {
accounts: Vec<Arc<TVar<i64>>>,
}
impl BankSystem {
fn new(num_accounts: usize, initial: i64) -> Self {
let accounts = (0..num_accounts)
.map(|_| Arc::new(TVar::new(initial)))
.collect();
BankSystem { accounts }
}
// 原子转账
fn transfer(&self, from: usize, to: usize, amount: i64) -> Result<(), String> {
if from >= self.accounts.len() || to >= self.accounts.len() {
return Err("无效账户".to_string());
}
atomically(|trans| {
let from_balance = self.accounts[from].read(trans)?;
if from_balance < amount {
return Err(stm::StmError::Retry);
}
let to_balance = self.accounts[to].read(trans)?;
self.accounts[from].write(trans, from_balance - amount)?;
self.accounts[to].write(trans, to_balance + amount)?;
Ok(())
});
Ok(())
}
// 获取总余额
fn total_balance(&self) -> i64 {
atomically(|trans| {
let mut total = 0;
for account in &self.accounts {
total += account.read(trans)?;
}
Ok(total)
})
}
}
---
05.错误处理
a.StmError类型
STM操作返回StmResult类型,可能返回Retry错误表示需要重试事务。
b.错误处理示例
---
use stm::{atomically, TVar, StmError, StmResult};
let balance = TVar::new(50);
let min_balance = 100;
// 处理事务错误
let result: StmResult<()> = atomically(|trans| {
let value = balance.read(trans)?;
if value < min_balance {
// 余额不足,重试
return Err(StmError::Retry);
}
balance.write(trans, value - 30)?;
Ok(())
});
match result {
Ok(_) => println!("扣款成功"),
Err(StmError::Retry) => println!("余额不足,等待充值"),
Err(e) => println!("其他错误: {:?}", e),
}
---
06.性能优化
a.减少事务范围
保持事务简短可以减少冲突概率,提高并发性能。
b.优化示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
// 不推荐:事务范围过大
fn bad_practice(data: Arc<TVar<Vec<i32>>>) {
atomically(|trans| {
let mut vec = data.read(trans)?;
// 耗时计算在事务内
for i in 0..1000 {
vec.push(i * i);
}
data.write(trans, vec)?;
Ok(())
});
}
// 推荐:缩小事务范围
fn good_practice(data: Arc<TVar<Vec<i32>>>) {
// 耗时计算在事务外
let mut new_data = Vec::new();
for i in 0..1000 {
new_data.push(i * i);
}
// 只在事务内更新
atomically(|trans| {
let mut vec = data.read(trans)?;
vec.extend(new_data.clone());
data.write(trans, vec)?;
Ok(())
});
}
---
3.3 TVar变量
01.TVar基础
a.事务变量定义
TVar是STM中的核心数据结构,代表可以在事务中安全读写的变量,支持任何实现Clone trait的类型。
b.创建TVar
---
use stm::TVar;
// 基本类型
let counter = TVar::new(0);
let flag = TVar::new(true);
let message = TVar::new(String::from("Hello"));
// 复杂类型
#[derive(Clone)]
struct Config {
host: String,
port: u16,
timeout: u64,
}
let config = TVar::new(Config {
host: "localhost".to_string(),
port: 8080,
timeout: 30,
});
---
02.读取操作
a.read方法
在事务中使用read方法读取TVar的值,返回值的克隆。
b.读取示例
---
use stm::{atomically, TVar};
let balance = TVar::new(1000);
// 单个读取
let value = atomically(|trans| {
balance.read(trans)
});
println!("余额: {}", value);
// 多个读取
let account1 = TVar::new(500);
let account2 = TVar::new(300);
let (val1, val2) = atomically(|trans| {
let v1 = account1.read(trans)?;
let v2 = account2.read(trans)?;
Ok((v1, v2))
});
println!("账户1: {}, 账户2: {}", val1, val2);
---
03.写入操作
a.write方法
在事务中使用write方法更新TVar的值,新值会在事务提交时生效。
b.写入示例
---
use stm::{atomically, TVar};
let counter = TVar::new(0);
// 直接写入
atomically(|trans| {
counter.write(trans, 42)?;
Ok(())
});
// 读-修改-写模式
atomically(|trans| {
let value = counter.read(trans)?;
counter.write(trans, value + 10)?;
Ok(())
});
let final_value = atomically(|trans| counter.read(trans));
println!("最终值: {}", final_value);
---
04.TVar共享
a.Arc包装
TVar通常使用Arc包装以在多个线程间共享,Arc提供线程安全的引用计数。
b.共享示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
let shared_counter = Arc::new(TVar::new(0));
let mut handles = vec![];
// 启动5个线程并发更新
for i in 0..5 {
let counter = Arc::clone(&shared_counter);
let handle = thread::spawn(move || {
for _ in 0..100 {
atomically(|trans| {
let value = counter.read(trans)?;
counter.write(trans, value + 1)?;
Ok(())
});
}
println!("线程 {} 完成", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let final_value = atomically(|trans| shared_counter.read(trans));
println!("最终计数: {}", final_value); // 输出: 500
---
05.复杂数据类型
a.结构体TVar
TVar可以存储复杂的结构体,但每次读写都会克隆整个结构体。
b.结构体示例
---
use stm::{atomically, TVar};
use std::collections::HashMap;
#[derive(Clone, Debug)]
struct Account {
id: u32,
owner: String,
balance: i64,
}
// 单个账户
let account = TVar::new(Account {
id: 1,
owner: "Alice".to_string(),
balance: 1000,
});
// 修改账户余额
atomically(|trans| {
let mut acc = account.read(trans)?;
acc.balance += 500;
account.write(trans, acc)?;
Ok(())
});
// HashMap存储多个账户
let accounts = TVar::new(HashMap::new());
atomically(|trans| {
let mut map = accounts.read(trans)?;
map.insert(1, Account {
id: 1,
owner: "Alice".to_string(),
balance: 1000,
});
map.insert(2, Account {
id: 2,
owner: "Bob".to_string(),
balance: 500,
});
accounts.write(trans, map)?;
Ok(())
});
// 查询账户
let alice_balance = atomically(|trans| {
let map = accounts.read(trans)?;
Ok(map.get(&1).map(|acc| acc.balance).unwrap_or(0))
});
println!("Alice余额: {}", alice_balance);
---
06.性能优化
a.减少克隆开销
对于大对象,使用Arc包装可以避免频繁克隆,只克隆Arc指针。
b.优化示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
// 方案1:直接存储大对象(每次读写都克隆)
#[derive(Clone)]
struct LargeData {
data: Vec<u8>, // 可能很大
}
let large = TVar::new(LargeData {
data: vec![0; 1_000_000],
});
// 方案2:使用Arc共享(只克隆指针)
#[derive(Clone)]
struct OptimizedData {
data: Arc<Vec<u8>>,
}
let optimized = TVar::new(OptimizedData {
data: Arc::new(vec![0; 1_000_000]),
});
// 方案2性能更好
atomically(|trans| {
let data = optimized.read(trans)?; // 只克隆Arc指针
println!("数据大小: {}", data.data.len());
Ok(())
});
---
3.4 原子操作
01.atomically执行
a.事务闭包
atomically函数接受一个闭包,在闭包中执行所有事务操作,保证原子性和隔离性。
b.基础示例
---
use stm::{atomically, TVar};
let x = TVar::new(10);
let y = TVar::new(20);
// 原子交换两个变量的值
atomically(|trans| {
let x_val = x.read(trans)?;
let y_val = y.read(trans)?;
x.write(trans, y_val)?;
y.write(trans, x_val)?;
Ok(())
});
let x_final = atomically(|trans| x.read(trans));
let y_final = atomically(|trans| y.read(trans));
println!("x = {}, y = {}", x_final, y_final); // x = 20, y = 10
---
02.多变量原子更新
a.原子性保证
即使多个线程同时修改相同变量,STM也会自动检测冲突并重试,保证原子性。
b.并发转账示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
let account1 = Arc::new(TVar::new(1000));
let account2 = Arc::new(TVar::new(500));
let mut handles = vec![];
// 线程1:A -> B 转账100
let acc1 = Arc::clone(&account1);
let acc2 = Arc::clone(&account2);
let h1 = thread::spawn(move || {
atomically(|trans| {
let balance1 = acc1.read(trans)?;
let balance2 = acc2.read(trans)?;
acc1.write(trans, balance1 - 100)?;
acc2.write(trans, balance2 + 100)?;
Ok(())
});
});
// 线程2:B -> A 转账50
let acc1 = Arc::clone(&account1);
let acc2 = Arc::clone(&account2);
let h2 = thread::spawn(move || {
atomically(|trans| {
let balance1 = acc1.read(trans)?;
let balance2 = acc2.read(trans)?;
acc2.write(trans, balance2 - 50)?;
acc1.write(trans, balance1 + 50)?;
Ok(())
});
});
h1.join().unwrap();
h2.join().unwrap();
// 验证总金额不变
let total = atomically(|trans| {
let b1 = account1.read(trans)?;
let b2 = account2.read(trans)?;
Ok(b1 + b2)
});
println!("总金额: {}", total); // 总是1500
---
03.复杂原子操作
a.库存管理
STM支持任意复杂的原子操作,如库存扣减、订单创建等多步骤操作。
b.库存示例
---
use stm::{atomically, TVar};
use std::collections::HashMap;
#[derive(Clone, Debug)]
struct Product {
id: u32,
name: String,
stock: i32,
}
let inventory = TVar::new(HashMap::new());
let total_value = TVar::new(0);
// 初始化库存
atomically(|trans| {
let mut inv = inventory.read(trans)?;
inv.insert(1, Product {
id: 1,
name: "笔记本".to_string(),
stock: 100,
});
inv.insert(2, Product {
id: 2,
name: "鼠标".to_string(),
stock: 200,
});
inventory.write(trans, inv)?;
total_value.write(trans, 300)?;
Ok(())
});
// 原子购买:减少库存并更新总值
let purchase = |product_id: u32, quantity: i32| {
atomically(|trans| {
let mut inv = inventory.read(trans)?;
let mut total = total_value.read(trans)?;
if let Some(product) = inv.get_mut(&product_id) {
if product.stock < quantity {
return Err(stm::StmError::Retry);
}
product.stock -= quantity;
total -= quantity;
inventory.write(trans, inv)?;
total_value.write(trans, total)?;
Ok(())
} else {
Err(stm::StmError::Retry)
}
})
};
purchase(1, 10);
purchase(2, 20);
---
04.条件原子操作
a.条件检查
在事务中可以进行条件判断,只有满足条件时才执行更新。
b.条件扣款示例
---
use stm::{atomically, TVar};
let balance = TVar::new(100);
let min_balance = 50;
// 条件扣款:只有余额足够时才扣款
let withdraw = |amount: i32| -> Result<(), String> {
atomically(|trans| {
let current = balance.read(trans)?;
if current - amount < min_balance {
return Err(stm::StmError::Retry);
}
balance.write(trans, current - amount)?;
Ok(())
});
Ok(())
};
match withdraw(30) {
Ok(_) => println!("扣款成功"),
Err(e) => println!("扣款失败: {}", e),
}
match withdraw(50) {
Ok(_) => println!("扣款成功"),
Err(e) => println!("扣款失败: {}", e),
}
---
05.嵌套事务
a.事务合并
atomically调用可以嵌套,内层事务会合并到外层事务中。
b.嵌套示例
---
use stm::{atomically, TVar};
fn increment(var: &TVar<i32>) {
atomically(|trans| {
let value = var.read(trans)?;
var.write(trans, value + 1)?;
Ok(())
});
}
let counter = TVar::new(0);
// 外层事务
atomically(|trans| {
let value = counter.read(trans)?;
counter.write(trans, value + 10)?;
// 内层事务会合并到外层
increment(&counter);
Ok(())
});
let final_value = atomically(|trans| counter.read(trans));
println!("最终值: {}", final_value); // 11
---
06.性能优化
a.减少事务范围
保持事务简短可以减少冲突概率,提高并发性能。
b.优化示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
let counter = Arc::new(TVar::new(0));
let num_threads = 4;
let iterations = 1000;
let start = Instant::now();
let mut handles = vec![];
for _ in 0..num_threads {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..iterations {
// 简短的原子操作
atomically(|trans| {
let value = counter_clone.read(trans)?;
counter_clone.write(trans, value + 1)?;
Ok(())
});
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let duration = start.elapsed();
let final_value = atomically(|trans| counter.read(trans));
println!("最终值: {}", final_value);
println!("耗时: {:?}", duration);
println!("吞吐量: {:.2} ops/sec",
(num_threads * iterations) as f64 / duration.as_secs_f64());
---
3.5 retry重试
01.retry机制
a.阻塞等待
retry会阻塞当前事务,直到相关的TVar被其他事务修改后再自动重试。
b.基础示例
---
use stm::{atomically, TVar, StmError};
let balance = TVar::new(50);
// 等待余额达到100
let result = atomically(|trans| {
let value = balance.read(trans)?;
if value < 100 {
// 条件不满足,阻塞等待
return Err(StmError::Retry);
}
Ok(value)
});
---
02.生产者消费者模式
a.队列实现
retry非常适合实现生产者-消费者模式,消费者在队列为空时自动阻塞。
b.队列示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
let queue = Arc::new(TVar::new(Vec::new()));
// 消费者:等待队列非空
let queue_clone = Arc::clone(&queue);
let consumer = thread::spawn(move || {
for i in 0..5 {
let item = atomically(|trans| {
let mut q = queue_clone.read(trans)?;
// 队列为空,阻塞等待
if q.is_empty() {
return Err(stm::StmError::Retry);
}
// 取出第一个元素
let item = q.remove(0);
queue_clone.write(trans, q)?;
Ok(item)
});
println!("消费者 {} 取出: {}", i, item);
}
});
// 生产者:向队列添加元素
let queue_clone = Arc::clone(&queue);
let producer = thread::spawn(move || {
for i in 0..5 {
thread::sleep(Duration::from_millis(500));
atomically(|trans| {
let mut q = queue_clone.read(trans)?;
q.push(i);
queue_clone.write(trans, q)?;
Ok(())
});
println!("生产者添加: {}", i);
}
});
consumer.join().unwrap();
producer.join().unwrap();
---
03.有界队列
a.容量限制
使用retry实现有界队列,队列满时生产者阻塞,队列空时消费者阻塞。
b.有界队列示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
struct BoundedQueue<T> {
items: TVar<Vec<T>>,
capacity: usize,
}
impl<T: Clone> BoundedQueue<T> {
fn new(capacity: usize) -> Self {
BoundedQueue {
items: TVar::new(Vec::new()),
capacity,
}
}
// 入队:队列满时阻塞
fn enqueue(&self, item: T) {
atomically(|trans| {
let mut queue = self.items.read(trans)?;
if queue.len() >= self.capacity {
return Err(stm::StmError::Retry);
}
queue.push(item);
self.items.write(trans, queue)?;
Ok(())
});
}
// 出队:队列空时阻塞
fn dequeue(&self) -> T {
atomically(|trans| {
let mut queue = self.items.read(trans)?;
if queue.is_empty() {
return Err(stm::StmError::Retry);
}
let item = queue.remove(0);
self.items.write(trans, queue)?;
Ok(item)
})
}
}
let queue = Arc::new(BoundedQueue::new(3));
// 生产者
let queue_clone = Arc::clone(&queue);
let producer = thread::spawn(move || {
for i in 0..10 {
println!("生产者尝试添加: {}", i);
queue_clone.enqueue(i);
println!("生产者成功添加: {}", i);
}
});
// 消费者
let queue_clone = Arc::clone(&queue);
let consumer = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(100));
for _ in 0..10 {
let item = queue_clone.dequeue();
println!("消费者取出: {}", item);
thread::sleep(std::time::Duration::from_millis(200));
}
});
producer.join().unwrap();
consumer.join().unwrap();
---
04.条件变量模式
a.事件通知
retry可以替代传统的条件变量,实现事件驱动的并发模式。
b.事件通知示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
let ready = Arc::new(TVar::new(false));
let data = Arc::new(TVar::new(0));
// 等待线程
let ready_clone = Arc::clone(&ready);
let data_clone = Arc::clone(&data);
let waiter = thread::spawn(move || {
println!("等待数据准备...");
// 等待ready变为true
atomically(|trans| {
let is_ready = ready_clone.read(trans)?;
if !is_ready {
return Err(stm::StmError::Retry);
}
Ok(())
});
// 读取数据
let value = atomically(|trans| data_clone.read(trans));
println!("数据已准备: {}", value);
});
// 准备线程
let ready_clone = Arc::clone(&ready);
let data_clone = Arc::clone(&data);
let preparer = thread::spawn(move || {
thread::sleep(std::time::Duration::from_secs(1));
println!("准备数据...");
atomically(|trans| {
data_clone.write(trans, 42)?;
ready_clone.write(trans, true)?;
Ok(())
});
});
waiter.join().unwrap();
preparer.join().unwrap();
---
05.信号量实现
a.资源计数
使用retry实现信号量,控制并发访问资源的数量。
b.信号量示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
struct Semaphore {
count: Arc<TVar<i32>>,
}
impl Semaphore {
fn new(initial: i32) -> Self {
Semaphore {
count: Arc::new(TVar::new(initial)),
}
}
// 获取资源:计数为0时阻塞
fn acquire(&self) {
atomically(|trans| {
let value = self.count.read(trans)?;
if value <= 0 {
return Err(stm::StmError::Retry);
}
self.count.write(trans, value - 1)?;
Ok(())
});
}
// 释放资源
fn release(&self) {
atomically(|trans| {
let value = self.count.read(trans)?;
self.count.write(trans, value + 1)?;
Ok(())
});
}
}
let sem = Arc::new(Semaphore::new(2));
let mut handles = vec![];
for i in 0..5 {
let sem_clone = Arc::clone(&sem);
let handle = thread::spawn(move || {
println!("线程 {} 等待资源", i);
sem_clone.acquire();
println!("线程 {} 获得资源", i);
thread::sleep(std::time::Duration::from_secs(1));
sem_clone.release();
println!("线程 {} 释放资源", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
---
06.最佳实践
a.避免忙等待
使用retry而不是循环检查,可以避免CPU忙等待。
b.实践示例
---
use stm::{atomically, TVar};
let balance = TVar::new(50);
// 不好的做法:忙等待
// loop {
// let value = atomically(|trans| balance.read(trans));
// if value >= 100 {
// break;
// }
// }
// 好的做法:使用retry
atomically(|trans| {
let value = balance.read(trans)?;
if value < 100 {
return Err(stm::StmError::Retry);
}
Ok(())
});
---
3.6 orElse组合
01.orElse概念
a.备选方案
orElse允许组合多个事务,如果第一个事务失败(retry),则尝试第二个事务。
b.基础示例
---
use stm::{atomically, TVar};
let account1 = TVar::new(50);
let account2 = TVar::new(100);
// 尝试从account1扣款,失败则从account2扣款
let result = atomically(|trans| {
// 尝试方案1
let balance1 = account1.read(trans)?;
if balance1 >= 80 {
account1.write(trans, balance1 - 80)?;
return Ok("从账户1扣款");
}
// 方案1失败,尝试方案2
let balance2 = account2.read(trans)?;
if balance2 >= 80 {
account2.write(trans, balance2 - 80)?;
return Ok("从账户2扣款");
}
// 两个方案都失败
Err(stm::StmError::Retry)
});
println!("结果: {}", result);
---
02.多账户扣款
a.优先级选择
orElse可以实现从多个账户中按优先级选择一个扣款。
b.多账户示例
---
use stm::{atomically, TVar};
struct Account {
name: String,
balance: TVar<i32>,
}
impl Account {
fn new(name: &str, balance: i32) -> Self {
Account {
name: name.to_string(),
balance: TVar::new(balance),
}
}
// 尝试扣款
fn try_withdraw(&self, amount: i32) -> Result<(), stm::StmError> {
atomically(|trans| {
let balance = self.balance.read(trans)?;
if balance < amount {
return Err(stm::StmError::Retry);
}
self.balance.write(trans, balance - amount)?;
Ok(())
})
}
}
let accounts = vec![
Account::new("支票账户", 30),
Account::new("储蓄账户", 100),
Account::new("信用账户", 500),
];
let amount = 50;
// 依次尝试从各个账户扣款
for account in &accounts {
match account.try_withdraw(amount) {
Ok(_) => {
println!("从{}扣款成功", account.name);
break;
}
Err(_) => {
println!("{}余额不足,尝试下一个", account.name);
}
}
}
---
03.多队列消费
a.优先级队列
orElse可以实现从多个队列中按优先级消费。
b.多队列示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct Queue<T> {
items: TVar<Vec<T>>,
name: String,
}
impl<T: Clone> Queue<T> {
fn new(name: &str) -> Self {
Queue {
items: TVar::new(Vec::new()),
name: name.to_string(),
}
}
fn enqueue(&self, item: T) {
atomically(|trans| {
let mut queue = self.items.read(trans)?;
queue.push(item);
self.items.write(trans, queue)?;
Ok(())
});
}
fn try_dequeue(&self) -> Option<T> {
atomically(|trans| {
let mut queue = self.items.read(trans)?;
if queue.is_empty() {
return Ok(None);
}
let item = queue.remove(0);
self.items.write(trans, queue)?;
Ok(Some(item))
})
}
}
let queue1 = Arc::new(Queue::new("高优先级"));
let queue2 = Arc::new(Queue::new("低优先级"));
// 生产者:向两个队列添加任务
let q1 = Arc::clone(&queue1);
let q2 = Arc::clone(&queue2);
thread::spawn(move || {
for i in 0..5 {
if i % 2 == 0 {
q1.enqueue(i);
println!("添加到高优先级队列: {}", i);
} else {
q2.enqueue(i);
println!("添加到低优先级队列: {}", i);
}
thread::sleep(Duration::from_millis(100));
}
});
// 消费者:优先从高优先级队列消费
thread::sleep(Duration::from_millis(500));
for _ in 0..5 {
if let Some(item) = queue1.try_dequeue() {
println!("从高优先级队列取出: {}", item);
} else if let Some(item) = queue2.try_dequeue() {
println!("从低优先级队列取出: {}", item);
} else {
println!("所有队列都为空");
}
}
---
04.资源池
a.多池选择
orElse可以实现从多个资源池中获取资源。
b.资源池示例
---
use stm::{atomically, TVar};
struct ResourcePool<T> {
resources: TVar<Vec<T>>,
name: String,
}
impl<T: Clone> ResourcePool<T> {
fn new(name: &str, resources: Vec<T>) -> Self {
ResourcePool {
resources: TVar::new(resources),
name: name.to_string(),
}
}
// 尝试获取资源
fn try_acquire(&self) -> Option<T> {
atomically(|trans| {
let mut pool = self.resources.read(trans)?;
if pool.is_empty() {
return Ok(None);
}
let resource = pool.pop();
self.resources.write(trans, pool)?;
Ok(resource)
})
}
// 归还资源
fn release(&self, resource: T) {
atomically(|trans| {
let mut pool = self.resources.read(trans)?;
pool.push(resource);
self.resources.write(trans, pool)?;
Ok(())
});
}
}
// 创建两个连接池
let pool1 = ResourcePool::new("主池", vec![1, 2, 3]);
let pool2 = ResourcePool::new("备用池", vec![4, 5]);
// 尝试获取连接:优先从主池获取
for i in 0..6 {
let conn = pool1.try_acquire()
.or_else(|| pool2.try_acquire());
match conn {
Some(c) => println!("请求 {} 获得连接: {}", i, c),
None => println!("请求 {} 无可用连接", i),
}
}
---
05.支付方式选择
a.多支付方式
orElse可以实现按优先级尝试多种支付方式。
b.支付示例
---
use stm::{atomically, TVar};
struct PaymentMethod {
name: String,
balance: TVar<i32>,
limit: i32,
}
impl PaymentMethod {
fn new(name: &str, balance: i32, limit: i32) -> Self {
PaymentMethod {
name: name.to_string(),
balance: TVar::new(balance),
limit,
}
}
fn try_pay(&self, amount: i32) -> Result<String, stm::StmError> {
atomically(|trans| {
let balance = self.balance.read(trans)?;
// 检查限额
if amount > self.limit {
return Err(stm::StmError::Retry);
}
// 检查余额
if balance < amount {
return Err(stm::StmError::Retry);
}
// 扣款
self.balance.write(trans, balance - amount)?;
Ok(format!("使用{}支付", self.name))
})
}
}
let methods = vec![
PaymentMethod::new("信用卡", 100, 1000),
PaymentMethod::new("借记卡", 500, 500),
PaymentMethod::new("余额", 2000, 10000),
];
let amount = 300;
// 依次尝试各种支付方式
for method in &methods {
match method.try_pay(amount) {
Ok(msg) => {
println!("{}", msg);
break;
}
Err(_) => {
println!("{}不可用,尝试下一个", method.name);
}
}
}
---
06.最佳实践
a.优先级排序
按优先级排列备选方案,优先尝试最优方案。
b.实践示例
---
use stm::{atomically, TVar};
let option1 = TVar::new(10);
let option2 = TVar::new(50);
let option3 = TVar::new(100);
let amount = 30;
// 清晰的优先级和失败条件
let result = atomically(|trans| {
// 方案1:最优选择
let balance1 = option1.read(trans)?;
if balance1 >= amount {
option1.write(trans, balance1 - amount)?;
return Ok("方案1");
}
// 方案2:次优选择
let balance2 = option2.read(trans)?;
if balance2 >= amount {
option2.write(trans, balance2 - amount)?;
return Ok("方案2");
}
// 方案3:最后选择
let balance3 = option3.read(trans)?;
if balance3 >= amount {
option3.write(trans, balance3 - amount)?;
return Ok("方案3");
}
// 所有方案都失败
Err(stm::StmError::Retry)
});
match result {
Ok(plan) => println!("使用{}", plan),
Err(_) => println!("所有方案都不可行"),
}
---
3.7 STM最佳实践
01.事务范围最小化
a.减少冲突
保持事务简短可以减少冲突概率,将耗时计算移到事务外执行。
b.优化示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
// 不推荐:事务范围过大
fn bad_practice(data: Arc<TVar<Vec<i32>>>) {
atomically(|trans| {
let mut vec = data.read(trans)?;
// 耗时计算在事务内
for i in 0..10000 {
vec.push(i * i);
}
data.write(trans, vec)?;
Ok(())
});
}
// 推荐:缩小事务范围
fn good_practice(data: Arc<TVar<Vec<i32>>>) {
// 耗时计算在事务外
let mut new_data = Vec::new();
for i in 0..10000 {
new_data.push(i * i);
}
// 只在事务内更新
atomically(|trans| {
let mut vec = data.read(trans)?;
vec.extend(new_data.clone());
data.write(trans, vec)?;
Ok(())
});
}
---
02.避免副作用
a.事务可能重试
事务可能因冲突而重试多次,避免在事务中执行有副作用的操作如IO、打印等。
b.副作用示例
---
use stm::{atomically, TVar};
let counter = TVar::new(0);
// 不推荐:事务中有副作用
atomically(|trans| {
let value = counter.read(trans)?;
println!("当前值: {}", value); // 可能打印多次
counter.write(trans, value + 1)?;
Ok(())
});
// 推荐:副作用在事务外
let value = atomically(|trans| {
let v = counter.read(trans)?;
counter.write(trans, v + 1)?;
Ok(v)
});
println!("当前值: {}", value); // 只打印一次
---
03.合理使用Arc
a.减少克隆开销
对于大对象,使用Arc包装可以避免频繁克隆,提高性能。
b.Arc优化示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
// 方案1:直接存储大对象
#[derive(Clone)]
struct LargeData {
data: Vec<u8>,
}
let large = TVar::new(LargeData {
data: vec![0; 1_000_000],
});
// 方案2:使用Arc共享
#[derive(Clone)]
struct OptimizedData {
data: Arc<Vec<u8>>,
}
let optimized = TVar::new(OptimizedData {
data: Arc::new(vec![0; 1_000_000]),
});
// 方案2性能更好
atomically(|trans| {
let data = optimized.read(trans)?;
println!("数据大小: {}", data.data.len());
Ok(())
});
---
04.条件检查优化
a.提前检查
在事务开始时就检查条件,避免无效的事务执行。
b.检查示例
---
use stm::{atomically, TVar};
let balance = TVar::new(100);
let amount = 200;
// 不推荐:事务内才检查
atomically(|trans| {
let value = balance.read(trans)?;
if value < amount {
return Err(stm::StmError::Retry);
}
balance.write(trans, value - amount)?;
Ok(())
});
// 推荐:提前检查
let current = atomically(|trans| balance.read(trans));
if current >= amount {
atomically(|trans| {
let value = balance.read(trans)?;
if value < amount {
return Err(stm::StmError::Retry);
}
balance.write(trans, value - amount)?;
Ok(())
});
}
---
05.避免嵌套过深
a.扁平化事务
避免过深的事务嵌套,保持事务逻辑清晰简单。
b.扁平化示例
---
use stm::{atomically, TVar};
let var1 = TVar::new(0);
let var2 = TVar::new(0);
let var3 = TVar::new(0);
// 不推荐:嵌套过深
atomically(|trans| {
let v1 = var1.read(trans)?;
atomically(|trans| {
let v2 = var2.read(trans)?;
atomically(|trans| {
let v3 = var3.read(trans)?;
var3.write(trans, v1 + v2 + v3)?;
Ok(())
});
Ok(())
});
Ok(())
});
// 推荐:扁平化
atomically(|trans| {
let v1 = var1.read(trans)?;
let v2 = var2.read(trans)?;
let v3 = var3.read(trans)?;
var3.write(trans, v1 + v2 + v3)?;
Ok(())
});
---
06.性能监控
a.测量事务性能
监控事务执行时间和重试次数,识别性能瓶颈。
b.监控示例
---
use stm::{atomically, TVar};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
let counter = Arc::new(TVar::new(0));
let num_threads = 4;
let iterations = 1000;
let start = Instant::now();
let mut handles = vec![];
for _ in 0..num_threads {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..iterations {
atomically(|trans| {
let value = counter_clone.read(trans)?;
counter_clone.write(trans, value + 1)?;
Ok(())
});
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let duration = start.elapsed();
let final_value = atomically(|trans| counter.read(trans));
println!("性能指标:");
println!(" 最终值: {}", final_value);
println!(" 总耗时: {:?}", duration);
println!(" 吞吐量: {:.2} ops/sec",
(num_threads * iterations) as f64 / duration.as_secs_f64());
println!(" 平均延迟: {:?}",
duration / (num_threads * iterations));
---
4 事务模式
4.1 补偿事务:Saga
01.Saga模式概念
a.分布式事务解决方案
Saga模式将长事务拆分为多个本地事务,每个本地事务都有对应的补偿操作,当某个步骤失败时执行补偿操作回滚之前的更改。
b.基础示例
---
use std::error::Error;
use std::fmt;
#[derive(Debug)]
struct SagaError(String);
impl fmt::Display for SagaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Saga错误: {}", self.0)
}
}
impl Error for SagaError {}
// Saga步骤定义
struct SagaStep<T> {
name: String,
action: Box<dyn Fn(&mut T) -> Result<(), Box<dyn Error>>>,
compensation: Box<dyn Fn(&mut T) -> Result<(), Box<dyn Error>>>,
}
// Saga协调器
struct Saga<T> {
steps: Vec<SagaStep<T>>,
}
impl<T> Saga<T> {
fn new() -> Self {
Saga { steps: Vec::new() }
}
// 添加步骤
fn add_step(
mut self,
name: &str,
action: impl Fn(&mut T) -> Result<(), Box<dyn Error>> + 'static,
compensation: impl Fn(&mut T) -> Result<(), Box<dyn Error>> + 'static,
) -> Self {
self.steps.push(SagaStep {
name: name.to_string(),
action: Box::new(action),
compensation: Box::new(compensation),
});
self
}
// 执行Saga
fn execute(&self, context: &mut T) -> Result<(), Box<dyn Error>> {
let mut executed_steps = Vec::new();
// 顺序执行所有步骤
for step in &self.steps {
println!("执行步骤: {}", step.name);
match (step.action)(context) {
Ok(_) => {
executed_steps.push(step);
}
Err(e) => {
println!("步骤 {} 失败: {}", step.name, e);
// 执行补偿操作
self.compensate(&executed_steps, context)?;
return Err(e);
}
}
}
Ok(())
}
// 执行补偿
fn compensate(
&self,
executed_steps: &[&SagaStep<T>],
context: &mut T,
) -> Result<(), Box<dyn Error>> {
println!("开始补偿操作...");
// 逆序执行补偿
for step in executed_steps.iter().rev() {
println!("补偿步骤: {}", step.name);
(step.compensation)(context)?;
}
Ok(())
}
}
---
02.订单处理Saga
a.电商订单场景
订单处理涉及库存扣减、支付、物流等多个服务,使用Saga模式协调这些操作。
b.订单Saga示例
---
use std::error::Error;
#[derive(Debug, Clone)]
struct OrderContext {
order_id: String,
user_id: String,
amount: f64,
inventory_reserved: bool,
payment_completed: bool,
shipping_created: bool,
}
impl OrderContext {
fn new(order_id: String, user_id: String, amount: f64) -> Self {
OrderContext {
order_id,
user_id,
amount,
inventory_reserved: false,
payment_completed: false,
shipping_created: false,
}
}
}
// 创建订单Saga
fn create_order_saga() -> Saga<OrderContext> {
Saga::new()
// 步骤1: 预留库存
.add_step(
"预留库存",
|ctx| {
println!(" 订单 {} 预留库存", ctx.order_id);
ctx.inventory_reserved = true;
Ok(())
},
|ctx| {
println!(" 订单 {} 释放库存", ctx.order_id);
ctx.inventory_reserved = false;
Ok(())
},
)
// 步骤2: 处理支付
.add_step(
"处理支付",
|ctx| {
println!(" 订单 {} 扣款 {}", ctx.order_id, ctx.amount);
ctx.payment_completed = true;
Ok(())
},
|ctx| {
println!(" 订单 {} 退款 {}", ctx.order_id, ctx.amount);
ctx.payment_completed = false;
Ok(())
},
)
// 步骤3: 创建物流
.add_step(
"创建物流",
|ctx| {
println!(" 订单 {} 创建物流单", ctx.order_id);
ctx.shipping_created = true;
Ok(())
},
|ctx| {
println!(" 订单 {} 取消物流单", ctx.order_id);
ctx.shipping_created = false;
Ok(())
},
)
}
// 使用示例
fn process_order() {
let saga = create_order_saga();
let mut context = OrderContext::new(
"ORD-001".to_string(),
"USER-123".to_string(),
99.99,
);
match saga.execute(&mut context) {
Ok(_) => println!("订单处理成功"),
Err(e) => println!("订单处理失败: {}", e),
}
}
---
03.支付失败场景
a.模拟支付失败
当支付步骤失败时,自动执行补偿操作回滚已完成的库存预留。
b.失败处理示例
---
use std::error::Error;
// 创建会失败的订单Saga
fn create_failing_order_saga() -> Saga<OrderContext> {
Saga::new()
.add_step(
"预留库存",
|ctx| {
println!(" 订单 {} 预留库存", ctx.order_id);
ctx.inventory_reserved = true;
Ok(())
},
|ctx| {
println!(" 订单 {} 释放库存", ctx.order_id);
ctx.inventory_reserved = false;
Ok(())
},
)
.add_step(
"处理支付",
|ctx| {
println!(" 订单 {} 尝试扣款 {}", ctx.order_id, ctx.amount);
// 模拟支付失败
Err(Box::new(SagaError("余额不足".to_string())))
},
|ctx| {
println!(" 订单 {} 退款 {}", ctx.order_id, ctx.amount);
ctx.payment_completed = false;
Ok(())
},
)
}
// 测试失败场景
fn test_payment_failure() {
let saga = create_failing_order_saga();
let mut context = OrderContext::new(
"ORD-002".to_string(),
"USER-456".to_string(),
199.99,
);
println!("开始处理订单 {}", context.order_id);
match saga.execute(&mut context) {
Ok(_) => println!("订单处理成功"),
Err(e) => {
println!("订单处理失败: {}", e);
println!("最终状态:");
println!(" 库存预留: {}", context.inventory_reserved);
println!(" 支付完成: {}", context.payment_completed);
}
}
}
---
04.异步Saga
a.异步执行步骤
在实际应用中,Saga步骤通常是异步的,需要使用async/await处理。
b.异步Saga示例
---
use tokio;
use std::error::Error;
// 异步Saga步骤
struct AsyncSagaStep<T> {
name: String,
action: Box<dyn Fn(&mut T) -> futures::future::BoxFuture<Result<(), Box<dyn Error + Send>>> + Send + Sync>,
compensation: Box<dyn Fn(&mut T) -> futures::future::BoxFuture<Result<(), Box<dyn Error + Send>>> + Send + Sync>,
}
// 异步Saga协调器
struct AsyncSaga<T> {
steps: Vec<AsyncSagaStep<T>>,
}
impl<T: Send> AsyncSaga<T> {
fn new() -> Self {
AsyncSaga { steps: Vec::new() }
}
fn add_step<F, C>(mut self, name: &str, action: F, compensation: C) -> Self
where
F: Fn(&mut T) -> futures::future::BoxFuture<Result<(), Box<dyn Error + Send>>> + Send + Sync + 'static,
C: Fn(&mut T) -> futures::future::BoxFuture<Result<(), Box<dyn Error + Send>>> + Send + Sync + 'static,
{
self.steps.push(AsyncSagaStep {
name: name.to_string(),
action: Box::new(action),
compensation: Box::new(compensation),
});
self
}
async fn execute(&self, context: &mut T) -> Result<(), Box<dyn Error + Send>> {
let mut executed_steps = Vec::new();
for step in &self.steps {
println!("执行步骤: {}", step.name);
match (step.action)(context).await {
Ok(_) => executed_steps.push(step),
Err(e) => {
println!("步骤 {} 失败: {}", step.name, e);
self.compensate(&executed_steps, context).await?;
return Err(e);
}
}
}
Ok(())
}
async fn compensate(
&self,
executed_steps: &[&AsyncSagaStep<T>],
context: &mut T,
) -> Result<(), Box<dyn Error + Send>> {
println!("开始补偿操作...");
for step in executed_steps.iter().rev() {
println!("补偿步骤: {}", step.name);
(step.compensation)(context).await?;
}
Ok(())
}
}
// 异步订单处理
async fn async_order_processing() {
use futures::FutureExt;
let saga = AsyncSaga::new()
.add_step(
"预留库存",
|ctx: &mut OrderContext| {
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
println!(" 订单 {} 预留库存", ctx.order_id);
ctx.inventory_reserved = true;
Ok(())
}.boxed()
},
|ctx: &mut OrderContext| {
async move {
println!(" 订单 {} 释放库存", ctx.order_id);
ctx.inventory_reserved = false;
Ok(())
}.boxed()
},
)
.add_step(
"处理支付",
|ctx: &mut OrderContext| {
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
println!(" 订单 {} 扣款 {}", ctx.order_id, ctx.amount);
ctx.payment_completed = true;
Ok(())
}.boxed()
},
|ctx: &mut OrderContext| {
async move {
println!(" 订单 {} 退款 {}", ctx.order_id, ctx.amount);
ctx.payment_completed = false;
Ok(())
}.boxed()
},
);
let mut context = OrderContext::new(
"ORD-003".to_string(),
"USER-789".to_string(),
299.99,
);
match saga.execute(&mut context).await {
Ok(_) => println!("异步订单处理成功"),
Err(e) => println!("异步订单处理失败: {}", e),
}
}
---
05.Saga状态持久化
a.状态存储
为了支持故障恢复,需要持久化Saga的执行状态。
b.持久化示例
---
use serde::{Serialize, Deserialize};
use std::fs;
#[derive(Serialize, Deserialize, Debug, Clone)]
struct SagaState {
saga_id: String,
current_step: usize,
context: OrderContext,
completed_steps: Vec<String>,
}
impl SagaState {
fn new(saga_id: String, context: OrderContext) -> Self {
SagaState {
saga_id,
current_step: 0,
context,
completed_steps: Vec::new(),
}
}
// 保存状态
fn save(&self) -> Result<(), Box<dyn Error>> {
let json = serde_json::to_string_pretty(self)?;
fs::write(format!("saga_{}.json", self.saga_id), json)?;
println!("保存Saga状态: {}", self.saga_id);
Ok(())
}
// 加载状态
fn load(saga_id: &str) -> Result<Self, Box<dyn Error>> {
let json = fs::read_to_string(format!("saga_{}.json", saga_id))?;
let state = serde_json::from_str(&json)?;
println!("加载Saga状态: {}", saga_id);
Ok(state)
}
// 标记步骤完成
fn mark_step_completed(&mut self, step_name: String) {
self.completed_steps.push(step_name);
self.current_step += 1;
}
}
// 带持久化的Saga执行器
struct PersistentSaga<T> {
saga: Saga<T>,
state: SagaState,
}
impl PersistentSaga<OrderContext> {
fn execute(&mut self) -> Result<(), Box<dyn Error>> {
for (idx, step) in self.saga.steps.iter().enumerate() {
if idx < self.state.current_step {
println!("跳过已完成步骤: {}", step.name);
continue;
}
println!("执行步骤: {}", step.name);
match (step.action)(&mut self.state.context) {
Ok(_) => {
self.state.mark_step_completed(step.name.clone());
self.state.save()?;
}
Err(e) => {
println!("步骤失败,开始补偿");
self.compensate()?;
return Err(e);
}
}
}
Ok(())
}
fn compensate(&mut self) -> Result<(), Box<dyn Error>> {
for step_name in self.state.completed_steps.iter().rev() {
println!("补偿步骤: {}", step_name);
// 执行对应的补偿操作
}
Ok(())
}
}
---
06.最佳实践
a.幂等性保证
所有Saga步骤和补偿操作都应该是幂等的,确保重试不会产生副作用。
b.实践建议
---
// 1. 使用唯一ID确保幂等性
struct IdempotentOperation {
operation_id: String,
executed: bool,
}
impl IdempotentOperation {
fn execute(&mut self) -> Result<(), Box<dyn Error>> {
if self.executed {
println!("操作 {} 已执行,跳过", self.operation_id);
return Ok(());
}
// 执行实际操作
println!("执行操作 {}", self.operation_id);
self.executed = true;
Ok(())
}
}
// 2. 补偿操作也要幂等
fn idempotent_compensation(ctx: &mut OrderContext) -> Result<(), Box<dyn Error>> {
if !ctx.inventory_reserved {
println!("库存未预留,无需释放");
return Ok(());
}
println!("释放库存");
ctx.inventory_reserved = false;
Ok(())
}
// 3. 记录详细日志
fn log_saga_step(saga_id: &str, step_name: &str, status: &str) {
println!("[Saga:{}] 步骤:{} 状态:{}", saga_id, step_name, status);
}
// 4. 设置超时机制
use std::time::Duration;
async fn execute_with_timeout<F, T>(
operation: F,
timeout: Duration,
) -> Result<T, Box<dyn Error + Send>>
where
F: std::future::Future<Output = Result<T, Box<dyn Error + Send>>>,
{
tokio::time::timeout(timeout, operation)
.await
.map_err(|_| Box::new(SagaError("操作超时".to_string())) as Box<dyn Error + Send>)?
}
---
4.2 两阶段提交:2PC
01.2PC协议概念
a.两阶段提交原理
2PC通过协调者和参与者的两阶段交互保证分布式事务的原子性,第一阶段准备投票,第二阶段执行提交或回滚。
b.基础实现
---
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, PartialEq)]
enum Vote {
Commit,
Abort,
}
#[derive(Debug, Clone, PartialEq)]
enum Decision {
Commit,
Abort,
}
// 参与者trait
trait Participant: Send + Sync {
fn prepare(&self) -> Vote;
fn commit(&self);
fn abort(&self);
fn get_id(&self) -> String;
}
// 协调者
struct Coordinator {
participants: Vec<Arc<dyn Participant>>,
}
impl Coordinator {
fn new() -> Self {
Coordinator {
participants: Vec::new(),
}
}
fn add_participant(&mut self, participant: Arc<dyn Participant>) {
self.participants.push(participant);
}
// 执行两阶段提交
fn execute(&self) -> Decision {
println!("=== 阶段1: 准备阶段 ===");
// 向所有参与者发送prepare请求
let mut votes = Vec::new();
for participant in &self.participants {
let vote = participant.prepare();
println!("参与者 {} 投票: {:?}", participant.get_id(), vote);
votes.push(vote);
}
// 决策:所有参与者都投Commit才提交
let decision = if votes.iter().all(|v| *v == Vote::Commit) {
Decision::Commit
} else {
Decision::Abort
};
println!("\n=== 阶段2: 提交阶段 ===");
println!("协调者决策: {:?}", decision);
// 向所有参与者发送决策
match decision {
Decision::Commit => {
for participant in &self.participants {
participant.commit();
println!("参与者 {} 已提交", participant.get_id());
}
}
Decision::Abort => {
for participant in &self.participants {
participant.abort();
println!("参与者 {} 已回滚", participant.get_id());
}
}
}
decision
}
}
---
02.数据库参与者实现
a.模拟数据库节点
实现一个简单的数据库参与者,支持prepare、commit和abort操作。
b.参与者示例
---
use std::sync::{Arc, Mutex};
struct DatabaseParticipant {
id: String,
data: Arc<Mutex<HashMap<String, i32>>>,
prepared_changes: Arc<Mutex<Option<HashMap<String, i32>>>>,
should_fail: bool,
}
impl DatabaseParticipant {
fn new(id: String, should_fail: bool) -> Self {
DatabaseParticipant {
id,
data: Arc::new(Mutex::new(HashMap::new())),
prepared_changes: Arc::new(Mutex::new(None)),
should_fail,
}
}
// 设置初始数据
fn set(&self, key: String, value: i32) {
let mut data = self.data.lock().unwrap();
data.insert(key, value);
}
// 准备更新
fn prepare_update(&self, key: String, value: i32) {
let mut changes = self.prepared_changes.lock().unwrap();
let mut map = HashMap::new();
map.insert(key, value);
*changes = Some(map);
}
// 获取数据
fn get(&self, key: &str) -> Option<i32> {
let data = self.data.lock().unwrap();
data.get(key).copied()
}
}
impl Participant for DatabaseParticipant {
fn prepare(&self) -> Vote {
if self.should_fail {
println!(" {} 准备失败", self.id);
return Vote::Abort;
}
let changes = self.prepared_changes.lock().unwrap();
if changes.is_some() {
println!(" {} 准备成功", self.id);
Vote::Commit
} else {
println!(" {} 无变更", self.id);
Vote::Commit
}
}
fn commit(&self) {
let mut changes = self.prepared_changes.lock().unwrap();
if let Some(ref change_map) = *changes {
let mut data = self.data.lock().unwrap();
for (key, value) in change_map {
data.insert(key.clone(), *value);
}
}
*changes = None;
println!(" {} 提交完成", self.id);
}
fn abort(&self) {
let mut changes = self.prepared_changes.lock().unwrap();
*changes = None;
println!(" {} 回滚完成", self.id);
}
fn get_id(&self) -> String {
self.id.clone()
}
}
---
03.转账场景示例
a.跨数据库转账
使用2PC协调两个数据库节点的转账操作,确保原子性。
b.转账示例
---
fn transfer_with_2pc() {
// 创建两个数据库参与者
let db1 = Arc::new(DatabaseParticipant::new("DB1".to_string(), false));
let db2 = Arc::new(DatabaseParticipant::new("DB2".to_string(), false));
// 初始化账户余额
db1.set("account_A".to_string(), 1000);
db2.set("account_B".to_string(), 500);
println!("初始状态:");
println!(" DB1 账户A: {}", db1.get("account_A").unwrap());
println!(" DB2 账户B: {}", db2.get("account_B").unwrap());
// 准备转账:A转200给B
db1.prepare_update("account_A".to_string(), 800);
db2.prepare_update("account_B".to_string(), 700);
// 创建协调者并执行2PC
let mut coordinator = Coordinator::new();
coordinator.add_participant(db1.clone());
coordinator.add_participant(db2.clone());
let decision = coordinator.execute();
println!("\n最终状态:");
println!(" DB1 账户A: {}", db1.get("account_A").unwrap());
println!(" DB2 账户B: {}", db2.get("account_B").unwrap());
println!(" 决策: {:?}", decision);
}
---
04.失败场景处理
a.参与者失败
当某个参与者在prepare阶段投Abort票时,协调者会通知所有参与者回滚。
b.失败示例
---
fn transfer_with_failure() {
// 创建参与者,db2会失败
let db1 = Arc::new(DatabaseParticipant::new("DB1".to_string(), false));
let db2 = Arc::new(DatabaseParticipant::new("DB2".to_string(), true));
db1.set("account_A".to_string(), 1000);
db2.set("account_B".to_string(), 500);
println!("初始状态:");
println!(" DB1 账户A: {}", db1.get("account_A").unwrap());
println!(" DB2 账户B: {}", db2.get("account_B").unwrap());
// 准备转账
db1.prepare_update("account_A".to_string(), 800);
db2.prepare_update("account_B".to_string(), 700);
// 执行2PC
let mut coordinator = Coordinator::new();
coordinator.add_participant(db1.clone());
coordinator.add_participant(db2.clone());
let decision = coordinator.execute();
println!("\n最终状态(应该回滚):");
println!(" DB1 账户A: {}", db1.get("account_A").unwrap());
println!(" DB2 账户B: {}", db2.get("account_B").unwrap());
println!(" 决策: {:?}", decision);
}
---
05.超时处理
a.超时机制
2PC需要处理参与者响应超时的情况,避免无限等待。
b.超时实现
---
use std::time::{Duration, Instant};
use std::thread;
struct TimeoutCoordinator {
participants: Vec<Arc<dyn Participant>>,
timeout: Duration,
}
impl TimeoutCoordinator {
fn new(timeout: Duration) -> Self {
TimeoutCoordinator {
participants: Vec::new(),
timeout,
}
}
fn add_participant(&mut self, participant: Arc<dyn Participant>) {
self.participants.push(participant);
}
fn execute(&self) -> Decision {
println!("=== 阶段1: 准备阶段(超时: {:?})===", self.timeout);
let start = Instant::now();
let mut votes = Vec::new();
for participant in &self.participants {
// 检查超时
if start.elapsed() > self.timeout {
println!("准备阶段超时,中止事务");
return Decision::Abort;
}
let vote = participant.prepare();
println!("参与者 {} 投票: {:?}", participant.get_id(), vote);
votes.push(vote);
}
let decision = if votes.iter().all(|v| *v == Vote::Commit) {
Decision::Commit
} else {
Decision::Abort
};
println!("\n=== 阶段2: 提交阶段 ===");
println!("协调者决策: {:?}", decision);
match decision {
Decision::Commit => {
for participant in &self.participants {
participant.commit();
}
}
Decision::Abort => {
for participant in &self.participants {
participant.abort();
}
}
}
decision
}
}
// 慢速参与者
struct SlowParticipant {
id: String,
delay: Duration,
}
impl SlowParticipant {
fn new(id: String, delay: Duration) -> Self {
SlowParticipant { id, delay }
}
}
impl Participant for SlowParticipant {
fn prepare(&self) -> Vote {
println!(" {} 处理中...", self.id);
thread::sleep(self.delay);
Vote::Commit
}
fn commit(&self) {
println!(" {} 提交", self.id);
}
fn abort(&self) {
println!(" {} 回滚", self.id);
}
fn get_id(&self) -> String {
self.id.clone()
}
}
// 测试超时
fn test_timeout() {
let mut coordinator = TimeoutCoordinator::new(Duration::from_secs(2));
coordinator.add_participant(Arc::new(SlowParticipant::new(
"Fast".to_string(),
Duration::from_millis(100),
)));
coordinator.add_participant(Arc::new(SlowParticipant::new(
"Slow".to_string(),
Duration::from_secs(5), // 超过超时时间
)));
let decision = coordinator.execute();
println!("最终决策: {:?}", decision);
}
---
06.2PC的局限性
a.阻塞问题
2PC在prepare阶段会阻塞资源,如果协调者故障,参与者会一直等待。
b.单点故障
---
// 2PC的主要问题:
// 1. 同步阻塞
// 参与者在prepare后必须等待协调者的决策
// 这期间资源被锁定,影响系统吞吐量
// 2. 单点故障
// 协调者故障会导致参与者无限期等待
// 需要额外的故障恢复机制
// 3. 数据不一致
// 在commit阶段,如果部分参与者收到commit消息
// 而其他参与者未收到,会导致数据不一致
// 改进方案:三阶段提交(3PC)
// 3PC在prepare和commit之间增加preCommit阶段
// 减少阻塞时间,但增加了复杂度
#[derive(Debug, Clone, PartialEq)]
enum ThreePhaseState {
CanCommit,
PreCommit,
DoCommit,
DoAbort,
}
// 3PC协调者框架
struct ThreePhaseCoordinator {
participants: Vec<Arc<dyn Participant>>,
}
impl ThreePhaseCoordinator {
fn execute(&self) -> Decision {
// 阶段1: CanCommit
println!("=== 阶段1: CanCommit ===");
let can_commit = self.phase1_can_commit();
if !can_commit {
return Decision::Abort;
}
// 阶段2: PreCommit
println!("=== 阶段2: PreCommit ===");
let pre_commit = self.phase2_pre_commit();
if !pre_commit {
return Decision::Abort;
}
// 阶段3: DoCommit
println!("=== 阶段3: DoCommit ===");
self.phase3_do_commit();
Decision::Commit
}
fn phase1_can_commit(&self) -> bool {
// 询问参与者是否可以提交
true
}
fn phase2_pre_commit(&self) -> bool {
// 参与者执行事务但不提交
true
}
fn phase3_do_commit(&self) {
// 参与者提交事务
for participant in &self.participants {
participant.commit();
}
}
}
---
4.3 三阶段提交:3PC
01.3PC协议概念
a.改进2PC的阻塞问题
3PC在2PC基础上增加PreCommit阶段,将提交过程分为CanCommit、PreCommit和DoCommit三个阶段,减少阻塞时间并提供超时机制。
b.基础框架
---
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq)]
enum Phase {
CanCommit,
PreCommit,
DoCommit,
DoAbort,
}
#[derive(Debug, Clone, PartialEq)]
enum Response {
Yes,
No,
Timeout,
}
// 3PC参与者trait
trait ThreePhaseParticipant: Send + Sync {
fn can_commit(&self) -> Response;
fn pre_commit(&self) -> Response;
fn do_commit(&self);
fn do_abort(&self);
fn get_id(&self) -> String;
}
// 3PC协调者
struct ThreePhaseCoordinator {
participants: Vec<Arc<dyn ThreePhaseParticipant>>,
timeout: Duration,
}
impl ThreePhaseCoordinator {
fn new(timeout: Duration) -> Self {
ThreePhaseCoordinator {
participants: Vec::new(),
timeout,
}
}
fn add_participant(&mut self, participant: Arc<dyn ThreePhaseParticipant>) {
self.participants.push(participant);
}
// 执行三阶段提交
fn execute(&self) -> bool {
// 阶段1: CanCommit
if !self.phase1_can_commit() {
println!("阶段1失败,中止事务");
self.abort_all();
return false;
}
// 阶段2: PreCommit
if !self.phase2_pre_commit() {
println!("阶段2失败,中止事务");
self.abort_all();
return false;
}
// 阶段3: DoCommit
self.phase3_do_commit();
println!("事务提交成功");
true
}
fn phase1_can_commit(&self) -> bool {
println!("=== 阶段1: CanCommit ===");
let start = Instant::now();
for participant in &self.participants {
if start.elapsed() > self.timeout {
println!("阶段1超时");
return false;
}
let response = participant.can_commit();
println!("参与者 {} 响应: {:?}", participant.get_id(), response);
if response != Response::Yes {
return false;
}
}
true
}
fn phase2_pre_commit(&self) -> bool {
println!("\n=== 阶段2: PreCommit ===");
let start = Instant::now();
for participant in &self.participants {
if start.elapsed() > self.timeout {
println!("阶段2超时");
return false;
}
let response = participant.pre_commit();
println!("参与者 {} 预提交: {:?}", participant.get_id(), response);
if response != Response::Yes {
return false;
}
}
true
}
fn phase3_do_commit(&self) {
println!("\n=== 阶段3: DoCommit ===");
for participant in &self.participants {
participant.do_commit();
println!("参与者 {} 已提交", participant.get_id());
}
}
fn abort_all(&self) {
println!("\n=== 中止所有参与者 ===");
for participant in &self.participants {
participant.do_abort();
println!("参与者 {} 已回滚", participant.get_id());
}
}
}
---
02.数据库参与者实现
a.实现3PC参与者
实现支持三阶段提交的数据库参与者,包含状态管理和超时处理。
b.参与者示例
---
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq)]
enum ParticipantState {
Idle,
CanCommit,
PreCommitted,
Committed,
Aborted,
}
struct Database3PC {
id: String,
data: Arc<Mutex<HashMap<String, i32>>>,
prepared_data: Arc<Mutex<Option<HashMap<String, i32>>>>,
state: Arc<Mutex<ParticipantState>>,
should_fail: bool,
}
impl Database3PC {
fn new(id: String, should_fail: bool) -> Self {
Database3PC {
id,
data: Arc::new(Mutex::new(HashMap::new())),
prepared_data: Arc::new(Mutex::new(None)),
state: Arc::new(Mutex::new(ParticipantState::Idle)),
should_fail,
}
}
fn set(&self, key: String, value: i32) {
let mut data = self.data.lock().unwrap();
data.insert(key, value);
}
fn prepare_update(&self, key: String, value: i32) {
let mut prepared = self.prepared_data.lock().unwrap();
let mut map = HashMap::new();
map.insert(key, value);
*prepared = Some(map);
}
fn get(&self, key: &str) -> Option<i32> {
let data = self.data.lock().unwrap();
data.get(key).copied()
}
}
impl ThreePhaseParticipant for Database3PC {
fn can_commit(&self) -> Response {
let mut state = self.state.lock().unwrap();
if self.should_fail {
*state = ParticipantState::Aborted;
return Response::No;
}
let prepared = self.prepared_data.lock().unwrap();
if prepared.is_some() {
*state = ParticipantState::CanCommit;
Response::Yes
} else {
Response::No
}
}
fn pre_commit(&self) -> Response {
let mut state = self.state.lock().unwrap();
if *state != ParticipantState::CanCommit {
return Response::No;
}
*state = ParticipantState::PreCommitted;
println!(" {} 进入PreCommit状态", self.id);
Response::Yes
}
fn do_commit(&self) {
let mut state = self.state.lock().unwrap();
let mut prepared = self.prepared_data.lock().unwrap();
if let Some(ref changes) = *prepared {
let mut data = self.data.lock().unwrap();
for (key, value) in changes {
data.insert(key.clone(), *value);
}
}
*prepared = None;
*state = ParticipantState::Committed;
}
fn do_abort(&self) {
let mut state = self.state.lock().unwrap();
let mut prepared = self.prepared_data.lock().unwrap();
*prepared = None;
*state = ParticipantState::Aborted;
}
fn get_id(&self) -> String {
self.id.clone()
}
}
---
03.转账场景示例
a.使用3PC进行转账
使用三阶段提交协调跨数据库转账,展示完整的三个阶段。
b.转账示例
---
fn transfer_with_3pc() {
let db1 = Arc::new(Database3PC::new("DB1".to_string(), false));
let db2 = Arc::new(Database3PC::new("DB2".to_string(), false));
// 初始化账户
db1.set("account_A".to_string(), 1000);
db2.set("account_B".to_string(), 500);
println!("初始状态:");
println!(" DB1 账户A: {}", db1.get("account_A").unwrap());
println!(" DB2 账户B: {}", db2.get("account_B").unwrap());
// 准备转账:A转300给B
db1.prepare_update("account_A".to_string(), 700);
db2.prepare_update("account_B".to_string(), 800);
// 创建3PC协调者
let mut coordinator = ThreePhaseCoordinator::new(Duration::from_secs(5));
coordinator.add_participant(db1.clone());
coordinator.add_participant(db2.clone());
// 执行3PC
let success = coordinator.execute();
println!("\n最终状态:");
println!(" DB1 账户A: {}", db1.get("account_A").unwrap());
println!(" DB2 账户B: {}", db2.get("account_B").unwrap());
println!(" 事务结果: {}", if success { "成功" } else { "失败" });
}
---
04.超时恢复机制
a.参与者超时处理
3PC的关键改进是参与者可以在超时后自主决策,避免无限期阻塞。
b.超时恢复示例
---
struct RecoverableParticipant {
id: String,
state: Arc<Mutex<ParticipantState>>,
timeout: Duration,
last_message_time: Arc<Mutex<Instant>>,
}
impl RecoverableParticipant {
fn new(id: String, timeout: Duration) -> Self {
RecoverableParticipant {
id,
state: Arc::new(Mutex::new(ParticipantState::Idle)),
timeout,
last_message_time: Arc::new(Mutex::new(Instant::now())),
}
}
// 检查是否超时
fn check_timeout(&self) -> bool {
let last_time = self.last_message_time.lock().unwrap();
last_time.elapsed() > self.timeout
}
// 超时后的自主决策
fn timeout_decision(&self) {
let state = self.state.lock().unwrap();
match *state {
ParticipantState::CanCommit => {
// 在CanCommit状态超时,选择中止
println!("{} 在CanCommit状态超时,选择中止", self.id);
drop(state);
self.do_abort();
}
ParticipantState::PreCommitted => {
// 在PreCommit状态超时,选择提交
println!("{} 在PreCommit状态超时,选择提交", self.id);
drop(state);
self.do_commit();
}
_ => {
println!("{} 在 {:?} 状态超时", self.id, *state);
}
}
}
fn update_message_time(&self) {
let mut last_time = self.last_message_time.lock().unwrap();
*last_time = Instant::now();
}
}
impl ThreePhaseParticipant for RecoverableParticipant {
fn can_commit(&self) -> Response {
self.update_message_time();
let mut state = self.state.lock().unwrap();
*state = ParticipantState::CanCommit;
Response::Yes
}
fn pre_commit(&self) -> Response {
self.update_message_time();
let mut state = self.state.lock().unwrap();
*state = ParticipantState::PreCommitted;
Response::Yes
}
fn do_commit(&self) {
self.update_message_time();
let mut state = self.state.lock().unwrap();
*state = ParticipantState::Committed;
println!(" {} 提交完成", self.id);
}
fn do_abort(&self) {
self.update_message_time();
let mut state = self.state.lock().unwrap();
*state = ParticipantState::Aborted;
println!(" {} 回滚完成", self.id);
}
fn get_id(&self) -> String {
self.id.clone()
}
}
---
05.3PC与2PC对比
a.优势分析
3PC通过��加PreCommit阶段,允许参与者在超时后自主决策,减少阻塞时间。
b.对比示例
---
// 2PC vs 3PC 对比
// 2PC的问题场景:
// 1. 协调者在发送commit前崩溃
// 2. 参与者已经prepare,但不知道是commit还是abort
// 3. 参与者只能无限期等待,资源被锁定
// 3PC的改进:
// 1. 增加PreCommit阶段
// 2. 参与者在PreCommit后知道其他参与者都准备好了
// 3. 如果在PreCommit后超时,可以安全地提交
fn compare_2pc_3pc() {
println!("=== 2PC流程 ===");
println!("1. Prepare阶段");
println!(" - 协调者询问:能否提交?");
println!(" - 参与者响应:Yes/No");
println!("2. Commit阶段");
println!(" - 协调者决策:Commit/Abort");
println!(" - 参与者执行决策");
println!("\n问题:如果协调者在Commit阶段崩溃");
println!(" 参与者不知道该提交还是回滚");
println!("\n=== 3PC流程 ===");
println!("1. CanCommit阶段");
println!(" - 协调者询问:能否提交?");
println!(" - 参与者响应:Yes/No");
println!("2. PreCommit阶段");
println!(" - 协调者通知:准备提交");
println!(" - 参与者进入PreCommit状态");
println!("3. DoCommit阶段");
println!(" - 协调者通知:执行提交");
println!(" - 参与者提交事务");
println!("\n改进:参与者在PreCommit后知道");
println!(" 所有参与者都准备好了");
println!(" 超时后可以安全提交");
}
---
06.3PC的局限性
a.网络分区问题
3PC在网络分区情况下仍可能导致数据不一致,不能完全解决分布式一致性问题。
b.局限性说明
---
// 3PC的局限性
// 1. 网络分区问题
// 如果网络分区导致参与者分成两组:
// - 一组收到PreCommit,超时后提交
// - 另一组未收到PreCommit,超时后中止
// 结果:数据不一致
// 2. 性能开销
// 3PC比2PC多一个阶段,增加了延迟和消息开销
// 3. 复杂度增加
// 实现和维护更复杂,需要更多的状态管理
struct NetworkPartitionScenario {
group1: Vec<Arc<dyn ThreePhaseParticipant>>,
group2: Vec<Arc<dyn ThreePhaseParticipant>>,
}
impl NetworkPartitionScenario {
fn simulate_partition(&self) {
println!("=== 模拟网络分区 ===");
// 组1收到PreCommit
println!("\n组1(收到PreCommit):");
for p in &self.group1 {
p.pre_commit();
println!(" {} 进入PreCommit状态", p.get_id());
}
// 模拟超时
println!("\n等待超时...");
std::thread::sleep(Duration::from_secs(1));
// 组1超时后提交
println!("\n组1超时后决策:提交");
for p in &self.group1 {
p.do_commit();
}
// 组2未收到PreCommit
println!("\n组2(未收到PreCommit):");
println!(" 等待超时...");
std::thread::sleep(Duration::from_secs(1));
// 组2超时后中止
println!("\n组2超时后决策:中止");
for p in &self.group2 {
p.do_abort();
}
println!("\n结果:数据不一致!");
println!(" 组1已提交,组2已中止");
}
}
// 实际应用建议
fn practical_advice() {
println!("=== 实际应用建议 ===");
println!("1. 对于强一致性要求:");
println!(" - 使用Paxos或Raft等共识算法");
println!(" - 或使用分布式事务框架(如Seata)");
println!("\n2. 对于最终一致性:");
println!(" - 使用Saga模式");
println!(" - 或使用事件溯源");
println!("\n3. 2PC/3PC适用场景:");
println!(" - 网络稳定的内部系统");
println!(" - 事务时间短");
println!(" - 可以接受阻塞");
}
---
4.4 TCC模式
01.TCC模式概念
a.Try-Confirm-Cancel
TCC将事务分为Try、Confirm和Cancel三个阶段,Try阶段预留资源,Confirm阶段确认提交,Cancel阶段取消并释放资源。
b.基础框架
---
use std::error::Error;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// TCC操作trait
trait TccOperation: Send + Sync {
fn try_phase(&self) -> Result<(), Box<dyn Error>>;
fn confirm_phase(&self) -> Result<(), Box<dyn Error>>;
fn cancel_phase(&self) -> Result<(), Box<dyn Error>>;
fn get_id(&self) -> String;
}
// TCC协调器
struct TccCoordinator {
operations: Vec<Arc<dyn TccOperation>>,
}
impl TccCoordinator {
fn new() -> Self {
TccCoordinator {
operations: Vec::new(),
}
}
fn add_operation(&mut self, operation: Arc<dyn TccOperation>) {
self.operations.push(operation);
}
// 执行TCC事务
fn execute(&self) -> Result<(), Box<dyn Error>> {
println!("=== Try阶段 ===");
// Try阶段:预留资源
for op in &self.operations {
match op.try_phase() {
Ok(_) => println!(" {} Try成功", op.get_id()),
Err(e) => {
println!(" {} Try失败: {}", op.get_id(), e);
self.cancel_all();
return Err(e);
}
}
}
println!("\n=== Confirm阶段 ===");
// Confirm阶段:确认提交
for op in &self.operations {
match op.confirm_phase() {
Ok(_) => println!(" {} Confirm成功", op.get_id()),
Err(e) => {
println!(" {} Confirm失败: {}", op.get_id(), e);
// Confirm失败需要人工介入
return Err(e);
}
}
}
Ok(())
}
fn cancel_all(&self) {
println!("\n=== Cancel阶段 ===");
for op in &self.operations {
match op.cancel_phase() {
Ok(_) => println!(" {} Cancel成功", op.get_id()),
Err(e) => println!(" {} Cancel失败: {}", op.get_id(), e),
}
}
}
}
---
02.账户服务实现
a.TCC账户操作
实现支持TCC模式的账户服务,包含资源预留和确认机制。
b.账户服务示例
---
#[derive(Debug, Clone)]
struct Account {
balance: i64,
frozen: i64, // 冻结金额
}
impl Account {
fn new(balance: i64) -> Self {
Account {
balance,
frozen: 0,
}
}
fn available_balance(&self) -> i64 {
self.balance - self.frozen
}
}
struct AccountService {
id: String,
accounts: Arc<Mutex<HashMap<String, Account>>>,
}
impl AccountService {
fn new(id: String) -> Self {
AccountService {
id,
accounts: Arc::new(Mutex::new(HashMap::new())),
}
}
fn create_account(&self, account_id: String, balance: i64) {
let mut accounts = self.accounts.lock().unwrap();
accounts.insert(account_id, Account::new(balance));
}
fn get_balance(&self, account_id: &str) -> Option<i64> {
let accounts = self.accounts.lock().unwrap();
accounts.get(account_id).map(|acc| acc.balance)
}
}
// 扣款TCC操作
struct DebitTccOperation {
service: Arc<AccountService>,
account_id: String,
amount: i64,
}
impl DebitTccOperation {
fn new(service: Arc<AccountService>, account_id: String, amount: i64) -> Self {
DebitTccOperation {
service,
account_id,
amount,
}
}
}
impl TccOperation for DebitTccOperation {
fn try_phase(&self) -> Result<(), Box<dyn Error>> {
let mut accounts = self.service.accounts.lock().unwrap();
if let Some(account) = accounts.get_mut(&self.account_id) {
if account.available_balance() >= self.amount {
// 冻结金额
account.frozen += self.amount;
println!(" 账户 {} 冻结 {}", self.account_id, self.amount);
Ok(())
} else {
Err("余额不足".into())
}
} else {
Err("账户不存在".into())
}
}
fn confirm_phase(&self) -> Result<(), Box<dyn Error>> {
let mut accounts = self.service.accounts.lock().unwrap();
if let Some(account) = accounts.get_mut(&self.account_id) {
// 扣除余额,解冻
account.balance -= self.amount;
account.frozen -= self.amount;
println!(" 账户 {} 扣款 {}", self.account_id, self.amount);
Ok(())
} else {
Err("账户不存在".into())
}
}
fn cancel_phase(&self) -> Result<(), Box<dyn Error>> {
let mut accounts = self.service.accounts.lock().unwrap();
if let Some(account) = accounts.get_mut(&self.account_id) {
// 解冻金额
account.frozen -= self.amount;
println!(" 账户 {} 解冻 {}", self.account_id, self.amount);
Ok(())
} else {
Ok(()) // 账户不存在也算成功
}
}
fn get_id(&self) -> String {
format!("Debit-{}", self.account_id)
}
}
// 入账TCC操作
struct CreditTccOperation {
service: Arc<AccountService>,
account_id: String,
amount: i64,
}
impl CreditTccOperation {
fn new(service: Arc<AccountService>, account_id: String, amount: i64) -> Self {
CreditTccOperation {
service,
account_id,
amount,
}
}
}
impl TccOperation for CreditTccOperation {
fn try_phase(&self) -> Result<(), Box<dyn Error>> {
// 入账Try���段只检查账户存在性
let accounts = self.service.accounts.lock().unwrap();
if accounts.contains_key(&self.account_id) {
println!(" 账户 {} 准备入账 {}", self.account_id, self.amount);
Ok(())
} else {
Err("账户不存在".into())
}
}
fn confirm_phase(&self) -> Result<(), Box<dyn Error>> {
let mut accounts = self.service.accounts.lock().unwrap();
if let Some(account) = accounts.get_mut(&self.account_id) {
account.balance += self.amount;
println!(" 账户 {} 入账 {}", self.account_id, self.amount);
Ok(())
} else {
Err("账户不存在".into())
}
}
fn cancel_phase(&self) -> Result<(), Box<dyn Error>> {
// 入账取消不需要操作
println!(" 账户 {} 取消入账", self.account_id);
Ok(())
}
fn get_id(&self) -> String {
format!("Credit-{}", self.account_id)
}
}
---
03.转账场景示例
a.TCC转账实现
使用TCC模式实现跨账户转账,展示完整的Try-Confirm-Cancel流程。
b.转账示例
---
fn transfer_with_tcc() {
let service = Arc::new(AccountService::new("AccountService".to_string()));
// 创建账户
service.create_account("A".to_string(), 1000);
service.create_account("B".to_string(), 500);
println!("初始状态:");
println!(" 账户A: {}", service.get_balance("A").unwrap());
println!(" 账户B: {}", service.get_balance("B").unwrap());
// 创建TCC协调器
let mut coordinator = TccCoordinator::new();
// 添加扣款操作
coordinator.add_operation(Arc::new(DebitTccOperation::new(
service.clone(),
"A".to_string(),
300,
)));
// 添加入账操作
coordinator.add_operation(Arc::new(CreditTccOperation::new(
service.clone(),
"B".to_string(),
300,
)));
// 执行TCC事务
match coordinator.execute() {
Ok(_) => println!("\n转账成功"),
Err(e) => println!("\n转账失败: {}", e),
}
println!("\n最终状态:");
println!(" 账户A: {}", service.get_balance("A").unwrap());
println!(" 账户B: {}", service.get_balance("B").unwrap());
}
---
04.失败场景处理
a.Try阶段失败
当Try阶段失败时,自动执行Cancel操作回滚已完成的Try操作。
b.失败示例
---
fn transfer_with_insufficient_balance() {
let service = Arc::new(AccountService::new("AccountService".to_string()));
service.create_account("A".to_string(), 100); // 余额不足
service.create_account("B".to_string(), 500);
println!("初始状态:");
println!(" 账户A: {}", service.get_balance("A").unwrap());
println!(" 账户B: {}", service.get_balance("B").unwrap());
let mut coordinator = TccCoordinator::new();
// 尝试转账300(余额不足)
coordinator.add_operation(Arc::new(DebitTccOperation::new(
service.clone(),
"A".to_string(),
300,
)));
coordinator.add_operation(Arc::new(CreditTccOperation::new(
service.clone(),
"B".to_string(),
300,
)));
match coordinator.execute() {
Ok(_) => println!("\n转账成功"),
Err(e) => println!("\n转账失败: {}", e),
}
println!("\n最终状态(应该不变):");
println!(" 账户A: {}", service.get_balance("A").unwrap());
println!(" 账户B: {}", service.get_balance("B").unwrap());
}
---
05.幂等性保证
a.操作幂等性
TCC的每个阶段都需要保证幂等性,支持重试而不产生副作用。
b.幂等性实现
---
use std::collections::HashSet;
struct IdempotentTccOperation {
id: String,
executed_tries: Arc<Mutex<HashSet<String>>>,
executed_confirms: Arc<Mutex<HashSet<String>>>,
executed_cancels: Arc<Mutex<HashSet<String>>>,
}
impl IdempotentTccOperation {
fn new(id: String) -> Self {
IdempotentTccOperation {
id,
executed_tries: Arc::new(Mutex::new(HashSet::new())),
executed_confirms: Arc::new(Mutex::new(HashSet::new())),
executed_cancels: Arc::new(Mutex::new(HashSet::new())),
}
}
}
impl TccOperation for IdempotentTccOperation {
fn try_phase(&self) -> Result<(), Box<dyn Error>> {
let mut tries = self.executed_tries.lock().unwrap();
if tries.contains(&self.id) {
println!(" {} Try已执行,跳过", self.id);
return Ok(());
}
// 执行实际操作
println!(" {} 执行Try", self.id);
tries.insert(self.id.clone());
Ok(())
}
fn confirm_phase(&self) -> Result<(), Box<dyn Error>> {
let mut confirms = self.executed_confirms.lock().unwrap();
if confirms.contains(&self.id) {
println!(" {} Confirm已执行,跳过", self.id);
return Ok(());
}
println!(" {} 执行Confirm", self.id);
confirms.insert(self.id.clone());
Ok(())
}
fn cancel_phase(&self) -> Result<(), Box<dyn Error>> {
let mut cancels = self.executed_cancels.lock().unwrap();
if cancels.contains(&self.id) {
println!(" {} Cancel已执行,跳过", self.id);
return Ok(());
}
println!(" {} 执行Cancel", self.id);
cancels.insert(self.id.clone());
Ok(())
}
fn get_id(&self) -> String {
self.id.clone()
}
}
// 测试幂等性
fn test_idempotency() {
let op = Arc::new(IdempotentTccOperation::new("OP-001".to_string()));
println!("第一次Try:");
op.try_phase().unwrap();
println!("\n第二次Try(应该跳过):");
op.try_phase().unwrap();
println!("\n第一次Confirm:");
op.confirm_phase().unwrap();
println!("\n第二次Confirm(应该跳过):");
op.confirm_phase().unwrap();
}
---
06.TCC最佳实践
a.设计原则
TCC模式需要业务层面的支持,每个操作都要设计Try、Confirm和Cancel三个接口。
b.实践建议
---
// TCC最佳实践
// 1. 资源预留设计
// Try阶段要预留资源,不能直接修改业务数据
struct ResourceReservation {
resource_id: String,
reserved_amount: i64,
reservation_time: std::time::Instant,
}
// 2. 超时处理
// 预留的资源需要设置超时时间,避免长期占用
impl ResourceReservation {
fn is_expired(&self, timeout: std::time::Duration) -> bool {
self.reservation_time.elapsed() > timeout
}
fn cleanup_expired(reservations: &mut Vec<ResourceReservation>, timeout: std::time::Duration) {
reservations.retain(|r| !r.is_expired(timeout));
println!("清理过期预留资源");
}
}
// 3. 异常处理
// Confirm和Cancel都要保证最终成功,失败时需要重试
async fn retry_confirm<F>(operation: F, max_retries: u32) -> Result<(), Box<dyn Error>>
where
F: Fn() -> Result<(), Box<dyn Error>>,
{
for i in 0..max_retries {
match operation() {
Ok(_) => return Ok(()),
Err(e) => {
println!("Confirm失败,第{}次重试: {}", i + 1, e);
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (i as u64 + 1))).await;
}
}
}
Err("Confirm重试失败".into())
}
// 4. 日志记录
// 记录每个阶段的执行状态,便于问题排查
struct TccLogger {
transaction_id: String,
}
impl TccLogger {
fn log_try(&self, operation_id: &str, success: bool) {
println!("[TCC:{}] Try {} - {}",
self.transaction_id, operation_id,
if success { "成功" } else { "失败" });
}
fn log_confirm(&self, operation_id: &str, success: bool) {
println!("[TCC:{}] Confirm {} - {}",
self.transaction_id, operation_id,
if success { "成功" } else { "失败" });
}
fn log_cancel(&self, operation_id: &str, success: bool) {
println!("[TCC:{}] Cancel {} - {}",
self.transaction_id, operation_id,
if success { "成功" } else { "失败" });
}
}
// 5. 状态机管理
#[derive(Debug, Clone, PartialEq)]
enum TccState {
Initial,
Trying,
Tried,
Confirming,
Confirmed,
Cancelling,
Cancelled,
}
struct TccStateMachine {
state: Arc<Mutex<TccState>>,
}
impl TccStateMachine {
fn transition(&self, new_state: TccState) -> Result<(), String> {
let mut state = self.state.lock().unwrap();
// 验证状态转换合法性
let valid = match (&*state, &new_state) {
(TccState::Initial, TccState::Trying) => true,
(TccState::Trying, TccState::Tried) => true,
(TccState::Tried, TccState::Confirming) => true,
(TccState::Tried, TccState::Cancelling) => true,
(TccState::Confirming, TccState::Confirmed) => true,
(TccState::Cancelling, TccState::Cancelled) => true,
_ => false,
};
if valid {
*state = new_state;
Ok(())
} else {
Err(format!("非法状态转换: {:?} -> {:?}", *state, new_state))
}
}
}
---
4.5 本地消息表
01.本地消息表概念
a.最终一致性方案
本地消息表通过在本地数据库中记录消息,与业务操作在同一事务中执行,然后异步发送消息到其他服务,保证最终一致性。
b.基础实现
---
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum MessageStatus {
Pending, // 待发送
Sent, // 已发送
Failed, // 发送失败
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LocalMessage {
id: String,
topic: String,
payload: String,
status: MessageStatus,
retry_count: u32,
created_at: u64,
updated_at: u64,
}
impl LocalMessage {
fn new(id: String, topic: String, payload: String) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
LocalMessage {
id,
topic,
payload,
status: MessageStatus::Pending,
retry_count: 0,
created_at: now,
updated_at: now,
}
}
}
// 本地消息表
struct LocalMessageTable {
messages: Arc<Mutex<HashMap<String, LocalMessage>>>,
}
impl LocalMessageTable {
fn new() -> Self {
LocalMessageTable {
messages: Arc::new(Mutex::new(HashMap::new())),
}
}
// 插入消息
fn insert(&self, message: LocalMessage) {
let mut messages = self.messages.lock().unwrap();
messages.insert(message.id.clone(), message);
println!("插入消息: {}", messages.len());
}
// 获取待发送消息
fn get_pending_messages(&self) -> Vec<LocalMessage> {
let messages = self.messages.lock().unwrap();
messages
.values()
.filter(|m| m.status == MessageStatus::Pending)
.cloned()
.collect()
}
// 更新消息状态
fn update_status(&self, id: &str, status: MessageStatus) {
let mut messages = self.messages.lock().unwrap();
if let Some(message) = messages.get_mut(id) {
message.status = status;
message.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
}
}
// 增加重试次数
fn increment_retry(&self, id: &str) {
let mut messages = self.messages.lock().unwrap();
if let Some(message) = messages.get_mut(id) {
message.retry_count += 1;
}
}
}
---
02.订单服务示例
a.订单创建与消息发送
在创建订单的同时,将消息插入本地消息表,保证原子性。
b.订单服务实现
---
use std::error::Error;
#[derive(Debug, Clone)]
struct Order {
id: String,
user_id: String,
amount: f64,
status: String,
}
struct OrderService {
orders: Arc<Mutex<HashMap<String, Order>>>,
message_table: Arc<LocalMessageTable>,
}
impl OrderService {
fn new(message_table: Arc<LocalMessageTable>) -> Self {
OrderService {
orders: Arc::new(Mutex::new(HashMap::new())),
message_table,
}
}
// 创建订单(本地事务)
fn create_order(&self, order: Order) -> Result<(), Box<dyn Error>> {
println!("=== 开始本地事务 ===");
// 1. 插入订单
{
let mut orders = self.orders.lock().unwrap();
orders.insert(order.id.clone(), order.clone());
println!("1. 插入订单: {}", order.id);
}
// 2. 插入消息到本地消息表
let message = LocalMessage::new(
format!("msg-{}", order.id),
"order.created".to_string(),
serde_json::to_string(&order)?,
);
self.message_table.insert(message);
println!("2. 插入消息到本地表");
println!("=== 本地事务提交 ===\n");
Ok(())
}
fn get_order(&self, order_id: &str) -> Option<Order> {
let orders = self.orders.lock().unwrap();
orders.get(order_id).cloned()
}
}
---
03.消息发送器
a.异步消息发送
定时扫描本地消息表,发送待发送的消息到消息队列。
b.发送器实现
---
use std::thread;
use std::time::Duration;
struct MessageSender {
message_table: Arc<LocalMessageTable>,
max_retries: u32,
}
impl MessageSender {
fn new(message_table: Arc<LocalMessageTable>, max_retries: u32) -> Self {
MessageSender {
message_table,
max_retries,
}
}
// 启动定时发送任务
fn start(&self) {
let message_table = self.message_table.clone();
let max_retries = self.max_retries;
thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(1));
let pending_messages = message_table.get_pending_messages();
if !pending_messages.is_empty() {
println!("\n=== 扫描到 {} 条待发送消息 ===", pending_messages.len());
}
for message in pending_messages {
if message.retry_count >= max_retries {
println!("消息 {} 超过最大重试次数", message.id);
message_table.update_status(&message.id, MessageStatus::Failed);
continue;
}
// 模拟发送消息
match Self::send_message(&message) {
Ok(_) => {
println!("消息 {} 发送成功", message.id);
message_table.update_status(&message.id, MessageStatus::Sent);
}
Err(e) => {
println!("消息 {} 发送失败: {}", message.id, e);
message_table.increment_retry(&message.id);
}
}
}
}
});
}
// 模拟发送消息到消息队列
fn send_message(message: &LocalMessage) -> Result<(), Box<dyn Error>> {
// 模拟网络延迟
thread::sleep(Duration::from_millis(100));
// 模拟10%的失败率
if rand::random::<f32>() < 0.1 {
return Err("网络错误".into());
}
println!(" -> 发送到主题: {}", message.topic);
Ok(())
}
}
---
04.完整示例
a.订单创建流程
展示从订单创建到消息发送的完整流程。
b.完整示例代码
---
fn local_message_table_example() {
// 创建本地消息表
let message_table = Arc::new(LocalMessageTable::new());
// 创建订单服务
let order_service = OrderService::new(message_table.clone());
// 启动消息发送器
let sender = MessageSender::new(message_table.clone(), 3);
sender.start();
// 创建多个订单
for i in 1..=3 {
let order = Order {
id: format!("ORD-{:03}", i),
user_id: format!("USER-{}", i),
amount: 100.0 * i as f64,
status: "CREATED".to_string(),
};
match order_service.create_order(order.clone()) {
Ok(_) => println!("订单 {} 创建成功\n", order.id),
Err(e) => println!("订单 {} 创建失败: {}\n", order.id, e),
}
thread::sleep(Duration::from_millis(500));
}
// 等待消息发送
println!("等待消息发送...");
thread::sleep(Duration::from_secs(5));
// 检查消息状态
println!("\n=== 最终消息状态 ===");
let messages = message_table.messages.lock().unwrap();
for (id, msg) in messages.iter() {
println!("消息 {}: {:?} (重试{}次)", id, msg.status, msg.retry_count);
}
}
---
05.消息消费者
a.下游服务处理
下游服务消费消息并处理业务逻辑,需要保证幂等性。
b.消费者实现
---
struct MessageConsumer {
processed_messages: Arc<Mutex<std::collections::HashSet<String>>>,
}
impl MessageConsumer {
fn new() -> Self {
MessageConsumer {
processed_messages: Arc::new(Mutex::new(std::collections::HashSet::new())),
}
}
// 处理消息(幂等)
fn handle_message(&self, message: &LocalMessage) -> Result<(), Box<dyn Error>> {
let mut processed = self.processed_messages.lock().unwrap();
// 检查是否已处理
if processed.contains(&message.id) {
println!("消息 {} 已处理,跳过", message.id);
return Ok(());
}
// 处理消息
println!("处理消息 {}", message.id);
match message.topic.as_str() {
"order.created" => {
let order: Order = serde_json::from_str(&message.payload)?;
self.handle_order_created(order)?;
}
_ => {
println!("未知消息类型: {}", message.topic);
}
}
// 标记为已处理
processed.insert(message.id.clone());
Ok(())
}
fn handle_order_created(&self, order: Order) -> Result<(), Box<dyn Error>> {
println!(" 处理订单创建事件:");
println!(" 订单ID: {}", order.id);
println!(" 用户ID: {}", order.user_id);
println!(" 金额: {}", order.amount);
// 执行业务逻辑
// 例如:扣减库存、发送通知等
Ok(())
}
}
---
06.最佳实践
a.设计要点
本地消息表需要考虑消息去重、重试策略、失败处理等问题。
b.实践建议
---
// 本地消息表最佳实践
// 1. 消息去重
// 使用唯一ID确保消息不重复
fn generate_message_id(order_id: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
order_id.hash(&mut hasher);
format!("msg-{:x}", hasher.finish())
}
// 2. 重试策略
// 使用指数退避策略
fn calculate_retry_delay(retry_count: u32) -> Duration {
let base_delay = 1000; // 1秒
let max_delay = 60000; // 60秒
let delay = base_delay * 2_u64.pow(retry_count);
Duration::from_millis(delay.min(max_delay))
}
// 3. 消息清理
// 定期清理已发送的旧消息
impl LocalMessageTable {
fn cleanup_old_messages(&self, days: u64) {
let mut messages = self.messages.lock().unwrap();
let cutoff = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() - (days * 24 * 3600);
messages.retain(|_, msg| {
msg.status != MessageStatus::Sent || msg.updated_at > cutoff
});
println!("清理旧消息,剩余: {}", messages.len());
}
}
// 4. 监控告警
// 监控失败消息数量
impl LocalMessageTable {
fn get_failed_count(&self) -> usize {
let messages = self.messages.lock().unwrap();
messages
.values()
.filter(|m| m.status == MessageStatus::Failed)
.count()
}
fn check_and_alert(&self, threshold: usize) {
let failed_count = self.get_failed_count();
if failed_count > threshold {
println!("⚠️ 警告:失败消息数量 {} 超过阈值 {}",
failed_count, threshold);
// 发送告警
}
}
}
// 5. 事务一致性
// 确保业务操作和消息插入在同一事务中
struct TransactionalService {
message_table: Arc<LocalMessageTable>,
}
impl TransactionalService {
fn execute_with_message<F>(&self, business_op: F, message: LocalMessage)
-> Result<(), Box<dyn Error>>
where
F: FnOnce() -> Result<(), Box<dyn Error>>,
{
// 开始事务
println!("BEGIN TRANSACTION");
// 执行业务操作
business_op()?;
// 插入消息
self.message_table.insert(message);
// 提交事务
println!("COMMIT TRANSACTION");
Ok(())
}
}
---
4.6 事务消息
01.事务消息概念
a.消息队列事务支持
事务消息是消息队列提供的特性,确保消息发送与本地事务的原子性,如RocketMQ的事务消息机制。
b.工作原理
---
// 事务消息流程:
// 1. 发送半消息(Half Message)到消息队列
// 2. 执行本地事务
// 3. 根据本地事务结果提交或回滚消息
// 4. 消息队列定期回查事务状态
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq)]
enum TransactionStatus {
Unknown, // 未知
Commit, // 提交
Rollback, // 回滚
}
#[derive(Debug, Clone)]
struct TransactionMessage {
id: String,
topic: String,
payload: String,
transaction_id: String,
status: TransactionStatus,
}
impl TransactionMessage {
fn new(id: String, topic: String, payload: String, transaction_id: String) -> Self {
TransactionMessage {
id,
topic,
payload,
transaction_id,
status: TransactionStatus::Unknown,
}
}
}
---
02.事务消息生产者
a.发送事务消息
生产者发送半消息,执行本地事务,然后提交或回滚消息。
b.生产者实现
---
use std::error::Error;
trait TransactionExecutor: Send + Sync {
fn execute_local_transaction(&self, msg: &TransactionMessage) -> TransactionStatus;
fn check_local_transaction(&self, msg: &TransactionMessage) -> TransactionStatus;
}
struct TransactionProducer {
half_messages: Arc<Mutex<HashMap<String, TransactionMessage>>>,
committed_messages: Arc<Mutex<Vec<TransactionMessage>>>,
}
impl TransactionProducer {
fn new() -> Self {
TransactionProducer {
half_messages: Arc::new(Mutex::new(HashMap::new())),
committed_messages: Arc::new(Mutex::new(Vec::new())),
}
}
// 发送事务消息
fn send_message_in_transaction<E: TransactionExecutor>(
&self,
message: TransactionMessage,
executor: &E,
) -> Result<(), Box<dyn Error>> {
println!("=== 发送事务消息 ===");
// 步骤1: 发送半消息
println!("1. 发送半消息: {}", message.id);
{
let mut half_messages = self.half_messages.lock().unwrap();
half_messages.insert(message.id.clone(), message.clone());
}
// 步骤2: 执行本地事务
println!("2. 执行本地事务");
let status = executor.execute_local_transaction(&message);
println!(" 本地事务结果: {:?}", status);
// 步骤3: 根据事务结果提交或回滚
match status {
TransactionStatus::Commit => {
println!("3. 提交消息");
self.commit_message(&message.id)?;
}
TransactionStatus::Rollback => {
println!("3. 回滚消息");
self.rollback_message(&message.id)?;
}
TransactionStatus::Unknown => {
println!("3. 事务状态未知,等待回查");
}
}
Ok(())
}
// 提交消息
fn commit_message(&self, message_id: &str) -> Result<(), Box<dyn Error>> {
let mut half_messages = self.half_messages.lock().unwrap();
if let Some(mut message) = half_messages.remove(message_id) {
message.status = TransactionStatus::Commit;
let mut committed = self.committed_messages.lock().unwrap();
committed.push(message);
println!(" 消息已提交到队列");
Ok(())
} else {
Err("消息不存在".into())
}
}
// 回滚消息
fn rollback_message(&self, message_id: &str) -> Result<(), Box<dyn Error>> {
let mut half_messages = self.half_messages.lock().unwrap();
if half_messages.remove(message_id).is_some() {
println!(" 消息已回滚");
Ok(())
} else {
Err("消息不存在".into())
}
}
// 事务回查
fn check_transaction<E: TransactionExecutor>(
&self,
message_id: &str,
executor: &E,
) -> Result<(), Box<dyn Error>> {
println!("\n=== 事务回查 ===");
let half_messages = self.half_messages.lock().unwrap();
if let Some(message) = half_messages.get(message_id) {
println!("回查消息: {}", message_id);
drop(half_messages);
let status = executor.check_local_transaction(message);
println!("回查结果: {:?}", status);
match status {
TransactionStatus::Commit => self.commit_message(message_id)?,
TransactionStatus::Rollback => self.rollback_message(message_id)?,
TransactionStatus::Unknown => {
println!("仍然未知,稍后再次回查");
}
}
}
Ok(())
}
// 获取已提交的消息
fn get_committed_messages(&self) -> Vec<TransactionMessage> {
let committed = self.committed_messages.lock().unwrap();
committed.clone()
}
}
---
03.订单服务示例
a.订单创建事务
使用事务消息确保订单创建和消息发送的原子性。
b.订单事务实现
---
#[derive(Debug, Clone)]
struct OrderData {
order_id: String,
user_id: String,
amount: f64,
}
struct OrderTransactionExecutor {
orders: Arc<Mutex<HashMap<String, OrderData>>>,
}
impl OrderTransactionExecutor {
fn new() -> Self {
OrderTransactionExecutor {
orders: Arc::new(Mutex::new(HashMap::new())),
}
}
fn get_order(&self, order_id: &str) -> Option<OrderData> {
let orders = self.orders.lock().unwrap();
orders.get(order_id).cloned()
}
}
impl TransactionExecutor for OrderTransactionExecutor {
fn execute_local_transaction(&self, msg: &TransactionMessage) -> TransactionStatus {
println!(" 执行订单创建事务");
// 解析订单数据
let order_data: OrderData = match serde_json::from_str(&msg.payload) {
Ok(data) => data,
Err(e) => {
println!(" 解析订单数据失败: {}", e);
return TransactionStatus::Rollback;
}
};
// 创建订单
let mut orders = self.orders.lock().unwrap();
orders.insert(order_data.order_id.clone(), order_data.clone());
println!(" 订单 {} 创建成功", order_data.order_id);
TransactionStatus::Commit
}
fn check_local_transaction(&self, msg: &TransactionMessage) -> TransactionStatus {
println!(" 检查订单事务状态");
// 解析订单数据
let order_data: OrderData = match serde_json::from_str(&msg.payload) {
Ok(data) => data,
Err(_) => return TransactionStatus::Rollback,
};
// 检查订单是否存在
let orders = self.orders.lock().unwrap();
if orders.contains_key(&order_data.order_id) {
println!(" 订单 {} 已存在,提交消息", order_data.order_id);
TransactionStatus::Commit
} else {
println!(" 订单 {} 不存在,回滚消息", order_data.order_id);
TransactionStatus::Rollback
}
}
}
// 使用示例
fn create_order_with_transaction_message() {
let producer = TransactionProducer::new();
let executor = OrderTransactionExecutor::new();
let order_data = OrderData {
order_id: "ORD-001".to_string(),
user_id: "USER-123".to_string(),
amount: 299.99,
};
let message = TransactionMessage::new(
"msg-001".to_string(),
"order.created".to_string(),
serde_json::to_string(&order_data).unwrap(),
"tx-001".to_string(),
);
match producer.send_message_in_transaction(message, &executor) {
Ok(_) => println!("\n事务消息发送成功"),
Err(e) => println!("\n事务消息发送失败: {}", e),
}
// 查看已提交的消息
let committed = producer.get_committed_messages();
println!("\n已提交消息数: {}", committed.len());
}
---
04.事务回查机制
a.定时回查
消息队列定期回查未确认的半消息,确保最终一致性。
b.回查实现
---
use std::thread;
use std::time::Duration;
struct TransactionChecker {
producer: Arc<TransactionProducer>,
check_interval: Duration,
max_check_times: u32,
}
impl TransactionChecker {
fn new(
producer: Arc<TransactionProducer>,
check_interval: Duration,
max_check_times: u32,
) -> Self {
TransactionChecker {
producer,
check_interval,
max_check_times,
}
}
// 启动回查任务
fn start<E: TransactionExecutor + 'static>(&self, executor: Arc<E>) {
let producer = self.producer.clone();
let interval = self.check_interval;
let max_times = self.max_check_times;
thread::spawn(move || {
let mut check_counts: HashMap<String, u32> = HashMap::new();
loop {
thread::sleep(interval);
let half_messages = producer.half_messages.lock().unwrap();
let message_ids: Vec<String> = half_messages.keys().cloned().collect();
drop(half_messages);
for message_id in message_ids {
let count = check_counts.entry(message_id.clone()).or_insert(0);
if *count >= max_times {
println!("消息 {} 超过最大回查次数,强制回滚", message_id);
let _ = producer.rollback_message(&message_id);
check_counts.remove(&message_id);
continue;
}
*count += 1;
let _ = producer.check_transaction(&message_id, executor.as_ref());
}
}
});
}
}
---
05.消息消费
a.消费已提交的消息
消费者只能看到已提交的事务消息,保证消息的可靠性。
b.消费者实现
---
struct TransactionMessageConsumer {
processed: Arc<Mutex<std::collections::HashSet<String>>>,
}
impl TransactionMessageConsumer {
fn new() -> Self {
TransactionMessageConsumer {
processed: Arc::new(Mutex::new(std::collections::HashSet::new())),
}
}
// 消费消息
fn consume(&self, producer: &TransactionProducer) {
let messages = producer.get_committed_messages();
println!("\n=== 消费事务消息 ===");
println!("可消费消息数: {}", messages.len());
for message in messages {
if self.is_processed(&message.id) {
println!("消息 {} 已处理,跳过", message.id);
continue;
}
println!("处理消息 {}", message.id);
self.process_message(&message);
self.mark_processed(&message.id);
}
}
fn is_processed(&self, message_id: &str) -> bool {
let processed = self.processed.lock().unwrap();
processed.contains(message_id)
}
fn mark_processed(&self, message_id: &str) {
let mut processed = self.processed.lock().unwrap();
processed.insert(message_id.to_string());
}
fn process_message(&self, message: &TransactionMessage) {
println!(" 主题: {}", message.topic);
println!(" 内容: {}", message.payload);
// 执行业务逻辑
}
}
---
06.最佳实践
a.设计要点
事务消息需要合理设计回查接口,确保幂等性和正确性。
b.实践建议
---
// 事务消息最佳实践
// 1. 回查接口设计
// 回查接口必须能够准确判断事务状态
impl TransactionExecutor for OrderTransactionExecutor {
fn check_local_transaction(&self, msg: &TransactionMessage) -> TransactionStatus {
// 通过查询数据库判断事务是否完成
// 不要依赖内存状态
let order_data: OrderData = match serde_json::from_str(&msg.payload) {
Ok(data) => data,
Err(_) => return TransactionStatus::Rollback,
};
// 查询数据库
let orders = self.orders.lock().unwrap();
if let Some(order) = orders.get(&order_data.order_id) {
// 订单存在且状态正确
TransactionStatus::Commit
} else {
// 订单不存在
TransactionStatus::Rollback
}
}
}
// 2. 超时设置
// 合理设置事务超时时间
struct TransactionConfig {
transaction_timeout: Duration,
check_interval: Duration,
max_check_times: u32,
}
impl Default for TransactionConfig {
fn default() -> Self {
TransactionConfig {
transaction_timeout: Duration::from_secs(30),
check_interval: Duration::from_secs(5),
max_check_times: 15,
}
}
}
// 3. 幂等性保证
// 本地事务执行必须幂等
impl OrderTransactionExecutor {
fn execute_local_transaction_idempotent(&self, msg: &TransactionMessage) -> TransactionStatus {
let order_data: OrderData = match serde_json::from_str(&msg.payload) {
Ok(data) => data,
Err(_) => return TransactionStatus::Rollback,
};
let mut orders = self.orders.lock().unwrap();
// 检查订单是否已存在
if orders.contains_key(&order_data.order_id) {
println!(" 订单已存在,幂等返回成功");
return TransactionStatus::Commit;
}
// 创建订单
orders.insert(order_data.order_id.clone(), order_data);
TransactionStatus::Commit
}
}
// 4. 监控告警
// 监控半消息数量和回查失败率
impl TransactionProducer {
fn get_half_message_count(&self) -> usize {
let half_messages = self.half_messages.lock().unwrap();
half_messages.len()
}
fn monitor(&self) {
let count = self.get_half_message_count();
if count > 100 {
println!("⚠️ 警告:半消息数量过多 {}", count);
// 发送告警
}
}
}
// 5. 日志记录
// 记录完整的事务流程
fn log_transaction_flow(
transaction_id: &str,
phase: &str,
status: &str,
) {
println!("[Transaction:{}] {} - {}", transaction_id, phase, status);
}
---
4.7 最大努力通知
01.最大努力通知概念
a.尽力而为的通知机制
最大努力通知是一种柔性事务方案,发送方尽最大努力通知接收方,但不保证一定成功,接收方需要提供查询接口作为补偿。
b.工作流程
---
// 最大努力通知流程:
// 1. 业务操作完成后发送通知
// 2. 通知失败时进行有限次数重试
// 3. 重试失败后记录日志,不再重试
// 4. 接收方提供查询接口主动查询结果
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq)]
enum NotificationStatus {
Pending, // 待发送
Sent, // 已发送
Failed, // 失败
}
#[derive(Debug, Clone)]
struct Notification {
id: String,
target_url: String,
payload: String,
status: NotificationStatus,
retry_count: u32,
max_retries: u32,
created_at: Instant,
last_retry_at: Option<Instant>,
}
impl Notification {
fn new(id: String, target_url: String, payload: String, max_retries: u32) -> Self {
Notification {
id,
target_url,
payload,
status: NotificationStatus::Pending,
retry_count: 0,
max_retries,
created_at: Instant::now(),
last_retry_at: None,
}
}
fn can_retry(&self) -> bool {
self.retry_count < self.max_retries
}
}
---
02.通知发送器
a.异步通知发送
发送器负责发送通知并处理重试逻辑,使用指数退避策略。
b.发送器实现
---
use std::error::Error;
use std::thread;
struct NotificationSender {
notifications: Arc<Mutex<HashMap<String, Notification>>>,
}
impl NotificationSender {
fn new() -> Self {
NotificationSender {
notifications: Arc::new(Mutex::new(HashMap::new())),
}
}
// 添加通知
fn add_notification(&self, notification: Notification) {
let mut notifications = self.notifications.lock().unwrap();
notifications.insert(notification.id.clone(), notification);
println!("添加通知: {}", notifications.len());
}
// 发送通知
fn send(&self, notification_id: &str) -> Result<(), Box<dyn Error>> {
let mut notifications = self.notifications.lock().unwrap();
if let Some(notification) = notifications.get_mut(notification_id) {
println!("发送通知: {}", notification.id);
println!(" 目标: {}", notification.target_url);
println!(" 重试次数: {}", notification.retry_count);
// 模拟HTTP请求
match Self::http_post(¬ification.target_url, ¬ification.payload) {
Ok(_) => {
notification.status = NotificationStatus::Sent;
println!(" ✓ 发送成功");
Ok(())
}
Err(e) => {
notification.retry_count += 1;
notification.last_retry_at = Some(Instant::now());
if notification.can_retry() {
println!(" ✗ 发送失败: {},将重试", e);
} else {
notification.status = NotificationStatus::Failed;
println!(" ✗ 发送失败: {},已达最大重试次数", e);
}
Err(e)
}
}
} else {
Err("通知不存在".into())
}
}
// 模拟HTTP POST请求
fn http_post(url: &str, payload: &str) -> Result<(), Box<dyn Error>> {
thread::sleep(Duration::from_millis(100));
// 模拟30%的失败率
if rand::random::<f32>() < 0.3 {
return Err("网络错误".into());
}
Ok(())
}
// 获取待发送的通知
fn get_pending_notifications(&self) -> Vec<String> {
let notifications = self.notifications.lock().unwrap();
notifications
.values()
.filter(|n| {
n.status == NotificationStatus::Pending && n.can_retry()
})
.map(|n| n.id.clone())
.collect()
}
// 获取失败的通知
fn get_failed_notifications(&self) -> Vec<Notification> {
let notifications = self.notifications.lock().unwrap();
notifications
.values()
.filter(|n| n.status == NotificationStatus::Failed)
.cloned()
.collect()
}
}
---
03.重试调度器
a.定时重试机制
调度器定期扫描待发送的通知,使用指数退避策略进行重试。
b.调度器实现
---
struct RetryScheduler {
sender: Arc<NotificationSender>,
check_interval: Duration,
}
impl RetryScheduler {
fn new(sender: Arc<NotificationSender>, check_interval: Duration) -> Self {
RetryScheduler {
sender,
check_interval,
}
}
// 启动调度器
fn start(&self) {
let sender = self.sender.clone();
let interval = self.check_interval;
thread::spawn(move || {
loop {
thread::sleep(interval);
let pending = sender.get_pending_notifications();
if !pending.is_empty() {
println!("\n=== 扫描到 {} 条待发送通知 ===", pending.len());
}
for notification_id in pending {
// 计算退避延迟
if Self::should_retry(&sender, ¬ification_id) {
let _ = sender.send(¬ification_id);
}
}
}
});
}
// 判断是否应该重试
fn should_retry(sender: &NotificationSender, notification_id: &str) -> bool {
let notifications = sender.notifications.lock().unwrap();
if let Some(notification) = notifications.get(notification_id) {
if let Some(last_retry) = notification.last_retry_at {
// 指数退避:1s, 2s, 4s, 8s, ...
let backoff = Duration::from_secs(2_u64.pow(notification.retry_count));
last_retry.elapsed() >= backoff
} else {
true // 首次发送
}
} else {
false
}
}
}
---
04.支付通知示例
a.支付结果通知
支付完成后通知商户,商户需要提供查询接口作为补偿。
b.支付通知实现
---
#[derive(Debug, Clone)]
struct PaymentResult {
order_id: String,
amount: f64,
status: String,
timestamp: u64,
}
struct PaymentService {
sender: Arc<NotificationSender>,
payments: Arc<Mutex<HashMap<String, PaymentResult>>>,
}
impl PaymentService {
fn new(sender: Arc<NotificationSender>) -> Self {
PaymentService {
sender,
payments: Arc::new(Mutex::new(HashMap::new())),
}
}
// 处理支付
fn process_payment(&self, order_id: String, amount: f64) -> Result<(), Box<dyn Error>> {
println!("=== 处理支付 ===");
println!("订单: {}, 金额: {}", order_id, amount);
// 1. 执行支付
let payment = PaymentResult {
order_id: order_id.clone(),
amount,
status: "SUCCESS".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
{
let mut payments = self.payments.lock().unwrap();
payments.insert(order_id.clone(), payment.clone());
}
println!("支付成功");
// 2. 发送通知
let notification = Notification::new(
format!("notify-{}", order_id),
format!("http://merchant.com/callback/{}", order_id),
serde_json::to_string(&payment)?,
5, // 最多重试5次
);
self.sender.add_notification(notification.clone());
self.sender.send(¬ification.id)?;
Ok(())
}
// 查询接口(供商户主动查询)
fn query_payment(&self, order_id: &str) -> Option<PaymentResult> {
let payments = self.payments.lock().unwrap();
payments.get(order_id).cloned()
}
}
// 商户服务
struct MerchantService {
payment_service: Arc<PaymentService>,
received_notifications: Arc<Mutex<HashMap<String, PaymentResult>>>,
}
impl MerchantService {
fn new(payment_service: Arc<PaymentService>) -> Self {
MerchantService {
payment_service,
received_notifications: Arc::new(Mutex::new(HashMap::new())),
}
}
// 接收通知回调
fn handle_callback(&self, order_id: String, payment: PaymentResult) {
println!("\n商户收到支付通知:");
println!(" 订单: {}", order_id);
println!(" 金额: {}", payment.amount);
println!(" 状态: {}", payment.status);
let mut notifications = self.received_notifications.lock().unwrap();
notifications.insert(order_id, payment);
}
// 主动查询支付结果
fn query_payment_result(&self, order_id: &str) {
println!("\n商户主动查询支付结果:");
println!(" 订单: {}", order_id);
match self.payment_service.query_payment(order_id) {
Some(payment) => {
println!(" 查询成功:");
println!(" 金额: {}", payment.amount);
println!(" 状态: {}", payment.status);
// 保存结果
let mut notifications = self.received_notifications.lock().unwrap();
notifications.insert(order_id.to_string(), payment);
}
None => {
println!(" 查询失败: 支付记录不存在");
}
}
}
}
---
05.完整示例
a.支付通知流程
展示从支付到通知的完整流程,包括重试和主动查询。
b.完整示例代码
---
fn best_effort_notification_example() {
// 创建通知发送器
let sender = Arc::new(NotificationSender::new());
// 启动重试调度器
let scheduler = RetryScheduler::new(sender.clone(), Duration::from_secs(1));
scheduler.start();
// 创建支付服务
let payment_service = Arc::new(PaymentService::new(sender.clone()));
// 创建商户服务
let merchant_service = MerchantService::new(payment_service.clone());
// 处理多笔支付
for i in 1..=3 {
let order_id = format!("ORD-{:03}", i);
let amount = 100.0 * i as f64;
match payment_service.process_payment(order_id.clone(), amount) {
Ok(_) => println!("订单 {} 支付处理完成\n", order_id),
Err(e) => println!("订单 {} 支付处理失败: {}\n", order_id, e),
}
thread::sleep(Duration::from_millis(500));
}
// 等待通知发送
println!("等待通知发送...");
thread::sleep(Duration::from_secs(10));
// 检查失败的通知
let failed = sender.get_failed_notifications();
if !failed.is_empty() {
println!("\n=== 失败的通知 ===");
for notification in &failed {
println!("通知 {}: 重试{}次后失败", notification.id, notification.retry_count);
// 商户主动查询
let order_id = notification.id.replace("notify-", "");
merchant_service.query_payment_result(&order_id);
}
}
// 显示最终状态
println!("\n=== 最终状态 ===");
let notifications = sender.notifications.lock().unwrap();
for (id, notification) in notifications.iter() {
println!("通知 {}: {:?} (重试{}次)",
id, notification.status, notification.retry_count);
}
}
---
06.最佳实践
a.设计要点
最大努力通知需要合理设计重试策略和查询接口,��保最终一致性。
b.实践建议
---
// 最大努力通知最佳实践
// 1. 重试策略
// 使用指数退避,避免频繁重试
fn calculate_backoff_delay(retry_count: u32) -> Duration {
let base = 2_u64.pow(retry_count);
let max_delay = 3600; // 最大1小时
Duration::from_secs(base.min(max_delay))
}
// 2. 幂等性设计
// 通知接口必须幂等
impl MerchantService {
fn handle_callback_idempotent(&self, order_id: String, payment: PaymentResult) {
let mut notifications = self.received_notifications.lock().unwrap();
// 检查是否已处理
if notifications.contains_key(&order_id) {
println!("订单 {} 通知已处理,跳过", order_id);
return;
}
// 处理通知
println!("处理订单 {} 的支付通知", order_id);
notifications.insert(order_id, payment);
}
}
// 3. 查询接口设计
// 提供查询接口供接收方主动查询
impl PaymentService {
fn query_payment_with_auth(&self, order_id: &str, merchant_id: &str) -> Option<PaymentResult> {
// 验证商户权限
if !self.verify_merchant(merchant_id, order_id) {
println!("商户 {} 无权查询订单 {}", merchant_id, order_id);
return None;
}
let payments = self.payments.lock().unwrap();
payments.get(order_id).cloned()
}
fn verify_merchant(&self, merchant_id: &str, order_id: &str) -> bool {
// 验证商户是否有权限查询该订单
true
}
}
// 4. 通知记录
// 记录所有通知尝试
struct NotificationLog {
notification_id: String,
attempt_time: Instant,
success: bool,
error_message: Option<String>,
}
impl NotificationSender {
fn log_attempt(&self, notification_id: &str, success: bool, error: Option<String>) {
let log = NotificationLog {
notification_id: notification_id.to_string(),
attempt_time: Instant::now(),
success,
error_message: error,
};
println!("[Log] 通知 {} - {} - {:?}",
log.notification_id,
if log.success { "成功" } else { "失败" },
log.error_message);
}
}
// 5. 监控告警
// 监控失败率和重试次数
impl NotificationSender {
fn get_statistics(&self) -> (usize, usize, usize) {
let notifications = self.notifications.lock().unwrap();
let total = notifications.len();
let sent = notifications.values()
.filter(|n| n.status == NotificationStatus::Sent)
.count();
let failed = notifications.values()
.filter(|n| n.status == NotificationStatus::Failed)
.count();
(total, sent, failed)
}
fn monitor(&self) {
let (total, sent, failed) = self.get_statistics();
let failure_rate = if total > 0 {
(failed as f64 / total as f64) * 100.0
} else {
0.0
};
println!("通知统计:");
println!(" 总数: {}", total);
println!(" 成功: {}", sent);
println!(" 失败: {}", failed);
println!(" 失败率: {:.2}%", failure_rate);
if failure_rate > 20.0 {
println!("⚠️ 警告:通知失败率过高");
// 发送告警
}
}
}
// 6. 超时设置
// 设置合理的超时时间
struct NotificationConfig {
http_timeout: Duration,
max_retries: u32,
retry_interval: Duration,
}
impl Default for NotificationConfig {
fn default() -> Self {
NotificationConfig {
http_timeout: Duration::from_secs(10),
max_retries: 5,
retry_interval: Duration::from_secs(1),
}
}
}
---
5 实践与优化
5.1 事务边界设计
01.事务边界原则
a.最小化事务范围
事务应该尽可能小,只包含必要的操作,减少锁持有时间和资源占用。
b.边界划分示例
---
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
struct Account {
balance: i64,
}
struct AccountService {
accounts: Arc<Mutex<HashMap<String, Account>>>,
}
impl AccountService {
// 错误示例:事务边界过大
fn transfer_bad(&self, from: &str, to: &str, amount: i64) {
let mut accounts = self.accounts.lock().unwrap();
// 执行耗时操作(不应在事务内)
self.validate_user(from); // 可能调用外部服务
self.check_risk(amount); // 风控检查
// 实际转账
if let Some(from_acc) = accounts.get_mut(from) {
from_acc.balance -= amount;
}
if let Some(to_acc) = accounts.get_mut(to) {
to_acc.balance += amount;
}
}
// 正确示例:最小化事务边界
fn transfer_good(&self, from: &str, to: &str, amount: i64) {
// 事务外执行验证
self.validate_user(from);
self.check_risk(amount);
// 最小化事务范围
{
let mut accounts = self.accounts.lock().unwrap();
if let Some(from_acc) = accounts.get_mut(from) {
from_acc.balance -= amount;
}
if let Some(to_acc) = accounts.get_mut(to) {
to_acc.balance += amount;
}
}
}
fn validate_user(&self, _user: &str) {
// 模拟外部服务调用
std::thread::sleep(std::time::Duration::from_millis(100));
}
fn check_risk(&self, _amount: i64) {
// 风控检查
}
}
---
02.读写分离
a.区分读写事务
读操作和写操作使用不同的事务策略,读操作可以使用更低的隔离级别。
b.读写分离示例
---
use std::sync::RwLock;
struct DataStore {
data: Arc<RwLock<HashMap<String, String>>>,
}
impl DataStore {
fn new() -> Self {
DataStore {
data: Arc::new(RwLock::new(HashMap::new())),
}
}
// 读操作:使用读锁
fn read(&self, key: &str) -> Option<String> {
let data = self.data.read().unwrap();
data.get(key).cloned()
}
// 批量读:共享读锁
fn batch_read(&self, keys: &[&str]) -> Vec<Option<String>> {
let data = self.data.read().unwrap();
keys.iter()
.map(|key| data.get(*key).cloned())
.collect()
}
// 写操作:使用写锁
fn write(&self, key: String, value: String) {
let mut data = self.data.write().unwrap();
data.insert(key, value);
}
// 事务性更新
fn update_if_exists(&self, key: &str, value: String) -> bool {
let mut data = self.data.write().unwrap();
if data.contains_key(key) {
data.insert(key.to_string(), value);
true
} else {
false
}
}
}
---
03.事务嵌套处理
a.避免嵌套事务
嵌套事务会增加复杂度和死锁风险,应该重构为平铺结构。
b.重构示例
---
struct OrderService {
orders: Arc<Mutex<HashMap<String, Order>>>,
inventory: Arc<Mutex<HashMap<String, i32>>>,
}
#[derive(Clone)]
struct Order {
id: String,
items: Vec<String>,
}
impl OrderService {
// 错误:嵌套事务
fn create_order_bad(&self, order: Order) {
let mut orders = self.orders.lock().unwrap();
// 嵌套获取另一个锁
for item in &order.items {
let mut inventory = self.inventory.lock().unwrap();
if let Some(stock) = inventory.get_mut(item) {
*stock -= 1;
}
}
orders.insert(order.id.clone(), order);
}
// 正确:平铺事务
fn create_order_good(&self, order: Order) {
// 先锁库存
{
let mut inventory = self.inventory.lock().unwrap();
for item in &order.items {
if let Some(stock) = inventory.get_mut(item) {
*stock -= 1;
}
}
}
// 再锁订单
{
let mut orders = self.orders.lock().unwrap();
orders.insert(order.id.clone(), order);
}
}
}
---
04.长事务拆分
a.拆分策略
将长事务拆分为多个短事务,使用补偿机制保证一致性。
b.拆分示例
---
struct BatchProcessor {
data: Arc<Mutex<Vec<String>>>,
}
impl BatchProcessor {
// 错误:长事务处理大批量
fn process_batch_bad(&self, items: Vec<String>) {
let mut data = self.data.lock().unwrap();
for item in items {
// 处理每个项目
self.process_item(&item);
data.push(item);
}
}
// 正确:分批处理
fn process_batch_good(&self, items: Vec<String>) {
const BATCH_SIZE: usize = 100;
for chunk in items.chunks(BATCH_SIZE) {
// 每批使用独立事务
{
let mut data = self.data.lock().unwrap();
for item in chunk {
self.process_item(item);
data.push(item.clone());
}
}
// 批次间释放锁
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
fn process_item(&self, _item: &str) {
// 处理逻辑
}
}
---
05.事务超时设置
a.超时机制
为事务设置合理的超时时间,避免长时间占用资源。
b.超时实现
---
use std::time::{Duration, Instant};
struct TransactionContext {
start_time: Instant,
timeout: Duration,
}
impl TransactionContext {
fn new(timeout: Duration) -> Self {
TransactionContext {
start_time: Instant::now(),
timeout,
}
}
fn check_timeout(&self) -> Result<(), String> {
if self.start_time.elapsed() > self.timeout {
Err("事务超时".to_string())
} else {
Ok(())
}
}
fn execute_with_timeout<F, T>(&self, operation: F) -> Result<T, String>
where
F: FnOnce() -> T,
{
self.check_timeout()?;
Ok(operation())
}
}
// 使用示例
fn transfer_with_timeout(ctx: &TransactionContext) -> Result<(), String> {
ctx.check_timeout()?;
// 执行转账操作
println!("执行转账");
ctx.check_timeout()?;
// 更新余额
println!("更新余额");
Ok(())
}
---
06.最佳实践
a.设计原则
合理设计事务边界,平衡一致性和性能。
b.实践建议
---
// 1. 事务边界检查清单
struct TransactionBoundaryChecklist {
// 是否包含外部调用
has_external_calls: bool,
// 是否有长时间计算
has_long_computation: bool,
// 锁持有时间(毫秒)
lock_duration_ms: u64,
// 涉及的表/资源数量
resource_count: usize,
}
impl TransactionBoundaryChecklist {
fn validate(&self) -> Result<(), String> {
if self.has_external_calls {
return Err("事务内不应包含外部调用".to_string());
}
if self.has_long_computation {
return Err("事务内不应包含长时间计算".to_string());
}
if self.lock_duration_ms > 100 {
return Err("锁持有时间过长".to_string());
}
if self.resource_count > 3 {
return Err("涉及资源过多,考虑拆分".to_string());
}
Ok(())
}
}
// 2. 事务模板
fn execute_transaction<F, T>(operation: F) -> Result<T, String>
where
F: FnOnce() -> Result<T, String>,
{
let start = Instant::now();
// 执行事务
let result = operation()?;
// 记录执行时间
let duration = start.elapsed();
if duration > Duration::from_millis(100) {
println!("警告:事务执行时间过长 {:?}", duration);
}
Ok(result)
}
// 3. 事务监控
struct TransactionMetrics {
total_count: u64,
timeout_count: u64,
avg_duration_ms: f64,
}
impl TransactionMetrics {
fn record(&mut self, duration: Duration, timeout: bool) {
self.total_count += 1;
if timeout {
self.timeout_count += 1;
}
let duration_ms = duration.as_millis() as f64;
self.avg_duration_ms = (self.avg_duration_ms * (self.total_count - 1) as f64
+ duration_ms) / self.total_count as f64;
}
fn report(&self) {
println!("事务统计:");
println!(" 总数: {}", self.total_count);
println!(" 超时: {}", self.timeout_count);
println!(" 平均耗时: {:.2}ms", self.avg_duration_ms);
println!(" 超时率: {:.2}%",
(self.timeout_count as f64 / self.total_count as f64) * 100.0);
}
}
---
5.2 长事务处理
01.长事务问题识别
a.长事务的危害
长事务会长时间持有锁,导致其他事务等待,降低系统并发性能,增加死锁风险。
b.识别长事务
---
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::collections::HashMap;
#[derive(Debug, Clone)]
struct TransactionInfo {
id: String,
start_time: Instant,
operation: String,
locked_resources: Vec<String>,
}
struct TransactionMonitor {
active_transactions: Arc<Mutex<HashMap<String, TransactionInfo>>>,
long_transaction_threshold: Duration,
}
impl TransactionMonitor {
fn new(threshold: Duration) -> Self {
TransactionMonitor {
active_transactions: Arc::new(Mutex::new(HashMap::new())),
long_transaction_threshold: threshold,
}
}
// 开始事务
fn begin_transaction(&self, id: String, operation: String) {
let mut transactions = self.active_transactions.lock().unwrap();
let info = TransactionInfo {
id: id.clone(),
start_time: Instant::now(),
operation,
locked_resources: Vec::new(),
};
transactions.insert(id, info);
}
// 结束事务
fn end_transaction(&self, id: &str) {
let mut transactions = self.active_transactions.lock().unwrap();
if let Some(info) = transactions.remove(id) {
let duration = info.start_time.elapsed();
if duration > self.long_transaction_threshold {
println!("⚠️ 检测到长事务:");
println!(" ID: {}", info.id);
println!(" 操作: {}", info.operation);
println!(" 耗时: {:?}", duration);
println!(" 锁定资源: {:?}", info.locked_resources);
}
}
}
// 扫描长事务
fn scan_long_transactions(&self) -> Vec<TransactionInfo> {
let transactions = self.active_transactions.lock().unwrap();
transactions
.values()
.filter(|tx| tx.start_time.elapsed() > self.long_transaction_threshold)
.cloned()
.collect()
}
// 记录锁定资源
fn add_locked_resource(&self, tx_id: &str, resource: String) {
let mut transactions = self.active_transactions.lock().unwrap();
if let Some(tx) = transactions.get_mut(tx_id) {
tx.locked_resources.push(resource);
}
}
}
---
02.分批处理策略
a.大批量数据处理
将大批量操作拆分为多个小批次,每个批次使用独立事务,避免长时间锁定。
b.分批处理实现
---
use std::thread;
struct BatchTransactionProcessor {
monitor: Arc<TransactionMonitor>,
batch_size: usize,
batch_interval: Duration,
}
impl BatchTransactionProcessor {
fn new(monitor: Arc<TransactionMonitor>, batch_size: usize) -> Self {
BatchTransactionProcessor {
monitor,
batch_size,
batch_interval: Duration::from_millis(50),
}
}
// 批量更新用户积分
fn batch_update_points(&self, updates: Vec<(String, i32)>) -> Result<(), String> {
let total = updates.len();
let batches = updates.chunks(self.batch_size);
let batch_count = (total + self.batch_size - 1) / self.batch_size;
println!("=== 批量更新积分 ===");
println!("总记录数: {}", total);
println!("批次大小: {}", self.batch_size);
println!("批次数量: {}", batch_count);
for (idx, batch) in batches.enumerate() {
let tx_id = format!("batch-tx-{}", idx);
// 开始事务
self.monitor.begin_transaction(
tx_id.clone(),
format!("批量更新积分 批次{}/{}", idx + 1, batch_count),
);
// 处理当前批次
for (user_id, points) in batch {
self.update_user_points(user_id, *points);
self.monitor.add_locked_resource(&tx_id, user_id.clone());
}
// 结束事务
self.monitor.end_transaction(&tx_id);
println!("批次 {}/{} 完成,处理 {} 条记录",
idx + 1, batch_count, batch.len());
// 批次间休息,释放资源
if idx < batch_count - 1 {
thread::sleep(self.batch_interval);
}
}
println!("批量更新完成");
Ok(())
}
fn update_user_points(&self, user_id: &str, points: i32) {
// 模拟数据库更新
thread::sleep(Duration::from_millis(10));
}
// 批量删除过期数据
fn batch_delete_expired(&self, ids: Vec<String>) -> Result<usize, String> {
let mut deleted = 0;
for (idx, chunk) in ids.chunks(self.batch_size).enumerate() {
let tx_id = format!("delete-tx-{}", idx);
self.monitor.begin_transaction(
tx_id.clone(),
format!("批量删除 批次{}", idx + 1),
);
for id in chunk {
if self.delete_record(id) {
deleted += 1;
}
self.monitor.add_locked_resource(&tx_id, id.clone());
}
self.monitor.end_transaction(&tx_id);
thread::sleep(self.batch_interval);
}
Ok(deleted)
}
fn delete_record(&self, id: &str) -> bool {
thread::sleep(Duration::from_millis(5));
true
}
}
---
03.游标分页处理
a.游标遍历大数据集
使用游标分页遍历大数据集,避免一次性加载所有数据。
b.游标实现
---
#[derive(Debug, Clone)]
struct Record {
id: u64,
data: String,
version: u32,
}
struct CursorProcessor {
monitor: Arc<TransactionMonitor>,
page_size: usize,
}
impl CursorProcessor {
fn new(monitor: Arc<TransactionMonitor>, page_size: usize) -> Self {
CursorProcessor {
monitor,
page_size,
}
}
// 使用游标处理大数据集
fn process_with_cursor<F>(&self, mut processor: F) -> Result<usize, String>
where
F: FnMut(&Record) -> Result<(), String>,
{
let mut cursor = 0u64;
let mut total_processed = 0;
let mut page_num = 0;
println!("=== 游标分页处理 ===");
loop {
page_num += 1;
let tx_id = format!("cursor-tx-{}", page_num);
// 开始事务
self.monitor.begin_transaction(
tx_id.clone(),
format!("游标处理 页{}", page_num),
);
// 获取当前页数据
let records = self.fetch_page(cursor, self.page_size);
if records.is_empty() {
self.monitor.end_transaction(&tx_id);
break;
}
println!("处理第 {} 页,记录数: {}", page_num, records.len());
// 处理当前页
for record in &records {
processor(record)?;
self.monitor.add_locked_resource(&tx_id, format!("record-{}", record.id));
cursor = record.id;
}
total_processed += records.len();
// 结束事务
self.monitor.end_transaction(&tx_id);
// 如果返回的记录数少于页大小,说明已到末尾
if records.len() < self.page_size {
break;
}
// 页间休息
thread::sleep(Duration::from_millis(20));
}
println!("游标处理完成,总计: {} 条记录", total_processed);
Ok(total_processed)
}
// 模拟从数据库获取一页数据
fn fetch_page(&self, cursor: u64, limit: usize) -> Vec<Record> {
thread::sleep(Duration::from_millis(50));
// 模拟返回数据
(0..limit.min(10))
.map(|i| Record {
id: cursor + i as u64 + 1,
data: format!("data-{}", cursor + i as u64 + 1),
version: 1,
})
.collect()
}
}
---
04.异步化改造
a.异步事务处理
将长事务中的耗时操作异步化,减少事务持有锁的时间。
b.异步处理实现
---
use std::sync::mpsc::{channel, Sender, Receiver};
#[derive(Debug, Clone)]
struct AsyncTask {
id: String,
task_type: String,
payload: String,
}
struct AsyncTransactionProcessor {
monitor: Arc<TransactionMonitor>,
task_sender: Sender<AsyncTask>,
}
impl AsyncTransactionProcessor {
fn new(monitor: Arc<TransactionMonitor>) -> Self {
let (sender, receiver) = channel();
// 启动异步处理线程
Self::start_worker(receiver, monitor.clone());
AsyncTransactionProcessor {
monitor,
task_sender: sender,
}
}
// 处理订单(异步化改造)
fn process_order_async(&self, order_id: String, items: Vec<String>) -> Result<(), String> {
let tx_id = format!("order-tx-{}", order_id);
println!("=== 处理订单 {} ===", order_id);
// 开始事务
self.monitor.begin_transaction(
tx_id.clone(),
format!("处理订单 {}", order_id),
);
// 1. 快速完成核心数据库操作
self.create_order_record(&order_id);
self.lock_inventory(&items);
self.monitor.add_locked_resource(&tx_id, format!("order-{}", order_id));
// 结束事务(快速释放锁)
self.monitor.end_transaction(&tx_id);
println!("订单 {} 核心事务完成", order_id);
// 2. 异步执行耗时操作
self.task_sender.send(AsyncTask {
id: order_id.clone(),
task_type: "send_notification".to_string(),
payload: format!("订单 {} 已创建", order_id),
}).map_err(|e| e.to_string())?;
self.task_sender.send(AsyncTask {
id: order_id.clone(),
task_type: "update_analytics".to_string(),
payload: format!("items: {:?}", items),
}).map_err(|e| e.to_string())?;
Ok(())
}
fn create_order_record(&self, order_id: &str) {
thread::sleep(Duration::from_millis(20));
println!(" 创建订单记录: {}", order_id);
}
fn lock_inventory(&self, items: &[String]) {
thread::sleep(Duration::from_millis(30));
println!(" 锁定库存: {} 个商品", items.len());
}
// 启动异步工作线程
fn start_worker(receiver: Receiver<AsyncTask>, monitor: Arc<TransactionMonitor>) {
thread::spawn(move || {
println!("异步工作线程启动");
while let Ok(task) = receiver.recv() {
println!(" 执行异步任务: {} - {}", task.task_type, task.id);
match task.task_type.as_str() {
"send_notification" => {
// 发送通知(耗时操作)
thread::sleep(Duration::from_millis(200));
println!(" 通知已发送: {}", task.payload);
}
"update_analytics" => {
// 更新分析数据(耗时操作)
thread::sleep(Duration::from_millis(150));
println!(" 分析数据已更新");
}
_ => {}
}
}
});
}
}
---
05.超时中断机制
a.事务超时控制
为长事务设置超时时间,超时后自动中断并回滚。
b.超时控制实现
---
use std::sync::atomic::{AtomicBool, Ordering};
struct TimeoutTransaction {
monitor: Arc<TransactionMonitor>,
timeout: Duration,
}
impl TimeoutTransaction {
fn new(monitor: Arc<TransactionMonitor>, timeout: Duration) -> Self {
TimeoutTransaction {
monitor,
timeout,
}
}
// 执行带超时的事务
fn execute_with_timeout<F, T>(
&self,
tx_id: String,
operation: String,
work: F,
) -> Result<T, String>
where
F: FnOnce() -> Result<T, String> + Send + 'static,
T: Send + 'static,
{
println!("=== 执行超时事务 {} ===", tx_id);
self.monitor.begin_transaction(tx_id.clone(), operation);
let timeout_flag = Arc::new(AtomicBool::new(false));
let timeout_flag_clone = timeout_flag.clone();
let tx_id_clone = tx_id.clone();
let monitor_clone = self.monitor.clone();
// 启动超时监控线程
thread::spawn(move || {
thread::sleep(self.timeout);
if !timeout_flag_clone.load(Ordering::SeqCst) {
println!("⚠️ 事务 {} 超时", tx_id_clone);
timeout_flag_clone.store(true, Ordering::SeqCst);
monitor_clone.end_transaction(&tx_id_clone);
}
});
// 执行工作
let result = work();
// 检查是否超时
if timeout_flag.load(Ordering::SeqCst) {
return Err(format!("事务 {} 执行超时", tx_id));
}
timeout_flag.store(true, Ordering::SeqCst);
self.monitor.end_transaction(&tx_id);
result
}
// 可中断的批量处理
fn interruptible_batch_process(
&self,
items: Vec<String>,
) -> Result<usize, String> {
let tx_id = "interruptible-batch".to_string();
let processed = Arc::new(Mutex::new(0usize));
let processed_clone = processed.clone();
self.execute_with_timeout(
tx_id,
"可中断批量处理".to_string(),
move || {
for (idx, item) in items.iter().enumerate() {
// 模拟处理
thread::sleep(Duration::from_millis(100));
let mut count = processed_clone.lock().unwrap();
*count += 1;
println!(" 处理项目 {}/{}: {}", idx + 1, items.len(), item);
}
let count = *processed_clone.lock().unwrap();
Ok(count)
},
)?;
let final_count = *processed.lock().unwrap();
Ok(final_count)
}
}
---
06.最佳实践
a.长事务优化策略
综合运用多种技术手段优化长事务,提升系统性能。
b.实践建议
---
// 长事务处理最佳实践
struct LongTransactionOptimizer {
monitor: Arc<TransactionMonitor>,
batch_processor: BatchTransactionProcessor,
cursor_processor: CursorProcessor,
async_processor: AsyncTransactionProcessor,
timeout_processor: TimeoutTransaction,
}
impl LongTransactionOptimizer {
fn new() -> Self {
let monitor = Arc::new(TransactionMonitor::new(Duration::from_secs(5)));
LongTransactionOptimizer {
batch_processor: BatchTransactionProcessor::new(monitor.clone(), 100),
cursor_processor: CursorProcessor::new(monitor.clone(), 50),
async_processor: AsyncTransactionProcessor::new(monitor.clone()),
timeout_processor: TimeoutTransaction::new(monitor.clone(), Duration::from_secs(10)),
monitor,
}
}
// 综合示例:优化大数据迁移
fn optimize_data_migration(&self) -> Result<(), String> {
println!("=== 数据迁移优化示例 ===\n");
// 1. 使用游标分页读取源数据
println!("步骤1: 游标分页读取");
let mut records = Vec::new();
self.cursor_processor.process_with_cursor(|record| {
records.push(record.clone());
Ok(())
})?;
// 2. 分批写入目标表
println!("\n步骤2: 分批写入");
let updates: Vec<(String, i32)> = records
.iter()
.map(|r| (format!("user-{}", r.id), r.version as i32))
.collect();
self.batch_processor.batch_update_points(updates)?;
// 3. 异步更新索引
println!("\n步骤3: 异步更新索引");
for record in records.iter().take(5) {
self.async_processor.process_order_async(
format!("order-{}", record.id),
vec!["item1".to_string(), "item2".to_string()],
)?;
}
// 4. 监控长事务
println!("\n步骤4: 检查长事务");
let long_txs = self.monitor.scan_long_transactions();
if !long_txs.is_empty() {
println!("发现 {} 个长事务", long_txs.len());
for tx in long_txs {
println!(" - {} (耗时: {:?})", tx.id, tx.start_time.elapsed());
}
}
println!("\n数据迁移完成");
Ok(())
}
// 决策树:选择合适的优化策略
fn choose_strategy(&self, data_size: usize, operation_type: &str) -> String {
match (data_size, operation_type) {
(size, "read") if size > 10000 => {
"使用游标分页,避免一次性加载大量数据".to_string()
}
(size, "write") if size > 1000 => {
"使用分批处理,每批100-500条记录".to_string()
}
(_, "mixed") => {
"拆分读写操作,核心事务+异步处理".to_string()
}
(_, op) if op.contains("slow") => {
"设置超时机制,避免长时间阻塞".to_string()
}
_ => {
"正常事务处理即可".to_string()
}
}
}
// 性能指标收集
fn collect_metrics(&self) -> TransactionMetrics {
let long_txs = self.monitor.scan_long_transactions();
TransactionMetrics {
long_transaction_count: long_txs.len(),
avg_duration: if !long_txs.is_empty() {
let total: Duration = long_txs.iter()
.map(|tx| tx.start_time.elapsed())
.sum();
total / long_txs.len() as u32
} else {
Duration::from_secs(0)
},
max_locked_resources: long_txs.iter()
.map(|tx| tx.locked_resources.len())
.max()
.unwrap_or(0),
}
}
}
#[derive(Debug)]
struct TransactionMetrics {
long_transaction_count: usize,
avg_duration: Duration,
max_locked_resources: usize,
}
impl TransactionMetrics {
fn report(&self) {
println!("=== 事务性能指标 ===");
println!("长事务数量: {}", self.long_transaction_count);
println!("平均耗时: {:?}", self.avg_duration);
println!("最大锁定资源数: {}", self.max_locked_resources);
if self.long_transaction_count > 10 {
println!("⚠️ 警告: 长事务数量过多,建议优化");
}
}
}
---
5.3 死锁检测与处理
01.死锁原理
a.死锁四要素
互斥条件、持有并等待、不可剥夺、循环等待,四个条件同时满足时发生死锁。
b.死锁检测实现
---
use std::sync::{Arc, Mutex};
use std::collections::{HashMap, HashSet};
use std::time::Instant;
#[derive(Debug, Clone)]
struct LockRequest {
tx_id: String,
resource_id: String,
timestamp: Instant,
}
struct DeadlockDetector {
// 资源持有关系:资源ID -> 持有该资源的事务ID
resource_holders: Arc<Mutex<HashMap<String, String>>>,
// 等待关系:事务ID -> 等待的资源ID列表
wait_for: Arc<Mutex<HashMap<String, Vec<String>>>>,
// 事务持有的资源:事务ID -> 资源ID列表
tx_resources: Arc<Mutex<HashMap<String, Vec<String>>>>,
}
impl DeadlockDetector {
fn new() -> Self {
DeadlockDetector {
resource_holders: Arc::new(Mutex::new(HashMap::new())),
wait_for: Arc::new(Mutex::new(HashMap::new())),
tx_resources: Arc::new(Mutex::new(HashMap::new())),
}
}
// 事务获取资源
fn acquire_resource(&self, tx_id: &str, resource_id: &str) -> Result<(), String> {
let mut holders = self.resource_holders.lock().unwrap();
let mut tx_res = self.tx_resources.lock().unwrap();
// 检查资源是否被占用
if let Some(holder) = holders.get(resource_id) {
if holder != tx_id {
// 资源被占用,记录等待关系
drop(holders);
drop(tx_res);
self.add_wait_for(tx_id, resource_id)?;
return Err(format!("资源 {} 被事务 {} 占用", resource_id, holder));
}
}
// 获取资源
holders.insert(resource_id.to_string(), tx_id.to_string());
tx_res.entry(tx_id.to_string())
.or_insert_with(Vec::new)
.push(resource_id.to_string());
// 移除等待关系
drop(holders);
drop(tx_res);
self.remove_wait_for(tx_id, resource_id);
Ok(())
}
// 释放资源
fn release_resource(&self, tx_id: &str, resource_id: &str) {
let mut holders = self.resource_holders.lock().unwrap();
let mut tx_res = self.tx_resources.lock().unwrap();
holders.remove(resource_id);
if let Some(resources) = tx_res.get_mut(tx_id) {
resources.retain(|r| r != resource_id);
}
}
// 添加等待关系
fn add_wait_for(&self, tx_id: &str, resource_id: &str) -> Result<(), String> {
let mut wait_for = self.wait_for.lock().unwrap();
wait_for.entry(tx_id.to_string())
.or_insert_with(Vec::new)
.push(resource_id.to_string());
drop(wait_for);
// 检测死锁
if let Some(cycle) = self.detect_cycle(tx_id) {
return Err(format!("检测到死锁: {:?}", cycle));
}
Ok(())
}
// 移除等待关系
fn remove_wait_for(&self, tx_id: &str, resource_id: &str) {
let mut wait_for = self.wait_for.lock().unwrap();
if let Some(resources) = wait_for.get_mut(tx_id) {
resources.retain(|r| r != resource_id);
}
}
// 检测循环等待
fn detect_cycle(&self, start_tx: &str) -> Option<Vec<String>> {
let holders = self.resource_holders.lock().unwrap();
let wait_for = self.wait_for.lock().unwrap();
let mut visited = HashSet::new();
let mut path = Vec::new();
self.dfs(start_tx, &holders, &wait_for, &mut visited, &mut path)
}
fn dfs(
&self,
tx_id: &str,
holders: &HashMap<String, String>,
wait_for: &HashMap<String, Vec<String>>,
visited: &mut HashSet<String>,
path: &mut Vec<String>,
) -> Option<Vec<String>> {
if visited.contains(tx_id) {
// 找到环
if let Some(pos) = path.iter().position(|t| t == tx_id) {
return Some(path[pos..].to_vec());
}
return None;
}
visited.insert(tx_id.to_string());
path.push(tx_id.to_string());
// 查找当前事务等待的资源
if let Some(resources) = wait_for.get(tx_id) {
for resource in resources {
// 查找持有该资源的事务
if let Some(holder) = holders.get(resource) {
if let Some(cycle) = self.dfs(holder, holders, wait_for, visited, path) {
return Some(cycle);
}
}
}
}
path.pop();
None
}
// 打印等待图
fn print_wait_graph(&self) {
let holders = self.resource_holders.lock().unwrap();
let wait_for = self.wait_for.lock().unwrap();
println!("=== 等待图 ===");
println!("资源持有:");
for (resource, holder) in holders.iter() {
println!(" {} -> {}", resource, holder);
}
println!("等待关系:");
for (tx, resources) in wait_for.iter() {
println!(" {} 等待: {:?}", tx, resources);
}
}
}
---
02.死锁预防
a.资源排序
按固定顺序获取资源,避免循环等待。
b.资源排序实现
---
use std::cmp::Ordering;
struct OrderedLockManager {
detector: Arc<DeadlockDetector>,
}
impl OrderedLockManager {
fn new(detector: Arc<DeadlockDetector>) -> Self {
OrderedLockManager { detector }
}
// 按顺序获取多个资源
fn acquire_ordered(&self, tx_id: &str, mut resources: Vec<String>) -> Result<(), String> {
// 对资源ID排序
resources.sort();
println!("事务 {} 按顺序获取资源: {:?}", tx_id, resources);
for resource in &resources {
match self.detector.acquire_resource(tx_id, resource) {
Ok(_) => println!(" ✓ 获取资源: {}", resource),
Err(e) => {
println!(" ✗ 获取失败: {}", e);
// 释放已获取的资源
self.release_all(tx_id, resources.iter()
.take_while(|r| *r != resource)
.cloned()
.collect());
return Err(e);
}
}
}
Ok(())
}
// 释放所有资源
fn release_all(&self, tx_id: &str, resources: Vec<String>) {
for resource in resources {
self.detector.release_resource(tx_id, &resource);
println!(" 释放资源: {}", resource);
}
}
// 转账示例(正确顺序)
fn transfer_ordered(&self, from: &str, to: &str, amount: i64) -> Result<(), String> {
let tx_id = format!("transfer-{}-{}", from, to);
// 按字典序排序账户,避免死锁
let (first, second) = if from < to {
(from, to)
} else {
(to, from)
};
let resources = vec![
format!("account:{}", first),
format!("account:{}", second),
];
self.acquire_ordered(&tx_id, resources.clone())?;
// 执行转账
println!("执行转账: {} -> {} 金额: {}", from, to, amount);
std::thread::sleep(std::time::Duration::from_millis(100));
self.release_all(&tx_id, resources);
Ok(())
}
}
---
03.超时机制
a.等待超时
设置锁等待超时时间,超时后自动放弃并回滚。
b.超时实现
---
use std::time::Duration;
use std::thread;
struct TimeoutLockManager {
detector: Arc<DeadlockDetector>,
timeout: Duration,
}
impl TimeoutLockManager {
fn new(detector: Arc<DeadlockDetector>, timeout: Duration) -> Self {
TimeoutLockManager { detector, timeout }
}
// 带超时的资源获取
fn acquire_with_timeout(
&self,
tx_id: &str,
resource_id: &str,
) -> Result<(), String> {
let start = Instant::now();
let mut retry_count = 0;
loop {
match self.detector.acquire_resource(tx_id, resource_id) {
Ok(_) => {
println!("事务 {} 获取资源 {} (重试{}次)", tx_id, resource_id, retry_count);
return Ok(());
}
Err(e) => {
if start.elapsed() > self.timeout {
println!("⚠️ 事务 {} 获取资源 {} 超时", tx_id, resource_id);
return Err(format!("获取资源超时: {}", e));
}
retry_count += 1;
thread::sleep(Duration::from_millis(50));
}
}
}
}
// 批量获取资源(带超时)
fn acquire_batch_with_timeout(
&self,
tx_id: &str,
resources: Vec<String>,
) -> Result<(), String> {
let mut acquired = Vec::new();
for resource in &resources {
match self.acquire_with_timeout(tx_id, resource) {
Ok(_) => acquired.push(resource.clone()),
Err(e) => {
// 释放已获取的资源
for res in acquired {
self.detector.release_resource(tx_id, &res);
}
return Err(e);
}
}
}
Ok(())
}
}
---
04.死锁恢复
a.受害者选择
检测到死锁后,选择代价最小的事务回滚。
b.死锁恢复实现
---
#[derive(Debug, Clone)]
struct TransactionCost {
tx_id: String,
start_time: Instant,
operations_count: usize,
priority: i32,
}
struct DeadlockResolver {
detector: Arc<DeadlockDetector>,
tx_costs: Arc<Mutex<HashMap<String, TransactionCost>>>,
}
impl DeadlockResolver {
fn new(detector: Arc<DeadlockDetector>) -> Self {
DeadlockResolver {
detector,
tx_costs: Arc::new(Mutex::new(HashMap::new())),
}
}
// 注册事务
fn register_transaction(&self, tx_id: String, priority: i32) {
let mut costs = self.tx_costs.lock().unwrap();
costs.insert(tx_id.clone(), TransactionCost {
tx_id,
start_time: Instant::now(),
operations_count: 0,
priority,
});
}
// 增加操作计数
fn increment_operations(&self, tx_id: &str) {
let mut costs = self.tx_costs.lock().unwrap();
if let Some(cost) = costs.get_mut(tx_id) {
cost.operations_count += 1;
}
}
// 选择受害者事务
fn select_victim(&self, cycle: &[String]) -> Option<String> {
let costs = self.tx_costs.lock().unwrap();
let mut min_cost = usize::MAX;
let mut victim = None;
for tx_id in cycle {
if let Some(cost) = costs.get(tx_id) {
// 计算代价:优先级低、运行时间短、操作少的事务
let tx_cost = cost.operations_count * 100
+ cost.start_time.elapsed().as_millis() as usize
- (cost.priority as usize * 1000);
if tx_cost < min_cost {
min_cost = tx_cost;
victim = Some(tx_id.clone());
}
}
}
victim
}
// 回滚事务
fn rollback_transaction(&self, tx_id: &str) {
println!("⚠️ 回滚事务: {}", tx_id);
// 释放该事务持有的所有资源
let tx_res = self.detector.tx_resources.lock().unwrap();
if let Some(resources) = tx_res.get(tx_id) {
let resources_clone = resources.clone();
drop(tx_res);
for resource in resources_clone {
self.detector.release_resource(tx_id, &resource);
println!(" 释放资源: {}", resource);
}
}
// 清除等待关系
let mut wait_for = self.detector.wait_for.lock().unwrap();
wait_for.remove(tx_id);
// 清除成本信息
let mut costs = self.tx_costs.lock().unwrap();
costs.remove(tx_id);
}
// 处理死锁
fn handle_deadlock(&self, cycle: Vec<String>) -> Result<(), String> {
println!("=== 检测到死锁 ===");
println!("死锁环: {:?}", cycle);
if let Some(victim) = self.select_victim(&cycle) {
println!("选择受害者: {}", victim);
self.rollback_transaction(&victim);
Ok(())
} else {
Err("无法选择受害者".to_string())
}
}
}
---
05.等待图可视化
a.实时监控
可视化等待图,帮助理解死锁情况。
b.可视化实现
---
struct WaitGraphVisualizer {
detector: Arc<DeadlockDetector>,
}
impl WaitGraphVisualizer {
fn new(detector: Arc<DeadlockDetector>) -> Self {
WaitGraphVisualizer { detector }
}
// 生成DOT格式图
fn generate_dot(&self) -> String {
let holders = self.detector.resource_holders.lock().unwrap();
let wait_for = self.detector.wait_for.lock().unwrap();
let mut dot = String::from("digraph WaitGraph {\n");
dot.push_str(" rankdir=LR;\n");
dot.push_str(" node [shape=box];\n\n");
// 添加事务节点
let mut transactions = HashSet::new();
for holder in holders.values() {
transactions.insert(holder.clone());
}
for tx in wait_for.keys() {
transactions.insert(tx.clone());
}
for tx in &transactions {
dot.push_str(&format!(" \"{}\" [style=filled,fillcolor=lightblue];\n", tx));
}
// 添加资源节点
dot.push_str("\n node [shape=ellipse,fillcolor=lightgreen];\n");
for resource in holders.keys() {
dot.push_str(&format!(" \"{}\";\n", resource));
}
// 添加持有边
dot.push_str("\n // 持有关系\n");
for (resource, holder) in holders.iter() {
dot.push_str(&format!(" \"{}\" -> \"{}\" [color=green,label=\"持有\"];\n",
holder, resource));
}
// 添加等待边
dot.push_str("\n // 等待关系\n");
for (tx, resources) in wait_for.iter() {
for resource in resources {
if let Some(holder) = holders.get(resource) {
dot.push_str(&format!(" \"{}\" -> \"{}\" [color=red,label=\"等待\",style=dashed];\n",
tx, holder));
}
}
}
dot.push_str("}\n");
dot
}
// 打印ASCII图
fn print_ascii(&self) {
let holders = self.detector.resource_holders.lock().unwrap();
let wait_for = self.detector.wait_for.lock().unwrap();
println!("=== 等待图 (ASCII) ===");
for (tx, resources) in wait_for.iter() {
for resource in resources {
if let Some(holder) = holders.get(resource) {
println!("{} --[等待]--> {} --[持有]--> {}",
tx, resource, holder);
}
}
}
}
}
---
06.综合示例
a.完整死锁处理系统
集成检测、预防、恢复的完整系统。
b.综合实现
---
struct DeadlockManagementSystem {
detector: Arc<DeadlockDetector>,
resolver: DeadlockResolver,
ordered_manager: OrderedLockManager,
timeout_manager: TimeoutLockManager,
visualizer: WaitGraphVisualizer,
}
impl DeadlockManagementSystem {
fn new() -> Self {
let detector = Arc::new(DeadlockDetector::new());
DeadlockManagementSystem {
resolver: DeadlockResolver::new(detector.clone()),
ordered_manager: OrderedLockManager::new(detector.clone()),
timeout_manager: TimeoutLockManager::new(
detector.clone(),
Duration::from_secs(5),
),
visualizer: WaitGraphVisualizer::new(detector.clone()),
detector,
}
}
// 安全转账(综合策略)
fn safe_transfer(&self, from: &str, to: &str, amount: i64) -> Result<(), String> {
let tx_id = format!("tx-{}-{}", from, to);
// 1. 注册事务
self.resolver.register_transaction(tx_id.clone(), 1);
// 2. 使用资源排序策略
match self.ordered_manager.transfer_ordered(from, to, amount) {
Ok(_) => {
println!("✓ 转账成功");
Ok(())
}
Err(e) => {
// 3. 检测死锁
if let Some(cycle) = self.detector.detect_cycle(&tx_id) {
println!("检测到死锁,尝试恢复...");
self.visualizer.print_ascii();
self.resolver.handle_deadlock(cycle)?;
}
Err(e)
}
}
}
// 监控系统状态
fn monitor(&self) {
println!("\n=== 系统状态监控 ===");
self.detector.print_wait_graph();
self.visualizer.print_ascii();
}
// 压力测试
fn stress_test(&self) {
println!("=== 死锁压力测试 ===\n");
let accounts = vec!["A", "B", "C", "D"];
let mut handles = vec![];
for i in 0..10 {
let from = accounts[i % accounts.len()];
let to = accounts[(i + 1) % accounts.len()];
println!("启动转账 {}: {} -> {}", i, from, to);
// 模拟并发转账
thread::sleep(Duration::from_millis(100));
}
println!("\n压力测试完成");
self.monitor();
}
}
---
5.4 事务重试机制
01.重试策略
a.指数退避
失败后按指数增长的时间间隔重试,避免雪崩效应。
b.指数退避实现
---
use std::time::{Duration, Instant};
use std::thread;
struct RetryPolicy {
max_attempts: u32,
initial_delay: Duration,
max_delay: Duration,
multiplier: f64,
}
impl RetryPolicy {
fn exponential_backoff() -> Self {
RetryPolicy {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
multiplier: 2.0,
}
}
fn calculate_delay(&self, attempt: u32) -> Duration {
let delay_ms = (self.initial_delay.as_millis() as f64
* self.multiplier.powi(attempt as i32)) as u64;
Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as u64))
}
}
struct RetryExecutor {
policy: RetryPolicy,
}
impl RetryExecutor {
fn new(policy: RetryPolicy) -> Self {
RetryExecutor { policy }
}
// 执行带重试的操作
fn execute_with_retry<F, T>(&self, operation: F) -> Result<T, String>
where
F: Fn() -> Result<T, String>,
{
let mut attempt = 0;
loop {
attempt += 1;
match operation() {
Ok(result) => {
if attempt > 1 {
println!("✓ 操作成功 (尝试 {} 次)", attempt);
}
return Ok(result);
}
Err(e) => {
if attempt >= self.policy.max_attempts {
println!("✗ 操作失败,已达最大重试次数 {}", attempt);
return Err(format!("重试失败: {}", e));
}
let delay = self.policy.calculate_delay(attempt - 1);
println!("⚠️ 尝试 {} 失败: {},{}ms 后重试",
attempt, e, delay.as_millis());
thread::sleep(delay);
}
}
}
}
// 转账重试示例
fn transfer_with_retry(
&self,
from: &str,
to: &str,
amount: i64,
) -> Result<(), String> {
println!("=== 转账重试: {} -> {} 金额: {} ===", from, to, amount);
self.execute_with_retry(|| {
// 模拟可能失败的转账操作
if rand::random::<f64>() < 0.6 {
Err("网络超时".to_string())
} else {
println!(" 执行转账操作");
Ok(())
}
})
}
}
mod rand {
use std::cell::Cell;
thread_local! {
static SEED: Cell<u64> = Cell::new(12345);
}
pub fn random<T: From<f64>>() -> T {
SEED.with(|s| {
let seed = s.get();
let next = seed.wrapping_mul(1103515245).wrapping_add(12345);
s.set(next);
T::from((next % 1000) as f64 / 1000.0)
})
}
}
---
02.乐观锁重试
a.版本号机制
使用版本号检测冲突,冲突时自动重试。
b.乐观锁实现
---
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
#[derive(Debug, Clone)]
struct VersionedData {
value: i64,
version: u64,
}
struct OptimisticLockStore {
data: Arc<Mutex<HashMap<String, VersionedData>>>,
}
impl OptimisticLockStore {
fn new() -> Self {
OptimisticLockStore {
data: Arc::new(Mutex::new(HashMap::new())),
}
}
// 读取数据
fn read(&self, key: &str) -> Option<VersionedData> {
let data = self.data.lock().unwrap();
data.get(key).cloned()
}
// 乐观更新(CAS)
fn update(&self, key: &str, new_value: i64, expected_version: u64) -> Result<(), String> {
let mut data = self.data.lock().unwrap();
match data.get_mut(key) {
Some(entry) => {
if entry.version != expected_version {
return Err(format!("版本冲突: 期望 {}, 实际 {}",
expected_version, entry.version));
}
entry.value = new_value;
entry.version += 1;
Ok(())
}
None => {
data.insert(key.to_string(), VersionedData {
value: new_value,
version: 1,
});
Ok(())
}
}
}
}
struct OptimisticTransactionExecutor {
store: Arc<OptimisticLockStore>,
retry_policy: RetryPolicy,
}
impl OptimisticTransactionExecutor {
fn new(store: Arc<OptimisticLockStore>) -> Self {
OptimisticTransactionExecutor {
store,
retry_policy: RetryPolicy::exponential_backoff(),
}
}
// 带重试的乐观更新
fn update_with_retry<F>(
&self,
key: &str,
updater: F,
) -> Result<(), String>
where
F: Fn(i64) -> i64,
{
let mut attempt = 0;
loop {
attempt += 1;
// 1. 读取��前值和版本
let current = self.store.read(key)
.unwrap_or(VersionedData { value: 0, version: 0 });
// 2. 计算新值
let new_value = updater(current.value);
// 3. 尝试更新
match self.store.update(key, new_value, current.version) {
Ok(_) => {
println!("✓ 更新成功: {} = {} (尝试 {} 次)",
key, new_value, attempt);
return Ok(());
}
Err(e) => {
if attempt >= self.retry_policy.max_attempts {
return Err(format!("更新失败: {}", e));
}
let delay = self.retry_policy.calculate_delay(attempt - 1);
println!("⚠️ {},重试中...", e);
thread::sleep(delay);
}
}
}
}
// 转账示例(乐观锁)
fn transfer_optimistic(
&self,
from: &str,
to: &str,
amount: i64,
) -> Result<(), String> {
println!("=== 乐观锁转账: {} -> {} 金额: {} ===", from, to, amount);
// 扣款
self.update_with_retry(from, |balance| {
println!(" 扣款: {} 余额 {} -> {}", from, balance, balance - amount);
balance - amount
})?;
// 入账
self.update_with_retry(to, |balance| {
println!(" 入账: {} 余额 {} -> {}", to, balance, balance + amount);
balance + amount
})?;
Ok(())
}
}
---
03.幂等性保证
a.请求去重
使用唯一ID确保重试不会重复执行。
b.幂等性实现
---
use std::collections::HashSet;
struct IdempotentExecutor {
executed_ids: Arc<Mutex<HashSet<String>>>,
retry_executor: RetryExecutor,
}
impl IdempotentExecutor {
fn new() -> Self {
IdempotentExecutor {
executed_ids: Arc::new(Mutex::new(HashSet::new())),
retry_executor: RetryExecutor::new(RetryPolicy::exponential_backoff()),
}
}
// 幂等执行
fn execute_idempotent<F, T>(
&self,
request_id: String,
operation: F,
) -> Result<T, String>
where
F: Fn() -> Result<T, String>,
T: Clone,
{
// 检查是否已执行
{
let executed = self.executed_ids.lock().unwrap();
if executed.contains(&request_id) {
println!("⚠️ 请求 {} 已执行,跳过", request_id);
return Err("请求已执行".to_string());
}
}
// 执行操作(带重试)
let result = self.retry_executor.execute_with_retry(|| {
operation()
})?;
// 记录已执行
{
let mut executed = self.executed_ids.lock().unwrap();
executed.insert(request_id.clone());
println!("✓ 请求 {} 执行完成并记录", request_id);
}
Ok(result)
}
// 幂等转账
fn idempotent_transfer(
&self,
request_id: String,
from: &str,
to: &str,
amount: i64,
) -> Result<(), String> {
println!("=== 幂等转账 [{}] ===", request_id);
self.execute_idempotent(request_id, || {
println!(" 执行转账: {} -> {} 金额: {}", from, to, amount);
// 模拟转账操作
if rand::random::<f64>() < 0.7 {
Ok(())
} else {
Err("转账失败".to_string())
}
})
}
// 清理过期记录
fn cleanup_old_requests(&self, max_age: Duration) {
let mut executed = self.executed_ids.lock().unwrap();
let before_count = executed.len();
// 实际应用中应记录时间戳并清理过期记录
executed.clear();
println!("清理 {} 条过期请求记录", before_count);
}
}
---
04.补偿事务
a.Saga模式
长事务拆分为多个子事务,失败时执行补偿操作。
b.Saga实现
---
#[derive(Debug, Clone)]
struct SagaStep {
name: String,
forward: fn() -> Result<(), String>,
compensate: fn() -> Result<(), String>,
}
struct SagaExecutor {
steps: Vec<SagaStep>,
retry_policy: RetryPolicy,
}
impl SagaExecutor {
fn new() -> Self {
SagaExecutor {
steps: Vec::new(),
retry_policy: RetryPolicy::exponential_backoff(),
}
}
fn add_step(&mut self, step: SagaStep) {
self.steps.push(step);
}
// 执行Saga
fn execute(&self) -> Result<(), String> {
let mut completed_steps = Vec::new();
println!("=== 执行 Saga 事务 ===");
// 正向执行
for (idx, step) in self.steps.iter().enumerate() {
println!("步骤 {}: {}", idx + 1, step.name);
match self.execute_step_with_retry(step.forward) {
Ok(_) => {
println!(" ✓ 完成");
completed_steps.push(step);
}
Err(e) => {
println!(" ✗ 失败: {}", e);
println!("\n=== 执行补偿操作 ===");
self.compensate(completed_steps);
return Err(format!("Saga 失败于步骤 {}: {}", idx + 1, e));
}
}
}
println!("\n✓ Saga 事务完成");
Ok(())
}
// 执行单步(带重试)
fn execute_step_with_retry(
&self,
step_fn: fn() -> Result<(), String>,
) -> Result<(), String> {
let mut attempt = 0;
loop {
attempt += 1;
match step_fn() {
Ok(_) => return Ok(()),
Err(e) => {
if attempt >= self.retry_policy.max_attempts {
return Err(e);
}
let delay = self.retry_policy.calculate_delay(attempt - 1);
println!(" 重试 {} ({}ms 后)", attempt, delay.as_millis());
thread::sleep(delay);
}
}
}
}
// 补偿已完成的步骤
fn compensate(&self, completed_steps: Vec<&SagaStep>) {
for (idx, step) in completed_steps.iter().rev().enumerate() {
println!("补偿 {}: {}", idx + 1, step.name);
match (step.compensate)() {
Ok(_) => println!(" ✓ 补偿完成"),
Err(e) => println!(" ✗ 补偿失败: {}", e),
}
}
}
}
// 订单Saga示例
fn create_order_saga() -> SagaExecutor {
let mut saga = SagaExecutor::new();
saga.add_step(SagaStep {
name: "创建订单".to_string(),
forward: || {
println!(" 创建订单记录");
Ok(())
},
compensate: || {
println!(" 删除订单记录");
Ok(())
},
});
saga.add_step(SagaStep {
name: "扣减库存".to_string(),
forward: || {
println!(" 扣减商品库存");
if rand::random::<f64>() < 0.8 {
Ok(())
} else {
Err("库存不足".to_string())
}
},
compensate: || {
println!(" 恢复商品库存");
Ok(())
},
});
saga.add_step(SagaStep {
name: "扣款".to_string(),
forward: || {
println!(" 扣除账户余额");
Ok(())
},
compensate: || {
println!(" 退还账户余额");
Ok(())
},
});
saga
}
---
05.断路器模式
a.熔断保护
连续失败达到阈值后暂停调用,避免雪崩。
b.断路器实现
---
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
#[derive(Debug, PartialEq)]
enum CircuitState {
Closed, // 正常
Open, // 熔断
HalfOpen, // 半开
}
struct CircuitBreaker {
state: Arc<Mutex<CircuitState>>,
failure_count: AtomicU32,
success_count: AtomicU32,
last_failure_time: AtomicU64,
failure_threshold: u32,
timeout: Duration,
}
impl CircuitBreaker {
fn new(failure_threshold: u32, timeout: Duration) -> Self {
CircuitBreaker {
state: Arc::new(Mutex::new(CircuitState::Closed)),
failure_count: AtomicU32::new(0),
success_count: AtomicU32::new(0),
last_failure_time: AtomicU64::new(0),
failure_threshold,
timeout,
}
}
// 执行操作(带断路器保护)
fn execute<F, T>(&self, operation: F) -> Result<T, String>
where
F: Fn() -> Result<T, String>,
{
// 检查状态
self.check_state();
let state = self.state.lock().unwrap();
if *state == CircuitState::Open {
return Err("断路器打开,拒绝请求".to_string());
}
drop(state);
// 执行操作
match operation() {
Ok(result) => {
self.on_success();
Ok(result)
}
Err(e) => {
self.on_failure();
Err(e)
}
}
}
fn check_state(&self) {
let mut state = self.state.lock().unwrap();
if *state == CircuitState::Open {
let last_failure = self.last_failure_time.load(Ordering::SeqCst);
let elapsed = Instant::now().duration_since(
Instant::now() - Duration::from_millis(last_failure)
);
if elapsed > self.timeout {
println!("断路器进入半开状态");
*state = CircuitState::HalfOpen;
self.failure_count.store(0, Ordering::SeqCst);
}
}
}
fn on_success(&self) {
let mut state = self.state.lock().unwrap();
if *state == CircuitState::HalfOpen {
println!("断路器关闭");
*state = CircuitState::Closed;
self.failure_count.store(0, Ordering::SeqCst);
}
self.success_count.fetch_add(1, Ordering::SeqCst);
}
fn on_failure(&self) {
let failures = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
self.last_failure_time.store(
Instant::now().elapsed().as_millis() as u64,
Ordering::SeqCst
);
if failures >= self.failure_threshold {
let mut state = self.state.lock().unwrap();
println!("⚠️ 断路器打开 (失败 {} 次)", failures);
*state = CircuitState::Open;
}
}
fn get_stats(&self) -> (u32, u32) {
(
self.success_count.load(Ordering::SeqCst),
self.failure_count.load(Ordering::SeqCst),
)
}
}
---
06.综合示例
a.完整重试系统
集成多种重试策略的完整系统。
b.综合实现
---
struct ResilientTransactionSystem {
retry_executor: RetryExecutor,
optimistic_executor: OptimisticTransactionExecutor,
idempotent_executor: IdempotentExecutor,
circuit_breaker: Arc<CircuitBreaker>,
}
impl ResilientTransactionSystem {
fn new(store: Arc<OptimisticLockStore>) -> Self {
ResilientTransactionSystem {
retry_executor: RetryExecutor::new(RetryPolicy::exponential_backoff()),
optimistic_executor: OptimisticTransactionExecutor::new(store),
idempotent_executor: IdempotentExecutor::new(),
circuit_breaker: Arc::new(CircuitBreaker::new(3, Duration::from_secs(30))),
}
}
// 综合转账(所有策略)
fn resilient_transfer(
&self,
request_id: String,
from: &str,
to: &str,
amount: i64,
) -> Result<(), String> {
println!("=== 弹性转账系统 [{}] ===", request_id);
// 1. 断路器保护
self.circuit_breaker.execute(|| {
// 2. 幂等性保证
self.idempotent_executor.execute_idempotent(
request_id.clone(),
|| {
// 3. 乐观锁 + 重试
self.optimistic_executor.transfer_optimistic(from, to, amount)
},
)
})?;
let (success, failure) = self.circuit_breaker.get_stats();
println!("断路器统计: 成功 {}, 失败 {}", success, failure);
Ok(())
}
// 压力测试
fn stress_test(&self) {
println!("=== 弹性系统压力测试 ===\n");
for i in 0..20 {
let request_id = format!("req-{}", i);
let result = self.resilient_transfer(
request_id,
"Alice",
"Bob",
100,
);
match result {
Ok(_) => println!("✓ 转账 {} 成功\n", i),
Err(e) => println!("✗ 转账 {} 失败: {}\n", i, e),
}
thread::sleep(Duration::from_millis(200));
}
let (success, failure) = self.circuit_breaker.get_stats();
println!("=== 测试完成 ===");
println!("总成功: {}", success);
println!("总失败: {}", failure);
println!("成功率: {:.1}%",
success as f64 / (success + failure) as f64 * 100.0);
}
}
---
5.5 幂等性设计
01.幂等性原理
a.幂等性定义
多次执行相同操作产生相同结果,确保重试安全。
b.幂等性检测
---
use std::sync::{Arc, Mutex};
use std::collections::{HashMap, HashSet};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
struct RequestRecord {
request_id: String,
timestamp: SystemTime,
result: String,
status: RequestStatus,
}
#[derive(Debug, Clone, PartialEq)]
enum RequestStatus {
Processing,
Completed,
Failed,
}
struct IdempotencyManager {
records: Arc<Mutex<HashMap<String, RequestRecord>>>,
ttl: Duration,
}
impl IdempotencyManager {
fn new(ttl: Duration) -> Self {
IdempotencyManager {
records: Arc::new(Mutex::new(HashMap::new())),
ttl,
}
}
// 检查请求是否已处理
fn check_request(&self, request_id: &str) -> Option<RequestRecord> {
let records = self.records.lock().unwrap();
records.get(request_id).cloned()
}
// 开始处理请求
fn start_processing(&self, request_id: String) -> Result<(), String> {
let mut records = self.records.lock().unwrap();
if let Some(record) = records.get(&request_id) {
match record.status {
RequestStatus::Processing => {
return Err("请求正在处理中".to_string());
}
RequestStatus::Completed => {
return Err(format!("请求已完成: {}", record.result));
}
RequestStatus::Failed => {
println!("请求之前失败,允许重试");
}
}
}
records.insert(request_id.clone(), RequestRecord {
request_id,
timestamp: SystemTime::now(),
result: String::new(),
status: RequestStatus::Processing,
});
Ok(())
}
// 完成请求
fn complete_request(&self, request_id: &str, result: String) {
let mut records = self.records.lock().unwrap();
if let Some(record) = records.get_mut(request_id) {
record.status = RequestStatus::Completed;
record.result = result;
record.timestamp = SystemTime::now();
}
}
// 标记请求失败
fn fail_request(&self, request_id: &str, error: String) {
let mut records = self.records.lock().unwrap();
if let Some(record) = records.get_mut(request_id) {
record.status = RequestStatus::Failed;
record.result = error;
}
}
// 清理过期记录
fn cleanup_expired(&self) -> usize {
let mut records = self.records.lock().unwrap();
let now = SystemTime::now();
let before_count = records.len();
records.retain(|_, record| {
if let Ok(elapsed) = now.duration_since(record.timestamp) {
elapsed < self.ttl
} else {
true
}
});
let removed = before_count - records.len();
if removed > 0 {
println!("清理 {} 条过期记录", removed);
}
removed
}
}
---
02.数据库层幂等
a.唯一约束
使用数据库唯一约束保证幂等性。
b.唯一约束实现
---
use std::collections::BTreeMap;
#[derive(Debug, Clone)]
struct Transaction {
tx_id: String,
from: String,
to: String,
amount: i64,
timestamp: SystemTime,
}
struct TransactionStore {
transactions: Arc<Mutex<BTreeMap<String, Transaction>>>,
account_balances: Arc<Mutex<HashMap<String, i64>>>,
}
impl TransactionStore {
fn new() -> Self {
let mut balances = HashMap::new();
balances.insert("Alice".to_string(), 1000);
balances.insert("Bob".to_string(), 1000);
TransactionStore {
transactions: Arc::new(Mutex::new(BTreeMap::new())),
account_balances: Arc::new(Mutex::new(balances)),
}
}
// 幂等转账(基于唯一事务ID)
fn idempotent_transfer(
&self,
tx_id: String,
from: String,
to: String,
amount: i64,
) -> Result<(), String> {
println!("=== 幂等转账 [{}] ===", tx_id);
// 检查事务是否已存在
{
let transactions = self.transactions.lock().unwrap();
if let Some(existing) = transactions.get(&tx_id) {
println!("✓ 事务已存在,返回成功");
println!(" 原始交易: {:?}", existing);
return Ok(());
}
}
// 执行转账
{
let mut balances = self.account_balances.lock().unwrap();
let from_balance = balances.get(&from).copied().unwrap_or(0);
if from_balance < amount {
return Err(format!("余额不足: {} < {}", from_balance, amount));
}
*balances.get_mut(&from).unwrap() -= amount;
*balances.entry(to.clone()).or_insert(0) += amount;
println!(" 转账成功: {} -> {} 金额: {}", from, to, amount);
}
// 记录事务
{
let mut transactions = self.transactions.lock().unwrap();
transactions.insert(tx_id.clone(), Transaction {
tx_id,
from,
to,
amount,
timestamp: SystemTime::now(),
});
}
Ok(())
}
// 查询账户余额
fn get_balance(&self, account: &str) -> i64 {
let balances = self.account_balances.lock().unwrap();
balances.get(account).copied().unwrap_or(0)
}
// 查询事务
fn get_transaction(&self, tx_id: &str) -> Option<Transaction> {
let transactions = self.transactions.lock().unwrap();
transactions.get(tx_id).cloned()
}
}
---
03.状态机幂等
a.状态转换
通过状态机确保操作幂等性。
b.状态机实现
---
#[derive(Debug, Clone, PartialEq)]
enum OrderState {
Created,
Paid,
Shipped,
Delivered,
Cancelled,
}
#[derive(Debug, Clone)]
struct Order {
order_id: String,
state: OrderState,
amount: i64,
history: Vec<(OrderState, SystemTime)>,
}
struct OrderStateMachine {
orders: Arc<Mutex<HashMap<String, Order>>>,
}
impl OrderStateMachine {
fn new() -> Self {
OrderStateMachine {
orders: Arc::new(Mutex::new(HashMap::new())),
}
}
// 创建订单(幂等)
fn create_order(&self, order_id: String, amount: i64) -> Result<(), String> {
let mut orders = self.orders.lock().unwrap();
if let Some(order) = orders.get(&order_id) {
println!("订单 {} 已存在,状态: {:?}", order_id, order.state);
return Ok(());
}
let mut order = Order {
order_id: order_id.clone(),
state: OrderState::Created,
amount,
history: Vec::new(),
};
order.history.push((OrderState::Created, SystemTime::now()));
orders.insert(order_id.clone(), order);
println!("✓ 创建订单: {}", order_id);
Ok(())
}
// 支付订单(幂等)
fn pay_order(&self, order_id: &str) -> Result<(), String> {
let mut orders = self.orders.lock().unwrap();
let order = orders.get_mut(order_id)
.ok_or_else(|| "订单不存在".to_string())?;
match order.state {
OrderState::Created => {
order.state = OrderState::Paid;
order.history.push((OrderState::Paid, SystemTime::now()));
println!("✓ 订单 {} 支付成功", order_id);
Ok(())
}
OrderState::Paid => {
println!("订单 {} 已支付,跳过", order_id);
Ok(())
}
_ => {
Err(format!("订单状态错误: {:?}", order.state))
}
}
}
// 发货(幂等)
fn ship_order(&self, order_id: &str) -> Result<(), String> {
let mut orders = self.orders.lock().unwrap();
let order = orders.get_mut(order_id)
.ok_or_else(|| "订单不存在".to_string())?;
match order.state {
OrderState::Paid => {
order.state = OrderState::Shipped;
order.history.push((OrderState::Shipped, SystemTime::now()));
println!("✓ 订单 {} 已发货", order_id);
Ok(())
}
OrderState::Shipped | OrderState::Delivered => {
println!("订单 {} 已发货,跳过", order_id);
Ok(())
}
_ => {
Err(format!("订单状态错误: {:?}", order.state))
}
}
}
// 查询订单
fn get_order(&self, order_id: &str) -> Option<Order> {
let orders = self.orders.lock().unwrap();
orders.get(order_id).cloned()
}
// 打印订单历史
fn print_history(&self, order_id: &str) {
let orders = self.orders.lock().unwrap();
if let Some(order) = orders.get(order_id) {
println!("订单 {} 状态历史:", order_id);
for (state, time) in &order.history {
println!(" {:?} at {:?}", state, time);
}
}
}
}
---
04.Token机制
a.一次性Token
使用一次性Token确保操作只执行一次。
b.Token实现
---
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
struct TokenManager {
used_tokens: Arc<Mutex<HashSet<String>>>,
token_ttl: Duration,
}
impl TokenManager {
fn new(ttl: Duration) -> Self {
TokenManager {
used_tokens: Arc::new(Mutex::new(HashSet::new())),
token_ttl: ttl,
}
}
// 生成Token
fn generate_token(&self, request_id: &str, timestamp: u64) -> String {
let mut hasher = DefaultHasher::new();
request_id.hash(&mut hasher);
timestamp.hash(&mut hasher);
format!("token-{:x}", hasher.finish())
}
// 验证并消费Token
fn consume_token(&self, token: String) -> Result<(), String> {
let mut used = self.used_tokens.lock().unwrap();
if used.contains(&token) {
return Err("Token已使用".to_string());
}
used.insert(token.clone());
println!("✓ Token {} 已消费", token);
Ok(())
}
// 清理过期Token
fn cleanup(&self) {
let mut used = self.used_tokens.lock().unwrap();
let before = used.len();
used.clear();
println!("清理 {} 个Token", before);
}
}
struct TokenBasedExecutor {
token_manager: Arc<TokenManager>,
store: Arc<TransactionStore>,
}
impl TokenBasedExecutor {
fn new(token_manager: Arc<TokenManager>, store: Arc<TransactionStore>) -> Self {
TokenBasedExecutor {
token_manager,
store,
}
}
// 使用Token执行转账
fn transfer_with_token(
&self,
token: String,
tx_id: String,
from: String,
to: String,
amount: i64,
) -> Result<(), String> {
println!("=== Token转账 [{}] ===", token);
// 验证并消费Token
self.token_manager.consume_token(token)?;
// 执行转账
self.store.idempotent_transfer(tx_id, from, to, amount)?;
Ok(())
}
}
---
05.版本号幂等
a.乐观锁版本
使用版本号确保更新幂等性。
b.版本号实现
---
#[derive(Debug, Clone)]
struct VersionedAccount {
account_id: String,
balance: i64,
version: u64,
last_updated: SystemTime,
}
struct VersionedAccountStore {
accounts: Arc<Mutex<HashMap<String, VersionedAccount>>>,
}
impl VersionedAccountStore {
fn new() -> Self {
let mut accounts = HashMap::new();
accounts.insert("Alice".to_string(), VersionedAccount {
account_id: "Alice".to_string(),
balance: 1000,
version: 1,
last_updated: SystemTime::now(),
});
accounts.insert("Bob".to_string(), VersionedAccount {
account_id: "Bob".to_string(),
balance: 1000,
version: 1,
last_updated: SystemTime::now(),
});
VersionedAccountStore {
accounts: Arc::new(Mutex::new(accounts)),
}
}
// 读取账户
fn get_account(&self, account_id: &str) -> Option<VersionedAccount> {
let accounts = self.accounts.lock().unwrap();
accounts.get(account_id).cloned()
}
// 幂等更新(基于版本号)
fn update_balance(
&self,
account_id: &str,
new_balance: i64,
expected_version: u64,
) -> Result<u64, String> {
let mut accounts = self.accounts.lock().unwrap();
let account = accounts.get_mut(account_id)
.ok_or_else(|| "账户不存在".to_string())?;
if account.version != expected_version {
return Err(format!(
"版本冲突: 期望 {}, 实际 {}",
expected_version, account.version
));
}
account.balance = new_balance;
account.version += 1;
account.last_updated = SystemTime::now();
println!("✓ 更新账户 {} 余额: {} (版本: {})",
account_id, new_balance, account.version);
Ok(account.version)
}
// 幂等转账(基于版本号)
fn versioned_transfer(
&self,
from: &str,
to: &str,
amount: i64,
) -> Result<(), String> {
println!("=== 版本号转账: {} -> {} ===", from, to);
// 读取当前版本
let from_account = self.get_account(from)
.ok_or_else(|| "源账户不存在".to_string())?;
let to_account = self.get_account(to)
.ok_or_else(|| "目标账户不存在".to_string())?;
// 计算新余额
let new_from_balance = from_account.balance - amount;
let new_to_balance = to_account.balance + amount;
if new_from_balance < 0 {
return Err("余额不足".to_string());
}
// 更新源账户
self.update_balance(from, new_from_balance, from_account.version)?;
// 更新目标账户
self.update_balance(to, new_to_balance, to_account.version)?;
println!("✓ 转账完成");
Ok(())
}
}
---
06.综合示例
a.完整幂等系统
集成多种幂等策略的完整系统。
b.综合实现
---
struct ComprehensiveIdempotencySystem {
idempotency_manager: Arc<IdempotencyManager>,
transaction_store: Arc<TransactionStore>,
order_state_machine: Arc<OrderStateMachine>,
token_manager: Arc<TokenManager>,
versioned_store: Arc<VersionedAccountStore>,
}
impl ComprehensiveIdempotencySystem {
fn new() -> Self {
ComprehensiveIdempotencySystem {
idempotency_manager: Arc::new(IdempotencyManager::new(Duration::from_secs(3600))),
transaction_store: Arc::new(TransactionStore::new()),
order_state_machine: Arc::new(OrderStateMachine::new()),
token_manager: Arc::new(TokenManager::new(Duration::from_secs(300))),
versioned_store: Arc::new(VersionedAccountStore::new()),
}
}
// 综合幂等转账
fn comprehensive_transfer(
&self,
request_id: String,
tx_id: String,
from: String,
to: String,
amount: i64,
) -> Result<(), String> {
println!("=== 综合幂等转账 [{}] ===", request_id);
// 1. 检查请求幂等性
if let Some(record) = self.idempotency_manager.check_request(&request_id) {
match record.status {
RequestStatus::Completed => {
println!("✓ 请求已完成: {}", record.result);
return Ok(());
}
RequestStatus::Processing => {
return Err("请求处理中".to_string());
}
RequestStatus::Failed => {
println!("请求之前失败,重试");
}
}
}
// 2. 开始处理
self.idempotency_manager.start_processing(request_id.clone())?;
// 3. 执行转账(数据库层幂等)
match self.transaction_store.idempotent_transfer(
tx_id.clone(),
from.clone(),
to.clone(),
amount,
) {
Ok(_) => {
self.idempotency_manager.complete_request(
&request_id,
format!("转账成功: {}", tx_id),
);
println!("✓ 综合转账完成");
Ok(())
}
Err(e) => {
self.idempotency_manager.fail_request(&request_id, e.clone());
Err(e)
}
}
}
// 测试幂等性
fn test_idempotency(&self) {
println!("=== 幂等性测试 ===\n");
let request_id = "req-001".to_string();
let tx_id = "tx-001".to_string();
// 第一次执行
println!("第一次执行:");
let result1 = self.comprehensive_transfer(
request_id.clone(),
tx_id.clone(),
"Alice".to_string(),
"Bob".to_string(),
100,
);
println!("结果: {:?}\n", result1);
// 第二次执行(应该幂等)
println!("第二次执行(重复请求):");
let result2 = self.comprehensive_transfer(
request_id.clone(),
tx_id.clone(),
"Alice".to_string(),
"Bob".to_string(),
100,
);
println!("结果: {:?}\n", result2);
// 检查余额
println!("最终余额:");
println!(" Alice: {}", self.transaction_store.get_balance("Alice"));
println!(" Bob: {}", self.transaction_store.get_balance("Bob"));
// 清理
self.idempotency_manager.cleanup_expired();
}
// 压力测试
fn stress_test_idempotency(&self) {
println!("\n=== 幂等性压力测试 ===\n");
let request_id = "stress-req-001".to_string();
// 模拟10次重复请求
for i in 0..10 {
println!("尝试 {}:", i + 1);
let result = self.comprehensive_transfer(
request_id.clone(),
format!("stress-tx-{}", i),
"Alice".to_string(),
"Bob".to_string(),
10,
);
match result {
Ok(_) => println!(" ✓ 成功"),
Err(e) => println!(" ✗ 失败: {}", e),
}
}
println!("\n最终余额:");
println!(" Alice: {}", self.transaction_store.get_balance("Alice"));
println!(" Bob: {}", self.transaction_store.get_balance("Bob"));
}
}
---
5.6 事务监控
01.性能指标采集
a.关键指标
采集事务执行时间、吞吐量、错误率等关键性能指标。
b.指标采集实现
---
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::collections::VecDeque;
#[derive(Debug, Clone)]
struct TransactionMetrics {
tx_id: String,
start_time: Instant,
end_time: Option<Instant>,
duration: Option<Duration>,
status: MetricStatus,
operation: String,
}
#[derive(Debug, Clone, PartialEq)]
enum MetricStatus {
Success,
Failed,
Timeout,
}
struct MetricsCollector {
total_transactions: AtomicU64,
successful_transactions: AtomicU64,
failed_transactions: AtomicU64,
timeout_transactions: AtomicU64,
total_duration: AtomicU64,
recent_metrics: Arc<Mutex<VecDeque<TransactionMetrics>>>,
max_recent: usize,
}
impl MetricsCollector {
fn new(max_recent: usize) -> Self {
MetricsCollector {
total_transactions: AtomicU64::new(0),
successful_transactions: AtomicU64::new(0),
failed_transactions: AtomicU64::new(0),
timeout_transactions: AtomicU64::new(0),
total_duration: AtomicU64::new(0),
recent_metrics: Arc::new(Mutex::new(VecDeque::with_capacity(max_recent))),
max_recent,
}
}
// 记录事务开始
fn record_start(&self, tx_id: String, operation: String) -> TransactionMetrics {
self.total_transactions.fetch_add(1, Ordering::SeqCst);
TransactionMetrics {
tx_id,
start_time: Instant::now(),
end_time: None,
duration: None,
status: MetricStatus::Success,
operation,
}
}
// 记录事务完成
fn record_complete(&self, mut metric: TransactionMetrics, status: MetricStatus) {
let end_time = Instant::now();
let duration = end_time.duration_since(metric.start_time);
metric.end_time = Some(end_time);
metric.duration = Some(duration);
metric.status = status.clone();
// 更新统计
match status {
MetricStatus::Success => {
self.successful_transactions.fetch_add(1, Ordering::SeqCst);
}
MetricStatus::Failed => {
self.failed_transactions.fetch_add(1, Ordering::SeqCst);
}
MetricStatus::Timeout => {
self.timeout_transactions.fetch_add(1, Ordering::SeqCst);
}
}
self.total_duration.fetch_add(duration.as_millis() as u64, Ordering::SeqCst);
// 保存最近的指标
let mut recent = self.recent_metrics.lock().unwrap();
if recent.len() >= self.max_recent {
recent.pop_front();
}
recent.push_back(metric);
}
// 获取统计摘要
fn get_summary(&self) -> MetricsSummary {
let total = self.total_transactions.load(Ordering::SeqCst);
let success = self.successful_transactions.load(Ordering::SeqCst);
let failed = self.failed_transactions.load(Ordering::SeqCst);
let timeout = self.timeout_transactions.load(Ordering::SeqCst);
let total_duration = self.total_duration.load(Ordering::SeqCst);
let avg_duration = if total > 0 {
Duration::from_millis(total_duration / total)
} else {
Duration::from_secs(0)
};
let success_rate = if total > 0 {
(success as f64 / total as f64) * 100.0
} else {
0.0
};
MetricsSummary {
total_transactions: total,
successful: success,
failed,
timeout,
avg_duration,
success_rate,
}
}
// 打印报告
fn print_report(&self) {
let summary = self.get_summary();
println!("=== 事务性能报告 ===");
println!("总事务数: {}", summary.total_transactions);
println!("成功: {} ({:.1}%)", summary.successful, summary.success_rate);
println!("失败: {}", summary.failed);
println!("超时: {}", summary.timeout);
println!("平均耗时: {:?}", summary.avg_duration);
// 最近事务
let recent = self.recent_metrics.lock().unwrap();
if !recent.is_empty() {
println!("\n最近 {} 笔事务:", recent.len().min(5));
for metric in recent.iter().rev().take(5) {
println!(" {} - {:?} - {:?}",
metric.tx_id,
metric.status,
metric.duration.unwrap_or(Duration::from_secs(0)));
}
}
}
}
#[derive(Debug)]
struct MetricsSummary {
total_transactions: u64,
successful: u64,
failed: u64,
timeout: u64,
avg_duration: Duration,
success_rate: f64,
}
---
02.实时监控
a.监控面板
实时展示事务执行状态和性能指标。
b.监控实现
---
use std::thread;
struct TransactionMonitor {
collector: Arc<MetricsCollector>,
active_transactions: Arc<Mutex<Vec<TransactionMetrics>>>,
alert_threshold: Duration,
}
impl TransactionMonitor {
fn new(collector: Arc<MetricsCollector>, alert_threshold: Duration) -> Self {
TransactionMonitor {
collector,
active_transactions: Arc::new(Mutex::new(Vec::new())),
alert_threshold,
}
}
// 开始监控事务
fn start_monitoring(&self, tx_id: String, operation: String) -> TransactionMetrics {
let metric = self.collector.record_start(tx_id.clone(), operation);
let mut active = self.active_transactions.lock().unwrap();
active.push(metric.clone());
println!("▶ 开始事务: {} - {}", metric.tx_id, metric.operation);
metric
}
// 完成监控
fn complete_monitoring(&self, metric: TransactionMetrics, status: MetricStatus) {
let duration = metric.start_time.elapsed();
// 检查是否超过阈值
if duration > self.alert_threshold {
println!("⚠️ 慢事务告警: {} 耗时 {:?}", metric.tx_id, duration);
}
self.collector.record_complete(metric.clone(), status.clone());
let mut active = self.active_transactions.lock().unwrap();
active.retain(|m| m.tx_id != metric.tx_id);
let status_symbol = match status {
MetricStatus::Success => "✓",
MetricStatus::Failed => "✗",
MetricStatus::Timeout => "⏱",
};
println!("{} 完成事务: {} - {:?}", status_symbol, metric.tx_id, duration);
}
// 获取活跃事务
fn get_active_transactions(&self) -> Vec<TransactionMetrics> {
let active = self.active_transactions.lock().unwrap();
active.clone()
}
// 实时监控循环
fn start_monitoring_loop(&self) {
let active_txs = self.active_transactions.clone();
let threshold = self.alert_threshold;
thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(5));
let active = active_txs.lock().unwrap();
if !active.is_empty() {
println!("\n=== 活跃事务监控 ===");
for tx in active.iter() {
let elapsed = tx.start_time.elapsed();
let status = if elapsed > threshold {
"⚠️ 慢"
} else {
"✓ 正常"
};
println!(" {} {} - {} - {:?}",
status, tx.tx_id, tx.operation, elapsed);
}
}
}
});
}
// 打印实时状态
fn print_status(&self) {
println!("\n=== 实时监控状态 ===");
let active = self.active_transactions.lock().unwrap();
println!("活跃事务数: {}", active.len());
let summary = self.collector.get_summary();
println!("总事务数: {}", summary.total_transactions);
println!("成功率: {:.1}%", summary.success_rate);
println!("平均耗时: {:?}", summary.avg_duration);
}
}
---
03.告警机制
a.阈值告警
当指标超过阈值时触发告警。
b.告警实���
---
#[derive(Debug, Clone)]
struct Alert {
alert_type: AlertType,
message: String,
timestamp: Instant,
severity: AlertSeverity,
}
#[derive(Debug, Clone, PartialEq)]
enum AlertType {
SlowTransaction,
HighFailureRate,
TooManyActive,
Timeout,
}
#[derive(Debug, Clone, PartialEq)]
enum AlertSeverity {
Info,
Warning,
Critical,
}
struct AlertManager {
alerts: Arc<Mutex<Vec<Alert>>>,
slow_tx_threshold: Duration,
failure_rate_threshold: f64,
max_active_threshold: usize,
}
impl AlertManager {
fn new() -> Self {
AlertManager {
alerts: Arc::new(Mutex::new(Vec::new())),
slow_tx_threshold: Duration::from_secs(5),
failure_rate_threshold: 10.0,
max_active_threshold: 100,
}
}
// 检查慢事务
fn check_slow_transaction(&self, tx_id: &str, duration: Duration) {
if duration > self.slow_tx_threshold {
self.trigger_alert(Alert {
alert_type: AlertType::SlowTransaction,
message: format!("事务 {} 执行缓慢: {:?}", tx_id, duration),
timestamp: Instant::now(),
severity: AlertSeverity::Warning,
});
}
}
// 检查失败率
fn check_failure_rate(&self, summary: &MetricsSummary) {
let failure_rate = 100.0 - summary.success_rate;
if failure_rate > self.failure_rate_threshold {
self.trigger_alert(Alert {
alert_type: AlertType::HighFailureRate,
message: format!("失败率过高: {:.1}%", failure_rate),
timestamp: Instant::now(),
severity: AlertSeverity::Critical,
});
}
}
// 检查活跃事务数
fn check_active_count(&self, count: usize) {
if count > self.max_active_threshold {
self.trigger_alert(Alert {
alert_type: AlertType::TooManyActive,
message: format!("活跃事务过多: {}", count),
timestamp: Instant::now(),
severity: AlertSeverity::Warning,
});
}
}
// 触发告警
fn trigger_alert(&self, alert: Alert) {
let severity_symbol = match alert.severity {
AlertSeverity::Info => "ℹ️",
AlertSeverity::Warning => "⚠️",
AlertSeverity::Critical => "🚨",
};
println!("{} 告警: {:?} - {}",
severity_symbol, alert.alert_type, alert.message);
let mut alerts = self.alerts.lock().unwrap();
alerts.push(alert);
}
// 获取最近告警
fn get_recent_alerts(&self, count: usize) -> Vec<Alert> {
let alerts = self.alerts.lock().unwrap();
alerts.iter().rev().take(count).cloned().collect()
}
// 清理旧告警
fn cleanup_old_alerts(&self, max_age: Duration) {
let mut alerts = self.alerts.lock().unwrap();
let now = Instant::now();
alerts.retain(|alert| {
now.duration_since(alert.timestamp) < max_age
});
}
}
---
04.日志记录
a.结构化日志
记录事务执行的详细日志,便于问题排查。
b.日志实现
---
use std::fs::{File, OpenOptions};
use std::io::Write;
#[derive(Debug, Clone)]
struct TransactionLog {
tx_id: String,
timestamp: Instant,
level: LogLevel,
operation: String,
message: String,
context: Vec<(String, String)>,
}
#[derive(Debug, Clone, PartialEq)]
enum LogLevel {
Debug,
Info,
Warn,
Error,
}
struct TransactionLogger {
logs: Arc<Mutex<Vec<TransactionLog>>>,
log_to_console: bool,
}
impl TransactionLogger {
fn new(log_to_console: bool) -> Self {
TransactionLogger {
logs: Arc::new(Mutex::new(Vec::new())),
log_to_console,
}
}
// 记录日志
fn log(&self, log: TransactionLog) {
if self.log_to_console {
let level_str = match log.level {
LogLevel::Debug => "DEBUG",
LogLevel::Info => "INFO ",
LogLevel::Warn => "WARN ",
LogLevel::Error => "ERROR",
};
println!("[{}] {} - {} - {}",
level_str, log.tx_id, log.operation, log.message);
if !log.context.is_empty() {
for (key, value) in &log.context {
println!(" {}: {}", key, value);
}
}
}
let mut logs = self.logs.lock().unwrap();
logs.push(log);
}
// 便捷方法
fn info(&self, tx_id: String, operation: String, message: String) {
self.log(TransactionLog {
tx_id,
timestamp: Instant::now(),
level: LogLevel::Info,
operation,
message,
context: Vec::new(),
});
}
fn warn(&self, tx_id: String, operation: String, message: String) {
self.log(TransactionLog {
tx_id,
timestamp: Instant::now(),
level: LogLevel::Warn,
operation,
message,
context: Vec::new(),
});
}
fn error(&self, tx_id: String, operation: String, message: String, context: Vec<(String, String)>) {
self.log(TransactionLog {
tx_id,
timestamp: Instant::now(),
level: LogLevel::Error,
operation,
message,
context,
});
}
// 查询日志
fn query_logs(&self, tx_id: Option<&str>, level: Option<LogLevel>) -> Vec<TransactionLog> {
let logs = self.logs.lock().unwrap();
logs.iter()
.filter(|log| {
let tx_match = tx_id.map_or(true, |id| log.tx_id == id);
let level_match = level.as_ref().map_or(true, |l| &log.level == l);
tx_match && level_match
})
.cloned()
.collect()
}
// 打印日志摘要
fn print_summary(&self) {
let logs = self.logs.lock().unwrap();
let mut counts = std::collections::HashMap::new();
for log in logs.iter() {
*counts.entry(format!("{:?}", log.level)).or_insert(0) += 1;
}
println!("=== 日志摘要 ===");
println!("总日志数: {}", logs.len());
for (level, count) in counts {
println!(" {}: {}", level, count);
}
}
}
---
05.性能分析
a.瓶颈识别
分析事务性能瓶颈,找出优化点。
b.分析实现
---
struct PerformanceAnalyzer {
collector: Arc<MetricsCollector>,
logger: Arc<TransactionLogger>,
}
impl PerformanceAnalyzer {
fn new(collector: Arc<MetricsCollector>, logger: Arc<TransactionLogger>) -> Self {
PerformanceAnalyzer {
collector,
logger,
}
}
// 分析慢事务
fn analyze_slow_transactions(&self, threshold: Duration) -> Vec<TransactionMetrics> {
let recent = self.collector.recent_metrics.lock().unwrap();
recent.iter()
.filter(|m| {
m.duration.map_or(false, |d| d > threshold)
})
.cloned()
.collect()
}
// 分析失败模式
fn analyze_failures(&self) -> std::collections::HashMap<String, usize> {
let logs = self.logger.query_logs(None, Some(LogLevel::Error));
let mut failure_patterns = std::collections::HashMap::new();
for log in logs {
*failure_patterns.entry(log.operation.clone()).or_insert(0) += 1;
}
failure_patterns
}
// 生成性能报告
fn generate_report(&self) {
println!("=== 性能分析报告 ===\n");
// 1. 整体统计
let summary = self.collector.get_summary();
println!("1. 整体统计:");
println!(" 总事务: {}", summary.total_transactions);
println!(" 成功率: {:.1}%", summary.success_rate);
println!(" 平均耗时: {:?}\n", summary.avg_duration);
// 2. 慢事务分析
let slow_txs = self.analyze_slow_transactions(Duration::from_secs(3));
println!("2. 慢事务分析 (>3s):");
println!(" 慢事务数: {}", slow_txs.len());
if !slow_txs.is_empty() {
println!(" 最慢的3笔:");
let mut sorted = slow_txs.clone();
sorted.sort_by(|a, b| b.duration.cmp(&a.duration));
for tx in sorted.iter().take(3) {
println!(" {} - {:?}", tx.tx_id, tx.duration.unwrap());
}
}
println!();
// 3. 失败分析
let failures = self.analyze_failures();
println!("3. 失败模式分析:");
if failures.is_empty() {
println!(" 无失败记录");
} else {
for (operation, count) in failures {
println!(" {}: {} 次", operation, count);
}
}
println!();
// 4. 建议
println!("4. 优化建议:");
if summary.success_rate < 95.0 {
println!(" - 成功率偏低,建议检查错误处理和重试机制");
}
if summary.avg_duration > Duration::from_secs(2) {
println!(" - 平均耗时较长,建议优化事务逻辑");
}
if slow_txs.len() > 10 {
println!(" - 慢事务较多,建议拆分长事务");
}
}
}
---
06.综合示例
a.完整监控系统
集成指标采集、监控、告警、日志的完整系统。
b.综合实现
---
struct ComprehensiveMonitoringSystem {
collector: Arc<MetricsCollector>,
monitor: Arc<TransactionMonitor>,
alert_manager: Arc<AlertManager>,
logger: Arc<TransactionLogger>,
analyzer: Arc<PerformanceAnalyzer>,
}
impl ComprehensiveMonitoringSystem {
fn new() -> Self {
let collector = Arc::new(MetricsCollector::new(1000));
let logger = Arc::new(TransactionLogger::new(true));
ComprehensiveMonitoringSystem {
monitor: Arc::new(TransactionMonitor::new(
collector.clone(),
Duration::from_secs(5),
)),
alert_manager: Arc::new(AlertManager::new()),
analyzer: Arc::new(PerformanceAnalyzer::new(
collector.clone(),
logger.clone(),
)),
collector,
logger,
}
}
// 执行被监控的事务
fn execute_monitored_transaction<F>(
&self,
tx_id: String,
operation: String,
work: F,
) -> Result<(), String>
where
F: FnOnce() -> Result<(), String>,
{
// 开始监控
let metric = self.monitor.start_monitoring(tx_id.clone(), operation.clone());
self.logger.info(tx_id.clone(), operation.clone(), "事务开始".to_string());
// 执行工作
let result = work();
// 完成监控
let status = match &result {
Ok(_) => {
self.logger.info(tx_id.clone(), operation.clone(), "事务成功".to_string());
MetricStatus::Success
}
Err(e) => {
self.logger.error(
tx_id.clone(),
operation.clone(),
format!("事务失败: {}", e),
vec![("error".to_string(), e.clone())],
);
MetricStatus::Failed
}
};
self.monitor.complete_monitoring(metric.clone(), status);
// 检查告警
if let Some(duration) = metric.start_time.elapsed().into() {
self.alert_manager.check_slow_transaction(&tx_id, duration);
}
let summary = self.collector.get_summary();
self.alert_manager.check_failure_rate(&summary);
let active_count = self.monitor.get_active_transactions().len();
self.alert_manager.check_active_count(active_count);
result
}
// 模拟事务负载
fn simulate_workload(&self) {
println!("=== 模拟事务负载 ===\n");
for i in 0..20 {
let tx_id = format!("tx-{:03}", i);
let operation = "transfer".to_string();
let result = self.execute_monitored_transaction(
tx_id.clone(),
operation,
|| {
// 模拟不同的执行时间
let delay = Duration::from_millis((i % 5) * 1000 + 500);
thread::sleep(delay);
// 模拟10%失败率
if i % 10 == 0 {
Err("模拟失败".to_string())
} else {
Ok(())
}
},
);
if i % 5 == 0 {
self.monitor.print_status();
}
}
println!("\n=== 负载测试完成 ===\n");
// 生成报告
self.collector.print_report();
println!();
self.logger.print_summary();
println!();
self.analyzer.generate_report();
// 显示告警
let alerts = self.alert_manager.get_recent_alerts(10);
if !alerts.is_empty() {
println!("\n=== 最近告警 ===");
for alert in alerts {
println!(" {:?} - {}", alert.severity, alert.message);
}
}
}
}
---