返回 筑基・数据元府藏真

分布式事务实战:从本地到分布式的一致性

博主
大约 29 分钟

分布式事务实战:从本地到分布式的一致性

问题引入:跨服务转账难题

去年我们上线了一个金融转账系统,用户可以在不同银行之间进行实时转账。系统架构采用了微服务设计:账户服务、支付服务、通知服务分别部署。上线第一周就出现了一个严重问题:

// 用户A从工商银行转账10000元到用户B的建设银行账户
@Service
public class TransferService {
    
    @Autowired
    private AccountService accountService;  // 账户服务(工商银行数据库)
    
    @Autowired
    private PaymentService paymentService;  // 支付服务(跨行清算)
    
    @Autowired
    private NotificationService notificationService;  // 通知服务
    
    public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
        // 1. 扣减A的余额(工商银行)
        accountService.decreaseBalance(fromUserId, amount);  // 成功
        
        // 2. 增加B的余额(建设银行)
        paymentService.transferToOtherBank(toUserId, amount);  // 网络超时,状态未知
        
        // 3. 发送短信通知
        notificationService.sendTransferNotification(toUserId, amount);  // 失败
        
        // 问题:
        // - A的钱已经扣了,B是否收到不确定
        // - 通知失败,转账是否回滚?
        // - 如果回滚,A的扣款如何恢复?
    }
}

实际故障:转账接口超时后,用户A看到扣款成功,但用户B迟迟未收到款项。客服查询发现:工商银行已扣款,但建设银行未入账,资金"丢失"在系统中。这次故障让我深刻理解了分布式事务的复杂性。

现象描述:分布式系统的数据不一致

案例1:订单创建与库存扣减不一致

场景:电商下单流程

// 问题代码:订单创建成功,但库存未扣减
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;  // 订单库
    
    @Autowired
    private InventoryService inventoryService;  // 库存服务(远程调用)
    
    @Transactional  // 本地事务
    public void createOrder(OrderRequest request) {
        // 1. 创建订单(本地事务)
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);  // 订单创建成功
        
        // 2. 扣减库存(远程调用)
        inventoryService.deduct(request.getProductId(), request.getQuantity());  // 网络异常,失败
        
        // 结果:订单创建成功,库存未扣减 → 超卖!
    }
}

影响:订单系统显示有货,实际库存已空,导致大量超卖订单。

案例2:分布式锁与事务的冲突

场景:秒杀活动

@Service
public class SeckillService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private OrderService orderService;
    
    public void seckill(Long userId, Long productId) {
        String lockKey = "seckill:" + productId;
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
        
        if (locked) {
            try {
                // 检查库存
                Integer stock = getStock(productId);
                if (stock > 0) {
                    // 创建订单
                    orderService.createOrder(userId, productId);  // 事务内
                    // 扣减库存
                    decreaseStock(productId);  // 事务外
                }
            } finally {
                redisTemplate.delete(lockKey);
            }
        }
    }
}

问题

  1. 分布式锁在事务外,事务提交前锁已释放
  2. 其他线程可能读到未提交的库存数据
  3. 事务回滚后,库存扣减无法恢复

案例3:消息发送与业务操作不一致

场景:订单完成后发送消息

@Service
public class OrderCompletionService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional
    public void completeOrder(Long orderId) {
        // 1. 更新订单状态
        orderMapper.updateStatus(orderId, OrderStatus.COMPLETED);
        
        // 2. 发送完成消息
        kafkaTemplate.send("order-completed", orderId.toString());  // 消息发送成功
        
        // 3. 事务回滚(后续代码异常)
        throw new RuntimeException("系统异常");
        
        // 结果:订单状态回滚,但消息已发送
        // 下游服务收到消息,处理了一个不存在的订单
    }
}

影响:订单系统显示订单未完成,但物流系统已发货,财务系统已结算。

原因分析:分布式事务的理论基础

1. CAP理论深度解析

CAP理论:在分布式系统中,一致性(Consistency)、可用性(Availability)、
分区容错性(Partition Tolerance)三者不可兼得,最多同时满足两项。

┌─────────────────────────────────────────────────────────────┐
│                      CAP三角关系                            │
│                                                             │
│                    ┌─────────────┐                          │
│                   /   一致性(C)   \                         │
│                  /    Consistency \                        │
│                 /                   \                       │
│                /                     \                      │
│         ┌─────┴─────┐           ┌─────┴─────┐              │
│         │  可用性(A) │           │ 分区容错性 │              │
│         │Availability│           │Partition   │              │
│         │           │           │Tolerance   │              │
│          \         /             \         /               │
│           \       /               \       /                │
│            \     /                 \     /                 │
│             \   /                   \   /                  │
│              \ /                     \ /                   │
│               v                       v                    │
│        ┌─────────────┐         ┌─────────────┐             │
│        │   CP系统    │         │   AP系统    │             │
│        │ Zookeeper   │         │  Cassandra  │             │
│        │   etcd      │         │   Eureka    │             │
│        │   HBase     │         │  DynamoDB   │             │
│        └─────────────┘         └─────────────┘             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.1 一致性(Consistency)

强一致性:所有节点在同一时间看到的数据完全一致
┌─────────┐    写操作    ┌─────────┐
│  主节点  │ ──────────→ │  从节点1 │  立即同步,读取一致
│  (A=10)  │             │  (A=10)  │
└─────────┘             └─────────┘
                              ↓
                         ┌─────────┐
                         │  从节点2 │  立即同步,读取一致
                         │  (A=10)  │
                         └─────────┘

