1 数据库事务
1.1 事务基础概念
01.ACID特性
a.原子性(Atomicity)
事务中的操作要么全部成功,要么全部失败
b.一致性(Consistency)
事务前后数据保持一致状态
c.隔离性(Isolation)
并发事务之间相互隔离
d.持久性(Durability)
事务提交后永久保存
02.事务状态
a.活动状态
事务正在执行
b.部分提交
最后一条语句执行完毕
c.提交状态
事务成功完成
d.失败状态
事务无法继续执行
e.中止状态
事务回滚完成
1.2 database/sql包
01.���本使用
a.导入驱动
---
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/dbname")
if err != nil {
panic(err)
}
defer db.Close()
}
---
02.连接池配置
a.设置参数
---
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
---
03.基本操作
a.查询
---
rows, err := db.Query("SELECT id, name FROM users WHERE age > ?", 18)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var id int
var name string
if err := rows.Scan(&id, &name); err != nil {
log.Fatal(err)
}
fmt.Println(id, name)
}
---
b.插入
---
result, err := db.Exec("INSERT INTO users(name, age) VALUES(?, ?)", "Alice", 25)
if err != nil {
log.Fatal(err)
}
id, _ := result.LastInsertId()
fmt.Println("插入ID:", id)
---
1.3 事务操作:Begin、Commit、Rollback
01.开启事务
a.Begin方法
---
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
---
02.提交事务
a.Commit方法
---
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
---
03.回滚事务
a.Rollback方法
---
if err := tx.Rollback(); err != nil {
log.Fatal(err)
}
---
04.完整示例
a.转账操作
---
func transfer(db *sql.DB, from, to int, amount float64) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 扣款
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from)
if err != nil {
return err
}
// 加款
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to)
if err != nil {
return err
}
return tx.Commit()
}
---
1.4 事务隔离级别
01.隔离级别
a.读未提交(Read Uncommitted)
最低级别,可能出现脏读
b.读已提交(Read Committed)
避免脏读,可能出现不可重复读
c.可重复读(Repeatable Read)
避免不可重复读,可能出现幻读
d.串行化(Serializable)
最高级别,完全隔离
02.设置隔离级别
a.MySQL示例
---
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
---
03.并发问题
a.脏读
读取未提交的数据
b.不可重复读
同一事务中多次读取结果不同
c.幻读
同一查询返回不同的行集
1.5 预处理语句
01.Prepare方法
a.基本使用
---
stmt, err := db.Prepare("INSERT INTO users(name, age) VALUES(?, ?)")
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
for i := 0; i < 10; i++ {
_, err := stmt.Exec(fmt.Sprintf("user%d", i), 20+i)
if err != nil {
log.Fatal(err)
}
}
---
02.事务中使用
a.示例
---
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
stmt, err := tx.Prepare("INSERT INTO users(name, age) VALUES(?, ?)")
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec("Alice", 25)
if err != nil {
log.Fatal(err)
}
tx.Commit()
---
03.优势
a.性能提升
SQL语句只编译一次
b.防止SQL注入
参数自动转义
2 分布式事务
2.1 分布式事务概念
01.定义
a.概念
跨多个数据库或服务的事务
需要保证全局一致性
b.挑战
网络延迟
部分失败
性能开销
02.CAP定理
a.一致性(Consistency)
所有节点看到相同数据
b.可用性(Availability)
系统持续可用
c.分区容错性(Partition tolerance)
网络分区时系统继续工作
03.BASE理论
a.基本可用(Basically Available)
b.软状态(Soft state)
c.最终一致性(Eventually consistent)
2.2 两阶段提交:2PC
01.2PC流程
a.准备阶段
协调者询问所有参与者是否可以提交
参与者执行事务但不提交
b.提交阶段
所有参与者都同意则提交
任一参与者拒绝则回滚
02.简单实现
a.协调者
---
type Coordinator struct {
participants []Participant
}
func (c *Coordinator) Execute() error {
// 阶段1:准备
for _, p := range c.participants {
if err := p.Prepare(); err != nil {
c.Rollback()
return err
}
}
// 阶段2:提交
for _, p := range c.participants {
if err := p.Commit(); err != nil {
return err
}
}
return nil
}
func (c *Coordinator) Rollback() {
for _, p := range c.participants {
p.Rollback()
}
}
---
03.缺点
a.同步阻塞
参与者等待协调者指令
b.单点故障
协调者故障导致阻塞
c.数据不一致
网络分区可能导致不一致
2.3 三阶段提交:3PC
01.3PC流程
a.CanCommit阶段
询问是否可以提交
b.PreCommit阶段
执行事务预提交
c.DoCommit阶段
正式提交
02.改进
a.超时机制
参与者超时自动提交或回滚
b.减少阻塞
增加预提交阶段
03.缺点
a.复杂度高
实现和维护成本高
b.性能开销
多一个阶段增加延迟
2.4 TCC事务模式
01.TCC概念
a.Try阶段
尝试执行,预留资源
b.Confirm阶段
确认执行,提交事务
c.Cancel阶段
取消执行,释放资源
02.实现示例
a.转账TCC
---
type Account struct {
ID int
Balance float64
Frozen float64
}
// Try: 冻结金额
func (a *Account) TryTransfer(amount float64) error {
if a.Balance < amount {
return errors.New("余额不足")
}
a.Balance -= amount
a.Frozen += amount
return nil
}
// Confirm: 确认转账
func (a *Account) ConfirmTransfer(amount float64) error {
a.Frozen -= amount
return nil
}
// Cancel: 取消转账
func (a *Account) CancelTransfer(amount float64) error {
a.Balance += amount
a.Frozen -= amount
return nil
}
---
03.优势
a.无锁设计
不阻塞其他操作
b.灵活性高
业务逻辑可定制
2.5 Saga事务模式
01.Saga概念
a.定义
将长事务拆分为多个本地事务
每个本地事务有对应的补偿操作
b.执行方式
顺序执行各个本地事务
失败时执行补偿操作
02.实现模式
a.编排模式
---
type Saga struct {
steps []Step
}
type Step struct {
Action func() error
Compensate func() error
}
func (s *Saga) Execute() error {
executed := []Step{}
for _, step := range s.steps {
if err := step.Action(); err != nil {
// 执行补偿
for i := len(executed) - 1; i >= 0; i-- {
executed[i].Compensate()
}
return err
}
executed = append(executed, step)
}
return nil
}
---
03.优势
a.高性能
无需锁定资源
b.高可用
支持长时间运行的事务
3 ORM框架事务
3.1 GORM事务
01.基本事务
a.手动事务
---
import "gorm.io/gorm"
func transfer(db *gorm.DB, from, to uint, amount float64) error {
return db.Transaction(func(tx *gorm.DB) error {
// 扣款
if err := tx.Model(&Account{}).Where("id = ?", from).
Update("balance", gorm.Expr("balance - ?", amount)).Error; err != nil {
return err
}
// 加款
if err := tx.Model(&Account{}).Where("id = ?", to).
Update("balance", gorm.Expr("balance + ?", amount)).Error; err != nil {
return err
}
return nil
})
}
---
02.手动控制
a.Begin/Commit/Rollback
---
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
if err := tx.Create(&user).Error; err != nil {
tx.Rollback()
return err
}
if err := tx.Create(&profile).Error; err != nil {
tx.Rollback()
return err
}
return tx.Commit().Error
---
03.SavePoint
a.保存点
---
tx := db.Begin()
tx.Create(&user1)
tx.SavePoint("sp1")
tx.Create(&user2)
tx.RollbackTo("sp1")
tx.Commit()
---
3.2 XORM事务
01.基本事务
a.Session事务
---
import "xorm.io/xorm"
func transfer(engine *xorm.Engine, from, to int64, amount float64) error {
session := engine.NewSession()
defer session.Close()
if err := session.Begin(); err != nil {
return err
}
// 扣款
_, err := session.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from)
if err != nil {
session.Rollback()
return err
}
// 加款
_, err = session.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to)
if err != nil {
session.Rollback()
return err
}
return session.Commit()
}
---
02.事务闭包
a.Transaction方法
---
err := engine.Transaction(func(session *xorm.Session) error {
_, err := session.Insert(&user)
if err != nil {
return err
}
_, err = session.Insert(&profile)
return err
})
---
3.3 sqlx事务
01.基本事务
a.Beginx方法
---
import "github.com/jmoiron/sqlx"
func transfer(db *sqlx.DB, from, to int, amount float64) error {
tx, err := db.Beginx()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from)
if err != nil {
return err
}
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to)
if err != nil {
return err
}
return tx.Commit()
}
---
02.命名查询
a.NamedExec
---
tx, _ := db.Beginx()
defer tx.Rollback()
_, err := tx.NamedExec(`UPDATE accounts SET balance = balance - :amount WHERE id = :id`,
map[string]interface{}{"amount": 100, "id": 1})
tx.Commit()
---
3.4 ent事务
01.基本事务
a.Tx方法
---
import "entgo.io/ent"
func transfer(ctx context.Context, client *ent.Client, from, to int, amount int) error {
tx, err := client.Tx(ctx)
if err != nil {
return err
}
defer func() {
if v := recover(); v != nil {
tx.Rollback()
panic(v)
}
}()
// 扣款
_, err = tx.Account.UpdateOneID(from).
AddBalance(-amount).
Save(ctx)
if err != nil {
return rollback(tx, err)
}
// 加款
_, err = tx.Account.UpdateOneID(to).
AddBalance(amount).
Save(ctx)
if err != nil {
return rollback(tx, err)
}
return tx.Commit()
}
func rollback(tx *ent.Tx, err error) error {
if rerr := tx.Rollback(); rerr != nil {
err = fmt.Errorf("%w: %v", err, rerr)
}
return err
}
---
4 事务最佳实践
4.1 事务超时控制
01.Context超时
a.WithTimeout
---
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 执行操作
_, err = tx.ExecContext(ctx, "UPDATE ...")
if err != nil {
return err
}
return tx.Commit()
---
02.数据库超时
a.设置超时参数
---
db.SetConnMaxLifetime(time.Minute * 3)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
---
4.2 事务嵌套处理
01.避免嵌套
a.问题
数据库通常不支持真正的嵌套事务
b.解决方案
使用SavePoint
重构代码避免嵌套
02.SavePoint示例
a.GORM
---
tx := db.Begin()
tx.Create(&user)
tx.SavePoint("sp1")
if err := tx.Create(&order).Error; err != nil {
tx.RollbackTo("sp1")
}
tx.Commit()
---
4.3 错误处理与回滚
01.defer回滚
a.最佳实践
---
func doTransaction(db *sql.DB) (err error) {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
// 执行操作
_, err = tx.Exec("INSERT ...")
if err != nil {
return err
}
return nil
}
---
02.错误分类
a.可重试错误
死锁、超时
b.不可重试错误
约束违反、语法错误
4.4 性能优化
01.减少事务范围
a.最小化事务
---
// 不好:事务范围太大
tx.Begin()
data := fetchData()
processData(data)
tx.Exec("INSERT ...")
tx.Commit()
// 好:只在必要时使用事务
data := fetchData()
processData(data)
tx.Begin()
tx.Exec("INSERT ...")
tx.Commit()
---
02.批量操作
a.批量插入
---
tx, _ := db.Begin()
stmt, _ := tx.Prepare("INSERT INTO users(name) VALUES(?)")
for _, name := range names {
stmt.Exec(name)
}
stmt.Close()
tx.Commit()
---
03.读写分离
a.只读事务
---
tx, err := db.BeginTx(ctx, &sql.TxOptions{
ReadOnly: true,
})
---
4.5 常见陷阱
01.长事务
a.问题
锁定资源时间过长
影响并发性能
b.解决
拆分事务
使用乐观锁
02.忘记提交或回滚
a.问题
连接泄漏
资源未释放
b.解决
使用defer确保回滚
使用事务闭包
03.事务中的慢查询
a.问题
阻塞其他事务
b.解决
优化查询
在事务外执行查询
04.隔离级别选择
a.问题
级别过高影响性能
级别过低数据不一致
b.解决
根据业务需求选择
测试验证