返回 筑基・数据元府藏真

数据异构与同步方案

博主
大约 28 分钟

数据异构与同步方案

一、问题引入:搜索与数据库不一致的困境

1.1 真实案例:电商搜索价格不一致事故

事故时间:2024年618大促期间
事故场景:商品详情页显示价格299元,但搜索结果页显示399元
影响范围:涉及商品500+,用户投诉2000+,订单退款150万
根本原因:MySQL到Elasticsearch的数据同步延迟和失败

事故经过:
┌─────────────────────────────────────────────────────────────┐
│  T1: 运营在后台修改商品A价格 399→299                        │
│  T2: MySQL更新成功,返回成功                                 │
│  T3: 异步同步ES时网络抖动,消息丢失                          │
│  T4: 用户搜索"手机" → ES返回旧价格399                        │
│  T5: 用户点击商品 → 详情页查MySQL显示299                     │
│  T6: 用户困惑,大量投诉"价格欺诈"                            │
│  T7: 问题发现时已过2小时,影响订单3000+                      │
└─────────────────────────────────────────────────────────────┘

损失评估:
- 直接经济损失:退差价150万元
- 品牌信誉损失:用户信任度下降
- 技术债务:紧急修复数据,通宵加班

1.2 数据异构场景分析

现代应用架构中的数据异构需求:
┌──────────────────────────────────────────────────────────────┐
│                                                              │
│   ┌─────────────┐                                            │
│   │  MySQL      │  主数据库,事务性存储                       │
│   │  (OLTP)     │  - 用户、订单、商品核心数据                 │
│   └──────┬──────┘                                            │
│          │                                                   │
│          ▼                                                   │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                    数据同步层                        │   │
│   │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌────────┐ │   │
│   │  │  Canal  │  │  MQ     │  │  定时   │  │ 双写   │ │   │
│   │  └────┬────┘  └────┬────┘  └────┬────┘  └───┬────┘ │   │
│   └───────┼────────────┼────────────┼────────────┼──────┘   │
│           │            │            │            │           │
│           ▼            ▼            ▼            ▼           │
│   ┌─────────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │
│   │Elasticsearch│ │  Redis   │ │ ClickHouse│ │  MongoDB │   │
│   │  (搜索)     │ │ (缓存)   │ │ (分析)   │ │ (文档)   │   │
│   └─────────────┘ └──────────┘ └──────────┘ └──────────┘   │
│                                                              │
│   异构原因:                                                 │
│   1. MySQL不适合复杂搜索(全文检索、聚合分析)               │
│   2. 缓存需要更快的访问速度                                  │
│   3. 大数据分析需要列式存储                                  │
│   4. 文档存储需要灵活的数据模型                              │
│                                                              │
└──────────────────────────────────────────────────────────────┘

1.3 数据一致性的挑战

挑战描述影响
同步延迟主库变更到从库生效的时间差用户看到旧数据
同步失败网络故障、服务宕机导致同步中断数据永久不一致
顺序错乱消息队列消费顺序与产生顺序不同数据覆盖错误
重复消费消息队列重试导致重复处理数据错误
数据冲突多源写入同一目标数据覆盖丢失

二、数据同步方案全景

2.1 四种主流方案对比

┌─────────────────────────────────────────────────────────────────────┐
│                        数据同步方案对比                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  方案1:双写(Dual Write)                                          │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  应用代码同时写入多个数据源                                   │   │
│  │  App → MySQL + Redis + ES                                   │   │
│  │                                                             │   │
│  │  优点:简单直观,实时性高                                    │   │
│  │  缺点:强耦合,一致性问题,部分失败难处理                    │   │
│  │  适用:简单场景,数据一致性要求不高                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  方案2:消息队列(MQ)                                              │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  业务只写主库,发送变更消息,消费者同步到异构库              │   │
│  │  App → MySQL → MQ → Consumer → ES/Redis                     │   │
│  │                                                             │   │
│  │  优点:解耦,可靠投递,可重试                                │   │
│  │  缺点:有延迟,需要处理顺序和幂等                            │   │
│  │  适用:主流方案,大多数业务场景                              │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  方案3:CDC(Canal/Debezium)                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  监听数据库Binlog,异步同步到异构库                          │   │
│  │  MySQL → Binlog → Canal → Kafka → Consumer → ES             │   │
│  │                                                             │   │
│  │  优点:业务无侵入,数据变更必达                              │   │
│  │  缺点:需要维护CDC组件,有解析延迟                           │   │
│  │  适用:推荐方案,已有MySQL架构                               │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  方案4:定时同步(Batch Sync)                                      │
│  │  定时扫描变更,批量同步                                      │   │
│  │  Scheduler → Query MySQL → Batch Update ES                  │   │
│  │                                                             │   │
│  │  优点:简单,资源可控                                        │   │
│  │  缺点:延迟大,实时性差                                      │   │
│  │  适用:离线分析,对实时性要求不高的场景                      │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.2 方案选择决策矩阵