最终一致性:数据最终会一致,但存在不一致窗口
┌─────────┐    写操作    ┌─────────┐
│  主节点  │ ──────────→ │  从节点1 │  异步同步,可能读到旧值
│  (A=10)  │   异步复制   │  (A=5)   │  (不一致窗口)
└─────────┘             └─────────┘
                              ↓
                         ┌─────────┐
                         │  从节点2 │  异步同步,可能读到旧值
                         │  (A=5)   │
                         └─────────┘

1.2 可用性(Availability)

高可用性:每个请求都能在合理时间内获得响应
┌─────────┐         ┌─────────┐
│  客户端  │ ──────→ │  节点A  │  正常响应
│         │         │ (可用)   │
│         │ ←────── └─────────┘
│         │
│         │ ──────→ ┌─────────┐
│         │         │  节点B  │  节点A故障,节点B接管
│         │ ←────── │ (可用)   │
│         │         └─────────┘
└─────────┘

1.3 分区容错性(Partition Tolerance)

分区容错:网络分区时系统仍能运行

网络分区前:
┌─────────┐         ┌─────────┐
│  节点A  │ ←─────→ │  节点B  │
│  (A=10)  │         │  (A=10)  │
└─────────┘         └─────────┘

网络分区后(A和B之间网络中断):
┌─────────┐    X    ┌─────────┐
│  节点A  │ ←─X─→ │  节点B  │
│  (A=10)  │  网络  │  (A=10)  │
└─────────┘  中断  └─────────┘

CP选择:拒绝写入,保证一致性(等待网络恢复)
AP选择:允许写入,保证可用性(牺牲一致性)

2. BASE理论

BASE理论:Basically Available(基本可用)、Soft state(软状态)、
Eventually consistent(最终一致性)

是对CAP中AP方案的延伸,强调通过牺牲强一致性来获得可用性。

基本可用(Basically Available):
- 响应时间延长:正常0.5秒,故障时2秒
- 功能降级:只读不写,或只写核心功能

软状态(Soft State):
- 允许中间状态存在
- 数据副本之间可以暂时不一致

最终一致性(Eventually Consistent):
┌─────────┐    写操作    ┌─────────┐
│  主节点  │ ──────────→ │  从节点  │
│  (A=10)  │   异步复制   │  (A=5)   │  不一致窗口
└─────────┘             └─────────┘
                              ↓ 时间推移
                         ┌─────────┐
                         │  从节点  │
                         │  (A=10)  │  最终一致
                         └─────────┘

3. 分布式事务的挑战

分布式事务面临的核心挑战:

1. 网络不可靠
   - 请求丢失:调用方未收到请求
   - 响应丢失:服务方已处理,但调用方未收到响应
   - 网络延迟:响应时间不确定

2. 节点故障
   - 单点故障:某个服务节点宕机
   - 脑裂:网络分区导致多个主节点
   - 数据不一致:故障恢复后数据冲突

3. 数据一致性
   - 强一致性:所有节点数据实时一致(难以实现)
   - 最终一致性:允许短暂不一致,最终达到一致(主流方案)

4. 性能问题
   - 两阶段提交:同步阻塞,性能低下
   - 三阶段提交:减少阻塞,但复杂度增加

解决方案:分布式事务实现方案

1. 分布式事务方案对比

方案一致性级别性能复杂度适用场景代表框架
2PC强一致传统分布式事务XA协议
3PC强一致高可用场景较少使用
TCC最终一致高并发金融场景ByteTCC、TCC-Transaction
Saga最终一致长事务业务Seata Saga、Axon
本地消息表最终一致异步场景自研方案
可靠消息最终一致主流方案RocketMQ事务消息
最大努力通知最终一致对一致性要求低支付回调

2. 两阶段提交(2PC)

2.1 2PC原理

两阶段提交(Two-Phase Commit):

阶段一:准备阶段(Prepare Phase)
┌──────────┐              ┌──────────┐              ┌──────────┐
│  协调者   │ ──Prepare──→ │ 参与者A  │              │ 参与者B  │
│          │              │  执行本地  │              │  执行本地  │
│          │ ←─Yes/No──── │  事务,锁  │              │  事务,锁  │
│          │              │  资源,不  │              │  资源,不  │
│          │ ──Prepare──→ │  提交      │              │  提交      │
│          │              └──────────┘              │  执行本地  │
│          │ ←─Yes/No───────────────────────────────│  事务,锁  │
│          │                                        │  资源      │
└──────────┘                                        └──────────┘

阶段二:提交阶段(Commit Phase)
如果所有参与者返回Yes:
┌──────────┐              ┌──────────┐              ┌──────────┐
│  协调者   │ ──Commit───→ │ 参与者A  │              │ 参与者B  │
│          │              │  提交本地  │              │  提交本地  │
│          │ ←─Ack─────── │  事务      │              │  事务      │
│          │              └──────────┘              │  提交本地  │
│          │ ──Commit───→                          │  事务      │
│          │ ←─Ack──────────────────────────────────└──────────┘
└──────────┘

如果有参与者返回No:
┌──────────┐              ┌──────────┐              ┌──────────┐
│  协调者   │ ──Rollback─→ │ 参与者A  │              │ 参与者B  │
│          │              │  回滚本地  │              │  回滚本地  │
│          │ ←─Ack─────── │  事务      │              │  事务      │
│          │              └──────────┘              │  回滚本地  │
│          │ ──Rollback─→                          │  事务      │
│          │ ←─Ack──────────────────────────────────└──────────┘
└──────────┘

