返回 筑基・数据元府藏真
分布式事务实战:从本地到分布式的一致性
博主
大约 29 分钟
分布式事务实战:从本地到分布式的一致性
问题引入:跨服务转账难题
去年我们上线了一个金融转账系统,用户可以在不同银行之间进行实时转账。系统架构采用了微服务设计:账户服务、支付服务、通知服务分别部署。上线第一周就出现了一个严重问题:
// 用户A从工商银行转账10000元到用户B的建设银行账户
@Service
public class TransferService {
@Autowired
private AccountService accountService; // 账户服务(工商银行数据库)
@Autowired
private PaymentService paymentService; // 支付服务(跨行清算)
@Autowired
private NotificationService notificationService; // 通知服务
public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
// 1. 扣减A的余额(工商银行)
accountService.decreaseBalance(fromUserId, amount); // 成功
// 2. 增加B的余额(建设银行)
paymentService.transferToOtherBank(toUserId, amount); // 网络超时,状态未知
// 3. 发送短信通知
notificationService.sendTransferNotification(toUserId, amount); // 失败
// 问题:
// - A的钱已经扣了,B是否收到不确定
// - 通知失败,转账是否回滚?
// - 如果回滚,A的扣款如何恢复?
}
}
实际故障:转账接口超时后,用户A看到扣款成功,但用户B迟迟未收到款项。客服查询发现:工商银行已扣款,但建设银行未入账,资金"丢失"在系统中。这次故障让我深刻理解了分布式事务的复杂性。
现象描述:分布式系统的数据不一致
案例1:订单创建与库存扣减不一致
场景:电商下单流程
// 问题代码:订单创建成功,但库存未扣减
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper; // 订单库
@Autowired
private InventoryService inventoryService; // 库存服务(远程调用)
@Transactional // 本地事务
public void createOrder(OrderRequest request) {
// 1. 创建订单(本地事务)
Order order = new Order();
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order); // 订单创建成功
// 2. 扣减库存(远程调用)
inventoryService.deduct(request.getProductId(), request.getQuantity()); // 网络异常,失败
// 结果:订单创建成功,库存未扣减 → 超卖!
}
}
影响:订单系统显示有货,实际库存已空,导致大量超卖订单。
案例2:分布式锁与事务的冲突
场景:秒杀活动
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private OrderService orderService;
public void seckill(Long userId, Long productId) {
String lockKey = "seckill:" + productId;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (locked) {
try {
// 检查库存
Integer stock = getStock(productId);
if (stock > 0) {
// 创建订单
orderService.createOrder(userId, productId); // 事务内
// 扣减库存
decreaseStock(productId); // 事务外
}
} finally {
redisTemplate.delete(lockKey);
}
}
}
}
问题:
- 分布式锁在事务外,事务提交前锁已释放
- 其他线程可能读到未提交的库存数据
- 事务回滚后,库存扣减无法恢复
案例3:消息发送与业务操作不一致
场景:订单完成后发送消息
@Service
public class OrderCompletionService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void completeOrder(Long orderId) {
// 1. 更新订单状态
orderMapper.updateStatus(orderId, OrderStatus.COMPLETED);
// 2. 发送完成消息
kafkaTemplate.send("order-completed", orderId.toString()); // 消息发送成功
// 3. 事务回滚(后续代码异常)
throw new RuntimeException("系统异常");
// 结果:订单状态回滚,但消息已发送
// 下游服务收到消息,处理了一个不存在的订单
}
}
影响:订单系统显示订单未完成,但物流系统已发货,财务系统已结算。
原因分析:分布式事务的理论基础
1. CAP理论深度解析
CAP理论:在分布式系统中,一致性(Consistency)、可用性(Availability)、
分区容错性(Partition Tolerance)三者不可兼得,最多同时满足两项。
┌─────────────────────────────────────────────────────────────┐
│ CAP三角关系 │
│ │
│ ┌─────────────┐ │
│ / 一致性(C) \ │
│ / Consistency \ │
│ / \ │
│ / \ │
│ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ 可用性(A) │ │ 分区容错性 │ │
│ │Availability│ │Partition │ │
│ │ │ │Tolerance │ │
│ \ / \ / │
│ \ / \ / │
│ \ / \ / │
│ \ / \ / │
│ \ / \ / │
│ v v │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ CP系统 │ │ AP系统 │ │
│ │ Zookeeper │ │ Cassandra │ │
│ │ etcd │ │ Eureka │ │
│ │ HBase │ │ DynamoDB │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
1.1 一致性(Consistency)
强一致性:所有节点在同一时间看到的数据完全一致
┌─────────┐ 写操作 ┌─────────┐
│ 主节点 │ ──────────→ │ 从节点1 │ 立即同步,读取一致
│ (A=10) │ │ (A=10) │
└─────────┘ └─────────┘
↓
┌─────────┐
│ 从节点2 │ 立即同步,读取一致
│ (A=10) │
└─────────┘
最终一致性:数据最终会一致,但存在不一致窗口
┌─────────┐ 写操作 ┌─────────┐
│ 主节点 │ ──────────→ │ 从节点1 │ 异步同步,可能读到旧值
│ (A=10) │ 异步复制 │ (A=5) │ (不一致窗口)
└─────────┘ └─────────┘
↓
┌─────────┐
│ 从节点2 │ 异步同步,可能读到旧值
│ (A=5) │
└─────────┘
1.2 可用性(Availability)
高可用性:每个请求都能在合理时间内获得响应
┌─────────┐ ┌─────────┐
│ 客户端 │ ──────→ │ 节点A │ 正常响应
│ │ │ (可用) │
│ │ ←────── └─────────┘
│ │
│ │ ──────→ ┌─────────┐
│ │ │ 节点B │ 节点A故障,节点B接管
│ │ ←────── │ (可用) │
│ │ └─────────┘
└─────────┘
1.3 分区容错性(Partition Tolerance)
分区容错:网络分区时系统仍能运行
网络分区前:
┌─────────┐ ┌─────────┐
│ 节点A │ ←─────→ │ 节点B │
│ (A=10) │ │ (A=10) │
└─────────┘ └─────────┘
网络分区后(A和B之间网络中断):
┌─────────┐ X ┌─────────┐
│ 节点A │ ←─X─→ │ 节点B │
│ (A=10) │ 网络 │ (A=10) │
└─────────┘ 中断 └─────────┘
CP选择:拒绝写入,保证一致性(等待网络恢复)
AP选择:允许写入,保证可用性(牺牲一致性)
2. BASE理论
BASE理论:Basically Available(基本可用)、Soft state(软状态)、
Eventually consistent(最终一致性)
是对CAP中AP方案的延伸,强调通过牺牲强一致性来获得可用性。
基本可用(Basically Available):
- 响应时间延长:正常0.5秒,故障时2秒
- 功能降级:只读不写,或只写核心功能
软状态(Soft State):
- 允许中间状态存在
- 数据副本之间可以暂时不一致
最终一致性(Eventually Consistent):
┌─────────┐ 写操作 ┌─────────┐
│ 主节点 │ ──────────→ │ 从节点 │
│ (A=10) │ 异步复制 │ (A=5) │ 不一致窗口
└─────────┘ └─────────┘
↓ 时间推移
┌─────────┐
│ 从节点 │
│ (A=10) │ 最终一致
└─────────┘
3. 分布式事务的挑战
分布式事务面临的核心挑战:
1. 网络不可靠
- 请求丢失:调用方未收到请求
- 响应丢失:服务方已处理,但调用方未收到响应
- 网络延迟:响应时间不确定
2. 节点故障
- 单点故障:某个服务节点宕机
- 脑裂:网络分区导致多个主节点
- 数据不一致:故障恢复后数据冲突
3. 数据一致性
- 强一致性:所有节点数据实时一致(难以实现)
- 最终一致性:允许短暂不一致,最终达到一致(主流方案)
4. 性能问题
- 两阶段提交:同步阻塞,性能低下
- 三阶段提交:减少阻塞,但复杂度增加
解决方案:分布式事务实现方案
1. 分布式事务方案对比
| 方案 | 一致性级别 | 性能 | 复杂度 | 适用场景 | 代表框架 |
|---|---|---|---|---|---|
| 2PC | 强一致 | 低 | 中 | 传统分布式事务 | XA协议 |
| 3PC | 强一致 | 中 | 高 | 高可用场景 | 较少使用 |
| TCC | 最终一致 | 高 | 高 | 高并发金融场景 | ByteTCC、TCC-Transaction |
| Saga | 最终一致 | 高 | 中 | 长事务业务 | Seata Saga、Axon |
| 本地消息表 | 最终一致 | 中 | 低 | 异步场景 | 自研方案 |
| 可靠消息 | 最终一致 | 高 | 中 | 主流方案 | RocketMQ事务消息 |
| 最大努力通知 | 最终一致 | 高 | 低 | 对一致性要求低 | 支付回调 |
2. 两阶段提交(2PC)
2.1 2PC原理
两阶段提交(Two-Phase Commit):
阶段一:准备阶段(Prepare Phase)
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 协调者 │ ──Prepare──→ │ 参与者A │ │ 参与者B │
│ │ │ 执行本地 │ │ 执行本地 │
│ │ ←─Yes/No──── │ 事务,锁 │ │ 事务,锁 │
│ │ │ 资源,不 │ │ 资源,不 │
│ │ ──Prepare──→ │ 提交 │ │ 提交 │
│ │ └──────────┘ │ 执行本地 │
│ │ ←─Yes/No───────────────────────────────│ 事务,锁 │
│ │ │ 资源 │
└──────────┘ └──────────┘
阶段二:提交阶段(Commit Phase)
如果所有参与者返回Yes:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 协调者 │ ──Commit───→ │ 参与者A │ │ 参与者B │
│ │ │ 提交本地 │ │ 提交本地 │
│ │ ←─Ack─────── │ 事务 │ │ 事务 │
│ │ └──────────┘ │ 提交本地 │
│ │ ──Commit───→ │ 事务 │
│ │ ←─Ack──────────────────────────────────└──────────┘
└──────────┘
如果有参与者返回No:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 协调者 │ ──Rollback─→ │ 参与者A │ │ 参与者B │
│ │ │ 回滚本地 │ │ 回滚本地 │
│ │ ←─Ack─────── │ 事务 │ │ 事务 │
│ │ └──────────┘ │ 回滚本地 │
│ │ ──Rollback─→ │ 事务 │
│ │ ←─Ack──────────────────────────────────└──────────┘
└──────────┘
2.2 2PC的问题
2PC的主要问题:
1. 同步阻塞
- 所有参与者在等待协调者指令时,资源被锁定
- 协调者宕机,参与者一直阻塞
2. 单点故障
- 协调者是单点,宕机后整个事务无法完成
- 参与者等待协调者恢复
3. 数据不一致
- 协调者在发送Commit指令后宕机,部分参与者收到,部分未收到
- 导致数据不一致
4. 性能问题
- 两次网络往返
- 资源锁定时间长
2.3 Java实现2PC(XA协议)
// XA数据源配置
@Configuration
public class XADataSourceConfig {
@Bean
public UserTransactionManager userTransactionManager() {
return new UserTransactionManager();
}
@Bean
public JtaTransactionManager jtaTransactionManager() {
return new JtaTransactionManager(
userTransactionManager(),
userTransactionManager()
);
}
// XA数据源1
@Bean
public XADataSource xaDataSource1() {
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://localhost:3306/db1");
xaDataSource.setUser("root");
xaDataSource.setPassword("password");
return xaDataSource;
}
// XA数据源2
@Bean
public XADataSource xaDataSource2() {
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://localhost:3306/db2");
xaDataSource.setUser("root");
xaDataSource.setPassword("password");
return xaDataSource;
}
}
// 使用XA事务
@Service
public class XaTransferService {
@Autowired
private JdbcTemplate jdbcTemplate1; // db1
@Autowired
private JdbcTemplate jdbcTemplate2; // db2
@Transactional(transactionManager = "jtaTransactionManager")
public void transfer(Long fromId, Long toId, BigDecimal amount) {
// 扣减db1账户
jdbcTemplate1.update(
"UPDATE account SET balance = balance - ? WHERE id = ?",
amount, fromId
);
// 增加db2账户
jdbcTemplate2.update(
"UPDATE account SET balance = balance + ? WHERE id = ?",
amount, toId
);
// XA事务管理器自动处理2PC
}
}
3. TCC模式(Try-Confirm-Cancel)
3.1 TCC原理
TCC模式:将每个操作拆分为三个阶段
Try阶段:预留资源,执行业务检查
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 业务服务 │ ──Try──────→ │ 库存服务 │ │ 优惠券服务│
│ │ │ 冻结库存 │ │ 冻结优惠券│
│ │ │ (预留资源)│ │ (预留资源)│
└──────────┘ └──────────┘ └──────────┘
Confirm阶段:确认执行,使用预留资源
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 业务服务 │ ──Confirm──→ │ 库存服务 │ │ 优惠券服务│
│ │ │ 扣减库存 │ │ 使用优惠券│
│ │ │ (确认操作)│ │ (确认操作)│
└──────────┘ └──────────┘ └──────────┘
Cancel阶段:取消执行,释放预留资源
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 业务服务 │ ──Cancel───→ │ 库存服务 │ │ 优惠券服务│
│ │ │ 释放库存 │ │ 释放优惠券│
│ │ │ (回滚操作)│ │ (回滚操作)│
└──────────┘ └──────────┘ └──────────┘
3.2 TCC实现
// TCC接口定义
public interface InventoryTccAction {
@TwoPhaseBusinessAction(name = "inventoryDeductTcc",
useTCCFence = true)
boolean tryDeduct(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "quantity") Integer quantity);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
// TCC实现
@Component
public class InventoryTccActionImpl implements InventoryTccAction {
@Autowired
private InventoryMapper inventoryMapper;
@Autowired
private InventoryFreezeMapper freezeMapper;
@Override
@Transactional
public boolean tryDeduct(Long productId, Integer quantity) {
// 1. 检查库存
Inventory inventory = inventoryMapper.selectById(productId);
if (inventory == null || inventory.getStock() < quantity) {
throw new BusinessException("库存不足");
}
// 2. 检查是否已冻结(幂等性)
String xid = RootContext.getXID();
InventoryFreeze freeze = freezeMapper.selectByXid(xid);
if (freeze != null) {
return true; // 已处理过
}
// 3. 冻结库存
freezeMapper.insert(new InventoryFreeze(xid, productId, quantity, FreezeStatus.TRY));
// 4. 扣减可用库存
int affected = inventoryMapper.decreaseStock(productId, quantity);
if (affected == 0) {
throw new BusinessException("扣减库存失败");
}
return true;
}
@Override
@Transactional
public boolean confirm(BusinessActionContext context) {
String xid = context.getXid();
// 1. 查询冻结记录
InventoryFreeze freeze = freezeMapper.selectByXid(xid);
if (freeze == null) {
return true; // 已处理或无记录
}
// 2. 幂等性检查
if (freeze.getStatus() == FreezeStatus.CONFIRMED) {
return true;
}
// 3. 更新冻结状态为已确认
freezeMapper.updateStatus(xid, FreezeStatus.CONFIRMED);
// 4. 实际扣减库存(在try阶段已扣减,这里可以记录日志或更新其他状态)
// 实际业务中可能需要将冻结库存转为实际扣减
return true;
}
@Override
@Transactional
public boolean cancel(BusinessActionContext context) {
String xid = context.getXid();
// 1. 查询冻结记录
InventoryFreeze freeze = freezeMapper.selectByXid(xid);
if (freeze == null) {
return true; // 无冻结记录,空回滚
}
// 2. 幂等性检查
if (freeze.getStatus() == FreezeStatus.CANCELLED) {
return true;
}
// 3. 恢复库存
inventoryMapper.increaseStock(freeze.getProductId(), freeze.getQuantity());
// 4. 更新冻结状态为已取消
freezeMapper.updateStatus(xid, FreezeStatus.CANCELLED);
return true;
}
}
// 业务服务使用TCC
@Service
public class OrderTccService {
@Autowired
private InventoryTccAction inventoryTccAction;
@Autowired
private CouponTccAction couponTccAction;
@Autowired
private OrderMapper orderMapper;
@GlobalTransactional(name = "create-order-tcc", rollbackFor = Exception.class)
public Order createOrder(OrderRequest request) {
// 1. 创建订单(本地事务)
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(request.getUserId());
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
// 2. 调用TCC - 扣减库存(Try阶段)
inventoryTccAction.tryDeduct(request.getProductId(), request.getQuantity());
// 3. 调用TCC - 使用优惠券(Try阶段)
if (request.getCouponId() != null) {
couponTccAction.tryUse(request.getCouponId());
}
// 4. 如果所有Try成功,Seata自动调用Confirm
// 5. 如果有异常,Seata自动调用Cancel
return order;
}
}
4. Saga模式
4.1 Saga原理
Saga模式:将长事务拆分为多个本地事务,每个本地事务有对应的补偿操作
正向流程:
T1 → T2 → T3 → T4 → ... → Tn
补偿流程(T3失败):
T1 → T2 → T3(失败) → C2 → C1
示例:出行预订
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 订机票 │ → │ 订酒店 │ → │ 租车 │ → │ 完成 │
│ (T1) │ │ (T2) │ │ (T3) │ │ │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
如果租车失败:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 订机票 │ → │ 订酒店 │ → │ 租车失败 │ → │ 取消酒店 │ → │ 取消机票 │
│ (T1) │ │ (T2) │ │ (T3) │ │ (C2) │ │ (C1) │
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘
4.2 Saga实现(Seata Saga)
// Saga状态机定义(JSON)
{
"startState": "BookFlight",
"states": {
"BookFlight": {
"type": "serviceTask",
"serviceName": "bookFlightService",
"methodName": "book",
"input": [
"$.[request]"
],
"output": "$.#root",
"next": "BookHotel"
},
"BookHotel": {
"type": "serviceTask",
"serviceName": "bookHotelService",
"methodName": "book",
"input": [
"$.[request]"
],
"output": "$.#root",
"next": "BookCar"
},
"BookCar": {
"type": "serviceTask",
"serviceName": "bookCarService",
"methodName": "book",
"input": [
"$.[request]"
],
"output": "$.#root",
"next": "Succeed"
},
"Succeed": {
"type": "succeed"
},
"CompensateBookHotel": {
"type": "serviceTask",
"serviceName": "bookHotelService",
"methodName": "cancel",
"compensateState": "CompensateBookFlight"
},
"CompensateBookFlight": {
"type": "serviceTask",
"serviceName": "bookFlightService",
"methodName": "cancel"
}
}
}
// Saga服务实现
@Service
public class BookFlightService {
@Autowired
private FlightBookingMapper bookingMapper;
// 正向操作
public BookingResult book(BookingRequest request) {
// 预订机票
FlightBooking booking = new FlightBooking();
booking.setOrderId(request.getOrderId());
booking.setFlightNo(request.getFlightNo());
booking.setStatus(BookingStatus.BOOKED);
bookingMapper.insert(booking);
return new BookingResult(booking.getId(), true);
}
// 补偿操作
public boolean cancel(BookingResult result) {
// 取消机票预订
bookingMapper.updateStatus(result.getBookingId(), BookingStatus.CANCELLED);
return true;
}
}
// 使用Saga
@Service
public class TravelBookingService {
@Autowired
private StateMachineEngine stateMachineEngine;
public void bookTravel(TravelRequest request) {
Map<String, Object> params = new HashMap<>();
params.put("request", request);
// 启动Saga状态机
StateMachineInstance inst = stateMachineEngine.start(
"travelBookingSaga", // 状态机名称
null, // 当前登录用户
params // 初始参数
);
if (!inst.getStatus().equals(ExecutionStatus.SU)) {
throw new BusinessException("预订失败: " + inst.getException().getMessage());
}
}
}
5. 本地消息表
5.1 本地消息表原理
本地消息表:将分布式事务拆分为本地事务 + 消息发送
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 业务表 │ │ 消息表 │ │ 消息服务 │
│ (订单表) │ │ (待发送消息) │ │ (MQ发送) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ 1.本地事务 │ │
│ ┌───────────────┐ │ │
│ │ INSERT 订单 │ │ │
│ │ INSERT 消息 │ │ │
└─┤ (同库同事务) ├─┘ │
│ COMMIT │ │
└───────────────┘ │
│
2.定时扫描消息表 │
┌──────────────┐ │
│ 查询待发送消息 │ │
│ 发送到MQ │────────┘
│ 更新消息状态 │
└──────────────┘
│
3.消费消息 │
┌──────────────┐ │
│ 消费MQ消息 │←───────┘
│ 执行业务逻辑 │
│ 确认消费成功 │
└──────────────┘
5.2 本地消息表实现
// 消息表结构
CREATE TABLE transaction_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一标识',
message_type VARCHAR(32) NOT NULL COMMENT '消息类型',
message_body TEXT NOT NULL COMMENT '消息内容',
status TINYINT NOT NULL DEFAULT 0 COMMENT '状态:0-待发送,1-已发送,2-消费成功,3-消费失败',
retry_count INT DEFAULT 0 COMMENT '重试次数',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_time (status, create_time)
) ENGINE=InnoDB COMMENT='事务消息表';
// 业务服务
@Service
public class OrderServiceWithMessage {
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransactionMessageMapper messageMapper;
@Transactional
public void createOrder(OrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
// 2. 创建消息(同一事务)
TransactionMessage message = new TransactionMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType("ORDER_CREATED");
message.setMessageBody(JsonUtils.toJson(new OrderCreatedEvent(order)));
message.setStatus(0); // 待发送
messageMapper.insert(message);
// 3. 事务提交后,订单和消息同时写入数据库
}
}
// 消息发送服务
@Component
public class MessageSender {
@Autowired
private TransactionMessageMapper messageMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 定时任务扫描待发送消息
@Scheduled(fixedRate = 5000)
public void scanAndSendMessages() {
// 查询待发送消息(5分钟前的,避免与业务事务冲突)
List<TransactionMessage> messages = messageMapper.selectPendingMessages(5);
for (TransactionMessage message : messages) {
try {
// 发送消息到MQ
kafkaTemplate.send(
getTopic(message.getMessageType()),
message.getMessageId(),
message.getMessageBody()
).get(3, TimeUnit.SECONDS); // 同步等待发送结果
// 更新消息状态为已发送
messageMapper.updateStatus(message.getId(), 1);
} catch (Exception e) {
log.error("消息发送失败: {}", message.getMessageId(), e);
// 增加重试次数
messageMapper.increaseRetryCount(message.getId());
// 超过最大重试次数,标记为死信
if (message.getRetryCount() >= 10) {
messageMapper.updateStatus(message.getId(), 4); // 死信状态
}
}
}
}
}
// 消息消费服务
@Component
public class OrderEventConsumer {
@KafkaListener(topics = "order-created", groupId = "inventory-service")
public void onOrderCreated(ConsumerRecord<String, String> record) {
String messageId = record.key();
String messageBody = record.value();
try {
OrderCreatedEvent event = JsonUtils.fromJson(messageBody, OrderCreatedEvent.class);
// 幂等性检查
if (processedMessageService.isProcessed(messageId)) {
log.info("消息已处理,跳过: {}", messageId);
return;
}
// 执行业务逻辑
inventoryService.deductStock(event.getProductId(), event.getQuantity());
// 标记消息已处理
processedMessageService.markAsProcessed(messageId);
} catch (Exception e) {
log.error("消息消费失败: {}", messageId, e);
// 抛出异常,触发重试
throw e;
}
}
}
6. 可靠消息(RocketMQ事务消息)
6.1 RocketMQ事务消息原理
RocketMQ事务消息实现:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 生产者 │ │ Broker │ │ 消费者 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ 1.发送半消息 │ │
│ ─────────────────────→ │ │
│ │ 存储半消息 │
│ 2.执行本地事务 │ │
│ ┌───────────────────┐ │ │
│ │ 执行业务逻辑 │ │ │
│ │ 提交本地事务 │ │ │
│ └───────────────────┘ │ │
│ │ │
│ 3.发送确认消息 │ │
│ ─────────────────────→ │ │
│ (Commit/Rollback) │ 确认或删除半消息 │
│ │ │
│ │ 4.回查(如果未收到确认) │
│ ←───────────────────── │ │
│ 检查本地事务状态 │ │
│ ─────────────────────→ │ │
│ │ │
│ │ 5.投递消息 │
│ │ ─────────────────────→ │
│ │ │ 消费消息
6.2 RocketMQ事务消息实现
// RocketMQ事务消息生产者
@Component
public class OrderTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderService orderService;
public void sendOrderCreatedMessage(OrderRequest request) {
// 创建消息
Message<OrderRequest> message = MessageBuilder
.withPayload(request)
.setHeader("KEYS", request.getOrderNo())
.build();
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-producer-group",
"order-created-topic",
message,
request // 本地事务参数
);
if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
throw new BusinessException("订单创建失败");
}
}
}
// 事务监听器
@RocketMQTransactionListener(txProducerGroup = "order-producer-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private OrderMapper orderMapper;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
OrderRequest request = (OrderRequest) arg;
// 执行本地事务:创建订单
Order order = orderService.createOrder(request);
// 本地事务成功,提交消息
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行失败", e);
// 本地事务失败,回滚消息
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 回查本地事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
// 从消息中解析订单号
String orderNo = new String(msg.getBody());
// 查询订单是否存在
Order order = orderMapper.selectByOrderNo(orderNo);
if (order != null) {
// 订单存在,本地事务成功
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
// 订单不存在,本地事务失败
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("回查本地事务状态失败", e);
// 回查失败,返回未知,等待下次回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
7. Seata框架实战
7.1 Seata架构
Seata架构:
┌─────────────────────────────────────────────────────────────┐
│ Seata Server │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TC Server │ │ 事务协调器 │ │ 全局事务管理│ │
│ │ (Transaction│ │ │ │ │ │
│ │ Coordinator)│ │ │ │ │ │
│ └──────┬──────┘ └─────────────┘ └─────────────┘ │
└─────────┼───────────────────────────────────────────────────┘
│
│ 注册/心跳/事务协调
│
┌─────┴─────┬─────────────┬─────────────┐
↓ ↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│ TM │ │ RM1 │ │ RM2 │ │ RM3 │
│业务应用 │ │库存服务│ │订单服务│ │账户服务│
└───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘
│ │ │ │
│ ┌─────┘ │ │
│ ↓ │ │
│ 1.注册分支事务 │ │
│ 2.执行业务 │ │
│ 3.报告状态 │ │
│ │ │ │
↓ ↓ ↓ ↓
全局事务协调(2PC协议)
7.2 Seata配置与使用
# application.yml
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: my_tx_group
registry:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
config:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
client:
rm:
async-commit-buffer-limit: 10000
report-retry-count: 5
table-meta-check-enable: false
report-success-enable: false
saga-branch-register-enable: false
tm:
commit-retry-count: 5
rollback-retry-count: 5
default-global-transaction-timeout: 60000
undo:
data-validation: true
log-serialization: jackson
log-table: undo_log
// 业务服务使用Seata
@Service
public class BusinessService {
@Autowired
private StorageService storageService;
@Autowired
private OrderService orderService;
@Autowired
private AccountService accountService;
/**
* 创建订单 - 使用Seata AT模式
*/
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void purchase(PurchaseRequest request) {
log.info("开始全局事务, xid: {}", RootContext.getXID());
// 1. 扣减库存
storageService.deduct(request.getProductId(), request.getCount());
// 2. 创建订单
Order order = orderService.create(request);
// 3. 扣减账户余额
accountService.debit(request.getUserId(), request.getAmount());
// 任意步骤失败,Seata自动触发全局回滚
// 各服务的RM会根据undo_log进行补偿
log.info("全局事务提交成功, xid: {}", RootContext.getXID());
}
}
// 库存服务(RM)
@Service
public class StorageService {
@Autowired
private StorageMapper storageMapper;
/**
* 扣减库存 - 自动注册为分支事务
*/
public void deduct(Long productId, Integer count) {
log.info("扣减库存, xid: {}, productId: {}, count: {}",
RootContext.getXID(), productId, count);
int affected = storageMapper.decrease(productId, count);
if (affected == 0) {
throw new BusinessException("库存不足");
}
// Seata会自动:
// 1. 解析SQL,生成前后镜像
// 2. 插入undo_log
// 3. 提交本地事务
// 4. 向TC报告分支事务状态
}
}
实战案例:金融转账系统
案例1:跨行转账(TCC模式)
@Service
public class CrossBankTransferService {
@Autowired
private AccountTccAction accountTccAction;
@Autowired
private PaymentTccAction paymentTccAction;
@Autowired
private NotificationTccAction notificationTccAction;
@GlobalTransactional(name = "cross-bank-transfer", rollbackFor = Exception.class)
public void transfer(TransferRequest request) {
// 1. 冻结转出账户资金(Try)
accountTccAction.tryFreeze(request.getFromAccountId(), request.getAmount());
// 2. 调用跨行支付接口,冻结转入资金(Try)
paymentTccAction.tryTransfer(
request.getFromBankCode(),
request.getToBankCode(),
request.getToAccountId(),
request.getAmount()
);
// 3. 创建转账记录(Try)
TransferRecord record = createTransferRecord(request);
// 如果全部Try成功,自动执行Confirm
// - 实际扣减转出账户资金
// - 确认跨行转账
// - 更新转账记录状态
// 如果有异常,自动执行Cancel
// - 释放转出账户冻结资金
// - 取消跨行转账
// - 标记转账记录失败
}
}
案例2:订单-库存-支付一致性(Saga模式)
// Saga状态机定义
{
"startState": "CreateOrder",
"states": {
"CreateOrder": {
"type": "serviceTask",
"serviceName": "orderSagaService",
"methodName": "create",
"compensateState": "CancelOrder",
"next": "DeductInventory"
},
"DeductInventory": {
"type": "serviceTask",
"serviceName": "inventorySagaService",
"methodName": "deduct",
"compensateState": "RestoreInventory",
"next": "ProcessPayment"
},
"ProcessPayment": {
"type": "serviceTask",
"serviceName": "paymentSagaService",
"methodName": "process",
"compensateState": "RefundPayment",
"next": "SendNotification"
},
"SendNotification": {
"type": "serviceTask",
"serviceName": "notificationSagaService",
"methodName": "send",
"next": "Succeed"
},
"CancelOrder": {
"type": "serviceTask",
"serviceName": "orderSagaService",
"methodName": "cancel"
},
"RestoreInventory": {
"type": "serviceTask",
"serviceName": "inventorySagaService",
"methodName": "restore"
},
"RefundPayment": {
"type": "serviceTask",
"serviceName": "paymentSagaService",
"methodName": "refund"
},
"Succeed": {
"type": "succeed"
}
}
}
案例3:消息最终一致性(本地消息表)
@Service
public class OrderWithMessageService {
@Transactional
public void createOrderWithMessage(OrderRequest request) {
// 1. 创建订单
Order order = orderMapper.insert(request);
// 2. 记录消息(同一事务)
TransactionMessage message = new TransactionMessage();
message.setMessageId(order.getOrderNo());
message.setMessageType("ORDER_PAYMENT");
message.setMessageBody(JsonUtils.toJson(new PaymentRequest(order)));
message.setStatus(0);
messageMapper.insert(message);
// 事务提交后,定时任务会扫描并发送消息
}
}
@Component
public class PaymentMessageConsumer {
@KafkaListener(topics = "order-payment")
public void onPaymentMessage(ConsumerRecord<String, String> record) {
PaymentRequest request = JsonUtils.fromJson(record.value(), PaymentRequest.class);
// 调用支付网关
PaymentResult result = paymentGateway.charge(request);
if (result.isSuccess()) {
// 更新订单状态
orderMapper.updateStatus(request.getOrderId(), OrderStatus.PAID);
} else {
// 记录支付失败,进入人工处理流程
orderMapper.updateStatus(request.getOrderId(), OrderStatus.PAYMENT_FAILED);
}
}
}
性能测试数据
1. 不同分布式事务方案性能对比
| 方案 | TPS | 平均延迟 | 吞吐量 | 适用并发 |
|---|---|---|---|---|
| 本地事务 | 15,000 | 5ms | 高 | 单机 |
| 2PC/XA | 800 | 150ms | 低 | < 100 |
| TCC | 5,000 | 25ms | 高 | < 1000 |
| Saga | 4,500 | 30ms | 高 | < 1000 |
| 本地消息表 | 6,000 | 20ms | 高 | < 2000 |
| RocketMQ事务消息 | 7,000 | 15ms | 高 | < 3000 |
测试环境:3个服务节点,MySQL 8.0,网络延迟<1ms
2. Seata AT模式性能测试
| 场景 | 无Seata | 有Seata | 性能损耗 |
|---|---|---|---|
| 单服务单表 | 12,000 TPS | 10,500 TPS | 12.5% |
| 单服务多表 | 8,000 TPS | 6,500 TPS | 18.7% |
| 多服务调用 | 5,000 TPS | 3,800 TPS | 24% |
| 复杂长事务 | 2,000 TPS | 1,400 TPS | 30% |
3. 事务恢复时间测试
| 故障场景 | 自动恢复时间 | 人工介入比例 |
|---|---|---|
| 网络抖动 | < 3s | 0% |
| 服务重启 | 5-10s | 0% |
| 数据库故障 | 30-60s | 5% |
| 消息丢失 | 依赖重试策略 | 10% |
| 补偿失败 | 需人工处理 | 100% |
经验总结
✅ 最佳实践
-
能不用就不用
- 通过业务设计避免分布式事务
- 使用单服务内的事务 + 异步通知
- 将大事务拆分为小事务
-
最终一致性优先
- AP模式更适合互联网场景
- 用户可接受短暂不一致
- 通过补偿保证最终一致
-
异步化解耦
- 使用消息队列解耦服务
- 本地消息表或可靠消息
- 降低服务间耦合度
-
幂等设计
- 所有接口必须支持幂等
- 防止网络重试导致重复处理
- 使用唯一键或状态机去重
-
监控告警
- 事务状态全程监控
- 异常事务及时告警
- 补偿失败人工介入
-
降级策略
- 分布式事务框架故障时降级
- 使用本地事务 + 人工补偿
- 保证核心业务流程可用
❌ 常见错误
-
过度追求强一致性
// 错误:所有场景都用2PC @GlobalTransactional public void allUse2PC() { // 大部分场景不需要强一致 } // 正确:根据场景选择 // 核心金融交易:TCC或2PC // 普通业务:本地消息表 // 日志记录:直接发送,无需事务 -
忽略幂等性
// 错误:不处理重复消息 @KafkaListener(topics = "order") public void onOrderMessage(Order order) { inventoryService.deduct(order.getProductId(), order.getQuantity()); // 如果消息重复,库存会被多次扣减 } // 正确:幂等性处理 @KafkaListener(topics = "order") public void onOrderMessage(Order order) { // 检查是否已处理 if (processedOrderService.isProcessed(order.getOrderNo())) { return; } inventoryService.deduct(order.getProductId(), order.getQuantity()); processedOrderService.markAsProcessed(order.getOrderNo()); } -
事务范围过大
// 错误:整个业务流程在一个事务中 @GlobalTransactional public void bigTransaction() { createOrder(); // 10ms deductInventory(); // 20ms processPayment(); // 100ms(调用第三方支付) sendNotification(); // 50ms(发送短信) updateStatistics(); // 30ms // 总耗时210ms,锁持有时间长 } // 正确:拆分事务 @GlobalTransactional public void smallTransaction() { createOrder(); // 10ms deductInventory(); // 20ms // 总耗时30ms } // 其他操作异步处理 -
不处理补偿失败
// 错误:补偿操作不处理异常 public void cancel() { restoreInventory(); // 如果失败怎么办? } // 正确:补偿失败进入人工处理 public void cancel() { try { restoreInventory(); } catch (Exception e) { // 记录补偿失败,进入人工处理队列 compensationFailureService.record(context); alertService.sendAlert("补偿失败,需人工处理"); } } -
缺少事务监控
// 错误:没有监控 @GlobalTransactional public void noMonitor() { // 事务执行中... } // 正确:完善的监控 @GlobalTransactional public void withMonitor() { String xid = RootContext.getXID(); metricsService.recordTransactionStart(xid); try { // 事务执行... metricsService.recordTransactionSuccess(xid); } catch (Exception e) { metricsService.recordTransactionFailure(xid, e); throw e; } }
决策树:如何选择分布式事务方案
┌─────────────────────────────────────┐
│ 业务场景分析 │
└─────────────────┬───────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 强一致性要求 │ │ 最终一致性 │ │ 简单异步场景 │
│ 金融/支付 │ │ 电商/订单 │ │ 日志/通知 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ TCC模式 │ │ Saga模式 │ │ 本地消息表 │
│ 或2PC/XA │ │ 或可靠消息 │ │ 或直接发送 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Seata TCC │ │ Seata Saga │ │ RocketMQ │
│ ByteTCC │ │ RocketMQ事务 │ │ 自研消息表 │
└───────────────┘ └───────────────┘ └───────────────┘
检查清单
分布式事务设计检查清单
- 是否必须通过分布式事务解决?
- 选择的方案是否符合业务一致性要求?
- 所有参与方是否都实现了幂等性?
- 补偿操作是否考虑了失败情况?
- 是否有完善的事务监控?
- 是否设置了合理的超时时间?
- 是否有降级方案?
- 是否测试了各种故障场景?
- 是否有补偿失败的人工处理流程?
- 是否记录了完整的事务日志?
分布式事务故障排查清单
- 网络是否正常?
- 各服务是否都注册了TC?
- 数据库undo_log表是否存在?
- 事务超时时间是否合理?
- 是否有锁冲突?
- 补偿操作是否执行成功?
- 消息是否被正确消费?
- 是否存在幂等性问题?
- 事务状态是否一致?
- 是否有死锁情况?
系列上一篇:事务隔离级别与并发问题深度剖析
系列下一篇:高并发场景下的数据库应对策略
知识点测试
读完文章了?来测试一下你对知识点的掌握程度吧!
评论区
使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。
如果评论系统无法加载,请确保:
- 您的网络可以访问 GitHub
- giscus GitHub App 已安装到仓库
- 仓库已启用 Discussions 功能