方案实时性复杂度可靠性侵入性适用场景
双写⭐⭐⭐⭐⭐⭐⭐⭐⭐简单场景,快速上线
消息队列⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐主流方案,大多数业务
CDC订阅⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐已有MySQL,推荐方案
定时同步⭐⭐⭐⭐⭐⭐⭐离线分析,报表统计

2.3 方案选择决策树

                    ┌─────────────────┐
                    │  数据同步需求   │
                    └────────┬────────┘
                             │
                             ▼
              ┌──────────────────────────────┐
              │  是否已有成熟的MySQL架构?   │
              └─────────────┬────────────────┘
                            │
           ┌────────────────┼────────────────┐
           ▼是                               ▼否
    ┌───────────────┐                ┌───────────────┐
    │ 使用CDC订阅   │                │ 是否要求高实时│
    │ (Canal/Debezium)│              │ 性和强一致?  │
    └───────────────┘                └───────┬───────┘
                                             │
                            ┌────────────────┼────────────────┐
                            ▼是                               ▼否
                     ┌───────────────┐                ┌───────────────┐
                     │ 使用双写      │                │ 使用消息队列  │
                     │ + 事务消息    │                │ + 定时补偿    │
                     └───────────────┘                └───────────────┘

三、CDC方案详解(Canal实现)

3.1 Canal架构原理

┌─────────────────────────────────────────────────────────────────────┐
│                        Canal架构图                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────────────┐   │
│  │   MySQL     │     │   Canal     │     │    Kafka/RocketMQ   │   │
│  │  (Master)   │────▶│   Server    │────▶│     (消息队列)      │   │
│  │  Binlog     │     │  (解析服务)  │     │                     │   │
│  └─────────────┘     └─────────────┘     └─────────────────────┘   │
│         │                   │                      │                │
│         │                   │                      │                │
│         ▼                   ▼                      ▼                │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Canal Client / Adapter                    │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        │   │
│  │  │   ES    │  │  Redis  │  │  HBase  │  │ ClickHouse│       │   │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘        │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  工作流程:                                                         │
│  1. Canal Server伪装成MySQL Slave,向Master请求Binlog            │
│  2. Master发送Binlog事件给Canal Server                           │
│  3. Canal Server解析Binlog,转换为结构化数据                      │
│  4. 发送到消息队列或直接投递到目标存储                            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

3.2 Canal配置与部署

# canal.properties - Canal Server配置
canal.id = 1
canal.ip = 0.0.0.0
canal.port = 11111

# 模式:tcp/kafka/rocketMQ/rabbitMQ
canal.serverMode = kafka

# Kafka配置
canal.mq.servers = kafka:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432

# instance配置
---
# instance.properties - 具体实例配置
canal.instance.mysql.slaveId = 1234

# MySQL连接信息
canal.instance.master.address = mysql:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal

# 订阅配置
canal.instance.filter.regex = db1\\..*,db2\\.user,db2\\.order
canal.instance.filter.black.regex = db1\\.log_.*

# 表字段映射(用于ES同步)
canal.instance.tsdb.enable = true
canal.instance.tsdb.url = jdbc:mysql://mysql:3306/canal_tsdb

3.3 Java客户端实现

/**
 * Canal客户端 - 同步MySQL到Elasticsearch
 */