2.2 2PC的问题

2PC的主要问题:

1. 同步阻塞
   - 所有参与者在等待协调者指令时,资源被锁定
   - 协调者宕机,参与者一直阻塞

2. 单点故障
   - 协调者是单点,宕机后整个事务无法完成
   - 参与者等待协调者恢复

3. 数据不一致
   - 协调者在发送Commit指令后宕机,部分参与者收到,部分未收到
   - 导致数据不一致

4. 性能问题
   - 两次网络往返
   - 资源锁定时间长

2.3 Java实现2PC(XA协议)

// XA数据源配置
@Configuration
public class XADataSourceConfig {
    
    @Bean
    public UserTransactionManager userTransactionManager() {
        return new UserTransactionManager();
    }
    
    @Bean
    public JtaTransactionManager jtaTransactionManager() {
        return new JtaTransactionManager(
            userTransactionManager(), 
            userTransactionManager()
        );
    }
    
    // XA数据源1
    @Bean
    public XADataSource xaDataSource1() {
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setUrl("jdbc:mysql://localhost:3306/db1");
        xaDataSource.setUser("root");
        xaDataSource.setPassword("password");
        return xaDataSource;
    }
    
    // XA数据源2
    @Bean
    public XADataSource xaDataSource2() {
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setUrl("jdbc:mysql://localhost:3306/db2");
        xaDataSource.setUser("root");
        xaDataSource.setPassword("password");
        return xaDataSource;
    }
}

// 使用XA事务
@Service
public class XaTransferService {
    
    @Autowired
    private JdbcTemplate jdbcTemplate1;  // db1
    
    @Autowired
    private JdbcTemplate jdbcTemplate2;  // db2
    
    @Transactional(transactionManager = "jtaTransactionManager")
    public void transfer(Long fromId, Long toId, BigDecimal amount) {
        // 扣减db1账户
        jdbcTemplate1.update(
            "UPDATE account SET balance = balance - ? WHERE id = ?",
            amount, fromId
        );
        
        // 增加db2账户
        jdbcTemplate2.update(
            "UPDATE account SET balance = balance + ? WHERE id = ?",
            amount, toId
        );
        
        // XA事务管理器自动处理2PC
    }
}

3. TCC模式(Try-Confirm-Cancel)

3.1 TCC原理

TCC模式:将每个操作拆分为三个阶段

Try阶段:预留资源,执行业务检查
┌──────────┐              ┌──────────┐              ┌──────────┐
│  业务服务 │ ──Try──────→ │ 库存服务  │              │ 优惠券服务│
│          │              │ 冻结库存  │              │ 冻结优惠券│
│          │              │ (预留资源)│              │ (预留资源)│
└──────────┘              └──────────┘              └──────────┘

Confirm阶段:确认执行,使用预留资源
┌──────────┐              ┌──────────┐              ┌──────────┐
│  业务服务 │ ──Confirm──→ │ 库存服务  │              │ 优惠券服务│
│          │              │ 扣减库存  │              │ 使用优惠券│
│          │              │ (确认操作)│              │ (确认操作)│
└──────────┘              └──────────┘              └──────────┘

Cancel阶段:取消执行,释放预留资源
┌──────────┐              ┌──────────┐              ┌──────────┐
│  业务服务 │ ──Cancel───→ │ 库存服务  │              │ 优惠券服务│
│          │              │ 释放库存  │              │ 释放优惠券│
│          │              │ (回滚操作)│              │ (回滚操作)│
└──────────┘              └──────────┘              └──────────┘

3.2 TCC实现

// TCC接口定义
public interface InventoryTccAction {
    
    @TwoPhaseBusinessAction(name = "inventoryDeductTcc", 
                           useTCCFence = true)
    boolean tryDeduct(@BusinessActionContextParameter(paramName = "productId") Long productId,
                     @BusinessActionContextParameter(paramName = "quantity") Integer quantity);
    
    boolean confirm(BusinessActionContext context);
    
    boolean cancel(BusinessActionContext context);
}

// TCC实现
@Component
public class InventoryTccActionImpl implements InventoryTccAction {
    
    @Autowired
    private InventoryMapper inventoryMapper;
    
    @Autowired
    private InventoryFreezeMapper freezeMapper;
    
    @Override
    @Transactional
    public boolean tryDeduct(Long productId, Integer quantity) {
        // 1. 检查库存
        Inventory inventory = inventoryMapper.selectById(productId);
        if (inventory == null || inventory.getStock() < quantity) {
            throw new BusinessException("库存不足");
        }
        
        // 2. 检查是否已冻结(幂等性)
        String xid = RootContext.getXID();
        InventoryFreeze freeze = freezeMapper.selectByXid(xid);
        if (freeze != null) {
            return true; // 已处理过
        }
        
        // 3. 冻结库存
        freezeMapper.insert(new InventoryFreeze(xid, productId, quantity, FreezeStatus.TRY));
        
        // 4. 扣减可用库存
        int affected = inventoryMapper.decreaseStock(productId, quantity);
        if (affected == 0) {
            throw new BusinessException("扣减库存失败");
        }
        
        return true;
    }
    
