1 事务
1.1 [1]事务
01.事务
事务是一个【逻辑单元】,
由一组操作组成,要求【所有操作要么全部成功,要么全部失败】,以保持数据一致性
1.2 [1]长事务
01.长事务
a.锁争用
长事务持有锁时间长,可能导致其他事务等待,影响系统并发性能
b.锁表阻塞
对表的长时间锁定会阻塞其他事务对该表的读写操作
c.性能下降
长事务可能导致数据库性能下降,因为系统需要维护更多的事务状态和锁
d.回滚开销
如果长事务需要回滚,可能会导致大量数据恢复操作,影响系统性能
e.脏读和幻读
长事务可能导致数据一致性问题,如脏读和幻读,影响查询结果的正确性
1.3 [1]本地事务
01.定义
一个事务对应一个数据库连接,凡是不符合上述1:1关系的,都不能使用本地事务
02.以下情景都无法使用本地事务
a.单节点中
存在多个数据库实例
例如在本地建立了两个数据库:订单数据库、支付数据库,下单操作=订单数据库+支付数据库
b.分布式系统中
部署在不同节点上的多个服务访问同一个数据库
c.总结
以上两个场景说明:本地事务的使用性十分有限,因此需要使用分布式事务
03.假设使用本地事务处理分布式数据库
下单操作=订单数据+支付数据
服务器A=下单操作、订单数据库
服务器B=支付数据库
1.4 [1]嵌套事务:MySQL不支持
01.嵌套事务
众多事务分类中的一种,它是一个层次结构框架
有一个顶层事务控制着各个层次的事务,顶层事务之下嵌套的事务被称为子事务,它控制每一个局部的变换
需要注意的是,MySQL数据库不支持嵌套事务
1.5 [2]分布式事务:6种方案
01.分布式事务
a.定义
分布式事务是指【跨多个数据库】或【微服务】的操作,要求【所有操作要么全部成功,要么全部失败】,以保持数据一致性
b.6种方案
XA方案:一种两阶段提交协议,用于确保分布式事务的原子性和一致性
TCC方案:一种补偿事务模式,通过三个步骤来实现分布式事务
本地消息表:通过本地消息表记录事务状态,确保消息的可靠传递
可靠消息最终一致性方案:通过消息中间件确保消息的可靠传递,实现最终一致性
最大努力通知方案:通过多次重试通知,确保消息的最终传递
SAGA:将长事务分解为一系列短事务,每个短事务都有对应的补偿事务
01.XA方案
a.定义
XA是一种两阶段提交协议,用于确保分布式事务的原子性和一致性
b.原理
a.第一阶段(准备阶段)
协调者要求所有参与者准备提交事务,并锁定需要的资源
b.第二阶段(提交阶段)
如果所有参与者都准备就绪,协调者通知所有参与者提交事务;否则,通知回滚
c.常用API
XA_START:开始一个XA事务
XA_END:结束一个XA事务
XA_PREPARE:准备提交
XA_COMMIT:提交事务
XA_ROLLBACK:回滚事务
d.使用步骤
1.开始XA事务
2.执行分布式操作
3.准备提交
4.提交或回滚
e.场景示例
import javax.transaction.xa.*;
public class XAExample {
public void executeXA() throws Exception {
XAConnection xaConnection = xaDataSource.getXAConnection();
XAResource xaResource = xaConnection.getXAResource();
Xid xid = new XidImpl(); // 自定义 Xid 实现
try {
xaResource.start(xid, XAResource.TMNOFLAGS);
// 执行数据库操作
xaResource.end(xid, XAResource.TMSUCCESS);
int prepare = xaResource.prepare(xid);
if (prepare == XAResource.XA_OK) {
xaResource.commit(xid, false);
}
} catch (Exception e) {
xaResource.rollback(xid);
} finally {
xaConnection.close();
}
}
}
02.TCC方案
a.定义
TCC(Try-Confirm-Cancel)是一种补偿事务模式,通过三个步骤来实现分布式事务
b.原理
a.Try
尝试执行所有业务操作,预留资源
b.Confirm
确认操作,真正执行业务
c.Cancel
取消操作,释放资源
c.使用步骤
1.执行Try操作
2.根据Try操作结果执行Confirm或Cancel
d.场景示例
public class TCCExample {
public void tryOperation() {
// 尝试执行操作,预留资源
// 例如,预留库存
}
public void confirmOperation() {
// 确认操作,执行业务
}
public void cancelOperation() {
// 取消操作,释放资源
}
public void executeTCC() {
try {
tryOperation();
confirmOperation();
} catch (Exception e) {
cancelOperation();
}
}
}
03.本地消息表
a.定义
通过本地消息表记录事务状态,确保消息的可靠传递
b.原理
在本地事务中记录操作和消息
定期扫描消息表,发送未发送的消息
c.使用步骤
1.在本地事务中记录消息
2.发送消息
3.确认消息发送成功后删除
d.场景示例
public class LocalMessageTableExample {
public void saveMessage(String message) {
// 在本地事务中保存消息到消息表
String sql = "INSERT INTO message_table (message, status) VALUES (?, 'PENDING')";
// 执行 SQL
}
public void sendMessage() {
// 从消息表中读取未发送的消息并发送
String sql = "SELECT * FROM message_table WHERE status = 'PENDING'";
// 执行 SQL,发送消息
// 更新状态为 'SENT'
}
public void process() {
saveMessage("Your message");
sendMessage();
}
}
04.可靠消息最终一致性方案
a.定义
通过消息中间件确保消息的可靠传递,实现最终一致性
b.原理
1.使用消息中间件传递消息
2.消息接收方处理消息并确认
c.使用步骤
1.发送消息到消息中间件
2.消息接收方处理并确认
d.场景示例
public class ReliableMessageExample {
public void sendMessage(String message) {
// 使用消息中间件发送消息
messageQueue.send(message);
}
public void processMessage(String message) {
// 处理接收到的消息
// 确认消息处理成功
}
public void execute() {
sendMessage("Your message");
// 监听消息队列,处理消息
}
}
05.最大努力通知方案
a.定义
通过多次重试通知,确保消息的最终传递
b.原理
发送方多次重试发送消息,直到接收方确认
c.使用步骤
1.发送消息
2.如果未确认,定期重试
d.场景示例
public class MaxEffortNotificationExample {
public void sendMessage(String message) {
boolean sent = false;
int attempts = 0;
while (!sent && attempts < 3) {
try {
// 发送消息
messageQueue.send(message);
sent = true; // 发送成功
} catch (Exception e) {
attempts++;
// 等待一段时间后重试
Thread.sleep(1000);
}
}
}
}
06.SAGA
a.定义
SAGA是一种长事务管理模式,通过一系列补偿事务实现一致性
b.原理
将长事务分解为一系列短事务,每个短事务都有对应的补偿事务
c.使用步骤
1.执行一系列短事务
2.如果某个事务失败,执行补偿事务
d.场景示例
public class SagaExample {
public void step1() {
// 执行步骤 1
}
public void step2() {
// 执行步骤 2
}
public void compensateStep1() {
// 补偿步骤 1
}
public void compensateStep2() {
// 补偿步骤 2
}
public void executeSaga() {
try {
step1();
step2();
} catch (Exception e) {
compensateStep2();
compensateStep1();
}
}
}
1.6 [2]分布式事务:3种实现方式
01.3种实现方式
a.二阶段提交(2PC)
准备阶段:协调者询问各参与者是否可以提交
提交阶段:如果所有参与者都同意,协调者发送提交请求;否则,发送回滚请求
b.补偿事务
在发生错误时,执行一系列补偿操作来撤销之前成功的操作
适用于微服务架构中,事务不一定是原子性的
c.TCC(Try-Confirm-Cancel)
Try:执行预留资源的操作
Confirm:确认所有操作都成功
Cancel:如果出现问题,撤销之前的操作
1.7 [2]分布式事务:TCC补偿机制
01.概念来源
TCC(Try-Confirm-Cancel)的概念最早是由Pat Helland于2007年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出
02.TCC事务机制的优点
a.解决了协调者单点
由主业务方发起并完成这个业务活动。业务活动管理器也变成多点,引入集群
b.同步阻塞
引入超时,超时后进行补偿,并且不会锁定整个资源,将资源转换为业务逻辑形式,粒度变小
c.数据一致性
有了补偿机制之后,由业务活动管理器控制一致性
03.TCC的解释
a.Try阶段
尝试执行,完成所有业务检查(一致性),预留必须业务资源(准隔离性)
b.Confirm阶段
确认执行真正执行业务,不作任何业务检查,只使用Try阶段预留的业务资源,Confirm操作满足幂等性。要求具备幂等设计,Confirm失败后需要进行重试
c.Cancel阶段
取消执行,释放Try阶段预留的业务资源。Cancel操作满足幂等性,Cancel阶段的异常和Confirm阶段异常处理方案基本上一致
04.举例说明
如果你用100元买了一瓶水:
a.Try阶段
你需要向你的钱包检查是否够100元并锁住这100元,水也是一样的
如果有一个失败,则进行cancel(释放这100元和这一瓶水),如果cancel失败不论什么失败都进行重试cancel,所以需要保持幂等
b.Confirm阶段
如果都成功,则进行confirm,确认这100元扣,和这一瓶水被卖,如果confirm失败无论什么失败则重试(会依靠活动日志进行重试)
05.TCC适用场景
强隔离性,严格一致性要求的活动业务
执行时间较短的业务
1.8 [2]分布式事务:2PC和3PC区别
01.2PC和3PC有什么区别?
a.回答
2PC只有【两阶段】,简单但可能阻塞
3PC增加【预提交阶段和超时机制】,解决阻塞问题,但更复杂
都是用来【保证分布式事务一致性】的协议
b.阶段数量
2PC:有两个阶段——准备阶段(Prepare)和提交阶段(Commit)
准备阶段:协调者询问所有参与者是否可以执行事务
提交阶段:如果所有参与者都同意,协调者通知提交,否则回滚
3PC:在 2PC 的基础上,增加了一个预提交阶段(PreCommit),使得参与者在提交之前有更多的缓冲时间来处理问题
c.容错能力
2PC:在网络中断或协调者崩溃时,可能导致参与者处于阻塞状态(不知道是提交还是回滚)
3PC:通过增加预提交阶段,减少了阻塞的可能性,使协议更加健壮,避免长时间等待
d.是否引入超时机制
2PC:没有内置超时机制,协调者崩溃可能导致事务卡住
3PC:引入了超时机制,参与者可以根据超时自动决定回滚或提交,减少僵局
1.9 [3]分布式事务、分布式锁
01.分布式事务、分布式锁
分布式锁:资源抢占问题
分布式事务:解决流程化提交问题
1.10 [3]事务、本地事务、长事务、嵌套事务、全局事务、分布式事务
01.事务
事务是一组数据库操作的集合,这些操作要么全部成功,要么全部失败,确保数据的一致性和完整性
02.本地事务
本地事务是在单个数据库系统中执行的事务,通常由数据库管理系统(如MySQL、PostgreSQL)管理
03.长事务
长事务是指执行时间较长的事务,可能会导致资源锁定时间过长,影响系统性能
04.嵌套事务
嵌套事务是在一个事务中包含另一个事务,子事务可以独立提交或回滚,但父事务回滚时,所有子事务也必须回滚
05.全局事务
全局事务是跨多个资源(如多个数据库)的事务,需要协调多个参与者以确保一致性
06.分布式事务
分布式事务是全局事务的一种,涉及多个独立系统。常用协议包括两阶段提交(2PC)和Saga模式,用于确保跨系统的数据一致性
1.11 [4]最终⼀致性:4种
00.汇总
TCC (Try-Confirm-Cancel) 模式
消息队列
最大努力通知
事件溯源
01.TCC (Try-Confirm-Cancel) 模式
a.概述
TCC 是一种两阶段提交的变种,通过三个操作来实现事务的一致性:
Try:尝试执行业务操作,预留资源
Confirm:确认执行业务操作,释放资源
Cancel:取消执行业务操作,回滚资源
b.示例代码
public interface TccService {
void tryOperation();
void confirmOperation();
void cancelOperation();
}
public class TccServiceImpl implements TccService {
@Override
public void tryOperation() {
// 尝试执行业务操作,预留资源
}
@Override
public void confirmOperation() {
// 确认执行业务操作,释放资源
}
@Override
public void cancelOperation() {
// 取消执行业务操作,回滚资源
}
}
02.消息队列
a.概述
使用消息队列可以实现异步处理,确保事务的最终一致性。通常的做法是将事务操作分解为多个步骤,并通过消息队列来协调这些步骤
b.示例代码
public class OrderService {
private final MessageQueue messageQueue;
public OrderService(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public void createOrder(Order order) {
// 创建订单
orderRepository.save(order);
// 发送消息到消息队列
messageQueue.send("order_created", order.getId());
}
}
public class StockService {
private final MessageQueue messageQueue;
public StockService(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public void handleOrderCreated(String orderId) {
// 根据订单ID减少库存
Order order = orderRepository.findById(orderId);
stockRepository.decreaseStock(order.getProductId(), order.getQuantity());
// 发送消息确认库存减少
messageQueue.send("stock_decreased", orderId);
}
}
03.最大努力通知
a.概述
最大努力通知是一种补偿机制,通过定期检查和重试来确保事务的最终一致性。通常用于无法保证强一致性的场景
b.示例代码
public class CompensationService {
private final RetryPolicy retryPolicy;
public CompensationService(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
public void notifyCompensation(Compensation compensation) {
while (retryPolicy.shouldRetry()) {
try {
// 执行补偿操作
executeCompensation(compensation);
break;
} catch (Exception e) {
// 记录日志,等待重试
log.error("Compensation failed, retrying...", e);
Thread.sleep(retryPolicy.getInterval());
}
}
}
private void executeCompensation(Compensation compensation) {
// 执行具体的补偿操作
}
}
04.事件溯源
a.概述
事件溯源是一种记录系统状态变化的方法,通过记录所有状态变化的事件来实现最终一致性
当需要恢复或同步状态时,可以通过重放事件来达到一致状态
b.示例代码
public class EventStore {
private final List<Event> events = new ArrayList<>();
public void appendEvent(Event event) {
events.add(event);
}
public List<Event> getEvents() {
return events;
}
}
public class OrderAggregate {
private final EventStore eventStore;
public OrderAggregate(EventStore eventStore) {
this.eventStore = eventStore;
}
public void createOrder(Order order) {
// 创建订单事件
OrderCreatedEvent event = new OrderCreatedEvent(order);
eventStore.appendEvent(event);
// 应用事件
apply(event);
}
private void apply(OrderCreatedEvent event) {
// 处理订单创建事件
}
}
1.12 [4]最终⼀致性:本地消息表
01.概念
a.说明
本地消息表是最常用的一种实现分布式事务的模式,通过协调本地事务与外部系统的状态
确保系统在分布式环境下达到一致性。它主要用于确保在多个系统之间进行数据传输和状态更新时,保持最终一致性
b.说明
本地消息表是Base理论的应用,实现了数据的最终一致性,是分布式服务中最常用的数据一致性解决方案
实现简单,不过数据延迟性较高
c.说明
本地消息表的核心思路就是将分布式事务拆解为本地事务和发送MQ消息
跟RocketMQ事务消息类型,不过实现起来更为简单,当然缺点就是数据延迟性更高
d.说明
本地消息表通过在数据库中维护一张专门的消息表来管理与外部系统的通信和状态更新
消息表的写入是与业务操作同时在一个本地事务中完成的
在分布式服务中,虽然不能提供强一致性,但通过本地事务与消息表相结合,可以确保消息不会丢失,并最终实现事务的一致性
e.优点
实现简单,只使用一张消息表来维护消息的发送状态
容错性较高,如果消息发送失败,可以使用补偿任务重新发送
实现了分布式系统中数据的最终一致性
f.缺点
数据一致性延迟较高,由于依赖异步消息传递,不能立即保证数据一致性,只能实现最终一致性
02.实现流程
a.写入业务操作和消息记录
在同一个事务中,执行业务操作,并将消息写入消息表中,消息表记录至少包含消息ID、消息内容、目标系统和状态等
事务提交后,业务操作和消息记录都会被持久化到数据库
b.发送消息和更新消息状态
发送消息到MQ(消息队列)系统
发送成功后,更新消息表状态为已发送
c.补偿任务处理
消息记录被写入消息表后,补偿任务会定期扫描消息表,寻找尚未被处理的消息,重新发送消息到MQ(消息队列)系统
d.消费端幂等处理
消费者收到消息之后,需要确保操作具有幂等性,因为消息可能会被重复处理
e.注意
第一步和第二步写入业务数据和写入消息表,需要在同一个本地事务中
第三步、第四步、第五步由于涉及第三方MQ系统,所以无法放在一个本地事务中
03.代码实现
a.思路
1.设计消息表:在数据库中创建一张消息表,包含消息内容、状态、创建时间等字段
2.事务内记录消息:在业务操作的本地事务中,插入消息到消息表
3.定期发送消息:实现一个定时任务,扫描消息表,将未发送的消息发送到消息队列
4.更新消息状态:消息发送成功后,更新消息表中的状态
5.处理消费确认:消费方处理消息后,发送确认信息,生产者更新消息状态
b.设计消息表
CREATE TABLE local_message (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_content TEXT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
c.事务内记录消息
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public void executeBusinessLogic(Connection connection, String businessData, String messageContent) throws SQLException {
try {
connection.setAutoCommit(false);
// 执行业务操作
// ...
// 插入消息到本地消息表
String insertMessageSQL = "INSERT INTO local_message (message_content, status) VALUES (?, 'PENDING')";
try (PreparedStatement pstmt = connection.prepareStatement(insertMessageSQL)) {
pstmt.setString(1, messageContent);
pstmt.executeUpdate();
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
d.定期发送消息
import java.sql.*;
import java.util.Timer;
import java.util.TimerTask;
public class MessageSender {
private Connection connection;
public MessageSender(Connection connection) {
this.connection = connection;
}
public void start() {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
sendPendingMessages();
} catch (SQLException e) {
e.printStackTrace();
}
}
}, 0, 5000); // 每5秒执行一次
}
private void sendPendingMessages() throws SQLException {
String selectSQL = "SELECT id, message_content FROM local_message WHERE status = 'PENDING'";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(selectSQL)) {
while (rs.next()) {
long messageId = rs.getLong("id");
String messageContent = rs.getString("message_content");
// 发送消息到消息队列
boolean success = sendMessageToQueue(messageContent);
if (success) {
// 更新消息状态为已发送
String updateSQL = "UPDATE local_message SET status = 'SENT' WHERE id = ?";
try (PreparedStatement pstmt = connection.prepareStatement(updateSQL)) {
pstmt.setLong(1, messageId);
pstmt.executeUpdate();
}
}
}
}
}
private boolean sendMessageToQueue(String messageContent) {
// 实现消息发送逻辑
// 返回 true 表示发送成功
return true;
}
}
2 MySQL事务
2.1 [1]使用
01.查看事务
use sys;
select @@autocommit;
02.使用事务
# 开启事务
SET autocommit = 0;
BEGIN;
UPDATE test SET num = 10 WHERE id = '1';
UPDATE test SET num = 10 WHERE var = 1;
# 提交事务
COMMIT;
# 回滚事务
ROLLBACK;
2.2 [1]提交:2阶段
00.汇总
1.保证跨存储引擎(InnoDB)和 Server 层(binlog)的事务原子性
2.避免因崩溃导致 redo log 和 binlog 数据不一致
3.确保主从复制的数据一致性
01.为什么需要两阶段提交
a.介绍
a.说明
MySQL 的 redo log(由 InnoDB 管理)和 binlog(由 MySQL Server 管理)是两个独立的日志系统
但事务提交时需要同时保证两者的原子性:
b.redo log
记录事务对 InnoDB 存储引擎的数据页修改,用于崩溃恢复
c.binlog
记录所有数据库表结构变更和数据修改的逻辑操作,用于主从复制和数据恢复
b.如果事务提交时只写入一个日志而另一个日志失败,会导致数据不一致
a.若 redo log 提交成功但 binlog 失败
事务在崩溃恢复后会被保留,但从库无法通过 binlog 同步该事务
b.若 binlog 提交成功但 redo log 失败
事务在崩溃恢复后会被回滚,但从库可能已通过 binlog 执行该事务
c.两阶段提交的核心目标
确保 redo log 和 binlog 的数据一致性,使事务的提交要么同时成功,要么同时失败
02.两阶段提交的过程
a.说明
MySQL 通过两个阶段协调 redo log 和 binlog 的写入
b.阶段一:Prepare(准备阶段)
InnoDB 将事务的修改写入 redo log,并标记事务状态为 PREPARE
MySQL Server 将事务的逻辑操作写入 binlog(此时 binlog 尚未实际刷盘)
c.阶段二:Commit(提交阶段)
MySQL Server 调用 fsync() 将 binlog 强制刷盘(确保持久化)
InnoDB 将事务的 redo log 状态修改为 COMMIT,完成事务提交
03.崩溃恢复时的处理
a.说明
若在提交过程中发生崩溃(如阶段一完成后崩溃),MySQL 重启后会检查 redo log 和 binlog 的状态
b.如果 redo log 中存在 PREPARE 状态的事务
a.检查对应的 binlog 是否已完整写入
a.binlog 完整
提交事务(重放 redo log)
b.binlog 不完整
回滚事务(丢弃 redo log)
b.通过这一机制
1.所有提交的事务在 redo log 和 binlog 中是一致的
2.崩溃恢复后不会出现数据丢失或主从不一致
04.两阶段提交的代价
a.磁盘 I/O 次数增加
需要多次 fsync() 操作保证日志持久化
b.锁竞争
在协调阶段可能阻塞其他事务
c.复杂性
崩溃恢复逻辑更为复杂
2.3 [1]特征:ACID,全支持
00.汇总
A原子性(atomicity) 要么都成功,要么都失败 借钱zs -1000 ls +1000
C一致性(consistency) 事务执行前后,总量保持一致 人类的全部财富值
I隔离性(isolation) 各个事务并发执行时,彼此独立
D持久性(durability) 持久化操作
01.Atomic原子性
事务必须是一个原子的操作序列单元,事务中包含的各项操作在一次执行过程中
要么全部执行成功,要么全部不执行,任何一项失败,整个事务回滚,只有全部都执行成功,整个事务才算成功
02.Consistency一致性
事务的执行不能破坏数据库数据的完整性和一致性,事务在执行之前和之后,数据库都必须处于一致性状态
03.Isolation隔离性
在并发环境中,并发的事务是相互隔离的,一个事务的执行不能被其他事务干扰
即不同的事务并发操纵相同的数据时,每个事务都有各自完整的数据空间
即一个事务内部的操作及使用的数据对其他并发事务是隔离的,并发执行的各个事务之间不能相互干扰
04.Durability持久性
持久性也称永久性(permanence),指一个事务一旦提交,它对数据库中对应数据的状态变更就应该是永久性的
即使发生系统崩溃或机器宕机,只要数据库能够重新启动,那么一定能够将其恢复到事务成功结束时的状态
2.4 [1]事务:仅InnoDB存储支持
01.支持引擎
仅InnoDB存储
02.使用事务的步骤
a.启动事务
使用START TRANSACTION或BEGIN命令开始一个事务
b.执行SQL操作
在事务中执行一组SQL操作(如INSERT、UPDATE、DELETE等)
c.提交事务
使用COMMIT命令提交事务,将所有操作的结果保存到数据库中
d.回滚事务
如果在事务过程中发生错误或需要撤销操作,可以使用ROLLBACK命令回滚事务,将数据库恢复到事务开始前的状态
03.示例
a.操作
-- 假设有一个账户表 accounts,包含 id 和 balance 两个字段
-- 开始事务
START TRANSACTION;
-- 从账户1扣除100
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 向账户2增加100
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- 检查账户1的余额是否足够
SELECT balance FROM accounts WHERE id = 1;
-- 如果余额不足,回滚事务
-- 假设我们在应用程序中检查余额,并决定是否回滚
-- ROLLBACK;
-- 如果一切正常,提交事务
COMMIT;
b.注意事项
a.事务隔离级别
MySQL支持多种事务隔离级别(如READ UNCOMMITTED、READ COMMITTED、REPEATABLE READ、SERIALIZABLE)
可以通过SET TRANSACTION ISOLATION LEVEL命令设置。不同的隔离级别影响事务的并发性和一致性
b.自动提交模式
MySQL默认启用自动提交模式,即每个SQL语句都是一个单独的事务
可以通过SET AUTOCOMMIT = 0关闭自动提交模式,以便手动管理事务
c.错误处理
在事务中执行SQL操作时,应注意捕获和处理可能的错误,以便在必要时回滚事务
2.5 [1]MVCC:多版本并发控制
01.定义
MVCC(Multi-Version Concurrency Control,多版本并发控制)
一种用于管理数据库并发访问的机制,它允许多个事务同时读取数据库而不互相阻塞
MVCC通过为每条记录维护多个版本来实现这一点,每次修改记录时都会存储该记录被修改之前的版本
02.MVCC的工作机制
a.版本号
每个事务都有一个唯一的版本号,数据库中的每条记录也有一个版本号
事务在读取数据时,会选择与自己版本号相匹配的记录版本
b.旧版本保留
当记录被修改时,旧版本的数据不会立即删除,而是保留以供其他事务读取
c.提交和回滚
事务提交后,新的版本号成为当前版本;如果事务回滚,旧版本仍然有效
03.MVCC的实现机制
a.定义
MySQL中的MVCC(多版本并发控制)主要通过InnoDB存储引擎实现
InnoDB使用MVCC来支持事务的隔离性和一致性,特别是在实现可重复读(REPEATABLE READ)和读已提交(READ COMMITTED)隔离级别时
b.隐藏列
InnoDB在每行记录中添加了两个隐藏列,用于存储版本信息:
事务ID(Transaction ID):记录创建或最后一次修改该行的事务ID
回滚指针(Rollback Pointer):指向该行的上一个版本,用于实现回滚操作
c.快照读
在快照读中,事务读取的是数据的快照版本,而不是当前最新版本。InnoDB通过比较事务ID来确定事务可以看到哪个版本的数据
事务在开始时获取一个一致性视图(Consistent Read View),该视图决定了事务可以看到哪些版本的数据
d.当前读
当前读用于需要锁定数据的操作,如SELECT ... FOR UPDATE或UPDATE
当前读会读取最新版本的数据,并可能加锁以防止其他事务修改
e.版本链
当一行数据被更新时,InnoDB不会立即覆盖旧数据,而是创建一个新版本,并通过回滚指针将其链接到旧版本
这形成了一个版本链,事务可以沿着版本链找到符合其一致性视图的版本
f.事务隔离级别
读已提交(READ COMMITTED):每次读取数据时,事务都会获取最新的快照
可重复读(REPEATABLE READ):事务在开始时获取一个快照,并在整个事务过程中使用该快照进行读取
04.如果没有MVCC怎么办?
a.回答
如果没有MVCC,读写操作之间可能会发生冲突,导致事务的并发度降低,从而影响数据库性能
b.场景
假设有一个事务1正在执行,此时另一个事务2修改了记录A,但尚未提交
如果事务1此时需要读取记录A,由于事务2还未提交,
事务1无法读取最新的记录A,否则可能会导致脏读(即读取到未提交的数据)
如果事务2最终回滚,事务1读取到的数据将是不正确的
-----------------------------------------------------------------------------------------------------
在没有MVCC的情况下,事务1只能通过锁机制来阻塞等待事务2的提交,
这种实现方式称为LBCC(Lock-Based Concurrent Control,基于锁的并发控制)
这种方式会导致事务之间的等待和阻塞,降低并发性能
c.有了MVCC的好处
有了MVCC,情况就不同了。即使事务2修改了记录A并未提交,记录A被修改之前的版本仍然存在
此时,事务1可以读取记录A的旧版本数据,而不必等待事务2的提交
这样,读写操作之间不会互相阻塞,从而提高了事务的并发度和数据库的整体性能
2.6 [1]保存点
01.描述
打游戏:10关 1 ,2(savepoint), 3,4,5 (savepoint), 6,7,8 --> rollback
语法:savepoint 保存点名字
02.操作
insert into xx values(1,'zs'); --事务开始
insert into xx values(2,'ls');
savepoint initdate ; --保存点“类似存档”
insert into xx values(3,'ww');
rollback to savepoint initdate; --返回保存点
2.7 [1]生命周期
01.提交方式
MySQL:自动提交,自动将每一条DML语句直接commit
Oracle:手工提交
02.事务的过程
a.事务的开始标识
第一条DML
b.事务的中间过程
各种DML操作
c.事务的结束
a.提交
1.显示提交:commit
2.隐式提交(自动提交):正常退出exit(ctrl+c)、DCL(grant ....to..., revoke ..from )、DDL(create ... ,drop ....)
b.回滚
1.显示回滚:rollback
2.隐式回滚:异常退出(宕机、断电)
2.8 [2]传播行为:7种,默认REQUIRED
00.汇总
a.传播行为:7种
a.保证同一个事务中
REQUIRED 支持当前事务,如果不存在,就新建一个,【默认】级别
SUPPORTS 支持当前事务,如果不存在,就不使用事务
MANDATORY 支持当前事务,如果不存在,抛出异常
b.保证没有在同一个事务中
REQUIRES_NEW 如果有事务存在,挂起当前事务,创建一个新的事务
NOT_SUPPORTED 以非事务方式运行,如果有事务存在,挂起当前事务
NEVER 以非事务方式运行,如果有事务存在,抛出异常
NESTED 如果当前事务存在,则嵌套事务执行
b.经典3种事务
a.REQUIRED,【默认】级别
支持当前事务,如果不存在,就新建一个
b.REQUIRES_NEW
如果有事务存在,挂起当前事务,创建一个新的事务
c.NESTED
如果当前存在事务,则在【嵌套事务】中执行;否则,行为与REQUIRED相同
c.外部方法为REQUIRED
a.外部方法没有事务
a.执行异常
如果外部方法没有事务,并且执行过程中发生异常,可能需要对异常进行处理,通常会导致整个操作失败
b.成功执行
如果外部方法没有事务,并且执行成功,则操作正常完成
b.外部方法有事务
a.外部方法执行无异常
如果外部方法有事务,并且执行过程中没有异常,事务会正常提交
b.外部方法执行异常
a.内部方法执行异常被catch
如果外部方法有事务,并且内部方法执行过程中发生异常,但异常被捕获并处理,事务可能会继续,具体行为取决于异常处理逻辑
b.内部方法执行异常不被catch
如果外部方法有事务,并且内部方法执行过程中发生异常且未被捕获,事务通常会回滚,以保证数据一致性
d.REQUIRED
Propagation 方法A 方法B 预期结果 实际运行结果
REQUIRED 外部正常运行 内部方法正常运行 两张表正常插入 正常运行
外部正常、不捕获B抛出的异常 内部抛出异常 均失败 插入失败
外部正常、捕获B抛出的异常 内部抛出异常 均失败 插入失败
外部出现异常 内部正常/异常 均失败 插入失败
e.REQUIRED_NEW
外部方法A 内部方法B 理论分析 实际结果
-----------------------------------------------------------------------------------------------------
外部方法有事务且正常 抛出异常 内部方法在一个新的事务里面,发生异常回滚 两者都没有插入成功
没有捕获内部抛出的异常 外部方法感知到异常,也回滚
-----------------------------------------------------------------------------------------------------
外部方法有事务且正常 抛出异常 内部方法在一个新的事务里面,发生异常回滚 user成功、device失败
捕获抛出的异常 外部方法捕获异常、事务没有感知到异常
-----------------------------------------------------------------------------------------------------
外部方法有事务且异常 正常 外部事务发生异常、导致回滚 user失败、device正常
内部方法没有异常 内部方法正常没有感知到外部异常,因此插入正常
f.NESTED
外部方法 内部方法 分析 结果
外部正常、没有捕获 异常抛出异常 外部感知到异常、都回滚 都失败
外部正常、捕获内部抛异常 抛出异常 由于异常被捕获、事务管理没有感知到异常、外部成功内部失败 user成功、device失败
外部异常 内部正常 外部异常回滚、导致"嵌套"在内的子事务也回滚 都失败
01.REQUIRED
a.外部方法成功内部方法也成功
a.UserServiceImpl
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequiredNestedRequiredNoException(User newUser) {
log.info("预期结果:user 插入成功,device 插入成功");
userMapper.insertUser(newUser);
deviceService.insertDeviceByRequireNoException(
new Device(null, "RequiredNoException", "required",
"normal")
);
}
b.DeviceServiceImpl
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
@Override
public Integer insertDeviceByRequireNoException(Device device) {
log.info("propagation = Propagation.REQUIRED【正常运行不抛异常】");
return deviceMapper.insertDevice(device);
}
c.测试代码
@Test
public void test_Required_Nested_No_Exception() {
User user = new User();
user.setUserName("两者都能正常插入");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequiredNestedRequiredNoException(user);
}
b.外部方法成功,内部方法抛出异常
a.场景代码如下
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
@Override
public void insertUserByRequiredNestedRequireThrowException(User newUser) {
log.info("预期结果:user 插入失败,device 插入失败");
userMapper.insertUser(newUser);
deviceService.insertDeviceByRequireThrowException(
new Device(null, "device抛出异常", "required",
"normal")
);
}
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public Integer insertDeviceByRequireThrowException(Device device) {
log.info("propagation = Propagation.REQUIRED【抛出异常】");
deviceMapper.insertDevice(device);
throw new RuntimeException("插入数据失败");
}
b.测试方法
@Test
public void test_Required_Nested_Throw_Require_Exception() {
User user = new User();
user.setUserName("Device抛出Required异常");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequiredNestedRequireThrowException(user);
}
c.运行结果
User表中没有新增的数据。插入Device的方法抛出异常、本身就会回滚。因此都插入失败
c.外部方法捕获内部抛出的异常
a.说明
这种情况下、可能会想到由于我本身捕获了异常、导致本身的事务失效
所以User会插入成功、Device插入失败,但实际上
由于这两个方法在同一个事务,因此要么同时成功、要么同时失败、所以运行结果是都失败!
b.代码
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
@Override
public void insertUserByRequiredNestedCatchRequireException(User newUser) {
log.info("预期结果:user 插入失败,device 插入失败");
userMapper.insertUser(newUser);
try {
deviceService.insertDeviceByRequireCatchException(
new Device(null, "User捕获device[Runtime]异常", "required",
"normal")
);
} catch (RuntimeException e) {
log.error("插入数据失败", e);
}
}
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public Integer insertDeviceByRequireCatchException(Device device) {
deviceMapper.insertDevice(device);
throw new RuntimeException("插入数据失败");
}
c.运行结果
报错
d.外部方法异常
a.说明
同样的分析、由于这两个方法在同一个事务里面、不可能独善其身的!
b.代码
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequiredExceptionNestedRequire(User newUser) {
log.info("预期结果:user 插入失败,device 插入失败");
userMapper.insertUser(newUser);
deviceService.insertDeviceByNestedNoException(
new Device(null, "Device正常运行", "required",
"normal")
);
throw new RuntimeException("插入数据失败");
}
e.汇总
Propagation 方法A 方法B 预期结果 实际运行结果
REQUIRED 外部正常运行 内部方法正常运行 两张表正常插入 正常运行
外部正常、不捕获B抛出的异常 内部抛出异常 均失败 插入失败
外部正常、捕获B抛出的异常 内部抛出异常 均失败 插入失败
外部出现异常 内部正常/异常 均失败 插入失败
02.REQUIRED_NEW
a.外部方法有事务、内部方法抛出异常
a.说明
内部方法出现异常、本身就会回滚、外部方法感知到异常、也会回滚
b.测试方法
@Test
public void test_Require_Nested_Require_New_Throw_Exception() {
User user = new User();
user.setUserName("二者插入失败");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequireNestedRequireNewThrowException(user);
}
c.场景代码
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequireNestedRequireNewThrowException(User newUser) {
log.info("预期结果:user 插入失败,device 插入失败");
userMapper.insertUser(newUser);
deviceService.insertDeviceByRequireNewThrowException(
new Device(null, "都插入失败", "required_new",
"normal")
);
}
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public Integer insertDeviceByRequireNewThrowException(Device device) {
log.info("propagation = Propagation.REQUIRES_NEW【抛出异常】");
deviceMapper.insertDevice(device);
throw new RuntimeException("插入数据失败");
}
d.运行结果
结果符合分析预期
b.外部方法有事务、内部方法抛出异常被捕获
a.说明
两个方法不在同一个事务里面、外部方法捕获异常导致本身事务失效、因此运行结果为user插入成功、device插入失败
b.代码
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequireNestedRequireNewCatchException(User newUser) {
log.info("预期结果:user 插入成功,device 插入失败");
userMapper.insertUser(newUser);
try {
deviceService.insertDeviceByRequireNewCatchException(
new Device(null, "UserService捕获抛出的异常", "required_new",
"normal")
);
} catch (RuntimeException e) {
log.error("插入数据失败", e);
}
}
c.测试代码
@Test
public void test_Require_Nested_Require_New_Catch_Exception() {
User user = new User();
user.setUserName("User插入成功,Device插入失败");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequireNestedRequireNewCatchException(user);
}
c.外部方法有事务且异常、内部方法没有异常
a.说明
由于两个方法在不同的事务、且不是嵌套关系、外部失败、外部回滚、内部方法的事务没有感知
b.代码
@Test
public void test_Require_Exception_Nested_Require_New_No_Exception() {
User user = new User();
user.setUserName("user:runtime-ex,device insert ok");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequireExceptionNestedNoException(user);
}
c.场景代码
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequireExceptionNestedNoException(User newUser) {
log.info("预期结果:user 插入失败,device 插入成功");
userMapper.insertUser(newUser);
deviceService.insertDeviceByRequireNewNoException(
new Device(null, "NestedNoException", "required",
"normal")
);
throw new RuntimeException("插入数据失败");
}
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public Integer insertDeviceByRequireNewNoException(Device device) {
log.info("propagation = Propagation.REQUIRES_NEW【正常运行不抛异常】");
return deviceMapper.insertDevice(device);
}
d.汇总
外部方法A 内部方法B 理论分析 实际结果
-----------------------------------------------------------------------------------------------------
外部方法有事务且正常 抛出异常 内部方法在一个新的事务里面,发生异常回滚 两者都没有插入成功
没有捕获内部抛出的异常 外部方法感知到异常,也回滚
-----------------------------------------------------------------------------------------------------
外部方法有事务且正常 抛出异常 内部方法在一个新的事务里面,发生异常回滚 user成功、device失败
捕获抛出的异常 外部方法捕获异常、事务没有感知到异常
-----------------------------------------------------------------------------------------------------
外部方法有事务且异常 正常 外部事务发生异常、导致回滚 user失败、device正常
内部方法没有异常 内部方法正常没有感知到外部异常,因此插入正常
03.NESTED
a.外部正常、内部抛出异常
a.说明
内部方法抛出异常、本身就会回滚、外部方法感知到异常之后、也会回滚
b.代码
@Test
public void test_Require_Nested_Nested_Throw_Exception (){
User user = new User();
user.setUserName("二者均失败: 没有捕获异常");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequireNestedNestedThrowException(user);
}
c.对应的方法代码如下
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequireNestedNestedThrowException(User newUser) {
log.info("预期结果:user 插入失败,device 插入失败");
userMapper.insertUser(newUser);
deviceService.insertDeviceByNestedThrowException(
new Device(null, "Nested Throw Exception", "nested",
"normal")
);
}
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.NESTED)
public Integer insertDeviceByNestedThrowException(Device device) {
log.info("propagation = Propagation.NESTED【抛出异常】");
deviceMapper.insertDevice(device);
throw new RuntimeException("插入数据失败");
}
b.外部正常、内部抛出异常被捕获
a.说明
内部异常会导致本身回滚、外部捕获异常、事务管理没有感知到异常、因此User成功、Device插入失败
b.代码
@Test
public void test_Require_Nested_Nested_Catch_Exception (){
User user = new User();
user.setUserName("User插入成功,Nested插入失败");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequireNestedNestedCatchException(user);
}
c.对应的场景代码
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequireNestedNestedCatchException(User newUser) {
log.info("预期结果:user 插入成功,device 插入失败");
userMapper.insertUser(newUser);
try {
deviceService.insertDeviceByNestedCatchException(
new Device(null, "User Catch Exception", "nested",
"normal")
);
} catch (RuntimeException e) {
log.error("插入数据失败", e);
}
}
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.NESTED)
public Integer insertDeviceByNestedCatchException(Device device) {
log.info("propagation = Propagation.NESTED 捕获");
deviceMapper.insertDevice(device);
throw new RuntimeException("插入数据失败");
}
c.外部异常内部正常
a.说明
外部事务回滚、因此不管内部事务是那种运行状态、都会回滚
b.代码
@Test
public void test_Require_Exception_Nested_Nested_No_Exception (){
User user = new User();
user.setUserName("二者均失败: UserService出现异常");
user.setAge(18);
user.setCountNumber(1);
userService.insertUserByRequireExceptionNestedNestedNoException(user);
}
c.场景代码
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void insertUserByRequireExceptionNestedNestedNoException(User newUser) {
log.info("预期结果:user 插入失败,device 插入失败");
userMapper.insertUser(newUser);
deviceService.insertDeviceByNestedNoException(
new Device(null, "NestedNoException", "nested",
"normal")
);
throw new RuntimeException("插入数据失败");
}
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.NESTED)
public Integer insertDeviceByNestedNoException(Device device) {
log.info("propagation = Propagation.NESTED【正常运行不抛异常】");
return deviceMapper.insertDevice(device);
}
d.汇总
外部方法 内部方法 分析 结果
外部正常、没有捕获 异常抛出异常 外部感知到异常、都回滚 都失败
外部正常、捕获内部抛异常 抛出异常 由于异常被捕获、事务管理没有感知到异常、外部成功内部失败 user成功、device失败
外部异常 内部正常 外部异常回滚、导致"嵌套"在内的子事务也回滚 都失败
2.9 [2]隔离级别:4个,默认RR,可重复读
00.汇总
a.读未提交(READ UNCOMMITTED)
所有事务都可以看到其他事务未提交的修改,一般很少使用
b.读已提交(READ COMMITTED)
Oracle默认隔离级别,事务之间只能看到彼此已提交的变更修改
c.可重复读(REPEATABLE READ)
MySQL默认隔离级别,同一事务中的多次查询会看到相同的数据行;可以解决不可重复读,但可能出现幻读
d.可串行化(SERIALIZABLE)
最高的隔离级别,事务串行的执行,前一个事务执行完,后面的事务会执行
读取每条数据都会加锁,会导致大量的超时和锁争用问题;
e.图示
问题类型 读未提交 (RU) 读已提交 (RC) 可重复读 (RR) 可串行化 (SER)
脏读 允许 禁止 禁止 禁止
不可重复读 允许 允许 禁止 禁止
幻读 允许 允许 允许(但MySQL InnoDB解决了幻读) 禁止
01.读未提交
a.定义
如果一个事务正在处理某一数据,并对其进行了更新,但同时尚未完成事务,因此事务没有提交
与此同时,允许另一个事务也能够访问该数据
例如A将变量n从0累加到10才提交事务,此时B可能读到n变量从0到10之间的所有中间值
b.说明
脏读:允许。一个事务可以读取另一个事务尚未提交的更改
不可重复读:允许。一个事务在多次读取同一数据时,可能会看到不同的值,因为其他事务可能在中间进行了更新
幻读:允许。一个事务在多次执行相同的查询时,可能会看到不同的结果集,因为其他事务可能在中间插入了新的行
02.读已提交
a.定义
只允许读到已经提交的数据。即事务A在将n从0累加到10的过程中,B无法看到n的中间值,之中只能看到10
同时有事务C进行从10到20的累加,此时B在同一个事务内再次读时,读到的是20
b.说明
脏读:禁止。一个事务只能读取其他事务已经提交的更改
不可重复读:允许。一个事务在多次读取同一数据时,可能会看到不同的值,因为其他事务可能在中间进行了更新
幻读:允许。一个事务在多次执行相同的查询时,可能会看到不同的结果集,因为其他事务可能在中间插入了新的行
03.可重复读
a.定义
保证在事务处理过程中,多次读取同一个数据时,其值都和事务开始时刻时是一致的
幻读即同样的事务操作,在前后两个时间段内执行对同一个数据项的读取,可能出现不一致的结果
保证B在同一个事务内,多次读取n的值,读到的都是初始值0。幻读,就是不同事务,读到的n的数据可能是0,可能10,可能是20
b.说明
脏读:禁止。一个事务只能读取其他事务已经提交的更改
不可重复读:禁止。一个事务在其生命周期内多次读取同一数据时,总是看到相同的值
幻读:在标准SQL中允许,但在MySQL的InnoDB中,通过Next-Key Locking机制,幻读问题在可重复读隔离级别下也得到了有效解决
04.可串行化
a.定义
最严格的事务,要求所有事务被串行执行,不能并发执行
b.说明
脏读:禁止。一个事务只能读取其他事务已经提交的更改
不可重复读:禁止。一个事务在其生命周期内多次读取同一数据时,总是看到相同的值
幻读:禁止。通过完全串行化的方式执行事务,确保事务之间没有交叉的读写操作,从而避免幻读
2.10 [2]幻读:InnoDB的PR隔离级别
01.在MySQL的InnoDB存储引擎中,幻读问题可以通过以下方式解决
a.隔离级别
MySQL的InnoDB引擎在默认的 REPEATABLE READ 隔离级别下,通过使用锁机制解决了幻读问题
b.Next-Key Locking
InnoDB使用了一种称为Next-Key Locking的算法。这种算法结合了行锁和间隙锁(Gap Lock),不仅锁定了查询涉及的行,还锁定了这些行之间的间隙
通过锁定间隙,InnoDB防止其他事务在当前事务执行期间插入新的行,从而避免了幻读
c.间隙锁(Gap Lock)
间隙锁是Next-Key Locking的一部分,它锁定了索引记录之间的间隙
例如,如果一个查询锁定了ID为5的行,间隙锁会锁定ID为4和ID为6之间的间隙,防止其他事务在这个间隙中插入新的行
d.可重复读(REPEATABLE READ)
在 REPEATABLE READ 隔离级别下,InnoDB通过Next-Key Locking确保同一事务中的多次读取操作返回相同的结果集,即使其他事务插入了新的行
2.11 [2]幻读、不可重复读
01.区别1
幻读:在同一事务中,相同条件下,两次查询出来的 记录数 不一样
不可重复读:在同一事务中,相同条件下,两次查询出来的 数据 不一样
02.区别2
幻读:对于“多条数据”的查询操作,数据量数 20条 -> 18条/22条
不可重复读:对于“同一条”数据的查询操作 a -> b
03.区别3
幻读:insert/delete
不可重复读:update
2.12 [2]脏读、幻读、不可重复读
00.汇总
a.背景
在多个事务【并发操作】时,数据库中会脏读,幻读,不可重复读
b.图示
问题类型 读未提交 (RU) 读已提交 (RC) 可重复读 (RR) 可串行化 (SER)
脏读 允许 禁止 禁止 禁止
不可重复读 允许 允许 禁止 禁止
幻读 允许 允许 允许(但MySQL InnoDB解决了幻读) 禁止
01.脏读
a.定义
一个事务读取了另一个事务尚未提交的更改。这可能导致读取到不一致的数据,因为未提交的更改可能会被回滚
b.说明
事务A读到了事务B还未提交的数据:
事务A读取的数据,事务B对该数据进行修改还未提交数据之前,事务A再次读取数据会读到事务B已经修改后的数据
如果此时事务B进行回滚或再次修改该数据然后提交,事务A读到的数据就是脏数据,这个情况被称为脏读(Dirty Read)
c.说明
当一个事务正在访问数据,并对此数据进行了修改(1->2),但是这种修改【还没有提交到数据库(commit)】
此时,另一个事务也在访问这个数据。本质:某个事务(客户端)读取到的数据是过时的
d.示例
a.事务A
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 未提交
b.事务B
SELECT balance FROM accounts WHERE id = 1;
-- 可能读取到未提交的更新
02.不可重复读
a.定义
一个事务在多次读取同一数据时,看到不同的值。这是因为在两次读取之间,其他事务可能对数据进行了更新或删除
b.说明
事务A进行范围查询时,事务B中新增了满足该范围条件的记录,当事务A再次按该条件进行范围查询
会查到在事务B中提交的新的满足条件的记录(幻行 Phantom Row)
在一个事务的两次查询中数据笔数不一致,例如有一个事务查询了几列(Row)数据
而另一个事务却在此时插入了新的几列数据,先前的事务在接下来的查询中,就会发现有几列数据是它先前所没有的
c.说明
在一个事务内(客户端)内,多次读取同一批数据,但结果不同
d.示例
a.事务A
START TRANSACTION;
SELECT balance FROM accounts WHERE id = 1;
-- 读取到的balance为1000
b.事务B
START TRANSACTION;
UPDATE accounts SET balance = balance + 500 WHERE id = 1;
COMMIT;
c.事务A
SELECT balance FROM accounts WHERE id = 1;
-- 读取到的balance为1500,发生不可重复读
03.幻读
a.定义
一个事务在多次执行相同的查询时,看到不同的结果集。这是因为在两次查询之间,其他事务可能插入了新的行
b.说明
事务A在读取某些数据后,再次读取该数据,发现读出的该数据已经在事务B中发生了变更或删除
在一个事务的两次查询之中数据不一致,这可能是两次查询过程中间插入了一个事务更新的原有的数据
c.说明
在一个事务内(客户端)内,多次读取同一个数据,但结果不同
本质:就是事务A拿到了被其他事务B修改并提交后的数据
d.示例
a.事务A
START TRANSACTION;
SELECT * FROM orders WHERE amount > 100;
-- 返回两行
b.事务B
START TRANSACTION;
INSERT INTO orders (id, amount) VALUES (3, 150);
COMMIT;
c.事务A
SELECT * FROM orders WHERE amount > 100;
-- 返回三行,发生幻读
2.13 [3]事务控制:若无,5种情形
01.如果不对事务进行并发控制,我们看看数据库并发操作是会有那些异常情形
a.一类丢失更新
两个事物读同一数据,一个修改字段1,一个修改字段2,后提交的恢复了先提交修改的字段
b.二类丢失更新
两个事物读同一数据,都修改同一字段,后提交的覆盖了先提交的修改
c.脏读
读到了未提交的值,万一该事物回滚,则产生脏读
d.不可重复读
两个查询之间,被另外一个事务修改了数据的内容,产生内容的不一致
e.幻读
两个查询之间,被另外一个事务插入或删除了记录,产生结果集的不一致
2.14 [3]支持XA事务:参与2PC分布式事务
00.总结
a.回答
MySQL本身【并不直接实现两阶段提交协议(2PC)或三阶段提交协议(3PC)】,通过【支持XA事务】来【参与分布式事务管理】
b.XA事务
XA事务是基于两阶段提交协议(2PC)的标准,用于在分布式系统中实现全局事务的一致性
XA START 启动
XA END 结束
XA PREPARE 准备
XA COMMIT 提交
XA ROLLBACK 回滚
01.MySQL与两阶段提交(2PC)
a.XA事务支持:
MySQL支持XA事务,这是一种用于分布式事务的协议,遵循两阶段提交(2PC)模型
XA事务允许MySQL作为一个资源管理器参与到分布式事务中,与其他数据库或系统一起确保全局事务的一致性
b.两阶段提交的过程:
a.准备阶段(Prepare Phase):
事务管理器(TM)向所有参与的数据库(如MySQL)发送准备请求
每个数据库执行本地事务操作,并将状态记录到日志中,然后返回准备好的状态给事务管理器
b.提交阶段(Commit Phase):
如果所有参与者都返回准备好的状态,事务管理器向所有参与者发送提交请求,所有数据库提交本地事务
如果任何参与者返回失败状态,事务管理器向所有参与者发送回滚请求,所有数据库回滚本地事务
02.假设我们有两个数据库实例db1和db2,我们希望在这两个数据库中执行一个分布式事务
a.在db1中执行XA事务
-- 连接到db1
-- 开始XA事务
XA START 'xid-123';
-- 执行本地事务操作
INSERT INTO accounts (id, balance) VALUES (1, 1000);
-- 结束XA事务
XA END 'xid-123';
-- 准备提交
XA PREPARE 'xid-123';
b.在db2中执行XA事务
-- 连接到db2
-- 开始XA事务
XA START 'xid-123';
-- 执行本地事务操作
INSERT INTO accounts (id, balance) VALUES (2, 2000);
-- 结束XA事务
XA END 'xid-123';
-- 准备提交
XA PREPARE 'xid-123';
c.提交事务
-- 在db1中提交
XA COMMIT 'xid-123';
-- 在db2中提交
XA COMMIT 'xid-123';
d.回滚事务
-- 在db1中回滚
XA ROLLBACK 'xid-123';
-- 在db2中回滚
XA ROLLBACK 'xid-123';
3 Spring事务
3.1 [1]两种
01.编程式
编码方式
02.声明式
方式1:配置XML或注解
方式2:Spring的AOP注解
3.2 [1]概念:4个
01.代理对象
Spring的事务管理是基于代理机制实现的。默认情况下,Spring使用AOP代理来拦截对带有@Transactional注解的方法的调用
02.目标对象
当在同一个类中调用另一个方法时,调用是通过this引用进行的,而不是通过代理对象进行的。因此,事务拦截器无法拦截这种内部调用,导致事务失效
03.事务包裹
当一个方法调用另一个方法时,如果调用是通过代理对象进行的,Spring会在外层方法和内层方法之间正确地管理事务边界
04.事务嵌套
但在同一个类中直接调用方法时,内层方法的事务注解不会被代理拦截,因此不会创建新的事务或加入现有事务
3.3 [2]失效场景:15个
00.汇总
1.你的service类没有被Spring管理 没有使用 @Service 等注解,Spring 无法为业务类创建代理,事务逻辑不会被拦截
2.没有在Spring配置文件中启用事务管理器 在 Spring 配置中未注册事务管理器,事务代理无法生成,导致事务失效(Spring Boot 除外)
3.事务方法被final、static关键字修饰 这类方法无法被代理重写,因而事务逻辑无法注入
4.同一个类中,方法内部调用 内部调用不会经过代理对象,事务注解无法生效
5.方法的访问权限不是public 非 public 方法不会被 AOP 代理拦截,事务配置被忽略
6.数据库的存储引擎不支持事务 例如 MySQL 的 MyISAM 不支持事务,必须使用支持事务的引擎(如 InnoDB)
7.配置错误的 @Transactional 注解 如只读属性(readOnly=true)用于写操作、错误的传播机制(Propagation.NOT_SUPPORTED)或 rollbackFor 配置与实际异常不匹配等
8.事务超时时间设置过短 超时设置过低导致事务未完成就被终止
9.使用了错误的事务传播机制 子类重写方法时覆盖或不同传播行为,可能导致事务失效
10.rollbackFor属性配置错误 rollbackFor属性配置错误
11.事务注解被覆盖导致事务失效 子类重写方法时覆盖或不同传播行为,可能导致事务失效
12.嵌套事务的坑 内层事务异常未隔离或被捕获导致外层事务也被回滚或失效
13.事务多线程调用 事务上下文基于线程绑定,多线程调用时无法共享同一事务,从而失效
14.异常被捕获并处理了,没有重新抛出 业务代码捕获异常但未抛出,事务管理器无法感知异常,从而不执行回滚
15.手动抛了别的异常 Spring 默认只对 RuntimeException 和 Error 回滚,若抛出的是 Checked Exception,需要通过 rollbackFor 显式指定才能回滚。
01.你的service类没有被Spring管理
a.示例
//@Service (注释了@Service)
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
public void addTianLuo(TianLuo tianluo) {
//保存tianluo实体数据库记录
tianLuoMapper.save(tianluo);
//保存tianluo流水数据库记录
tianLuoFlowMapper.saveFlow(buildFlowByTianLuo(tianluo));
}
}
b.事务不生效的原因
上面例子中, @Service注解注释之后,spring事务(@Transactional)没有生效
因为Spring事务是由AOP机制实现的,也就是说从Spring IOC容器获取bean时,Spring会为目标类创建代理,来支持事务的
但是@Service被注释后,你的service类都不是spring管理的,那怎么创建代理类来支持事务呢
c.解决方案
加上@Service注解
02.没有在Spring配置文件中启用事务管理器
a.示例
@Configuration
public class AppConfig {
// 没有配置事务管理器
}
@Service
public class MyService {
@Transactional
public void doSomething() {
// ...
}
}
b.事务不生效的原因
没有在AppConfig中配置事务管理器,因此Spring无法创建事务代理对象,导致事务不生效
即使在MyService中添加了@Transactional注解,该方法也不会被Spring管理的事务代理拦截
c.解决方案
@Configuration
// 为了解决这个问题,应该在AppConfig中配置一个事务管器
public class AppConfig {
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource());
}
}
@Service
// 如果是Spring Boot项目,它默认会自动配置事务管理器并开启事务支持
public class MyService {
@Transactional
public void doSomething() {
// ...
}
}
03.事务方法被final、static关键字修饰
a.示例
@Service
public class TianLuoServiceImpl {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
public final void addTianLuo(TianLuo tianluo) {
//保存tianluo实体数据库记录
tianLuoMapper.save(tianluo);
//保存tianluo流水数据库记录
tianLuoFlowMapper.saveFlow(buildFlowByTianLuo(tianluo));
}
}
b.事务不生效的原因
如果一个方法被声明为final或者static,则该方法不能被子类重写
也就是说无法在该方法上进行动态代理,这会导致Spring无法生成事务代理对象来管理事务
c.详细说明
子类拦截重写所有方法,且所有方法执行的都是被重写的方法(切面)
父类即目标类中执行方法会走被重写的子类方法(切面);子类代理是重写所有方法
只在方法第一次调用时调用子类方法(切面),后续都只会调用父类即目标类中定义的方法
不走子类中的重写方法(切面)。因为子类拦截中每次调用目标类方法实际上都是调用切面方法
而子类代理只有第一次调用是调用切面方法,所以子类拦截在类自调用的场景下更合适
d.事务失效的原因
a.代理机制
a.介绍
Spring的事务管理通常依赖于代理(Proxy)模式
默认情况下,Spring使用JDK动态代理或CGLIB代理来创建事务代理对象
b.JDK动态代理
只能代理接口中的方法,因此如果你的类没有实现接口,Spring会使用CGLIB代理
c.CGLIB代理
可以代理类中的方法,但不能代理final方法,因为CGLIB是通过创建子类来实现代理的,而final方法不能被子类重写
b.final类和方法
如果一个类被声明为final,或者一个方法被声明为final,CGLIB无法对其进行代理
这意味着如果你的事务方法是final,Spring将无法为其创建事务代理,从而导致事务失效
c.正确的使用方式
a.避免使用final
如果你需要Spring管理事务,尽量避免将类或方法声明为final
b.接口优先
如果可能,使用接口来定义服务层,这样Spring可以使用JDK动态代理
c.配置代理方式
确保Spring配置为使用适当的代理方式(JDK动态代理或CGLIB),根据你的类结构选择合适的代理
04.同一个类中,方法内部调用
a.示例
@Service
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
public void addTianLuo(TianLuo tianluo){
// 调用内部的事务方法
this.executeAddTianLuo(tianluo);
}
@Transactional
public void executeAddTianLuo(TianLuo tianluo) {
tianLuoMapper.save(tianluo);
tianLuoFlowMapper.saveFlow(buildFlowByTianLuo(tianluo));
}
}
b.事务不生效的原因
事务是通过Spring AOP代理来实现的,而在同一个类中,一个方法调用另一个方法时,调用方法直接调用目标方法的代码
而不是通过代理类进行调用。即以上代码,调用目标executeAddTianLuo方法不是通过代理类进行的,因此事务不生效
c.解决1:可以新建多一个类,让这两个方法分开,分别在不同的类中
a.说明
有时候你也可以在该Service 类中注入自己,或者通过AopContext.currentProxy()获取代理对象
b.代码
@Service
public class TianLuoExecuteServiceImpl implements TianLuoExecuteService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
public void executeAddTianLuo(TianLuo tianluo) {
tianLuoMapper.save(tianluo);
tianLuoFlowMapper.saveFlow(buildFlowByTianLuo(tianluo));
}
}
@Service
public class TianLuoAddServiceImpl implements TianLuoAddService {
@Autowired
private TianLuoExecuteService tianLuoExecuteService;
public void addTianLuo(User user){
tianLuoExecuteService.executeAddTianLuo(user);
}
}
d.解决2:事务传播属性,不生效
a.说明
虽然事务传播属性可以定义事务的行为,但它不能解决由于内部调用未经过代理对象的问题
要解决这个问题,仍然需要通过代理对象进行方法调用
例如使用AopContext.currentProxy()或将方法提取到另一个类中,通过Spring容器注入来确保调用经过代理对象
b.代码
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
@Transactional(propagation = Propagation.REQUIRED)
public void processOrder() {
// 处理订单逻辑
System.out.println("Processing order...");
// 调用更新库存的方法
updateInventory();
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void updateInventory() {
// 更新库存逻辑
System.out.println("Updating inventory...");
}
}
e.解决3:AopContext.currentProxy()
a.说明
在Spring中,事务管理是通过AOP代理实现的
当一个方法调用另一个方法时,如果调用是通过this引用进行的(即直接调用)
那么这个调用不会经过代理对象,导致事务拦截器无法拦截到该调用
因此,事务可能不会按照预期生效
-------------------------------------------------------------------------------------------------
使用 AopContext.currentProxy() 可以解决这个问题
因为它获取当前代理对象,并通过代理对象进行方法调用,从而确保事务拦截器能够正确拦截到该调用
b.代码
import org.springframework.aop.framework.AopContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class MyService {
@Transactional
public void outerMethod() {
// 使用代理对象调用内部方法
((MyService) AopContext.currentProxy()).innerMethod();
}
@Transactional
public void innerMethod() {
// 事务逻辑
}
}
05.方法的访问权限不是public
a.示例
@Service
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
private void addTianLuo(TianLuo tianluo) {
tianLuoMapper.save(tianluo);
tianLuoFlowMapper.saveFlow(buildFlowByTianLuo(tianluo));
}
}
b.事务不生效的原因
spring事务方法addTianLuo的访问权限不是public,所以事务就不生效啦,因为Spring事务是由AOP机制实现的
AOP机制的本质就是动态代理,而代理的事务方法不是public的话,computeTransactionAttribute()就会返回null
也就是这时事务属性不存在了
c.解决方案
addTianLuo事务方法的访问权限修改为public
06.数据库的存储引擎不支持事务
a.示例
Spring事务的底层,还是依赖于数据库本身的事务支持
在MySQL中,MyISAM存储引擎是不支持事务的,InnoDB引擎才支持事务
因此开发阶段设计表的时候,确认你的选择的存储引擎是支持事务的
b.事务不生效的原因
在MySQL中,MyISAM存储引擎是不支持事务的,InnoDB引擎才支持事务
c.解决方案
选择InnoDB引擎
07.配置错误的 @Transactional 注解
a.示例
@Transactional(readOnly = true)
public void updateUser(User user) {
userDao.updateUser(user);
}
b.事务不生效的原因
虽然使用了@Transactional注解,但是注解中的readOnly=true属性指示这是一个只读事务,因此在更新User实体时会抛出异常
c.解决方案
将readOnly属性设置为false,或者移除了@Transactional注解中的readOnly属性
08.事务超时时间设置过短
a.示例
@Transactional(timeout = 1)
public void doSomething() {
//...
}
b.事务不生效的原因
在上面的例子中,timeout属性被设置为1秒,这意味着如果事务在1秒内无法完成,则报事务超时了
09.使用了错误的事务传播机制
a.示例
@Service
public class TianLuoServiceImpl {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void doInsertTianluo(TianLuo tianluo) throws Exception {
tianLuoMapper.save(tianluo);
tianLuoFlowMapper.saveFlow(buildFlowByTianLuo(tianluo));
}
}
b.事务不生效的原因
Propagation.NOT_SUPPORTED传播特性不支持事务
c.解决方案
选择正确的事务传播机制
10.rollbackFor属性配置错误
a.示例
@Service
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional(rollbackFor = Error.class)
public void addTianLuo(TianLuo tianluo) {
//保存tianluo数据库记录
tianLuoMapper.save(tianluo);
//保存tianluo流水数据库记录
tianLuoFlowMapper.saveFlow(tianluo);
//模拟异常抛出
throw new Exception();
}
}
b.事务不生效的原因
其实rollbackFor属性指定的异常必须是Throwable或者其子类
默认情况下,RuntimeException和Error两种异常都是会自动回滚的
但是因为以上的代码例子,指定了rollbackFor = Error.class,但是抛出的异常又是Exception
而Exception和Error没有任何什么继承关系,因此事务就不生效
c.解决方案
rollbackFor属性指定的异常与抛出的异常匹配
11.事务注解被覆盖导致事务失效
a.示例
public interface MyRepository {
@Transactional
void save(String data);
}
public class MyRepositoryImpl implements MyRepository {
@Override
public void save(String data) {
// 数据库操作
}
}
public class MyService {
@Autowired
private MyRepository myRepository;
@Transactional
public void doSomething(String data) {
myRepository.save(data);
}
}
public class MyTianluoService extends MyService {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void doSomething(String data) {
super.doSomething(data);
}
}
b.事务不生效的原因
MyTianluoService是MyService的子类,并且覆盖了doSomething()方法
在该方法中,使用了不同的传播行为(REQUIRES_NEW)来覆盖父类的@Transactional注解
在这种情况下,当调用MyTianluoService的doSomething()方法时,由于子类方法中的注解覆盖了父类的注解
Spring框架将不会在父类的方法中启动事务。因此,当MyRepository的save()方法被调用时,事务将不会被启动
也不会回滚。这将导致数据不一致的问题,因为在MyRepository的save()方法中进行的数据库操作将不会回滚
12.嵌套事务的坑
a.示例
@Service
public class TianLuoServiceInOutService {
@Autowired
private TianLuoFlowService tianLuoFlowService;
@Autowired
private TianLuoMapper tianLuoMapper;
@Transactional
public void addTianLuo(TianLuo tianluo) throws Exception {
tianLuoMapper.save(tianluo);
tianLuoFlowService.saveFlow(tianluo);
}
}
@Service
public class TianLuoFlowService {
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional(propagation = Propagation.NESTED)
public void saveFlow(TianLuo tianLuo) {
tianLuoFlowMapper.save(tianLuo);
throw new RuntimeException();
}
}
b.事务不生效的原因
如果saveFlow出现运行时异常,会继续往上抛,到外层addTianLuo的方法,导致tianLuoMapper.save也会回滚啦
c.如果不想因为被内部嵌套的事务影响,可以用try-catch包住,如下
@Transactional
public void addTianLuo(TianLuo tianluo) throws Exception {
tianLuoMapper.save(tianluo);
try {
tianLuoFlowService.saveFlow(tianluo);
} catch (Exception e) {
log.error("save tian luo flow fail,message:{}",e.getMessage());
}
}
13.事务多线程调用
a.示例
@Service
public class TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowService tianLuoFlowService;
@Transactional
public void addTianLuo(TianLuo tianluo) {
//保存tianluo数据库记录
tianLuoMapper.save(tianluo);
//多线程调用
new Thread(() -> {
tianLuoFlowService.saveFlow(tianluo);
}).start();
}
}
@Service
public class TianLuoFlowService {
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
public void save(TianLuo tianLuo) {
tianLuoFlowMapper.saveFlow(tianLuo);
}
}
b.事务不生效的原因
这是因为Spring事务是基于线程绑定的,每个线程都有自己的事务上下文,而多线程环境下可能会存在多个线程共享
同一个事务上下文的情况,导致事务不生效。Spring事务管理器通过使用线程本地变量(ThreadLocal)来实现线程安全
-----------------------------------------------------------------------------------------------------
在Spring事务管理器中,通过TransactionSynchronizationManager类来管理事务上下文
TransactionSynchronizationManager内部维护了一个ThreadLocal对象,用来存储当前线程的事务上下文
在事务开始时,TransactionSynchronizationManager会将事务上下文绑定到当前线程的ThreadLocal对象中
当事务结束时,TransactionSynchronizationManager会将事务上下文从ThreadLocal对象中移除
14.异常被捕获并处理了,没有重新抛出
a.示例
@Service
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
public void addTianLuo(TianLuo tianluo) {
try {
//保存tianluo数据库记录
tianLuoMapper.save(tianluo);
//保存tianluo flow数据库记录
tianLuoFlowMapper.saveFlow(tianluo);
} catch (Exception e) {
log.error("add TianLuo error,id:{},message:{}", tianluo.getId(),e.getMessage());
}
}
}
b.事务不生效的原因
事务中的异常已经被业务代码捕获并处理,而没有被正确地传播回事务管理器,事务将无法回滚
c.解决方案
在spring事务方法中,当我们使用了try-catch,如果catch住异常,记录完异常日志什么的,一定要重新把异常抛出来
-----------------------------------------------------------------------------------------------------
@Service
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional(rollbackFor = Exception.class)
public void addTianLuo(TianLuo tianluo) {
try {
//保存tianluo数据库记录
tianLuoMapper.save(tianluo);
//保存tianluo flow数据库记录
tianLuoFlowMapper.saveFlow(tianluo);
} catch (Exception e) {
log.error("add TianLuo error,id:{},message:{}", tianluo.getId(),e.getMessage());
throw e;
}
}
}
15.手动抛了别的异常
a.示例
@Service
public class TianLuoServiceImpl implements TianLuoService {
@Autowired
private TianLuoMapper tianLuoMapper;
@Autowired
private TianLuoFlowMapper tianLuoFlowMapper;
@Transactional
public void addTianLuo(TianLuo tianluo) throws Exception {
//保存tianluo数据库记录
tianLuoMapper.save(tianluo);
//保存tianluo流水数据库记录
tianLuoFlowMapper.saveFlow(tianluo);
throw new Exception();
}
}
b.事务不生效的原因
手动抛了Exception异常,但是是不会回滚的
因为Spring默认只处理RuntimeException和Error,对于普通的Exception不会回滚,除非,用rollbackFor属性指定配置
c.解决方案
解决方案:添加属性配置@Transactional(rollbackFor = Exception.class)
注解为事务范围的方法中,事务的回滚仅仅对于unchecked的异常有效。对于checked异常无效。也就是说事务回滚仅仅发生在,出现RuntimeException或Error的时候
通俗一点就是:代码中出现的空指针等异常,会被回滚。而文件读写、网络超时问题等,spring就没法回滚了
3.4 [2]事务不生效:7个
00.汇总
1.访问权限问题
2.方法用final修饰
3.方法内部调用
4.未被spring管理
5.多线程调用
6.表不支持事务
7.未开启事务
01.访问权限问题
a.问题描述
Java的访问权限主要有四种:private、default、protected、public,它们的权限从左到右,依次变大。
如果在开发过程中,把某些事务方法定义了错误的访问权限,就会导致事务功能出问题。
b.示例代码
@Service
public class UserService {
@Transactional
private void add(UserModel userModel) {
saveData(userModel);
updateData(userModel);
}
}
-----------------------------------------------------------------------------------------------------
我们可以看到add方法的访问权限被定义成了private,这样会导致事务失效,spring要求被代理方法必须是public的。
c.源码分析
在AbstractFallbackTransactionAttributeSource类的computeTransactionAttribute方法中有个判断,如果目标方法不是public,则TransactionAttribute返回null,即不支持事务。
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
if (specificMethod != method) {
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
d.结论
如果自定义的事务方法(即目标方法),它的访问权限不是public,而是private、default或protected的话,spring则不会提供事务功能。
02.方法用final修饰
a.问题描述
有时候,某个方法不想被子类重写,这时可以将该方法定义成final的。普通方法这样定义是没问题的,但如果将事务方法定义成final,会导致事务失效。
b.示例代码
@Service
public class UserService {
@Transactional
public final void add(UserModel userModel){
saveData(userModel);
updateData(userModel);
}
}
c.原因
如果某个方法用final修饰了,那么在它的代理类中,就无法重写该方法,而添加事务功能。
注意:如果某个方法是static的,同样无法通过动态代理,变成事务方法。
03.方法内部调用
a.问题描述
有时候我们需要在某个Service类的某个方法中,调用另外一个事务方法。
b.示例代码
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
public void add(UserModel userModel) {
userMapper.insertUser(userModel);
updateStatus(userModel);
}
@Transactional
public void updateStatus(UserModel userModel) {
doSameThing();
}
}
-----------------------------------------------------------------------------------------------------
在事务方法add中,直接调用事务方法updateStatus。updateStatus方法拥有事务的能力是因为spring aop生成代理了对象,但是这种方法直接调用了this对象的方法,所以updateStatus方法不会生成事务。
c.解决方案
i.新加一个Service方法
@Servcie
public class ServiceA {
@Autowired
prvate ServiceB serviceB;
public void save(User user) {
queryData1();
queryData2();
serviceB.doSave(user);
}
}
@Servcie
public class ServiceB {
@Transactional(rollbackFor=Exception.class)
public void doSave(User user) {
addData1();
updateData2();
}
}
b.在该Service类中注入自己
@Servcie
public class ServiceA {
@Autowired
prvate ServiceA serviceA;
public void save(User user) {
queryData1();
queryData2();
serviceA.doSave(user);
}
@Transactional(rollbackFor=Exception.class)
public void doSave(User user) {
addData1();
updateData2();
}
}
这种做法不会出现循环依赖问题。
c.通过AopContent类
在该Service类中使用AopContext.currentProxy()获取代理对象
@Servcie
public class ServiceA {
public void save(User user) {
queryData1();
queryData2();
((ServiceA)AopContext.currentProxy()).doSave(user);
}
@Transactional(rollbackFor=Exception.class)
public void doSave(User user) {
addData1();
updateData2();
}
}
04.未被spring管理
a.问题描述
使用spring事务的前提是:对象要被spring管理,需要创建bean实例。
b.示例代码
//@Service
public class UserService {
@Transactional
public void add(UserModel userModel) {
saveData(userModel);
updateData(userModel);
}
}
-----------------------------------------------------------------------------------------------------
UserService类没有加@Service注解,那么该类不会交给spring管理,所以它的add方法也不会生成事务。
05.多线程调用
a.问题描述
在实际项目开发中,多线程的使用场景还是挺多的。如果spring事务用在多线程场景中,会有问题。
b.示例代码
@Slf4j
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RoleService roleService;
@Transactional
public void add(UserModel userModel) throws Exception {
userMapper.insertUser(userModel);
new Thread(() -> {
roleService.doOtherThing();
}).start();
}
}
@Service
public class RoleService {
@Transactional
public void doOtherThing() {
System.out.println("保存role表数据");
}
}
-----------------------------------------------------------------------------------------------------
事务方法add中,调用了事务方法doOtherThing,但是事务方法doOtherThing是在另外一个线程中调用的。
这样会导致两个方法不在同一个线程中,获取到的数据库连接不一样,从而是两个不同的事务。
c.原因
spring的事务是通过数据库连接来实现的。当前线程中保存了一个map,key是数据源,value是数据库连接。
-----------------------------------------------------------------------------------------------------
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
-----------------------------------------------------------------------------------------------------
同一个事务是指同一个数据库连接,只有拥有同一个数据库连接才能同时提交和回滚。
06.表不支持事务
a.问题描述
在mysql5之前,默认的数据库引擎是myisam,它不支持事务。
b.示例代码
CREATE TABLE `category` (
`id` bigint NOT NULL AUTO_INCREMENT,
`one_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
`two_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
`three_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
`four_category` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
-----------------------------------------------------------------------------------------------------
myisam不支持事务、行锁和外键。
c.结论
在实际业务场景中,myisam使用的并不多。在mysql5以后,myisam已经逐渐退出了历史的舞台,取而代之的是innodb。
07.未开启事务
a.问题描述
有时候,事务没有生效的根本原因是没有开启事务。
b.解决方案
a.springboot项目
springboot通过DataSourceTransactionManagerAutoConfiguration类,已经默默的帮你开启了事务。
只需要配置spring.datasource相关参数即可。
b.传统的spring项目
在applicationContext.xml文件中,手动配置事务相关参数。
-------------------------------------------------------------------------------------------------
<!-- 配置事务管理器 -->
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" id="transactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>
<tx:advice id="advice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
<!-- 用切点把事务切进去 -->
<aop:config>
<aop:pointcut expression="execution(* com.susan.*.*(..))" id="pointcut"/>
<aop:advisor advice-ref="advice" pointcut-ref="pointcut"/>
</aop:config>
---
如果在pointcut标签中的切入点匹配规则,配错了的话,有些类的事务也不会生效。
3.5 [2]事务不回滚:5个
00.汇总
1.错误的传播特性
2.自己吞了异常
3.手动抛了别的异常
4.自定义了回滚异常
5.嵌套事务回滚多了
01.错误的传播特性
a.传播特性介绍
@Transactional注解可以指定propagation参数来设置事务的传播特性。Spring支持7种传播特性:
a.REQUIRED
如果当前上下文中存在事务,那么加入该事务,如果不存在事务,创建一个事务,这是默认的传播属性值。
b.SUPPORTS
如果当前上下文存在事务,则支持事务加入事务,如果不存在事务,则使用非事务的方式执行。
c.MANDATORY
如果当前上下文中存在事务,否则抛出异常。
d.REQUIRES_NEW
每次都会新建一个事务,并且同时将上下文中的事务挂起,执行当前新建事务完成以后,上下文事务恢复再执行。
e.NOT_SUPPORTED
如果当前上下文中存在事务,则挂起当前事务,然后新的方法在没有事务的环境中执行。
f.NEVER
如果当前上下文中存在事务,则抛出异常,否则在无事务环境上执行代码。
g.NESTED
如果当前上下文中存在事务,则嵌套事务执行,如果不存在事务,则新建事务。
b.错误示例
如果传播特性设置错误,例如设置为Propagation.NEVER,则不支持事务,如果有事务则会抛异常。
---
@Service
public class UserService {
@Transactional(propagation = Propagation.NEVER)
public void add(UserModel userModel) {
saveData(userModel);
updateData(userModel);
}
}
02.自己吞了异常
a.问题描述
事务不会回滚的常见问题是开发者在代码中手动try...catch了异常。
b.错误示例
@Slf4j
@Service
public class UserService {
@Transactional
public void add(UserModel userModel) {
try {
saveData(userModel);
updateData(userModel);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
c.解决方案
如果想要Spring事务能够正常回滚,必须抛出它能够处理的异常。如果没有抛异常,则Spring认为程序是正常的。
03.手动抛了别的异常
a.问题描述
即使开发者没有手动捕获异常,但如果抛的异常不正确,Spring事务也不会回滚。
b.错误示例
@Slf4j
@Service
public class UserService {
@Transactional
public void add(UserModel userModel) throws Exception {
try {
saveData(userModel);
updateData(userModel);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new Exception(e);
}
}
}
c.解决方案
Spring事务默认情况下只会回滚RuntimeException(运行时异常)和Error(错误),对于普通的Exception(非运行时异常),它不会回滚。
04.自定义了回滚异常
a.问题描述
在使用@Transactional注解声明事务时,有时我们想自定义回滚的异常,可以通过设置rollbackFor参数来完成。
b.错误示例
@Slf4j
@Service
public class UserService {
@Transactional(rollbackFor = BusinessException.class)
public void add(UserModel userModel) throws Exception {
saveData(userModel);
updateData(userModel);
}
}
c.解决方案
如果程序抛出的异常不属于指定的BusinessException,事务不会回滚。建议将rollbackFor参数设置成Exception或Throwable。
05.嵌套事务回滚多了
a.问题描述
使用嵌套事务时,原本希望只回滚内部事务,但实际情况是整个事务都回滚了。
b.错误示例
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RoleService roleService;
@Transactional
public void add(UserModel userModel) throws Exception {
userMapper.insertUser(userModel);
roleService.doOtherThing();
}
}
@Service
public class RoleService {
@Transactional(propagation = Propagation.NESTED)
public void doOtherThing() {
System.out.println("保存role表数据");
}
}
c.解决方案
将内部嵌套事务放在try/catch中,并且不继续往上抛异常。
@Slf4j
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RoleService roleService;
@Transactional
public void add(UserModel userModel) throws Exception {
userMapper.insertUser(userModel);
try {
roleService.doOtherThing();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
3.6 [3]使用@Transactional
01.全部属性
a.配置
属性 类型 描述
value String 可选的限定描述符,指定使用的事务管理器
propagation enum: Propagation 可选的事务传播行为设置
isolation enum: Isolation 可选的事务隔离级别设置
readOnly boolean 读写或只读事务,默认读写
timeout int (in seconds granularity) 事务超时时间设置
rollbackFor Class对象数组,必须继承自Throwable 导致事务回滚的异常类数组
rollbackForClassName 类名数组,必须继承自Throwable 导致事务回滚的异常类名字数组
noRollbackFor Class对象数组,必须继承自Throwable 不会导致事务回滚的异常类数组
noRollbackForClassName 类名数组,必须继承自Throwable 不会导致事务回滚的
b.示例
@Transactional(
propagation = Propagation.REQUIRES_NEW,
isolation = Isolation.READ_COMMITTED,
rollbackFor = {SQLException.class},
timeout = 30
)
public void updateUserProfile(User user) {
// 业务逻辑
}
02.注意事项
a.普通的Exception不会回滚
解决方案:添加属性配置@Transactional(rollbackFor = Exception.class)
Spring默认只处理RuntimeException和Error,对于普通的Exception不会回滚,除非,用rollbackFor属性指定配置
b.文件读写、网络超时问题,spring就没法回滚了
注解为事务范围的方法中,事务的回滚仅仅对于unchecked的异常有效,对于checked异常无效
也就是说事务回滚仅仅发生在,出现RuntimeException或Error的时候
通俗一点就是:代码中出现的空指针等异常,回滚;文件读写、网络超时问题等,无法回滚
03.默认行为和局限性
a.默认行为
a.异常捕获
只对 RuntimeException 和 Error 触发回滚
b.代码实践
@Service
public class DefaultBehaviorService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void defaultRollback() throws Exception {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "Charlie", 35);
// 抛出非 RuntimeException,不会回滚
throw new Exception("Not a RuntimeException");
}
@Transactional(rollbackFor = Exception.class)
public void explicitRollback() throws Exception {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "David", 40);
// 显式指定 rollbackFor,回滚生效
throw new Exception("Explicit rollback");
}
}
c.结果:
defaultRollback: 事务不回滚,数据插入成功
explicitRollback: 事务回滚,数据插入被撤销
b.局限性及代码实践
a.方法可见性
a.说明
仅对 public 方法生效
b.代码
@Service
public class VisibilityService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
private void privateMethod() {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "Eve", 45);
throw new RuntimeException("Test rollback");
}
public void callPrivateMethod() {
privateMethod(); // 直接调用,事务不生效
}
}
c.结果
privateMethod 的 @Transactional 被忽略,数据插入成功
b.自调用
a.说明
同一个类内方法调用,事务失效
b.代码
@Service
public class SelfInvocationService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void methodA() {
methodB(); // 自调用,事务不生效
}
@Transactional
public void methodB() {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "Frank", 50);
throw new RuntimeException("Test rollback");
}
}
c.结果
methodB 的 @Transactional 不生效,数据插入成功
d.解决办法:使用自注入
@Service
public class SelfInvocationFixService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private SelfInvocationFixService self;
public void methodA() {
self.methodB(); // 通过代理调用,事务生效
}
@Transactional
public void methodB() {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "Grace", 55);
throw new RuntimeException("Test rollback");
}
}
结果: 事务回滚,数据插入被撤销
c.代理对象
a.说明
非代理对象调用,事务失效
b.代码
public class ProxyIssueExample {
public static void main(String[] args) {
DeclarativeTransactionService service = new DeclarativeTransactionService();
service.updateDataWithTransaction(); // 直接 new,事务不生效
}
}
c.结果
事务不生效,需通过 Spring 容器获取代理对象
04.TransactionManager 和 TransactionAspectSupport 结合触发回滚
a.事务管理器
a.说明
编程式事务直接使用 TransactionManager
b.代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
@Service
public class TransactionManagerService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private PlatformTransactionManager transactionManager;
public void manualTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(def);
try {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "Helen", 60);
throw new RuntimeException("Test rollback");
} catch (Exception e) {
transactionManager.rollback(status); // 手动回滚
throw e;
}
// 如果没有异常,手动提交
// transactionManager.commit(status);
}
}
c.结果
事务回滚,数据插入被撤销
b.TransactionAspectSupport
a.说明
@Transactional
b.代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
@Service
public class TransactionAspectService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void forceRollback() {
jdbcTemplate.update("INSERT INTO user (name, age) VALUES (?, ?)", "Ivy", 65);
// 手动触发回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
}
c.结果
事务回滚,数据插入被撤销
d.说明
@Transactional 通过 TransactionInterceptor(继承自 TransactionAspectSupport)
调用 TransactionManager 管理事务,异常或手动标记回滚时触发 rollback()
c.SpringBoot配置
a.说明
自动配置
b.代码
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
@Configuration
public class TransactionConfig {
@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
c.结果
SpringBoot默认自动配置 TransactionManager,无需手动定义,除非需要自定义
3.7 [3]禁用@Transactional
01.回答
长事务、嵌套调用、复杂度增加
02.3种常用原因
a.长事务
在方法上增加@Transaction声明式事务
如果一个方法中存在较多耗时的操作,很容易引发长事务的问题
而长事务会带来锁的竞争,影响性能,同时也会导致数据库的连接池被消耗尽,影响到程序的正常运行
b.嵌套调用
如果方法存在嵌套调用,而被嵌套调用的方法也声明了@Transactional
那么这个时候就会引起事务混乱,造成程序运行结果异常
c.复杂度增加
@Transactional声明式事务,是將事物控制逻辑放在注解里面
如果项目中的复杂度增加,事物的控制可能会变得更加复索,导致代码的可读性和维护性下降
02.推荐使用编程式事务
a.第1种
通过Spring框架的TransactionTemplate
b.第2种
手动管理JDBC事务
3.8 [4]声明式事务:@Transactional
01.Spring事务:声明式事务
a.方式1
配置XML或注解
b.方式2
Spring的AOP注解
02.使用步骤
a.配置事务管理器
在Spring配置文件中定义一个事务管理器(如DataSourceTransactionManager)
b.启用事务管理
方式1:使用<tx:annotation-driven>在XML配置中启用注解驱动的事务管理
方式2:在Java配置类中使用@EnableTransactionManagement注解
c.标注事务方法
使用@Transactional注解标注需要事务管理的方法或类
03.使用Spring声明式事务
a.XML配置方式
<!-- 配置数据源 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/mydb"/>
<property name="username" value="root"/>
<property name="password" value="password"/>
</bean>
<!-- 配置事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 启用注解驱动的事务管理 -->
<tx:annotation-driven transaction-manager="transactionManager"/>
b.Java配置方式
@Configuration
@EnableTransactionManagement
public class AppConfig {
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/mydb");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
c.使用@Transactional注解
@Service
public class MyService {
@Transactional
public void performTransaction() {
// 业务逻辑代码
}
}
3.9 [4]编程式事务:TransactionTemplate
00.汇总
a.替代
@Transactional 替代为 TransactionTemplate、PlatformTransactionManager
b.对比
特性 PlatformTransactionManager TransactionTemplate
使用方式 低级别编程式事务控制,手动管理事务生命周期 高级别封装,简化事务控制,自动管理事务生命周期
代码复杂度 需要手动管理事务的开始、提交、回滚 简化了事务控制,只关注业务逻辑部分
灵活性 灵活,可以做更复杂的事务控制 适用于常见的事务操作,简化了代码但减少了灵活性
使用场景 需要精确控制事务的场景(例如复杂的事务管理) 简单的事务管理场景,减少了样板代码
代码示例 需要手动编写事务开启、提交、回滚的代码 通过 execute 方法简化事务操作代码
使用建议 Spring的底层事务管理接口 对PlatformTransactionManager的封装
01.@Transactional
@Service
public class TransactionalService {
@Transactional(rollbackFor = Exception.class)
public Boolean service() {
// 1.查询表
queryDTable1();
// 2.请求外部服务
outerServiceA();
outerServiceB();
outerServiceC();
// 3.更新表
updateTable1();
updateTable2();
return true;
}
}
02.TransactionTemplate
@Service
public class TransactionalService {
@Resource
private TransactionTemplate transactionTemplate;
public Boolean service() {
// 1.查询表
queryDTable1();
// 2.请求外部服务
outerServiceA();
outerServiceB();
outerServiceC();
transactionTemplate.execute((transactionStatus) -> {
try {
// 3.更新表
updateTable1();
updateTable2();
} catch (Exception e) {
transactionTemplate.setRollbackOnly();
log.error("更新失败!");
}
return transactionStatus;
});
return true;
}
}
03.PlatformTransactionManager
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
@Service
public class MyService {
@Autowired
private PlatformTransactionManager transactionManager;
public void executeBatchOperationsManually() {
// 创建事务定义,设置事务传播行为等
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); // 默认为 REQUIRED
TransactionStatus status = transactionManager.getTransaction(def); // 开始事务
try {
// 执行第一批操作
boolean vFlag11 = handleBatchOperation(sysUserUpdateData, "upsertSysUserBatch");
boolean vFlag12 = handleBatchOperation(sysThirdAccountUpdateData, "upsertSysThirdAccountBatch");
// 执行第二批操作
boolean vFlag21 = handleBatchOperation(sysUserDeleteData, "bulkDeleteSysUser");
boolean vFlag22 = handleBatchOperation(sysThirdAccountDeleteData, "bulkDeleteSysThirdAccount");
// 执行第三批操作
boolean vFlag31 = handleBatchOperation(sysUserInsertData, "upsertSysUserBatch");
boolean vFlag32 = handleBatchOperation(sysThirdAccountInsertData, "upsertSysThirdAccountBatch");
// 模拟异常
if (true) {
throw new RuntimeException("模拟事务失败");
}
// 根据返回值进行业务处理
if (!vFlag11 || !vFlag12 || !vFlag21 || !vFlag22 || !vFlag31 || !vFlag32) {
throw new RuntimeException("批量操作失败,事务回滚");
}
// 如果所有操作成功,提交事务
transactionManager.commit(status);
} catch (Exception e) {
// 如果发生异常,回滚事务
transactionManager.rollback(status);
throw new RuntimeException("事务回滚,操作失败", e);
}
}
}
3.10 [5]长事务:6条
00.汇总
1.少用@Transactional注解
2.将查询(select)方法放到事务外
3.事务中避免远程调用
4.事务中避免一次性处理太多数据
5.非事务执行
6.异步处理
01.少用@Transactional注解
a.问题描述
在实际项目开发中,我们在业务方法加上@Transactional注解开启事务功能,这是非常普遍的做法,它被称为声明式事务。
b.示例代码
@Transactional(rollbackFor=Exception.class)
public void save(User user) {
doSameThing...
}
c.原因
@Transactional注解是通过spring的aop起作用的,但是如果使用不当,事务功能可能会失效。
@Transactional注解一般加在某个业务方法上,会导致整个业务方法都在同一个事务中,粒度太粗,不好控制事务范围,是出现大事务问题的最常见的原因。
d.解决方案
使用编程式事务,在spring项目中使用TransactionTemplate类的对象,手动执行事务。
-----------------------------------------------------------------------------------------------------
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
transactionTemplate.execute((status) -> {
doSameThing...
return Boolean.TRUE;
})
}
-----------------------------------------------------------------------------------------------------
使用TransactionTemplate的编程式事务功能自己灵活控制事务的范围,是避免大事务问题的首选办法。
02.将查询(select)方法放到事务外
a.问题描述
如果出现大事务,可以将查询(select)方法放到事务外,因为一般情况下这类方法是不需要事务的。
b.示例代码
@Transactional(rollbackFor=Exception.class)
public void save(User user) {
queryData1();
queryData2();
addData1();
updateData2();
}
c.解决方案
将queryData1和queryData2两个查询方法放在事务外执行,将真正需要事务执行的代码才放到事务中。
-----------------------------------------------------------------------------------------------------
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
queryData1();
queryData2();
transactionTemplate.execute((status) -> {
addData1();
updateData2();
return Boolean.TRUE;
})
}
03.事务中避免远程调用
a.问题描述
我们在接口中调用其他系统的接口是不能避免的,由于网络不稳定,这种远程调的响应时间可能比较长,如果远程调用的代码放在某个事物中,这个事物就可能是大事务。
b.示例代码
@Transactional(rollbackFor=Exception.class)
public void save(User user) {
callRemoteApi();
addData1();
}
c.解决方案
远程调用的代码可能耗时较长,切记一定要放在事务之外。
-----------------------------------------------------------------------------------------------------
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
callRemoteApi();
transactionTemplate.execute((status) -> {
addData1();
return Boolean.TRUE;
})
}
-----------------------------------------------------------------------------------------------------
远程调用的代码不放在事务中如何保证数据一致性呢?这就需要建立:重试+补偿机制,达到数据最终一致性。
04.事务中避免一次性处理太多数据
a.问题描述
如果一个事务中需要处理的数据太多,也会造成大事务问题。比如为了操作方便,你可能会一次批量更新1000条数据,这样会导致大量数据锁等待,特别在高并发的系统中问题尤为明显。
b.解决方案
分页处理,1000条数据,分50页,一次只处理20条数据,这样可以大大减少大事务的出现。
05.非事务执行
a.问题描述
在使用事务之前,我们都应该思考一下,是不是所有的数据库操作都需要在事务中执行?
b.示例代码
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
transactionTemplate.execute((status) -> {
addData();
addLog();
updateCount();
return Boolean.TRUE;
})
}
c.解决方案
其实addLog增加操作日志方法 和 updateCount更新统计数量方法,是可以不在事务中执行的
因为操作日志和统计数量这种业务允许少量数据不一致的情况。
-----------------------------------------------------------------------------------------------------
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
transactionTemplate.execute((status) -> {
addData();
return Boolean.TRUE;
})
addLog();
updateCount();
}
06.异步处理
a.问题描述
是不是事务中的所有方法都需要同步执行?我们都知道,方法同步执行需要等待方法返回,如果一个事务中同步执行的方法太多了,势必会造成等待时间过长,出现大事务问题。
b.示例代码
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
transactionTemplate.execute((status) -> {
order();
delivery();
return Boolean.TRUE;
})
}
c.解决方案
发货功能其实可以走mq异步处理逻辑。
@Autowired
private TransactionTemplate transactionTemplate;
...
public void save(final User user) {
transactionTemplate.execute((status) -> {
order();
return Boolean.TRUE;
})
sendMq();
}
3.11 [5]长事务:LazyConnectionDataSourceProxy
01.定义
LazyConnectionDataSourceProxy是Spring框架中的一个代理数据源,它延迟获取数据库连接,直到真正需要时才获取
这种延迟获取机制可以有效减少数据库连接的占用时间,特别是在事务开始后但未立即执行数据库操作的情况下
02.原理
延迟连接:在事务开始时,LazyConnectionDataSourceProxy不会立即获取数据库连接,而是在第一次需要数据库操作时才获取连接
连接释放:在事务结束时,连接会被释放回连接池,从而减少连接的占用时间
事务管理:与Spring的事务管理器结合使用,确保事务的正确性和连接的有效管理
03.使用步骤
配置数据源:在Spring配置中定义LazyConnectionDataSourceProxy,并将其作为应用程序的数据源
配置事务管理器:使用Spring的事务管理器,确保事务的正确性
使用Spring的事务注解:在需要事务管理的方法上使用@Transactional注解
04.代码示例
a.配置类:配置数据源LazyConnectionDataSourceProxy和事务管理器
-----------------------------------------------------------------------------------------------------
import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
@Configuration
@EnableTransactionManagement
public class DataSourceConfig {
@Bean
public DataSource actualDataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/mydb");
dataSource.setUsername("user");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public LazyConnectionDataSourceProxy dataSource(DataSource actualDataSource) {
return new LazyConnectionDataSourceProxy(actualDataSource);
}
@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
b.使用Spring的事务注解:在需要事务管理的方法上使用@Transactional注解
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class MyService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void performDatabaseOperations() {
// 数据库操作1
jdbcTemplate.update("INSERT INTO my_table (name) VALUES (?)", "John Doe");
// 其他业务逻辑
// 数据库操作2
jdbcTemplate.update("UPDATE my_table SET name = ? WHERE id = ?", "Jane Doe", 1);
}
}
c.说明
@Configuration:标识一个配置类,Spring会在启动时自动扫描并加载这些配置
@EnableTransactionManagement:启用Spring的事务管理功能
@Bean:定义一个Bean,Spring会自动管理其生命周期
LazyConnectionDataSourceProxy:延迟获取数据库连接,优化资源使用
@Transactional:标识一个方法为事务性方法,Spring会自动管理事务的开始和结束
3.12 [5]异步事务:CompletableFuture
01.定义
异步事务是指在异步操作中确保事务的原子性、一致性、隔离性和持久性(ACID)
在异步环境中,事务的管理变得更加复杂,因为操作可能在不同的线程中执行
02.原理
事务管理:在异步操作中,事务管理器需要确保所有相关操作要么全部成功,要么全部失败
异常处理:在异步操作中,异常处理变得更加重要,需要捕获并处理所有可能的异常,以确保事务的一致性
组合操作:使用 CompletableFuture 的组合方法(如 thenCompose 和 thenCombine)来管理多个异步操作
03.常用API
supplyAsync(Supplier<U> supplier): 异步执行一个任务并返回结果
thenApply(Function<? super T,? extends U> fn): 对结果进行转换
thenCompose(Function<? super T,? extends CompletionStage<U>> fn): 链接另一个异步任务
exceptionally(Function<Throwable,? extends T> fn): 异常处理
handle(BiFunction<? super T, Throwable, ? extends U> fn): 处理结果或异常
04.使用步骤
定义异步任务:使用 CompletableFuture.supplyAsync() 定义异步任务
组合任务:使用 thenCompose 或 thenCombine 组合多个异步任务
处理结果和异常:使用 handle 或 exceptionally 处理任务结果和异常
确保事务一致性:在每个步骤中确保事务的一致性,必要时进行回滚
05.代码示例
a.说明
假设我们有一个银行转账的场景,需要确保转账操作的事务性
b.代码
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AsyncTransactionExample {
public static void main(String[] args) {
CompletableFuture<Void> transaction = transferMoney(100);
try {
transaction.get(); // 等待事务完成
System.out.println("Transaction completed successfully.");
} catch (InterruptedException | ExecutionException e) {
System.out.println("Transaction failed: " + e.getMessage());
}
}
public static CompletableFuture<Void> transferMoney(int amount) {
return CompletableFuture.supplyAsync(() -> {
// Step 1: 扣减账户 A 的金额
boolean debitSuccess = debitAccount("AccountA", amount);
if (!debitSuccess) {
throw new RuntimeException("Failed to debit Account A");
}
return amount;
}).thenCompose(debitedAmount -> CompletableFuture.supplyAsync(() -> {
// Step 2: 增加账户 B 的金额
boolean creditSuccess = creditAccount("AccountB", debitedAmount);
if (!creditSuccess) {
throw new RuntimeException("Failed to credit Account B");
}
return null;
})).exceptionally(ex -> {
// 异常处理和事务回滚
System.out.println("Exception occurred: " + ex.getMessage());
rollbackTransaction();
return null;
});
}
private static boolean debitAccount(String account, int amount) {
// 模拟扣减账户金额
System.out.println("Debiting " + amount + " from " + account);
return true; // 假设成功
}
private static boolean creditAccount(String account, int amount) {
// 模拟增加账户金额
System.out.println("Crediting " + amount + " to " + account);
return true; // 假设成功
}
private static void rollbackTransaction() {
// 模拟事务回滚
System.out.println("Rolling back transaction");
}
}
3.13 [5]普通事务:嵌套ReentrantLock锁会失效
00.汇总
a.说明
处理并发和超卖问题时,理解并合理运用锁机制和事务管理至关重要
通过将锁操作置于事务边界内,可以有效防止数据不一致,确保系统的稳定性和可靠性
在实际应用中,根据业务特性和性能要求选择最合适的解决方案是关键
b.解决
乐观锁:通过版本号或时间戳检查数据是否已被其他事务修改,适用于读多写少的场景
悲观锁:预先锁定数据直至事务完成,适合写操作频繁或数据竞争激烈的场景
分布式锁:如Redisson,确保分布式系统中数据的一致性,适用于跨节点的数据同步
代码级锁:利用synchronized或ReentrantLock等机制,控制线程间的访问顺序,防止并发冲突
c.事务套锁失效问题
问题:在锁外面加入了事务@Transactional,导致锁失效
原因:事务边界问题,锁的获取和释放不在事务边界之内,可能导致数据不一致
d.事务边界的重要性
关键点:确保锁的获取和释放严格位于事务边界内,避免数据在事务未完成前被其他线程修改
实践:使用try-finally结构包裹锁的获取和释放,确保即使发生异常,锁也能正确释放,维护数据完整性
01.什么是超卖?
a.定义
超卖是指系统允许多个用户购买或预订超过实际可用数量的资源。例如,库存数量为1的商品被卖给了2个人
b.示例
在空调租赁场景中,100个人同时充值使用时长,导致最终使用时长小于预期的100小时
02.超卖如何解决?
a.解决方案
乐观锁:通过版本号字段来实现,更新时检查版本号是否匹配
悲观锁:在数据读取时锁定数据,使用SELECT ... FOR UPDATE
分布式锁:使用如Redisson的分布式锁
代码锁:使用ReentrantLock或synchronized关键字
b.示例代码
a.说明
使用ReentrantLock解决超卖问题
b.代码
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DeviceService {
private Lock lock = new ReentrantLock();
public void updateDeviceByLock(Long deviceId) {
lock.lock();
try {
Device device = this.getById(deviceId);
LambdaUpdateWrapper<Device> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(Device::getId, deviceId);
updateWrapper.set(Device::getUseTimes, device.getUseTimes() + 1);
this.update(updateWrapper);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
03.事务套锁失效问题
a.问题
在锁外面加入了事务@Transactional,导致锁失效
b.原因
事务边界问题
锁的获取和释放不在事务边界之内,可能导致数据不一致
04.解决锁失效问题
a.解决方案
缩小事务边界,确保锁的获取和释放在事务边界之内
b.示例代码
a.说明
缩小事务边界以确保锁正常工作
b.代码
public class DeviceService {
private Lock lock = new ReentrantLock();
public void updateDeviceByReduce(Long deviceId) {
lock.lock();
try {
this.updateDevice(deviceId);
} finally {
lock.unlock();
}
}
@Transactional(rollbackFor = Exception.class)
public void updateDevice(Long deviceId) {
Device device = this.getById(deviceId);
LambdaUpdateWrapper<Device> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(Device::getId, deviceId);
updateWrapper.set(Device::getUseTimes, device.getUseTimes() + 1);
this.update(updateWrapper);
}
}
3.14 [5]普通事务:嵌套发送MQ消息和RPC调用
00.汇总
a.事务中嵌套发送 MQ 消息和 RPC 调用,会导致
事务回滚导致上下游数据不一致
增加事务执行时间,加大锁竞争,导致吞吐量下降(长事务)
下游无法反查到未提交的数据
b.解决方案
本地消息表+定时任务,由定时任务来发送 MQ 消息。实现简单,可靠,效果好
事务消息,依赖 RocketMQ 实现
监听 binlog,实现成本较高
c.总结
事务内应该只包含可靠的、可回滚数据。即,不要在事务中嵌套发送 MQ 消息和 RPC 调用
01.问题描述
在一个事务内,向 MySQL 写入数据,接下来发送 MQ 或 RPC 调用
在大部分情况下,这样写好像没什么问题,但如果此时我们下游执行反查操作,会发现找不到数据
02.存在问题
a.破坏事务原子性语义
数据库事务只能保证数据库操作的原子性(如 MySQL 的 InnoDB 事务),但无法控制外部系统的行为(如 MQ 或 RPC 服务)
事务成功提交,但 MQ 消息发送失败
事务提交失败,但 MQ 消息发送成功
我们所期望的事务原子性,就是操作要么全部执行成功,要么全部失败。以上两种情况,都将导致上下游数据不一致
b.长事务
MQ 和 RPC 通常是网络 I/O 操作,耗时可能会高于本地数据库操作
同时,网络环境是不稳定的,随时可能会出现延迟、不可用、丢包等等。这些因素将延长事务的执行时间,导致:
数据库锁竞争加剧,可能引发死锁
高并发场景下,RPC 耗时长,将增加 DB 连接池占用时间,降低系统吞吐量
c.下游无法反查到数据
用户支付后,需要创建订单,并发送 RPC 请求给权益中心,来加积分
事务提交前,权益中心需要反查数据,但因为事务隔离级别为读已提交以上,此时无法查询到还未提交事务的订单数据
那么 RPC 返回失败结果,导致本地事务无法提交。这就出现了个死循环——上游等待下游执行成功后才能提交事务
下游等待上游提交事务后才能返回执行成功
03.解决方案
a.保证事务提交和消息发送的时序问题
a.让消费者等一会
依旧是在事务中嵌套发送消息,不过消费者接收到消息时,主动 sleep 一定时间,再进行消费
或者发送延迟消息,保证消费者晚点再消费
目的是通过等待一定时间,保证消费者的消费行为发生在提交事务之后执行
缺点:延迟时间不好把控
b.在事务提交后再发消息
a.说明
在事务提交后,再发送 MQ 消息和 RPC 请求,保证事务提交
在发送 MQ 消息和 RPC 请求之前执行,避免它们在事务中嵌套
b.代码
public void craeteOrder(Order order) {
saveOrderByTransaction(order); // 通过事务写入订单
sendMQ(order); // 或者是发送 rpc,在事务之外执行
}
@Transaction // 只对 SQL 加事务
public void saveOrderByTransaction(order) {
saveOrder(order);
}
c.说明
需要判断事务是否成功提交,只有成功了,才发送消息
可以加个 if-else 解决,不过 Spring 给我们一个更优雅的解决方案:使用@TransactionalEventListener监听事务状态
d.代码
// OrderService
@Transactional
public void createOrder(Order order) {
// 假设订单创建成功后,发布事件
OrderCreatedEvent event = new OrderCreatedEvent(order);
eventPublisher.publishEvent(event);
saveOrder(order);
}
// OrderCreatedEventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
sendMQ(event);
}
b.本地事务和 MQ 消息的原子性
a.分布式事务
分布式事务可以解决本地事务和 MQ 消息的原子性问题
但会带来可靠性、性能、使用成本等问题,给系统带来额外的复杂性
b.事务消息
在 RocketMQ 中,支持事务消息,可以保证本地事务和 MQ 消息的原子性
执行逻辑如下:
生产者将消息发送至 Apache RocketMQ 服务端
Apache RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息
生产者开始执行本地事务逻辑
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者
二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者
c.本地消息表+定时任务
a.说明
在本地事务中,除了写入业务数据外,还要将要发送的 MQ 消息写入到 MySQL 的「消息表」中
而发送消息不再由业务代码决定,而是由后台定时任务来轮询「消息表」,定时发送消息
b.代码
// OrderService
@Transactional
public void createOrder(Order order) {
saveOrder(order);
saveOrderMesaage(order); // 写入本地消息表
// 不需要在代码写发送 MQ 消息的逻辑
}
// MessageSendTask
@Scheduled(fixedRate = 1000) // 每隔 1 秒执行一次
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
List<Message> messages = findMessages();
sendMQInBatches(messages);
// 接下来还要更新消息表中消息的发送状态
}
d.监听 binlog
a.说明
可以通过 canal 监听上游数据库的 binlog 日志,解析日志后发送到 MQ 中,由下游自行决定如何消费
b.优势
上游提交后才通知到相关系统,下游反查可以查到数据
可以保证本地事务和 MQ 消息的最终一致性。只有事务提交了,才有 binlog,才能发送 MQ 消息,给下游消费
解耦。业务只管正常写入数据就行,具体的发送 MQ 消息的操作不需要关心
c.缺点
实现复杂,需要额外维护监听 binlog 的第三方组件
c.避免反查
a.说明
消息中包含了消费者所需要的字段,即通过冗余字段,避免反查操作
那么对于下游,就不需要关心上游事务什么时候提交
b.缺点
使生产者的逻辑更复杂
增大的消息的体积,对网络带宽和 MQ 带来额外负担
MQ 的引入是为了解耦,即生产者不需要关心消费者是如何去使用数据的。如果生产者需要根据各类消费者定制消息,那么就会将生产者和消费者耦合在一起
在实际业务中,很多场景反查操作是不可避免的
4 Redis事务
4.1 命令
01.命令
MULTI:开始事务
EXEC:提交事务,执行排队的所有命令
DISCARD:放弃事务,清空队列中的命令
WATCH:监视一个或多个键,若这些键在事务执行前被修改,事务将被中止
02.示例
MULTI
SET key1 value1
SET key2 value2
EXEC
4.2 特点
01.单阶段提交
Redis事务通过MULTI和EXEC命令实现。事务中的所有命令在EXEC时一次性执行
在MULTI命令之后,所有的命令会被放入一个队列中,直到EXEC命令被调用,所有命令才会被顺序执行
02.原子性
Redis事务保证在EXEC命令执行时,事务中的所有命令要么全部执行,要么全部不执行
如果在EXEC之前,事务队列中的命令出现错误,整个事务会被放弃
03.没有回滚机制
Redis事务不支持回滚。如果事务中的某个命令执行失败,其他命令仍然会继续执行
这与传统的2PC或3PC不同,后者通常提供回滚机制以确保一致性
4.3 仅AC
00.四个特征ACID
A原子性(atomicity):强调事务的不可分割
C一致性(consistency):事务的执行的前后数据的完整性保持一致
I隔离性(isolation):一个事务执行的过程中,不应该受到其他事务的干扰
D持久性(durability):事务一旦结束,数据就持久到数据库
01.仅AC
a.原子性(Atomicity)
Redis事务通过MULTI和EXEC命令实现原子性。在事务中,所有命令要么全部执行,要么全部不执行
如果在EXEC命令执行之前,事务队列中的命令出现错误,整个事务会被放弃,不会执行任何命令。这确保了事务的原子性
b.一致性(Consistency)
Redis保证在事务执行过程中,数据的状态在事务开始前和结束后是一致的
事务中的命令按照顺序执行,确保数据的一致性
02.未实现ID
a.隔离性(Isolation)
Redis不提供严格的隔离性。在事务执行期间,其他客户端仍然可以访问和修改事务中涉及的数据
这可能导致“幻读”或“不可重复读”的问题,因此Redis的隔离性是有限的
b.持久性(Durability)
Redis的持久性依赖于其持久化机制(RDB快照和AOF日志)
在事务完成后,数据会写入内存,但是否立即写入磁盘取决于配置和持久化策略
如果在数据写入磁盘之前发生故障,可能会导致数据丢失,因此Redis的持久性也是有限的
4.4 单阶段,非2pc
01.总结
不属于两阶段提交(2PC)或三阶段提交(3PC)
采用了一种简单的单阶段提交机制,主要通过MULTI和EXEC命令实现命令的原子性和顺序执行
4.5 不支持回滚
01.说明
事务的执行【不支持回滚】,如果中间命令出错后续的命令还是会继续执行,且不会回滚之前的执行
02.不支持回滚的原因
a.简单性和性能
Redis的设计目标是简单和高性能。支持回滚会增加系统的复杂性和开销
通过不支持回滚,Redis可以更高效地执行事务,减少事务管理的开销
b.乐观锁机制
Redis提供了WATCH命令来实现乐观锁。通过监视一个或多个键,Redis可以在事务执行前检测到数据的变化
如果在事务执行前监视的键发生变化,EXEC命令将返回空响应,表示事务未执行
c.应用场景
Redis通常用于缓存、会话管理等场景,这些场景对事务的严格一致性要求较低
在需要严格事务管理的场景中,开发者可以通过应用逻辑来实现补偿机制
4.6 WATCH乐观锁
01.监视键的变化
WATCH命令允许你在执行事务之前监视一个或多个键的变化
如果在事务执行之前,任何被监视的键发生了变化,事务将不会执行
02.乐观锁的实现过程
a.监视键
在开始事务之前,使用WATCH命令监视你将要更新的键
例如:WATCH key1 key2
b.执行事务
使用MULTI命令开始事务
执行一系列的命令,这些命令会被放入事务队列中
c.提交事务
使用EXEC命令提交事务
如果在EXEC之前,任何被监视的键发生了变化,事务将被自动取消,EXEC返回nil,表示事务未执行
d.重新尝试
如果事务被取消,可以重新获取最新的数据并重试事务
03.示例
WATCH mykey
MULTI
SET mykey "new_value"
EXEC
5 Seata事务
5.1 是2PC
00.总结
Seata通过两阶段提交协议(2PC)来管理分布式事务,确保在分布式环境下的数据一致性
第一阶段(预提交):确保所有参与者都准备好提交事务
第二阶段(提交或回滚):根据第一阶段的结果,决定是提交事务还是回滚事务
01.事务预提交(第一阶段)
a.过程
在事务预提交阶段,事务协调器(Transaction Coordinator, TC)向所有参与事务的资源管理器(Resource Manager, RM)发送预提交请求
各个RM接收到预提交请求后,执行本地事务操作,并将操作结果(成功或失败)反馈给TC
如果所有RM都同意预提交(即本地事务操作成功),则TC记录预提交成功的状态,准备进入第二阶段
如果有任何一个RM不同意预提交(即本地事务操作失败),则TC记录预提交失败的状态,并准备回滚所有已执行的操作
b.目的
确保所有参与者都准备好提交事务,避免在提交过程中出现不一致的状态
02.事务提交(第二阶段)
a.过程:
a.提交异步化
如果第一阶段的预提交成功,TC会向所有RM发送提交请求
各个RM接收到提交请求后,正式提交本地事务操作
提交操作可以是异步的,以提高性能
b.回滚反向补偿
如果第一阶段的预提交失败,或者在提交阶段TC宕机,TC会向所有RM发送回滚请求
各个RM接收到回滚请求后,执行反向补偿操作,撤销本地事务操作
b.目的
确保所有参与者要么全部提交事务,要么全部回滚事务,保持数据的一致性
5.2 模式:3种
00.4种模式
AT模式 通过代理数据源的方式,自动管理分支事务的提交和回滚 【常用】
TCC模式 一种显式的事务补偿机制,分为三个阶段:Try、Confirm、Cancel 【常用】
Saga模式 一种长事务解决方案,通过一系列有序的子事务来完成全局事务
XA模式 一种两阶段提交协议,通过数据库的XA接口实现分布式事务,确保全局事务的原子性
01.AT模式(Automatic Transaction)
a.特点
AT模式是Seata的核心模式,适用于关系型数据库
通过代理数据源的方式,自动管理分支事务的提交和回滚
在业务代码中不需要显式地管理事务,Seata会自动生成和管理SQL的回滚日志
b.适用场景
适用于需要简化分布式事务管理的场景,尤其是使用关系型数据库的应用
02.TCC模式(Try-Confirm-Cancel)
a.特点
TCC模式是一种显式的事务补偿机制,分为三个阶段:Try、Confirm、Cancel
开发者需要实现这三个阶段的逻辑,Try阶段预留资源,Confirm阶段确认操作,Cancel阶段回滚操作
提供了更高的灵活性和性能,因为业务逻辑可以自定义
b.适用场景
适用于需要精细控制事务逻辑的场景,尤其是涉及复杂业务逻辑的应用
03.SAGA模式
a.特点
SAGA模式是一种长事务解决方案,通过一系列有序的子事务来完成全局事务
每个子事务都有一个对应的补偿操作,用于在失败时回滚
适合长时间运行的事务,支持异步执行
b.适用场景
适用于需要处理长时间运行事务的场景,如跨多个系统的复杂业务流程
04.XA模式
a.特点
XA模式基于XA协议,是一种两阶段提交协议
通过数据库的XA接口实现分布式事务,确保全局事务的原子性
需要数据库和驱动程序支持XA协议
b.适用场景
适用于需要严格事务一致性的场景,尤其是使用支持XA协议的数据库的应用
5.3 模式:AT,甩手掌柜
01.定义
SeataAT模式的核心是对业务无侵入,是一种改进后的两阶段 2PC 提交
02.设计思路
a.一阶段
业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
b.二阶段
提交异步化,非常快速地完成
回滚通过一阶段的回滚日志进行反向补偿
03.总结
AT 模式就是“甩手掌柜”,你只管写业务逻辑,回滚的事儿它全包了
但“自动对账”这种事儿,遇到大流量场景,可能会让你怀疑人生
5.4 模式:TCC,手工记账
01.定义
TCC 是个强管控的解决方案。假设你要给一百个人发奖金,每个人分三步:确定奖金池、发放奖金、确认到账
TCC 模式就要求每一步都明确标记:钱从哪里扣?到账了没有?出了问题咋办?
02.设计思路
a.Try阶段
先预留资源,比如锁定账户余额,告诉大家“钱已经备好了”
b.Confirm阶段
完成操作,比如真的把钱转给员工
c.Cancel阶段
如果过程中失败了,就把预留的资源退回来
03.总结
TCC 模式适合那些要求严格的场景,就像盖房子之前先把地基打稳。但如果操作多,稍不注意就会“忙中出错”
优点:控制精细,能适应更复杂的业务逻辑
缺点:代码侵入性高,每步操作都得你写,累得像手工记账的会计
5.5 模式:TCC,空回滚问题、悬挂事务
01.空回滚
a.说明
TCC中的Try过程中,有的参与者成功了,有的参与者失败了,这时候就需要所有参与者都执行Cancel
这时候,对于那些没有Try成功的参与者来说,本次回滚就是一次空回滚
需要在业务中做好对空回滚的识别和处理,否则就会出现异常报错的情况
甚至可能导致Cancel一直失败,最终导致整个分布式事务失败
b.解决
当一个参与者接到一次Cancel请求的时候,先去distribute_transaction表中根据tx_id查询是否有try的记录
如果没有,则进行一次空回滚即可。并在distribute_transaction中创建一条记录,状态标记为cancel
02.悬挂事务
a.说明
TCC的实现方式存在悬挂事务的问题,在调用TCC服务的一阶段Try操作时,可能会出现因网络拥堵而导致的超时
此时事务协调器会触发二阶段回滚,调用TCC服务的Cancel操作
在此之后,拥堵在网络上的一阶段Try数据包被TCC服务收到,出现了二阶段Cancel请求比一阶段Try请求先执行的情况
b.场景
举一个比较常见的具体场景:一次分布式事务,先发生了Try,但是因为有的节点失败,又发生了Cancel
而下游的某个节点因为网络延迟导致先接到了Cancel,在空回滚完成后,又接到了Try的请求,然后执行了
这就会导致这个节点的Try占用的资源无法释放,也没人会再来处理了,就会导致了事务悬挂
c.问题
这两个问题处理不好,都可能会导致一个分布式事务没办法保证最终一致性
有一个办法,可以一次性的解决以上两个问题,那就是——引入分布式事务记录表
-----------------------------------------------------------------------------------------------------
有了这张表,每一个参与者,都可以在本地事务执行的过程中,同时记录一次分布式事务的操作记录
这张表中有两个关键的字段,一个是tx_id用于保存本次处理的事务ID
还有一个就是state,用于记录本次事务的执行状态。至于其他的字段
比如一些业务数据,执行时间、业务场景啥的,就自己想记录上就记录啥
-----------------------------------------------------------------------------------------------------
CREATE TABLE `distribute_transaction`(
`tx_id` varchar(128) NOT NULL COMMENT '事务id',
`state` int(1) DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
PRIMARY KEY (`tx_id`) U)
-----------------------------------------------------------------------------------------------------
有了这张表以后,我们在做try、cancel和confirm操作之后,都需要在本地事务中创建或者修改这条记录
d.解决
当一个参与者接到一次Try请求的时候,先去distribute_transaction表中根据tx_id查询是否有记录
如果当前存在,并且记录的状态是cancel,则拒绝本次try请求,但是需要注意的是,上面的请求过程,需要做好并发控制
有了这张表,我们还可以基于他做幂等控制,每次try-cancel-confirm请求来的时候,都可以到这张表中查一下,然后做幂等控制
5.6 实现原理/3个角色
00.总结
事务协调者(TC):负责全局事务的管理和协调,决定【事务的最终结果】
事务管理者(TM):负责事务的发起和结束,定义全局【事务的边界】
资源管理器(RM):负责分支事务的资源管理,执行【具体的资源操作】
01.3个角色
a.事务协调者(TC)
主要负责管理全局的分支事务的状态,用于全局性事务的提交和回滚
它会对所有的分支事务进行注册,然后根据各个分支事务的状态来决定整体事务是否提交以及回滚
b.事务管理者(TM)
主要用于开发、提交以及回滚事务
它会根据业务逻辑来决定是否开启一个新事务,并且在适当的情况下进行事务的提交以及回滚操作
d.资源管理器(RM)
这个主要用于分支事务上的资源管理,其向 TC 注册分支事务,上报分支事务的状态
然后接受 TC 的命令来传达给事务管理者(TM)是否要提交或者回滚事务
01.事务协调者(Transaction Coordinator, TC)
a.职责
管理全局事务的生命周期,包括全局事务的开始、提交和回滚
维护全局事务的状态,并协调各个分支事务的状态
决定全局事务的最终结果(提交或回滚),并通知各个分支事务执行相应的操作
b.工作流程
当一个全局事务开始时,TC会记录该事务的信息
各个分支事务在执行过程中会向TC注册,并报告其状态
根据所有分支事务的状态,TC决定全局事务的提交或回滚,并通知各个分支事务执行相应的操作
02.事务管理者(Transaction Manager, TM)
a.职责
负责定义全局事务的边界,决定事务的开始和结束
在业务逻辑中,TM负责发起全局事务的开始请求,并在事务结束时请求提交或回滚
b.工作流程
在业务代码中,TM通过API调用来开始一个全局事务
根据业务逻辑的执行结果,TM决定是提交还是回滚事务
TM将提交或回滚的请求发送给TC,由TC协调执行
03.资源管理器(Resource Manager, RM)
a.职责
管理分支事务的资源,负责具体的资源操作(如数据库操作)
向TC注册分支事务,并报告分支事务的执行状态
接受TC的命令,执行分支事务的提交或回滚
b.工作流程
在分支事务执行过程中,RM负责具体的资源操作,并将分支事务的状态上报给TC
根据TC的指令,RM执行分支事务的提交或回滚操作
5.7 执行流程
00.总结
1.事务发起
2.创建全局事务记录
3.分支事务注册
4.业务逻辑执行
5.预提交阶段
6.确认提交阶段
7.全局事务完成
01.事务发起
事务发起方(Transaction Starter)向Seata的事务协调器(TC)发送全局事务的开始请求
并生成全局事务ID(Global Transaction ID)
02.创建全局事务记录
事务协调器(TC)接收到全局事务请求后,为该事务创建全局事务记录,并生成分支事务ID(Branch Transaction ID)
03.分支事务注册
事务发起方将全局事务ID和分支事务ID发送给各个资源管理器(RM)
资源管理器将分支事务ID注册到本地事务管理器,并将执行结果反馈给事务协调器(TC)
04.业务逻辑执行
各个资源管理器在分布式事务的上下文中执行本地事务,包括数据库操作和相关业务逻辑
05.预提交阶段
事务发起方通过事务协调器(TC)向事务管理器(TM)发起预提交请求
事务管理器收到资源管理器的反馈后,将本地执行结果反馈给事务协调器(TC)
06.确认提交阶段
事务管理器根据事务协调器的反馈判断结果。
如果所有参与者的本地事务都执行成功,事务协调器发送提交请求给事务管理器,资源管理器执行最终的提交操作
如果有任何参与者的本地事务执行失败,事务协调器将结果交给事务管理器,资源管理器根据事务管理器的结果进行事务回滚
07.全局事务完成
事务协调器收到事务管理器的提交或回滚结果后,更新全局事务的状态,并通知事务发起方全局事务的最终结果
5.8 日志回滚
00.回答
基于【日志】实现
01.实现步骤
a.分支事务的回滚日志
在执行本地事务操作时,Seata的资源管理器(RM)会记录回滚日志
这些日志包含了恢复数据所需的信息,如修改前的数据快照
回滚日志通常存储在数据库中,以便在需要时能够快速访问和恢复
b.预提交阶段的准备
在事务的预提交阶段,事务协调器(TC)会收集所有分支事务的执行结果
如果任何一个分支事务返回失败,或者在确认提交阶段之前事务协调器检测到问题,事务协调器会决定回滚整个全局事务
c.回滚请求的发送
事务协调器向所有参与的资源管理器发送回滚请求
回滚请求包含了需要撤销的分支事务的标识信息
d.执行回滚操作
资源管理器接收到回滚请求后,使用之前记录的回滚日志来撤销本地事务操作
具体来说,资源管理器会根据回滚日志中的信息,将数据恢复到事务开始之前的状态
e.回滚结果的反馈
资源管理器在完成回滚操作后,将回滚结果反馈给事务协调器
事务协调器根据所有分支事务的回滚结果,更新全局事务的状态,并通知事务发起方
02.关键点
a.数据一致性
通过回滚日志,Seata能够确保在事务失败时,所有参与的数据库状态都能恢复到一致的状态
b.自动化处理
Seata的回滚机制是自动化的,开发者不需要手动编写回滚逻辑,这简化了分布式事务的管理
c.可靠性
即使在网络故障或系统崩溃的情况下,Seata也能通过回滚日志确保数据的一致性
6 Kafka事务
6.1 原理:幂等性
01.介绍
Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入
即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败
02.说明
Kafka 事务分为生产者事务和消费者事务,但它们并不是强绑定的关系
消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务
6.2 生产者:开启事务,支持幂等
01.工作原理
a.启动生产者,分配协调器
在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务
事务协调器的分配涉及到一个特殊的主题 **__transaction_state**,该主题默认有 50 个分区,每个分区负责一部分事务;Kafka 根据事务ID的hashcode值%50 计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者
分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了
b.发送消息
生产者分配到 PID 后,要先告诉事务协调器要把消息发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息
当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应
c.确认事务
当生产者开始发送消息时,协调器判定事务开始。它会将开始的信息持久化到主题 __transaction_state 中
当生产者发送完事务内的消息,或者遇到异常发送失败,协调器会收到 Commit 或 Abort 请求,接着事务协调器会跟所有主题通信,告诉它们事务是成功还是失败的
如果是成功,主题会汇报自己已经收到消息,协调者收到所有主题的回应便确认了事务完成,并持久化这一结果
如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束;整个放弃事务的过程消费者是无感知的,它并不会收到这些数据
事物不仅可以保证多个数据整体成功失败,还可以保证数据丢失后恢复
02.创建一个 Producer,指定一个事务 ID
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置事务ID,必须
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");
//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
03.使用事务发送消息
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
//发送10条消息往kafka,假如中间有异常,所有消息都会发送失败
try {
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("topic-test", "a message" + i));
}
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 终止事务
producer.abortTransaction();
} finally {
producer.close();
}
04.完整代码
package com.hainiu.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerWithTransaction {
public static void main(String[] args) {
Properties pro = new Properties();
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaciton_test");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
producer.initTransactions();
producer.beginTransaction();
try{
for(int i=0;i<5;i++){
producer.send(record);
}
producer.commitTransaction();
}catch (Exception e){
producer.abortTransaction();
}finally {
producer.close();
}
}
}
6.3 消费者:不直接开启
00.汇总
消费者本身不直接开启事务,而是通过使用事务性生产者来实现事务操作
01.工作原理
a.事务性生产者(Transactional Producer)
生产者在发送消息时开启事务,将多个发送操作(可以跨多个主题和分区)封装在一个事务中
事务提交时,确保所有消息要么全部写入成功,要么全部失败回滚
b.消费者(Consumer)
消费者读取消息,并进行处理。
消费者使用事务性生产者将处理结果发送到下游主题,并将消费的偏移量提交到事务中
通过将偏移量提交作为事务的一部分,确保处理和偏移量的提交是原子性的,从而避免消息重复处理或丢失
c.隔离级别(Isolation Level)
消费者可以配置 isolation.level 来控制读取事务消息的行为
read_uncommitted:读取所有消息,包括未提交的事务消息
read_committed:只读取已提交的事务消息,默认值
02.操作
a.配置事务性生产者
设置 transactional.id
启用幂等性(enable.idempotence=true)
b.初始化事务
调用 initTransactions() 方法初始化事务
c.消费、处理和生产
开始事务 (beginTransaction)
消费消息,处理后生产新消息
发送处理结果到目标主题
在事务中提交消费者的偏移量 (sendOffsetsToTransaction)
提交事务 (commitTransaction)
d.异常处理
如果在事务过程中发生异常,需回滚事务 (abortTransaction) 以确保原子性
03.代码实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class TransactionalConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String inputTopic = "input-topic";
String outputTopic = "output-topic";
String groupId = "transactional-consumer-group";
String transactionalId = "transactional-producer-1";
// 配置消费者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置隔离级别为读已提交,避免读取未提交的事务消息
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(inputTopic));
// 配置生产者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 启用事务
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.count() > 0) {
try {
// 开始事务
producer.beginTransaction();
// 处理并发送消息
for (ConsumerRecord<String, String> record : records) {
String processedValue = process(record.value());
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outputTopic, record.key(), processedValue);
producer.send(producerRecord);
}
// 准备偏移量提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
// 将偏移量作为事务的一部分提交
producer.sendOffsetsToTransaction(offsets, groupId);
// 提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 无法恢复的异常,关闭生产者
producer.close();
break;
} catch (KafkaException e) {
// 其他可恢复的异常,回滚事务
producer.abortTransaction();
}
}
}
} finally {
consumer.close();
producer.close();
}
}
// 简单的处理函数,将消息值转换为大写
private static String process(String value) {
return value.toUpperCase();
}
}
6.4 示例:生产者+消费者
01.生产者
public class Demo09Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置事务 ID
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id-1");
properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置 ACK 为 all
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 初始化事务
producer.initTransactions();
try {
// 开始事务
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test9-topic", key, value);
producer.send(record);
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 出现异常时,撤销事务
producer.abortTransaction();
e.printStackTrace();
} finally {
producer.close();
}
}
}
02.消费者
public class Demo09Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test9-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
7 RabbitMQ事务
7.1 生产者:开启事务
00.总结
事务:提供了消息的原子性保证,但会显著降低性能
01.RabbitMQ事务
a.定义
事务:RabbitMQ的事务机制允许生产者在一个事务中发送多条消息,并在事务提交时确保所有消息都被成功处理
b.原理
事务开始:通过调用channel.txSelect()方法开始一个事务
事务提交:通过调用channel.txCommit()方法提交事务
事务回滚:通过调用channel.txRollback()方法回滚事务
c.使用步骤
开始事务:调用channel.txSelect()
发送消息:在事务中发送多条消息
提交或回滚事务:根据需要调用channel.txCommit()或channel.txRollback()
d.代码示例
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.txSelect(); // 开始事务
try {
String message1 = "Message 1";
String message2 = "Message 2";
channel.basicPublish("", "queue_name", null, message1.getBytes());
channel.basicPublish("", "queue_name", null, message2.getBytes());
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback(); // 回滚事务
e.printStackTrace();
}
}
7.2 消费者:发布确认
00.总结
发布确认:提供了一种更高效的方式来确保消息的可靠传递,是RabbitMQ推荐的做法
01.发布确认(Publisher Confirms)
a.定义
相比事务,发布确认是一种更轻量级的机制,用于确保消息被成功传递到RabbitMQ。发布确认不会像事务那样锁定通道,因此性能更好
b.使用步骤
启用发布确认:调用channel.confirmSelect()
发送消息:发送消息后,等待确认
处理确认:通过channel.waitForConfirmsOrDie()等待确认
c.代码示例
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect(); // 启用发布确认
String message = "Hello World!";
channel.basicPublish("", "queue_name", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("Message confirmed");
} else {
System.out.println("Message not confirmed");
}
}