@Component
@Slf4j
public class CanalSyncService {
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    @Autowired
    private ProductRepository productRepository;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 监听商品表变更
     */
    @CanalListener(destination = "example", schema = "mall", table = "product")
    public void onProductChange(CanalMessage message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        
        for (CanalEntry.Entry entry : entries) {
            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    processProductChange(rowChange.getEventType(), rowData);
                }
            } catch (InvalidProtocolBufferException e) {
                log.error("Parse canal message failed", e);
            }
        }
    }
    
    /**
     * 处理商品变更
     */
    private void processProductChange(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        switch (eventType) {
            case INSERT:
                handleProductInsert(rowData.getAfterColumnsList());
                break;
            case UPDATE:
                handleProductUpdate(rowData.getAfterColumnsList());
                break;
            case DELETE:
                handleProductDelete(rowData.getBeforeColumnsList());
                break;
            default:
                log.warn("Unknown event type: {}", eventType);
        }
    }
    
    /**
     * 处理新增
     */
    private void handleProductInsert(List<CanalEntry.Column> columns) {
        ProductDoc doc = buildProductDoc(columns);
        
        try {
            // 同步到ES
            elasticsearchTemplate.save(doc);
            log.info("Product inserted to ES, id: {}", doc.getId());
            
            // 删除缓存(如果存在)
            invalidateProductCache(doc.getId());
            
        } catch (Exception e) {
            log.error("Failed to sync product insert to ES, id: {}", doc.getId(), e);
            // 记录到补偿队列
            recordToCompensateQueue("INSERT", doc.getId());
        }
    }
    
    /**
     * 处理更新
     */
    private void handleProductUpdate(List<CanalEntry.Column> columns) {
        ProductDoc doc = buildProductDoc(columns);
        
        try {
            // 同步到ES
            elasticsearchTemplate.save(doc);
            log.info("Product updated in ES, id: {}", doc.getId());
            
            // 删除缓存
            invalidateProductCache(doc.getId());
            
            // 发送变更通知(用于其他业务处理)
            publishChangeEvent(doc);
            
        } catch (Exception e) {
            log.error("Failed to sync product update to ES, id: {}", doc.getId(), e);
            recordToCompensateQueue("UPDATE", doc.getId());
        }
    }
    
    /**
     * 处理删除
     */
    private void handleProductDelete(List<CanalEntry.Column> columns) {
        Long productId = getColumnValue(columns, "id");
        
        try {
            // 从ES删除
            elasticsearchTemplate.delete(productId.toString(), ProductDoc.class);
            log.info("Product deleted from ES, id: {}", productId);
            
            // 删除缓存
            invalidateProductCache(productId);
            
        } catch (Exception e) {
            log.error("Failed to sync product delete to ES, id: {}", productId, e);
            recordToCompensateQueue("DELETE", productId);
        }
    }
    
    /**
     * 构建ES文档
     */
    private ProductDoc buildProductDoc(List<CanalEntry.Column> columns) {
        ProductDoc doc = new ProductDoc();
        
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case "id":
                    doc.setId(Long.valueOf(column.getValue()));
                    break;
                case "name":
                    doc.setName(column.getValue());
                    break;
                case "price":
                    doc.setPrice(new BigDecimal(column.getValue()));
                    break;
                case "category_id":
                    doc.setCategoryId(Long.valueOf(column.getValue()));
                    break;
                case "status":
                    doc.setStatus(Integer.valueOf(column.getValue()));
                    break;
                // ... 其他字段
            }
        }
        
        // 补充关联数据(如分类名称)
        enrichProductDoc(doc);
        
        return doc;
    }
    
    /**
     * 补充关联数据
     */
    private void enrichProductDoc(ProductDoc doc) {
        // 查询分类名称
        String categoryName = getCategoryName(doc.getCategoryId());
        doc.setCategoryName(categoryName);
        
        // 查询库存
        Integer stock = getProductStock(doc.getId());
        doc.setStock(stock);
    }
    
    /**
     * 获取列值
     */
    private Long getColumnValue(List<CanalEntry.Column> columns, String columnName) {
        return columns.stream()
            .filter(c -> columnName.equals(c.getName()))
            .findFirst()
            .map(c -> Long.valueOf(c.getValue()))
            .orElse(null);
    }
    
    /**
     * 删除商品缓存
     */
    private void invalidateProductCache(Long productId) {
        String cacheKey = "product:" + productId;
        redisTemplate.delete(cacheKey);
        
        // 删除列表缓存
        redisTemplate.delete("product:list:*");
    }
    
    /**
     * 发布变更事件
     */
    private void publishChangeEvent(ProductDoc doc) {
        // 发送消息到MQ,通知其他服务
    }
    
    /**
     * 记录到补偿队列
     */
    private void recordToCompensateQueue(String operation, Long productId) {
        // 记录到Redis或MQ,后续补偿处理
    }
}

3.4 批量同步优化

/**
 * 批量同步处理器 - 提升ES写入性能
 */
@Component
@Slf4j
public class BatchSyncProcessor {
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    // 批量缓冲区
    private final List<ProductDoc> buffer = new ArrayList<>();
    private final int BATCH_SIZE = 100;
    private final long FLUSH_INTERVAL = 1000; // 1秒
    
    /**
     * 添加文档到缓冲区
     */
    public void addToBuffer(ProductDoc doc) {
        synchronized (buffer) {
            buffer.add(doc);
            
            if (buffer.size() >= BATCH_SIZE) {
                flushBuffer();
            }
        }
    }
    
    /**
     * 刷新缓冲区到ES
     */
    @Scheduled(fixedRate = FLUSH_INTERVAL)
    public void flushBuffer() {
        List<ProductDoc> batch;
        
        synchronized (buffer) {
            if (buffer.isEmpty()) {
                return;
            }
            batch = new ArrayList<>(buffer);
            buffer.clear();
        }
        
        try {
            // 批量保存
            elasticsearchTemplate.save(batch);
            log.info("Batch synced {} products to ES", batch.size());
        } catch (Exception e) {
            log.error("Batch sync failed, will retry individually", e);
            // 失败时逐个重试
            for (ProductDoc doc : batch) {
                try {
                    elasticsearchTemplate.save(doc);
                } catch (Exception ex) {
                    log.error("Individual sync failed for product: {}", doc.getId(), ex);
                    recordToCompensateQueue(doc);
                }
            }
        }
    }
}

四、消息队列同步方案

4.1 基于RocketMQ的数据同步