    @Override
    @Transactional
    public boolean confirm(BusinessActionContext context) {
        String xid = context.getXid();
        
        // 1. 查询冻结记录
        InventoryFreeze freeze = freezeMapper.selectByXid(xid);
        if (freeze == null) {
            return true; // 已处理或无记录
        }
        
        // 2. 幂等性检查
        if (freeze.getStatus() == FreezeStatus.CONFIRMED) {
            return true;
        }
        
        // 3. 更新冻结状态为已确认
        freezeMapper.updateStatus(xid, FreezeStatus.CONFIRMED);
        
        // 4. 实际扣减库存(在try阶段已扣减,这里可以记录日志或更新其他状态)
        // 实际业务中可能需要将冻结库存转为实际扣减
        
        return true;
    }
    
    @Override
    @Transactional
    public boolean cancel(BusinessActionContext context) {
        String xid = context.getXid();
        
        // 1. 查询冻结记录
        InventoryFreeze freeze = freezeMapper.selectByXid(xid);
        if (freeze == null) {
            return true; // 无冻结记录,空回滚
        }
        
        // 2. 幂等性检查
        if (freeze.getStatus() == FreezeStatus.CANCELLED) {
            return true;
        }
        
        // 3. 恢复库存
        inventoryMapper.increaseStock(freeze.getProductId(), freeze.getQuantity());
        
        // 4. 更新冻结状态为已取消
        freezeMapper.updateStatus(xid, FreezeStatus.CANCELLED);
        
        return true;
    }
}

// 业务服务使用TCC
@Service
public class OrderTccService {
    
    @Autowired
    private InventoryTccAction inventoryTccAction;
    
    @Autowired
    private CouponTccAction couponTccAction;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @GlobalTransactional(name = "create-order-tcc", rollbackFor = Exception.class)
    public Order createOrder(OrderRequest request) {
        // 1. 创建订单(本地事务)
        Order order = new Order();
        order.setOrderNo(generateOrderNo());
        order.setUserId(request.getUserId());
        order.setStatus(OrderStatus.PENDING);
        orderMapper.insert(order);
        
        // 2. 调用TCC - 扣减库存(Try阶段)
        inventoryTccAction.tryDeduct(request.getProductId(), request.getQuantity());
        
        // 3. 调用TCC - 使用优惠券(Try阶段)
        if (request.getCouponId() != null) {
            couponTccAction.tryUse(request.getCouponId());
        }
        
        // 4. 如果所有Try成功,Seata自动调用Confirm
        // 5. 如果有异常,Seata自动调用Cancel
        
        return order;
    }
}

4. Saga模式

4.1 Saga原理

Saga模式:将长事务拆分为多个本地事务,每个本地事务有对应的补偿操作

正向流程:
T1 → T2 → T3 → T4 → ... → Tn

补偿流程(T3失败):
T1 → T2 → T3(失败) → C2 → C1

示例:出行预订
┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│ 订机票   │ → │ 订酒店   │ → │ 租车    │ → │ 完成    │
│  (T1)   │    │  (T2)   │    │  (T3)   │    │         │
└─────────┘    └─────────┘    └─────────┘    └─────────┘

如果租车失败:
┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│ 订机票   │ → │ 订酒店   │ → │ 租车失败 │ → │ 取消酒店 │ → │ 取消机票 │
│  (T1)   │    │  (T2)   │    │  (T3)   │    │  (C2)   │    │  (C1)   │
└─────────┘    └─────────┘    └─────────┘    └─────────┘    └─────────┘

4.2 Saga实现(Seata Saga)

// Saga状态机定义(JSON)
{
    "startState": "BookFlight",
    "states": {
        "BookFlight": {
            "type": "serviceTask",
            "serviceName": "bookFlightService",
            "methodName": "book",
            "input": [
                "$.[request]"
            ],
            "output": "$.#root",
            "next": "BookHotel"
        },
        "BookHotel": {
            "type": "serviceTask",
            "serviceName": "bookHotelService",
            "methodName": "book",
            "input": [
                "$.[request]"
            ],
            "output": "$.#root",
            "next": "BookCar"
        },
        "BookCar": {
            "type": "serviceTask",
            "serviceName": "bookCarService",
            "methodName": "book",
            "input": [
                "$.[request]"
            ],
            "output": "$.#root",
            "next": "Succeed"
        },
        "Succeed": {
            "type": "succeed"
        },
        "CompensateBookHotel": {
            "type": "serviceTask",
            "serviceName": "bookHotelService",
            "methodName": "cancel",
            "compensateState": "CompensateBookFlight"
        },
        "CompensateBookFlight": {
            "type": "serviceTask",
            "serviceName": "bookFlightService",
            "methodName": "cancel"
        }
    }
}

// Saga服务实现
@Service
public class BookFlightService {
    
    @Autowired
    private FlightBookingMapper bookingMapper;
    
    // 正向操作
    public BookingResult book(BookingRequest request) {
        // 预订机票
        FlightBooking booking = new FlightBooking();
        booking.setOrderId(request.getOrderId());
        booking.setFlightNo(request.getFlightNo());
        booking.setStatus(BookingStatus.BOOKED);
        bookingMapper.insert(booking);
        
        return new BookingResult(booking.getId(), true);
    }
    
    // 补偿操作
    public boolean cancel(BookingResult result) {
        // 取消机票预订
        bookingMapper.updateStatus(result.getBookingId(), BookingStatus.CANCELLED);
        return true;
    }
}

// 使用Saga
@Service
public class TravelBookingService {
    
    @Autowired
    private StateMachineEngine stateMachineEngine;
    