/**
 * 消息队列数据同步方案
 * 适用于:业务主动发送变更消息,消费者同步到异构存储
 */
@Service
@Slf4j
public class MQDataSyncService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private TransactionMQProducer transactionProducer;
    
    /**
     * 发送数据变更消息 - 普通消息
     * 适用于:对一致性要求不高的场景
     */
    public void sendChangeMessage(String table, Long id, String operation) {
        DataChangeMessage message = new DataChangeMessage();
        message.setTable(table);
        message.setId(id);
        message.setOperation(operation);
        message.setTimestamp(System.currentTimeMillis());
        message.setTraceId(MDC.get("traceId"));
        
        rocketMQTemplate.asyncSend("data-sync-topic", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Change message sent successfully, table: {}, id: {}", table, id);
            }
            
            @Override
            public void onException(Throwable e) {
                log.error("Failed to send change message, table: {}, id: {}", table, id, e);
                // 记录到本地日志,后续补偿
                recordFailedMessage(message);
            }
        });
    }
    
    /**
     * 发送事务消息 - 保证本地事务和消息发送一致
     * 适用于:强一致性要求的场景
     */
    public void sendTransactionMessage(String table, Long id, String operation, 
                                       Runnable localTransaction) {
        DataChangeMessage message = new DataChangeMessage();
        message.setTable(table);
        message.setId(id);
        message.setOperation(operation);
        message.setTimestamp(System.currentTimeMillis());
        
        TransactionSendResult result = transactionProducer.sendMessageInTransaction(
            new Message("data-sync-topic", JSON.toJSONBytes(message)),
            localTransaction
        );
        
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            throw new RuntimeException("Transaction message send failed");
        }
    }
}

/**
 * 事务消息监听器
 */
@Component
@Slf4j
public class DataChangeTransactionListener implements TransactionListener {
    
    @Autowired
    private DataChangeLogMapper changeLogMapper;
    
    /**
     * 执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地业务逻辑
            Runnable localTransaction = (Runnable) arg;
            localTransaction.run();
            
            // 记录变更日志
            recordChangeLog(msg);
            
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            log.error("Local transaction failed", e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    /**
     * 回查本地事务状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 根据变更日志判断事务状态
        String transactionId = msg.getTransactionId();
        DataChangeLog log = changeLogMapper.selectByTransactionId(transactionId);
        
        if (log != null && log.getStatus() == 1) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

/**
 * 数据同步消费者
 */
@Component
@Slf4j
public class DataSyncConsumer implements RocketMQListener<DataChangeMessage> {
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    @Override
    @RocketMQMessageListener(
        topic = "data-sync-topic",
        consumerGroup = "data-sync-consumer",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING
    )
    public void onMessage(DataChangeMessage message) {
        log.info("Received change message: {}", message);
        
        try {
            switch (message.getTable()) {
                case "product":
                    syncProduct(message);
                    break;
                case "order":
                    syncOrder(message);
                    break;
                case "user":
                    syncUser(message);
                    break;
                default:
                    log.warn("Unknown table: {}", message.getTable());
            }
        } catch (Exception e) {
            log.error("Sync failed for message: {}", message, e);
            throw e; // 抛出异常触发重试
        }
    }
    
    /**
     * 同步商品数据
     */
    private void syncProduct(DataChangeMessage message) {
        Long productId = message.getId();
        
        switch (message.getOperation()) {
            case "INSERT":
            case "UPDATE":
                Product product = productMapper.selectById(productId);
                if (product != null) {
                    ProductDoc doc = convertToDoc(product);
                    elasticsearchTemplate.save(doc);
                }
                break;
            case "DELETE":
                elasticsearchTemplate.delete(productId.toString(), ProductDoc.class);
                break;
        }
    }
    
    /**
     * 同步订单数据到Redis
     */
    private void syncOrder(DataChangeMessage message) {
        // 实现订单数据同步逻辑
    }
    
    /**
     * 同步用户数据
     */
    private void syncUser(DataChangeMessage message) {
        // 实现用户数据同步逻辑
    }
    
    private ProductDoc convertToDoc(Product product) {
        ProductDoc doc = new ProductDoc();
        BeanUtils.copyProperties(product, doc);
        return doc;
    }
}

4.2 消息顺序保证

/**
 * 保证消息顺序消费
 * 适用于:对顺序敏感的场景(如库存扣减)
 */
@Component
@Slf4j
public class OrderedDataSyncConsumer implements RocketMQListener<DataChangeMessage> {
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    /**
     * 顺序消费配置
     * - 同一productId的消息发送到同一个队列
     * - 消费者单线程消费每个队列
     */
    @Override
    @RocketMQMessageListener(
        topic = "data-sync-ordered-topic",
        consumerGroup = "data-sync-ordered-consumer",
        consumeMode = ConsumeMode.ORDERLY,  // 顺序消费
        messageModel = MessageModel.CLUSTERING
    )
    public void onMessage(DataChangeMessage message) {
        log.info("Ordered consume message: {}", message);
        
        // 幂等检查
        if (isProcessed(message)) {
            log.info("Message already processed, skip: {}", message.getId());
            return;
        }
        
        try {
            processMessage(message);
            markAsProcessed(message);
        } catch (Exception e) {
            log.error("Process message failed: {}", message, e);
            throw e;
        }
    }
    