    public void bookTravel(TravelRequest request) {
        Map<String, Object> params = new HashMap<>();
        params.put("request", request);
        
        // 启动Saga状态机
        StateMachineInstance inst = stateMachineEngine.start(
            "travelBookingSaga",  // 状态机名称
            null,                  // 当前登录用户
            params                 // 初始参数
        );
        
        if (!inst.getStatus().equals(ExecutionStatus.SU)) {
            throw new BusinessException("预订失败: " + inst.getException().getMessage());
        }
    }
}

5. 本地消息表

5.1 本地消息表原理

本地消息表:将分布式事务拆分为本地事务 + 消息发送

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   业务表     │     │   消息表     │     │   消息服务   │
│  (订单表)   │     │ (待发送消息) │     │  (MQ发送)   │
└──────┬──────┘     └──────┬──────┘     └──────┬──────┘
       │                   │                   │
       │  1.本地事务        │                   │
       │ ┌───────────────┐ │                   │
       │ │ INSERT 订单    │ │                   │
       │ │ INSERT 消息    │ │                   │
       └─┤ (同库同事务)   ├─┘                   │
         │ COMMIT        │                     │
         └───────────────┘                     │
                                               │
                      2.定时扫描消息表          │
                      ┌──────────────┐        │
                      │ 查询待发送消息 │        │
                      │ 发送到MQ      │────────┘
                      │ 更新消息状态  │
                      └──────────────┘
                                               │
                      3.消费消息               │
                      ┌──────────────┐        │
                      │ 消费MQ消息   │←───────┘
                      │ 执行业务逻辑 │
                      │ 确认消费成功 │
                      └──────────────┘

5.2 本地消息表实现

// 消息表结构
CREATE TABLE transaction_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一标识',
    message_type VARCHAR(32) NOT NULL COMMENT '消息类型',
    message_body TEXT NOT NULL COMMENT '消息内容',
    status TINYINT NOT NULL DEFAULT 0 COMMENT '状态:0-待发送,1-已发送,2-消费成功,3-消费失败',
    retry_count INT DEFAULT 0 COMMENT '重试次数',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_time (status, create_time)
) ENGINE=InnoDB COMMENT='事务消息表';

// 业务服务
@Service
public class OrderServiceWithMessage {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private TransactionMessageMapper messageMapper;
    
    @Transactional
    public void createOrder(OrderRequest request) {
        // 1. 创建订单
        Order order = new Order();
        order.setOrderNo(generateOrderNo());
        order.setUserId(request.getUserId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);
        
        // 2. 创建消息(同一事务)
        TransactionMessage message = new TransactionMessage();
        message.setMessageId(UUID.randomUUID().toString());
        message.setMessageType("ORDER_CREATED");
        message.setMessageBody(JsonUtils.toJson(new OrderCreatedEvent(order)));
        message.setStatus(0); // 待发送
        messageMapper.insert(message);
        
        // 3. 事务提交后,订单和消息同时写入数据库
    }
}

// 消息发送服务
@Component
public class MessageSender {
    
    @Autowired
    private TransactionMessageMapper messageMapper;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    // 定时任务扫描待发送消息
    @Scheduled(fixedRate = 5000)
    public void scanAndSendMessages() {
        // 查询待发送消息(5分钟前的,避免与业务事务冲突)
        List<TransactionMessage> messages = messageMapper.selectPendingMessages(5);
        
        for (TransactionMessage message : messages) {
            try {
                // 发送消息到MQ
                kafkaTemplate.send(
                    getTopic(message.getMessageType()),
                    message.getMessageId(),
                    message.getMessageBody()
                ).get(3, TimeUnit.SECONDS); // 同步等待发送结果
                
                // 更新消息状态为已发送
                messageMapper.updateStatus(message.getId(), 1);
                
            } catch (Exception e) {
                log.error("消息发送失败: {}", message.getMessageId(), e);
                // 增加重试次数
                messageMapper.increaseRetryCount(message.getId());
                
                // 超过最大重试次数,标记为死信
                if (message.getRetryCount() >= 10) {
                    messageMapper.updateStatus(message.getId(), 4); // 死信状态
                }
            }
        }
    }
}

// 消息消费服务
@Component
public class OrderEventConsumer {
    
    @KafkaListener(topics = "order-created", groupId = "inventory-service")
    public void onOrderCreated(ConsumerRecord<String, String> record) {
        String messageId = record.key();
        String messageBody = record.value();
        
        try {
            OrderCreatedEvent event = JsonUtils.fromJson(messageBody, OrderCreatedEvent.class);
            
            // 幂等性检查
            if (processedMessageService.isProcessed(messageId)) {
                log.info("消息已处理,跳过: {}", messageId);
                return;
            }
            
            // 执行业务逻辑
            inventoryService.deductStock(event.getProductId(), event.getQuantity());
            
            // 标记消息已处理
            processedMessageService.markAsProcessed(messageId);
            
        } catch (Exception e) {
            log.error("消息消费失败: {}", messageId, e);
            // 抛出异常,触发重试
            throw e;
        }
    }
}

6. 可靠消息(RocketMQ事务消息)

6.1 RocketMQ事务消息原理

RocketMQ事务消息实现:

┌──────────┐              ┌──────────┐              ┌──────────┐
│  生产者   │              │  Broker  │              │  消费者   │
└────┬─────┘              └────┬─────┘              └────┬─────┘
     │                         │                         │
     │  1.发送半消息            │                         │
     │ ─────────────────────→  │                         │
     │                         │  存储半消息              │
     │  2.执行本地事务          │                         │
     │ ┌───────────────────┐   │                         │
     │ │ 执行业务逻辑       │   │                         │
     │ │ 提交本地事务       │   │                         │
     │ └───────────────────┘   │                         │
     │                         │                         │
     │  3.发送确认消息          │                         │
     │ ─────────────────────→  │                         │
     │  (Commit/Rollback)     │  确认或删除半消息         │
     │                         │                         │
     │                         │  4.回查(如果未收到确认)  │
     │ ←─────────────────────  │                         │
     │  检查本地事务状态        │                         │
     │ ─────────────────────→  │                         │
     │                         │                         │
     │                         │  5.投递消息              │
     │                         │ ─────────────────────→  │
     │                         │                         │  消费消息

6.2 RocketMQ事务消息实现

// RocketMQ事务消息生产者
@Component
public class OrderTransactionProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private OrderService orderService;
    
    public void sendOrderCreatedMessage(OrderRequest request) {
        // 创建消息
        Message<OrderRequest> message = MessageBuilder
            .withPayload(request)
            .setHeader("KEYS", request.getOrderNo())
            .build();
        
        // 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-producer-group",
            "order-created-topic",
            message,
            request  // 本地事务参数
        );
        
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            throw new BusinessException("订单创建失败");
        }
    }
}

// 事务监听器
@RocketMQTransactionListener(txProducerGroup = "order-producer-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private OrderMapper orderMapper;
    
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            OrderRequest request = (OrderRequest) arg;
            
            // 执行本地事务:创建订单
            Order order = orderService.createOrder(request);
            
            // 本地事务成功,提交消息
            return RocketMQLocalTransactionState.COMMIT_MESSAGE;
            
        } catch (Exception e) {
            log.error("本地事务执行失败", e);
            // 本地事务失败,回滚消息
            return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    /**
     * 回查本地事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            // 从消息中解析订单号
            String orderNo = new String(msg.getBody());
            
            // 查询订单是否存在
            Order order = orderMapper.selectByOrderNo(orderNo);
            
            if (order != null) {
                // 订单存在,本地事务成功
                return RocketMQLocalTransactionState.COMMIT_MESSAGE;
            } else {
                // 订单不存在,本地事务失败
                return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
            }
            
        } catch (Exception e) {
            log.error("回查本地事务状态失败", e);
            // 回查失败,返回未知,等待下次回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

7. Seata框架实战

7.1 Seata架构

Seata架构:
┌─────────────────────────────────────────────────────────────┐
│                        Seata Server                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  TC Server  │  │  事务协调器  │  │  全局事务管理│         │
│  │ (Transaction│  │             │  │             │         │
│  │  Coordinator)│  │             │  │             │         │
│  └──────┬──────┘  └─────────────┘  └─────────────┘         │
└─────────┼───────────────────────────────────────────────────┘
          │
          │  注册/心跳/事务协调
          │
    ┌─────┴─────┬─────────────┬─────────────┐
    ↓           ↓             ↓             ↓
┌───────┐  ┌───────┐    ┌───────┐    ┌───────┐
│  TM   │  │  RM1  │    │  RM2  │    │  RM3  │
│业务应用 │  │库存服务│    │订单服务│    │账户服务│
└───┬───┘  └───┬───┘    └───┬───┘    └───┬───┘
    │          │            │            │
    │    ┌─────┘            │            │
    │    ↓                  │            │
    │  1.注册分支事务        │            │
    │  2.执行业务           │            │
    │  3.报告状态           │            │
    │    │                  │            │
    ↓    ↓                  ↓            ↓
  全局事务协调(2PC协议)

7.2 Seata配置与使用

# application.yml
seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_tx_group
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
  config:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
  client:
    rm:
      async-commit-buffer-limit: 10000
      report-retry-count: 5
      table-meta-check-enable: false
      report-success-enable: false
      saga-branch-register-enable: false
    tm:
      commit-retry-count: 5
      rollback-retry-count: 5
      default-global-transaction-timeout: 60000
    undo:
      data-validation: true
      log-serialization: jackson
      log-table: undo_log
// 业务服务使用Seata
@Service
public class BusinessService {
    
    @Autowired
    private StorageService storageService;
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private AccountService accountService;
    
    /**
     * 创建订单 - 使用Seata AT模式
     */
    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
    public void purchase(PurchaseRequest request) {
        log.info("开始全局事务, xid: {}", RootContext.getXID());
        
        // 1. 扣减库存
        storageService.deduct(request.getProductId(), request.getCount());
        
        // 2. 创建订单
        Order order = orderService.create(request);
        
        // 3. 扣减账户余额
        accountService.debit(request.getUserId(), request.getAmount());
        
        // 任意步骤失败,Seata自动触发全局回滚
        // 各服务的RM会根据undo_log进行补偿
        
        log.info("全局事务提交成功, xid: {}", RootContext.getXID());
    }
}

// 库存服务(RM)
@Service
public class StorageService {
    
    @Autowired
    private StorageMapper storageMapper;
    
    /**
     * 扣减库存 - 自动注册为分支事务
     */
    public void deduct(Long productId, Integer count) {
        log.info("扣减库存, xid: {}, productId: {}, count: {}", 
                 RootContext.getXID(), productId, count);
        
        int affected = storageMapper.decrease(productId, count);
        if (affected == 0) {
            throw new BusinessException("库存不足");
        }
        
        // Seata会自动:
        // 1. 解析SQL,生成前后镜像
        // 2. 插入undo_log
        // 3. 提交本地事务
        // 4. 向TC报告分支事务状态
    }
}

实战案例:金融转账系统

案例1:跨行转账(TCC模式)