    /**
     * 消息发送时选择队列(按业务ID哈希)
     */
    public static class OrderIdHashQueueSelector implements MessageQueueSelector {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Long orderId = (Long) arg;
            int index = (int) (orderId % mqs.size());
            return mqs.get(index);
        }
    }
}

五、数据一致性保障机制

5.1 幂等性设计

/**
 * 幂等性保障 - 防止重复消费导致的数据错误
 */
@Component
@Slf4j
public class IdempotentProcessor {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private ProcessedMessageRepository messageRepository;
    
    private static final String IDEMPOTENT_KEY_PREFIX = "idempotent:";
    private static final long IDEMPOTENT_TTL = 24 * 60 * 60; // 24小时
    
    /**
     * 幂等执行
     * 
     * @param messageId 消息唯一标识
     * @param processor 业务处理器
     */
    public void execute(String messageId, Runnable processor) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        
        // 1. 检查是否已处理(Redis快速判断)
        Boolean processed = redisTemplate.hasKey(key);
        if (Boolean.TRUE.equals(processed)) {
            log.info("Message already processed: {}", messageId);
            return;
        }
        
        // 2. 尝试设置处理中标记(原子操作)
        Boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(key, "PROCESSING", IDEMPOTENT_TTL, TimeUnit.SECONDS);
        
        if (!Boolean.TRUE.equals(locked)) {
            log.info("Message is being processed by another thread: {}", messageId);
            return;
        }
        
        try {
            // 3. 执行业务逻辑
            processor.run();
            
            // 4. 标记为已处理
            redisTemplate.opsForValue().set(key, "PROCESSED", IDEMPOTENT_TTL, TimeUnit.SECONDS);
            
            // 5. 持久化到数据库(长期存储)
            saveProcessedMessage(messageId);
            
        } catch (Exception e) {
            // 处理失败,删除标记允许重试
            redisTemplate.delete(key);
            throw e;
        }
    }
    
    /**
     * 基于数据库的唯一索引实现幂等
     */
    @Transactional
    public void executeWithDB(String messageId, Runnable processor) {
        try {
            // 尝试插入记录
            ProcessedMessage record = new ProcessedMessage();
            record.setMessageId(messageId);
            record.setCreateTime(LocalDateTime.now());
            messageRepository.save(record);
            
            // 插入成功,执行业务
            processor.run();
            
        } catch (DuplicateKeyException e) {
            // 已存在,说明已处理过
            log.info("Message already processed: {}", messageId);
        }
    }
}

5.2 数据一致性校验

/**
 * 数据一致性校验服务
 */
@Component
@Slf4j
public class DataConsistencyChecker {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 全量校验 - 定时任务执行
     */
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
    public void fullConsistencyCheck() {
        log.info("Starting full consistency check");
        
        // 1. 获取MySQL中的所有商品ID
        List<Long> mysqlIds = productMapper.selectAllIds();
        
        // 2. 分批校验
        List<List<Long>> batches = Lists.partition(mysqlIds, 100);
        int inconsistentCount = 0;
        
        for (List<Long> batch : batches) {
            for (Long id : batch) {
                if (!checkConsistency(id)) {
                    inconsistentCount++;
                    // 记录不一致
                    recordInconsistency(id);
                    // 触发修复
                    repairData(id);
                }
            }
        }
        
        log.info("Consistency check completed, found {} inconsistent records", inconsistentCount);
        
        // 发送告警
        if (inconsistentCount > 0) {
            sendAlert(inconsistentCount);
        }
    }
    
    /**
     * 单条数据一致性校验
     */
    public boolean checkConsistency(Long productId) {
        try {
            // 1. 查询MySQL
            Product mysqlProduct = productMapper.selectById(productId);
            
            // 2. 查询ES
            ProductDoc esProduct = elasticsearchTemplate
                .get(productId.toString(), ProductDoc.class);
            
            // 3. 对比关键字段
            if (mysqlProduct == null && esProduct == null) {
                return true; // 都不存在,一致
            }
            
            if (mysqlProduct == null || esProduct == null) {
                return false; // 一个存在一个不存在,不一致
            }
            
            // 对比字段
            return Objects.equals(mysqlProduct.getName(), esProduct.getName()) &&
                   Objects.equals(mysqlProduct.getPrice(), esProduct.getPrice()) &&
                   Objects.equals(mysqlProduct.getStatus(), esProduct.getStatus());
                   
        } catch (Exception e) {
            log.error("Check consistency failed for product: {}", productId, e);
            return false;
        }
    }
    
    /**
     * 修复不一致数据
     */
    public void repairData(Long productId) {
        log.info("Repairing data for product: {}", productId);
        
        try {
            // 以MySQL为准,重新同步到ES
            Product product = productMapper.selectById(productId);
            
            if (product != null) {
                ProductDoc doc = convertToDoc(product);
                elasticsearchTemplate.save(doc);
            } else {
                // MySQL中不存在,从ES删除
                elasticsearchTemplate.delete(productId.toString(), ProductDoc.class);
            }
            
            log.info("Data repaired for product: {}", productId);
            
        } catch (Exception e) {
            log.error("Repair data failed for product: {}", productId, e);
        }
    }
    
    /**
     * 增量校验 - 基于时间窗口
     */
    public void incrementalCheck(LocalDateTime startTime, LocalDateTime endTime) {
        List<Long> changedIds = productMapper.selectChangedIds(startTime, endTime);
        
        for (Long id : changedIds) {
            if (!checkConsistency(id)) {
                repairData(id);
            }
        }
    }
}

5.3 同步延迟监控

/**
 * 同步延迟监控
 */
@Component
@Slf4j
public class SyncLatencyMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 记录同步延迟
     */
    public void recordLatency(String table, Long id, long sourceTimestamp) {
        long latency = System.currentTimeMillis() - sourceTimestamp;
        
        // 记录到Prometheus
        Gauge.builder("data.sync.latency")
            .tag("table", table)
            .register(meterRegistry)
            .set(latency);
        
        // 记录到Redis(用于实时查询)
        String key = "sync:latency:" + table;
        redisTemplate.opsForHash().put(key, String.valueOf(id), String.valueOf(latency));
        
        // 告警判断
        if (latency > 5000) { // 5秒
            log.warn("High sync latency detected, table: {}, id: {}, latency: {}ms", 
                table, id, latency);
            sendLatencyAlert(table, latency);
        }
    }
    
    /**
     * 获取实时同步延迟
     */
    public Map<String, Double> getRealtimeLatency() {
        Map<String, Double> result = new HashMap<>();
        
        // 查询各表的平均延迟
        String[] tables = {"product", "order", "user"};
        
        for (String table : tables) {
            String key = "sync:latency:" + table;
            Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
            
            if (!entries.isEmpty()) {
                double avgLatency = entries.values().stream()
                    .mapToLong(v -> Long.parseLong(v.toString()))
                    .average()
                    .orElse(0);
                
                result.put(table, avgLatency);
            }
        }
        
        return result;
    }
    
    /**
     * 同步健康检查
     */
    public HealthCheckResult checkSyncHealth() {
        HealthCheckResult result = new HealthCheckResult();
        
        Map<String, Double> latencyMap = getRealtimeLatency();
        
        boolean healthy = latencyMap.values().stream()
            .allMatch(l -> l < 10000); // 所有表延迟<10秒
        
        result.setHealthy(healthy);
        result.setLatencyMap(latencyMap);
        
        return result;
    }
}

六、双写方案与事务消息

6.1 双写方案实现

/**
 * 双写方案 - 同时写入MySQL和ES
 * 注意:双写存在一致性问题,需要配合事务消息或补偿机制
 */
@Service
@Slf4j
public class DualWriteService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 简单双写 - 不推荐,存在一致性问题
     */
    public void saveProductSimple(Product product) {
        // 1. 写MySQL
        productMapper.insert(product);
        
        // 2. 写ES
        try {
            ProductDoc doc = convertToDoc(product);
            elasticsearchTemplate.save(doc);
        } catch (Exception e) {
            // ES写入失败,MySQL已提交,数据不一致!
            log.error("ES write failed, data inconsistent!", e);
            // 需要补偿机制
        }
    }
    
    /**
     * 改进版双写 - 使用本地事务表 + 异步补偿
     */
    @Transactional
    public void saveProductWithCompensation(Product product) {
        // 1. 写MySQL
        productMapper.insert(product);
        
        // 2. 写入同步任务表(本地事务)
        SyncTask task = new SyncTask();
        task.setTableName("product");
        task.setRecordId(product.getId());
        task.setOperation("INSERT");
        task.setStatus(0); // 待处理
        task.setCreateTime(LocalDateTime.now());
        syncTaskMapper.insert(task);
    }
    
    /**
     * 定时任务处理同步任务
     */
    @Scheduled(fixedRate = 5000) // 每5秒
    public void processSyncTasks() {
        // 查询待处理任务
        List<SyncTask> tasks = syncTaskMapper.selectPendingTasks(100);
        
        for (SyncTask task : tasks) {
            try {
                switch (task.getTableName()) {
                    case "product":
                        syncProductToES(task.getRecordId(), task.getOperation());
                        break;
                }
                
                // 标记为成功
                task.setStatus(1);
                task.setUpdateTime(LocalDateTime.now());
                syncTaskMapper.updateById(task);
                
            } catch (Exception e) {
                log.error("Sync task failed: {}", task, e);
                
                // 重试次数+1
                task.setRetryCount(task.getRetryCount() + 1);
                
                if (task.getRetryCount() >= 3) {
                    task.setStatus(2); // 失败
                    // 发送告警
                    sendSyncFailureAlert(task);
                }
                
                syncTaskMapper.updateById(task);
            }
        }
    }
}