@Service
public class CrossBankTransferService {
    
    @Autowired
    private AccountTccAction accountTccAction;
    
    @Autowired
    private PaymentTccAction paymentTccAction;
    
    @Autowired
    private NotificationTccAction notificationTccAction;
    
    @GlobalTransactional(name = "cross-bank-transfer", rollbackFor = Exception.class)
    public void transfer(TransferRequest request) {
        // 1. 冻结转出账户资金(Try)
        accountTccAction.tryFreeze(request.getFromAccountId(), request.getAmount());
        
        // 2. 调用跨行支付接口,冻结转入资金(Try)
        paymentTccAction.tryTransfer(
            request.getFromBankCode(),
            request.getToBankCode(),
            request.getToAccountId(),
            request.getAmount()
        );
        
        // 3. 创建转账记录(Try)
        TransferRecord record = createTransferRecord(request);
        
        // 如果全部Try成功,自动执行Confirm
        // - 实际扣减转出账户资金
        // - 确认跨行转账
        // - 更新转账记录状态
        
        // 如果有异常,自动执行Cancel
        // - 释放转出账户冻结资金
        // - 取消跨行转账
        // - 标记转账记录失败
    }
}

案例2:订单-库存-支付一致性(Saga模式)

// Saga状态机定义
{
    "startState": "CreateOrder",
    "states": {
        "CreateOrder": {
            "type": "serviceTask",
            "serviceName": "orderSagaService",
            "methodName": "create",
            "compensateState": "CancelOrder",
            "next": "DeductInventory"
        },
        "DeductInventory": {
            "type": "serviceTask",
            "serviceName": "inventorySagaService",
            "methodName": "deduct",
            "compensateState": "RestoreInventory",
            "next": "ProcessPayment"
        },
        "ProcessPayment": {
            "type": "serviceTask",
            "serviceName": "paymentSagaService",
            "methodName": "process",
            "compensateState": "RefundPayment",
            "next": "SendNotification"
        },
        "SendNotification": {
            "type": "serviceTask",
            "serviceName": "notificationSagaService",
            "methodName": "send",
            "next": "Succeed"
        },
        "CancelOrder": {
            "type": "serviceTask",
            "serviceName": "orderSagaService",
            "methodName": "cancel"
        },
        "RestoreInventory": {
            "type": "serviceTask",
            "serviceName": "inventorySagaService",
            "methodName": "restore"
        },
        "RefundPayment": {
            "type": "serviceTask",
            "serviceName": "paymentSagaService",
            "methodName": "refund"
        },
        "Succeed": {
            "type": "succeed"
        }
    }
}

案例3:消息最终一致性(本地消息表)

@Service
public class OrderWithMessageService {
    
    @Transactional
    public void createOrderWithMessage(OrderRequest request) {
        // 1. 创建订单
        Order order = orderMapper.insert(request);
        
        // 2. 记录消息(同一事务)
        TransactionMessage message = new TransactionMessage();
        message.setMessageId(order.getOrderNo());
        message.setMessageType("ORDER_PAYMENT");
        message.setMessageBody(JsonUtils.toJson(new PaymentRequest(order)));
        message.setStatus(0);
        messageMapper.insert(message);
        
        // 事务提交后,定时任务会扫描并发送消息
    }
}

@Component
public class PaymentMessageConsumer {
    
    @KafkaListener(topics = "order-payment")
    public void onPaymentMessage(ConsumerRecord<String, String> record) {
        PaymentRequest request = JsonUtils.fromJson(record.value(), PaymentRequest.class);
        
        // 调用支付网关
        PaymentResult result = paymentGateway.charge(request);
        
        if (result.isSuccess()) {
            // 更新订单状态
            orderMapper.updateStatus(request.getOrderId(), OrderStatus.PAID);
        } else {
            // 记录支付失败,进入人工处理流程
            orderMapper.updateStatus(request.getOrderId(), OrderStatus.PAYMENT_FAILED);
        }
    }
}

性能测试数据

1. 不同分布式事务方案性能对比

方案TPS平均延迟吞吐量适用并发
本地事务15,0005ms单机
2PC/XA800150ms< 100
TCC5,00025ms< 1000
Saga4,50030ms< 1000
本地消息表6,00020ms< 2000
RocketMQ事务消息7,00015ms< 3000

测试环境:3个服务节点,MySQL 8.0,网络延迟<1ms

2. Seata AT模式性能测试

场景无Seata有Seata性能损耗
单服务单表12,000 TPS10,500 TPS12.5%
单服务多表8,000 TPS6,500 TPS18.7%
多服务调用5,000 TPS3,800 TPS24%
复杂长事务2,000 TPS1,400 TPS30%

3. 事务恢复时间测试

故障场景自动恢复时间人工介入比例
网络抖动< 3s0%
服务重启5-10s0%
数据库故障30-60s5%
消息丢失依赖重试策略10%
补偿失败需人工处理100%

经验总结

✅ 最佳实践

  1. 能不用就不用

    • 通过业务设计避免分布式事务
    • 使用单服务内的事务 + 异步通知
    • 将大事务拆分为小事务
  2. 最终一致性优先

    • AP模式更适合互联网场景
    • 用户可接受短暂不一致
    • 通过补偿保证最终一致
  3. 异步化解耦

    • 使用消息队列解耦服务
    • 本地消息表或可靠消息
    • 降低服务间耦合度
  4. 幂等设计

    • 所有接口必须支持幂等
    • 防止网络重试导致重复处理
    • 使用唯一键或状态机去重
  5. 监控告警

    • 事务状态全程监控
    • 异常事务及时告警
    • 补偿失败人工介入
  6. 降级策略

    • 分布式事务框架故障时降级
    • 使用本地事务 + 人工补偿
    • 保证核心业务流程可用