6.2 事务消息实现最终一致性

/**
 * 基于事务消息的可靠数据同步
 * 保证本地事务和消息发送的原子性
 */
@Service
@Slf4j
public class TransactionalMessageService {
    
    @Autowired
    private TransactionMQProducer producer;
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private TransactionLogMapper transactionLogMapper;
    
    /**
     * 保存商品 - 使用事务消息保证一致性
     */
    public void saveProductWithTransaction(Product product) {
        // 生成事务ID
        String transactionId = UUID.randomUUID().toString();
        
        // 构建消息
        DataChangeMessage message = new DataChangeMessage();
        message.setTransactionId(transactionId);
        message.setTable("product");
        message.setId(product.getId());
        message.setOperation("INSERT");
        
        Message mqMessage = new Message(
            "data-sync-topic",
            JSON.toJSONBytes(message)
        );
        mqMessage.setTransactionId(transactionId);
        
        // 发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(
            mqMessage,
            new LocalTransactionExecutor() {
                @Override
                public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                    try {
                        // 执行本地事务
                        productMapper.insert(product);
                        
                        // 记录事务日志
                        TransactionLog log = new TransactionLog();
                        log.setTransactionId(transactionId);
                        log.setStatus(1); // 已提交
                        transactionLogMapper.insert(log);
                        
                        return LocalTransactionState.COMMIT_MESSAGE;
                    } catch (Exception e) {
                        log.error("Local transaction failed", e);
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }
            }
        );
        
        if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
            throw new RuntimeException("Transaction failed");
        }
    }
}

/**
 * 事务监听器
 */
@Component
@Slf4j
public class DataSyncTransactionListener implements TransactionListener {
    
    @Autowired
    private TransactionLogMapper transactionLogMapper;
    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 本地事务已在sendMessageInTransaction中执行
        return LocalTransactionState.UNKNOW;
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String transactionId = msg.getTransactionId();
        
        // 查询事务日志
        TransactionLog log = transactionLogMapper.selectByTransactionId(transactionId);
        
        if (log == null) {
            // 未找到记录,可能是本地事务还未执行完
            return LocalTransactionState.UNKNOW;
        }
        
        if (log.getStatus() == 1) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

七、性能优化与最佳实践

7.1 批量处理优化

/**
 * 批量同步优化策略
 */
@Component
@Slf4j
public class BatchSyncOptimizer {
    
    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;
    
    /**
     * 批量导入 - 适用于全量同步
     */
    public void bulkImport(List<Product> products) {
        List<ProductDoc> docs = products.stream()
            .map(this::convertToDoc)
            .collect(Collectors.toList());
        
        // 分批处理,每批1000条
        List<List<ProductDoc>> batches = Lists.partition(docs, 1000);
        
        for (List<ProductDoc> batch : batches) {
            try {
                elasticsearchTemplate.save(batch);
                log.info("Bulk imported {} products", batch.size());
                
                // 短暂休眠,避免ES压力过大
                Thread.sleep(100);
            } catch (Exception e) {
                log.error("Bulk import failed, batch size: {}", batch.size(), e);
                // 降级为单条处理
                for (ProductDoc doc : batch) {
                    try {
                        elasticsearchTemplate.save(doc);
                    } catch (Exception ex) {
                        log.error("Single import failed for product: {}", doc.getId(), ex);
                    }
                }
            }
        }
    }
    
    /**
     * 并发批量处理
     */
    public void concurrentBulkImport(List<Product> products, int concurrency) {
        List<ProductDoc> docs = products.stream()
            .map(this::convertToDoc)
            .collect(Collectors.toList());
        
        List<List<ProductDoc>> batches = Lists.partition(docs, 1000);
        
        // 使用线程池并发处理
        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
        
        List<CompletableFuture<Void>> futures = batches.stream()
            .map(batch -> CompletableFuture.runAsync(() -> {
                try {
                    elasticsearchTemplate.save(batch);
                } catch (Exception e) {
                    log.error("Batch import failed", e);
                }
            }, executor))
            .collect(Collectors.toList());
        
        // 等待所有批次完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        
        executor.shutdown();
    }
}

7.2 失败重试与补偿

/**
 * 失败重试与补偿机制
 */
@Component
@Slf4j
public class SyncRetryService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    private static final String RETRY_KEY_PREFIX = "sync:retry:";
    private static final int MAX_RETRY_TIMES = 3;
    
    /**
     * 记录失败消息,进入重试队列
     */
    public void recordFailedMessage(DataChangeMessage message, Exception error) {
        String key = RETRY_KEY_PREFIX + message.getId();
        
        // 获取当前重试次数
        String retryCountStr = redisTemplate.opsForValue().get(key);
        int retryCount = retryCountStr != null ? Integer.parseInt(retryCountStr) : 0;
        
        if (retryCount >= MAX_RETRY_TIMES) {
            // 超过最大重试次数,进入死信队列
            sendToDeadLetterQueue(message, error);
            return;
        }
        
        // 增加重试次数
        retryCount++;
        redisTemplate.opsForValue().set(key, String.valueOf(retryCount), 1, TimeUnit.HOURS);
        
        // 计算延迟时间(指数退避)
        long delayTime = (long) (Math.pow(2, retryCount) * 1000); // 2s, 4s, 8s
        
        // 发送延迟消息
        Message<DataChangeMessage> delayMessage = MessageBuilder
            .withPayload(message)
            .setHeader("RETRY_COUNT", retryCount)
            .build();
        
        rocketMQTemplate.syncSendDelayTimeSeconds(
            "data-sync-retry-topic",
            delayMessage,
            (int) (delayTime / 1000)
        );
    }
    
    /**
     * 死信队列处理
     */
    @RocketMQMessageListener(topic = "data-sync-dlq", consumerGroup = "dlq-consumer")
    public class DeadLetterConsumer implements RocketMQListener<DataChangeMessage> {
        
        @Override
        public void onMessage(DataChangeMessage message) {
            log.error("Message entered dead letter queue: {}", message);
            
            // 记录到数据库,人工介入处理
            saveToFailedLog(message);
            
            // 发送告警
            sendAlert(message);
        }
    }
}

八、经验总结与最佳实践

8.1 方案选择建议

┌─────────────────────────────────────────────────────────────────────┐
│                      数据同步方案选择指南                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  【CDC订阅方案 - 推荐】                                             │
│  适用场景:                                                         │
│  - 已有成熟的MySQL架构                                              │
│  - 业务代码不希望侵入同步逻辑                                       │
│  - 需要实时同步(秒级延迟)                                         │
│                                                                     │
│  实施要点:                                                         │
│  1. 部署Canal Server集群,保证高可用                                │
│  2. 配置合理的Binlog过滤规则,减少不必要的数据传输                  │
│  3. 实现幂等消费,防止重复处理                                      │
│  4. 建立数据一致性校验机制                                          │
│  5. 监控同步延迟,设置告警阈值                                      │
│                                                                     │
│  【消息队列方案】                                                   │
│  适用场景:                                                         │
│  - 新业务系统,可以统一规划数据变更消息                             │
│  - 需要灵活控制同步时机和逻辑                                       │
│  - 多系统间的数据分发                                               │
│                                                                     │
│  实施要点:                                                         │
│  1. 设计统一的消息格式,包含完整的变更信息                          │
│  2. 使用事务消息保证本地事务和消息发送一致                          │
│  3. 配置合理的重试策略和死信队列                                    │
│  4. 保证消息顺序消费(对顺序敏感的场景)                            │
│                                                                     │
│  【双写方案 - 谨慎使用】                                            │
│  适用场景:                                                         │
│  - 简单场景,数据一致性要求不高                                     │
│  - 快速原型验证                                                     │
│                                                                     │
│  实施要点:                                                         │
│  1. 必须配合事务消息或本地事务表                                    │
│  2. 实现完善的补偿机制                                              │
│  3. 定期进行数据一致性校验                                          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

8.2 最佳实践清单

实践项说明优先级
幂等设计所有消费者必须实现幂等,防止重复消费🔴 必须
顺序保证对顺序敏感的数据,保证消费顺序🔴 必须
延迟监控监控同步延迟,及时发现同步异常🔴 必须
一致性校验定期进行数据一致性校验和修复🟠 强烈建议
失败重试实现指数退避的重试机制🟠 强烈建议
死信处理建立死信队列,人工介入处理🟠 强烈建议
批量处理使用批量操作提升同步性能🟡 建议
灰度发布同步逻辑变更先灰度验证🟡 建议

8.3 常见错误与解决方案

错误后果解决方案
无幂等保护重复消费导致数据错误实现幂等检查,使用唯一键或Redis标记
忽略顺序更新顺序错乱,最终数据错误按业务ID分区,保证单分区顺序消费
同步无监控同步失败无法及时发现监控同步延迟和失败率,设置告警
无补偿机制同步失败后数据永久不一致建立定时补偿任务,定期校验修复
全量字段同步网络开销大,同步慢只同步变更字段,减少数据传输
忽略异常异常被吞掉,数据丢失完善异常处理,记录失败日志

系列上一篇数据库缓存架构:Redis与数据库一致性

系列下一篇JSON与文档存储的高级应用

知识点测试

读完文章了?来测试一下你对知识点的掌握程度吧!

评论区

使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。

如果评论系统无法加载,请确保:

  • 您的网络可以访问 GitHub
  • giscus GitHub App 已安装到仓库
  • 仓库已启用 Discussions 功能