❌ 常见错误

  1. 过度追求强一致性

    // 错误:所有场景都用2PC
    @GlobalTransactional
    public void allUse2PC() {
        // 大部分场景不需要强一致
    }
    
    // 正确:根据场景选择
    // 核心金融交易:TCC或2PC
    // 普通业务:本地消息表
    // 日志记录:直接发送,无需事务
    
  2. 忽略幂等性

    // 错误:不处理重复消息
    @KafkaListener(topics = "order")
    public void onOrderMessage(Order order) {
        inventoryService.deduct(order.getProductId(), order.getQuantity());
        // 如果消息重复,库存会被多次扣减
    }
    
    // 正确:幂等性处理
    @KafkaListener(topics = "order")
    public void onOrderMessage(Order order) {
        // 检查是否已处理
        if (processedOrderService.isProcessed(order.getOrderNo())) {
            return;
        }
        inventoryService.deduct(order.getProductId(), order.getQuantity());
        processedOrderService.markAsProcessed(order.getOrderNo());
    }
    
  3. 事务范围过大

    // 错误:整个业务流程在一个事务中
    @GlobalTransactional
    public void bigTransaction() {
        createOrder();      // 10ms
        deductInventory();  // 20ms
        processPayment();   // 100ms(调用第三方支付)
        sendNotification(); // 50ms(发送短信)
        updateStatistics(); // 30ms
        // 总耗时210ms,锁持有时间长
    }
    
    // 正确:拆分事务
    @GlobalTransactional
    public void smallTransaction() {
        createOrder();      // 10ms
        deductInventory();  // 20ms
        // 总耗时30ms
    }
    // 其他操作异步处理
    
  4. 不处理补偿失败

    // 错误:补偿操作不处理异常
    public void cancel() {
        restoreInventory(); // 如果失败怎么办?
    }
    
    // 正确:补偿失败进入人工处理
    public void cancel() {
        try {
            restoreInventory();
        } catch (Exception e) {
            // 记录补偿失败,进入人工处理队列
            compensationFailureService.record(context);
            alertService.sendAlert("补偿失败,需人工处理");
        }
    }
    
  5. 缺少事务监控

    // 错误:没有监控
    @GlobalTransactional
    public void noMonitor() {
        // 事务执行中...
    }
    
    // 正确:完善的监控
    @GlobalTransactional
    public void withMonitor() {
        String xid = RootContext.getXID();
        metricsService.recordTransactionStart(xid);
        try {
            // 事务执行...
            metricsService.recordTransactionSuccess(xid);
        } catch (Exception e) {
            metricsService.recordTransactionFailure(xid, e);
            throw e;
        }
    }
    

决策树:如何选择分布式事务方案

                    ┌─────────────────────────────────────┐
                    │         业务场景分析                 │
                    └─────────────────┬───────────────────┘
                                      │
            ┌─────────────────────────┼─────────────────────────┐
            ↓                         ↓                         ↓
    ┌───────────────┐        ┌───────────────┐        ┌───────────────┐
    │  强一致性要求  │        │  最终一致性   │        │  简单异步场景 │
    │  金融/支付    │        │  电商/订单    │        │  日志/通知    │
    └───────┬───────┘        └───────┬───────┘        └───────┬───────┘
            │                        │                        │
            ↓                        ↓                        ↓
    ┌───────────────┐        ┌───────────────┐        ┌───────────────┐
    │   TCC模式     │        │  Saga模式     │        │ 本地消息表    │
    │   或2PC/XA    │        │  或可靠消息   │        │ 或直接发送    │
    └───────┬───────┘        └───────┬───────┘        └───────┬───────┘
            │                        │                        │
            ↓                        ↓                        ↓
    ┌───────────────┐        ┌───────────────┐        ┌───────────────┐
    │ Seata TCC     │        │ Seata Saga    │        │ RocketMQ      │
    │ ByteTCC       │        │ RocketMQ事务  │        │ 自研消息表    │
    └───────────────┘        └───────────────┘        └───────────────┘

检查清单

分布式事务设计检查清单

  • 是否必须通过分布式事务解决?
  • 选择的方案是否符合业务一致性要求?
  • 所有参与方是否都实现了幂等性?
  • 补偿操作是否考虑了失败情况?
  • 是否有完善的事务监控?
  • 是否设置了合理的超时时间?
  • 是否有降级方案?
  • 是否测试了各种故障场景?
  • 是否有补偿失败的人工处理流程?
  • 是否记录了完整的事务日志?

分布式事务故障排查清单

  • 网络是否正常?
  • 各服务是否都注册了TC?
  • 数据库undo_log表是否存在?
  • 事务超时时间是否合理?
  • 是否有锁冲突?
  • 补偿操作是否执行成功?
  • 消息是否被正确消费?
  • 是否存在幂等性问题?
  • 事务状态是否一致?
  • 是否有死锁情况?

系列上一篇事务隔离级别与并发问题深度剖析

系列下一篇高并发场景下的数据库应对策略

知识点测试

读完文章了?来测试一下你对知识点的掌握程度吧!

评论区

使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。

如果评论系统无法加载,请确保:

  • 您的网络可以访问 GitHub
  • giscus GitHub App 已安装到仓库
  • 仓库已启用 Discussions 功能