返回 筑基・数据元府藏真
数据异构与同步方案
博主
大约 28 分钟
数据异构与同步方案
一、问题引入:搜索与数据库不一致的困境
1.1 真实案例:电商搜索价格不一致事故
事故时间:2024年618大促期间
事故场景:商品详情页显示价格299元,但搜索结果页显示399元
影响范围:涉及商品500+,用户投诉2000+,订单退款150万
根本原因:MySQL到Elasticsearch的数据同步延迟和失败
事故经过:
┌─────────────────────────────────────────────────────────────┐
│ T1: 运营在后台修改商品A价格 399→299 │
│ T2: MySQL更新成功,返回成功 │
│ T3: 异步同步ES时网络抖动,消息丢失 │
│ T4: 用户搜索"手机" → ES返回旧价格399 │
│ T5: 用户点击商品 → 详情页查MySQL显示299 │
│ T6: 用户困惑,大量投诉"价格欺诈" │
│ T7: 问题发现时已过2小时,影响订单3000+ │
└─────────────────────────────────────────────────────────────┘
损失评估:
- 直接经济损失:退差价150万元
- 品牌信誉损失:用户信任度下降
- 技术债务:紧急修复数据,通宵加班
1.2 数据异构场景分析
现代应用架构中的数据异构需求:
┌──────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ │
│ │ MySQL │ 主数据库,事务性存储 │
│ │ (OLTP) │ - 用户、订单、商品核心数据 │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 数据同步层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌────────┐ │ │
│ │ │ Canal │ │ MQ │ │ 定时 │ │ 双写 │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └───┬────┘ │ │
│ └───────┼────────────┼────────────┼────────────┼──────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Elasticsearch│ │ Redis │ │ ClickHouse│ │ MongoDB │ │
│ │ (搜索) │ │ (缓存) │ │ (分析) │ │ (文档) │ │
│ └─────────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 异构原因: │
│ 1. MySQL不适合复杂搜索(全文检索、聚合分析) │
│ 2. 缓存需要更快的访问速度 │
│ 3. 大数据分析需要列式存储 │
│ 4. 文档存储需要灵活的数据模型 │
│ │
└──────────────────────────────────────────────────────────────┘
1.3 数据一致性的挑战
| 挑战 | 描述 | 影响 |
|---|---|---|
| 同步延迟 | 主库变更到从库生效的时间差 | 用户看到旧数据 |
| 同步失败 | 网络故障、服务宕机导致同步中断 | 数据永久不一致 |
| 顺序错乱 | 消息队列消费顺序与产生顺序不同 | 数据覆盖错误 |
| 重复消费 | 消息队列重试导致重复处理 | 数据错误 |
| 数据冲突 | 多源写入同一目标 | 数据覆盖丢失 |
二、数据同步方案全景
2.1 四种主流方案对比
┌─────────────────────────────────────────────────────────────────────┐
│ 数据同步方案对比 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 方案1:双写(Dual Write) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 应用代码同时写入多个数据源 │ │
│ │ App → MySQL + Redis + ES │ │
│ │ │ │
│ │ 优点:简单直观,实时性高 │ │
│ │ 缺点:强耦合,一致性问题,部分失败难处理 │ │
│ │ 适用:简单场景,数据一致性要求不高 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 方案2:消息队列(MQ) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 业务只写主库,发送变更消息,消费者同步到异构库 │ │
│ │ App → MySQL → MQ → Consumer → ES/Redis │ │
│ │ │ │
│ │ 优点:解耦,可靠投递,可重试 │ │
│ │ 缺点:有延迟,需要处理顺序和幂等 │ │
│ │ 适用:主流方案,大多数业务场景 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 方案3:CDC(Canal/Debezium) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 监听数据库Binlog,异步同步到异构库 │ │
│ │ MySQL → Binlog → Canal → Kafka → Consumer → ES │ │
│ │ │ │
│ │ 优点:业务无侵入,数据变更必达 │ │
│ │ 缺点:需要维护CDC组件,有解析延迟 │ │
│ │ 适用:推荐方案,已有MySQL架构 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 方案4:定时同步(Batch Sync) │
│ │ 定时扫描变更,批量同步 │ │
│ │ Scheduler → Query MySQL → Batch Update ES │ │
│ │ │ │
│ │ 优点:简单,资源可控 │ │
│ │ 缺点:延迟大,实时性差 │ │
│ │ 适用:离线分析,对实时性要求不高的场景 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2.2 方案选择决策矩阵
| 方案 | 实时性 | 复杂度 | 可靠性 | 侵入性 | 适用场景 |
|---|---|---|---|---|---|
| 双写 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ | 高 | 简单场景,快速上线 |
| 消息队列 | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 中 | 主流方案,大多数业务 |
| CDC订阅 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 无 | 已有MySQL,推荐方案 |
| 定时同步 | ⭐⭐ | ⭐⭐ | ⭐⭐⭐ | 无 | 离线分析,报表统计 |
2.3 方案选择决策树
┌─────────────────┐
│ 数据同步需求 │
└────────┬────────┘
│
▼
┌──────────────────────────────┐
│ 是否已有成熟的MySQL架构? │
└─────────────┬────────────────┘
│
┌────────────────┼────────────────┐
▼是 ▼否
┌───────────────┐ ┌───────────────┐
│ 使用CDC订阅 │ │ 是否要求高实时│
│ (Canal/Debezium)│ │ 性和强一致? │
└───────────────┘ └───────┬───────┘
│
┌────────────────┼────────────────┐
▼是 ▼否
┌───────────────┐ ┌───────────────┐
│ 使用双写 │ │ 使用消息队列 │
│ + 事务消息 │ │ + 定时补偿 │
└───────────────┘ └───────────────┘
三、CDC方案详解(Canal实现)
3.1 Canal架构原理
┌─────────────────────────────────────────────────────────────────────┐
│ Canal架构图 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ MySQL │ │ Canal │ │ Kafka/RocketMQ │ │
│ │ (Master) │────▶│ Server │────▶│ (消息队列) │ │
│ │ Binlog │ │ (解析服务) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Canal Client / Adapter │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ ES │ │ Redis │ │ HBase │ │ ClickHouse│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 工作流程: │
│ 1. Canal Server伪装成MySQL Slave,向Master请求Binlog │
│ 2. Master发送Binlog事件给Canal Server │
│ 3. Canal Server解析Binlog,转换为结构化数据 │
│ 4. 发送到消息队列或直接投递到目标存储 │
│ │
└─────────────────────────────────────────────────────────────────────┘
3.2 Canal配置与部署
# canal.properties - Canal Server配置
canal.id = 1
canal.ip = 0.0.0.0
canal.port = 11111
# 模式:tcp/kafka/rocketMQ/rabbitMQ
canal.serverMode = kafka
# Kafka配置
canal.mq.servers = kafka:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# instance配置
---
# instance.properties - 具体实例配置
canal.instance.mysql.slaveId = 1234
# MySQL连接信息
canal.instance.master.address = mysql:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 订阅配置
canal.instance.filter.regex = db1\\..*,db2\\.user,db2\\.order
canal.instance.filter.black.regex = db1\\.log_.*
# 表字段映射(用于ES同步)
canal.instance.tsdb.enable = true
canal.instance.tsdb.url = jdbc:mysql://mysql:3306/canal_tsdb
3.3 Java客户端实现
/**
* Canal客户端 - 同步MySQL到Elasticsearch
*/
@Component
@Slf4j
public class CanalSyncService {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
@Autowired
private ProductRepository productRepository;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 监听商品表变更
*/
@CanalListener(destination = "example", schema = "mall", table = "product")
public void onProductChange(CanalMessage message) {
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
processProductChange(rowChange.getEventType(), rowData);
}
} catch (InvalidProtocolBufferException e) {
log.error("Parse canal message failed", e);
}
}
}
/**
* 处理商品变更
*/
private void processProductChange(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
switch (eventType) {
case INSERT:
handleProductInsert(rowData.getAfterColumnsList());
break;
case UPDATE:
handleProductUpdate(rowData.getAfterColumnsList());
break;
case DELETE:
handleProductDelete(rowData.getBeforeColumnsList());
break;
default:
log.warn("Unknown event type: {}", eventType);
}
}
/**
* 处理新增
*/
private void handleProductInsert(List<CanalEntry.Column> columns) {
ProductDoc doc = buildProductDoc(columns);
try {
// 同步到ES
elasticsearchTemplate.save(doc);
log.info("Product inserted to ES, id: {}", doc.getId());
// 删除缓存(如果存在)
invalidateProductCache(doc.getId());
} catch (Exception e) {
log.error("Failed to sync product insert to ES, id: {}", doc.getId(), e);
// 记录到补偿队列
recordToCompensateQueue("INSERT", doc.getId());
}
}
/**
* 处理更新
*/
private void handleProductUpdate(List<CanalEntry.Column> columns) {
ProductDoc doc = buildProductDoc(columns);
try {
// 同步到ES
elasticsearchTemplate.save(doc);
log.info("Product updated in ES, id: {}", doc.getId());
// 删除缓存
invalidateProductCache(doc.getId());
// 发送变更通知(用于其他业务处理)
publishChangeEvent(doc);
} catch (Exception e) {
log.error("Failed to sync product update to ES, id: {}", doc.getId(), e);
recordToCompensateQueue("UPDATE", doc.getId());
}
}
/**
* 处理删除
*/
private void handleProductDelete(List<CanalEntry.Column> columns) {
Long productId = getColumnValue(columns, "id");
try {
// 从ES删除
elasticsearchTemplate.delete(productId.toString(), ProductDoc.class);
log.info("Product deleted from ES, id: {}", productId);
// 删除缓存
invalidateProductCache(productId);
} catch (Exception e) {
log.error("Failed to sync product delete to ES, id: {}", productId, e);
recordToCompensateQueue("DELETE", productId);
}
}
/**
* 构建ES文档
*/
private ProductDoc buildProductDoc(List<CanalEntry.Column> columns) {
ProductDoc doc = new ProductDoc();
for (CanalEntry.Column column : columns) {
switch (column.getName()) {
case "id":
doc.setId(Long.valueOf(column.getValue()));
break;
case "name":
doc.setName(column.getValue());
break;
case "price":
doc.setPrice(new BigDecimal(column.getValue()));
break;
case "category_id":
doc.setCategoryId(Long.valueOf(column.getValue()));
break;
case "status":
doc.setStatus(Integer.valueOf(column.getValue()));
break;
// ... 其他字段
}
}
// 补充关联数据(如分类名称)
enrichProductDoc(doc);
return doc;
}
/**
* 补充关联数据
*/
private void enrichProductDoc(ProductDoc doc) {
// 查询分类名称
String categoryName = getCategoryName(doc.getCategoryId());
doc.setCategoryName(categoryName);
// 查询库存
Integer stock = getProductStock(doc.getId());
doc.setStock(stock);
}
/**
* 获取列值
*/
private Long getColumnValue(List<CanalEntry.Column> columns, String columnName) {
return columns.stream()
.filter(c -> columnName.equals(c.getName()))
.findFirst()
.map(c -> Long.valueOf(c.getValue()))
.orElse(null);
}
/**
* 删除商品缓存
*/
private void invalidateProductCache(Long productId) {
String cacheKey = "product:" + productId;
redisTemplate.delete(cacheKey);
// 删除列表缓存
redisTemplate.delete("product:list:*");
}
/**
* 发布变更事件
*/
private void publishChangeEvent(ProductDoc doc) {
// 发送消息到MQ,通知其他服务
}
/**
* 记录到补偿队列
*/
private void recordToCompensateQueue(String operation, Long productId) {
// 记录到Redis或MQ,后续补偿处理
}
}
3.4 批量同步优化
/**
* 批量同步处理器 - 提升ES写入性能
*/
@Component
@Slf4j
public class BatchSyncProcessor {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
// 批量缓冲区
private final List<ProductDoc> buffer = new ArrayList<>();
private final int BATCH_SIZE = 100;
private final long FLUSH_INTERVAL = 1000; // 1秒
/**
* 添加文档到缓冲区
*/
public void addToBuffer(ProductDoc doc) {
synchronized (buffer) {
buffer.add(doc);
if (buffer.size() >= BATCH_SIZE) {
flushBuffer();
}
}
}
/**
* 刷新缓冲区到ES
*/
@Scheduled(fixedRate = FLUSH_INTERVAL)
public void flushBuffer() {
List<ProductDoc> batch;
synchronized (buffer) {
if (buffer.isEmpty()) {
return;
}
batch = new ArrayList<>(buffer);
buffer.clear();
}
try {
// 批量保存
elasticsearchTemplate.save(batch);
log.info("Batch synced {} products to ES", batch.size());
} catch (Exception e) {
log.error("Batch sync failed, will retry individually", e);
// 失败时逐个重试
for (ProductDoc doc : batch) {
try {
elasticsearchTemplate.save(doc);
} catch (Exception ex) {
log.error("Individual sync failed for product: {}", doc.getId(), ex);
recordToCompensateQueue(doc);
}
}
}
}
}
四、消息队列同步方案
4.1 基于RocketMQ的数据同步
/**
* 消息队列数据同步方案
* 适用于:业务主动发送变更消息,消费者同步到异构存储
*/
@Service
@Slf4j
public class MQDataSyncService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private TransactionMQProducer transactionProducer;
/**
* 发送数据变更消息 - 普通消息
* 适用于:对一致性要求不高的场景
*/
public void sendChangeMessage(String table, Long id, String operation) {
DataChangeMessage message = new DataChangeMessage();
message.setTable(table);
message.setId(id);
message.setOperation(operation);
message.setTimestamp(System.currentTimeMillis());
message.setTraceId(MDC.get("traceId"));
rocketMQTemplate.asyncSend("data-sync-topic", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Change message sent successfully, table: {}, id: {}", table, id);
}
@Override
public void onException(Throwable e) {
log.error("Failed to send change message, table: {}, id: {}", table, id, e);
// 记录到本地日志,后续补偿
recordFailedMessage(message);
}
});
}
/**
* 发送事务消息 - 保证本地事务和消息发送一致
* 适用于:强一致性要求的场景
*/
public void sendTransactionMessage(String table, Long id, String operation,
Runnable localTransaction) {
DataChangeMessage message = new DataChangeMessage();
message.setTable(table);
message.setId(id);
message.setOperation(operation);
message.setTimestamp(System.currentTimeMillis());
TransactionSendResult result = transactionProducer.sendMessageInTransaction(
new Message("data-sync-topic", JSON.toJSONBytes(message)),
localTransaction
);
if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
throw new RuntimeException("Transaction message send failed");
}
}
}
/**
* 事务消息监听器
*/
@Component
@Slf4j
public class DataChangeTransactionListener implements TransactionListener {
@Autowired
private DataChangeLogMapper changeLogMapper;
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地业务逻辑
Runnable localTransaction = (Runnable) arg;
localTransaction.run();
// 记录变更日志
recordChangeLog(msg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("Local transaction failed", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 回查本地事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据变更日志判断事务状态
String transactionId = msg.getTransactionId();
DataChangeLog log = changeLogMapper.selectByTransactionId(transactionId);
if (log != null && log.getStatus() == 1) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 数据同步消费者
*/
@Component
@Slf4j
public class DataSyncConsumer implements RocketMQListener<DataChangeMessage> {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
@Autowired
private ProductMapper productMapper;
@Override
@RocketMQMessageListener(
topic = "data-sync-topic",
consumerGroup = "data-sync-consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING
)
public void onMessage(DataChangeMessage message) {
log.info("Received change message: {}", message);
try {
switch (message.getTable()) {
case "product":
syncProduct(message);
break;
case "order":
syncOrder(message);
break;
case "user":
syncUser(message);
break;
default:
log.warn("Unknown table: {}", message.getTable());
}
} catch (Exception e) {
log.error("Sync failed for message: {}", message, e);
throw e; // 抛出异常触发重试
}
}
/**
* 同步商品数据
*/
private void syncProduct(DataChangeMessage message) {
Long productId = message.getId();
switch (message.getOperation()) {
case "INSERT":
case "UPDATE":
Product product = productMapper.selectById(productId);
if (product != null) {
ProductDoc doc = convertToDoc(product);
elasticsearchTemplate.save(doc);
}
break;
case "DELETE":
elasticsearchTemplate.delete(productId.toString(), ProductDoc.class);
break;
}
}
/**
* 同步订单数据到Redis
*/
private void syncOrder(DataChangeMessage message) {
// 实现订单数据同步逻辑
}
/**
* 同步用户数据
*/
private void syncUser(DataChangeMessage message) {
// 实现用户数据同步逻辑
}
private ProductDoc convertToDoc(Product product) {
ProductDoc doc = new ProductDoc();
BeanUtils.copyProperties(product, doc);
return doc;
}
}
4.2 消息顺序保证
/**
* 保证消息顺序消费
* 适用于:对顺序敏感的场景(如库存扣减)
*/
@Component
@Slf4j
public class OrderedDataSyncConsumer implements RocketMQListener<DataChangeMessage> {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
/**
* 顺序消费配置
* - 同一productId的消息发送到同一个队列
* - 消费者单线程消费每个队列
*/
@Override
@RocketMQMessageListener(
topic = "data-sync-ordered-topic",
consumerGroup = "data-sync-ordered-consumer",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费
messageModel = MessageModel.CLUSTERING
)
public void onMessage(DataChangeMessage message) {
log.info("Ordered consume message: {}", message);
// 幂等检查
if (isProcessed(message)) {
log.info("Message already processed, skip: {}", message.getId());
return;
}
try {
processMessage(message);
markAsProcessed(message);
} catch (Exception e) {
log.error("Process message failed: {}", message, e);
throw e;
}
}
/**
* 消息发送时选择队列(按业务ID哈希)
*/
public static class OrderIdHashQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}
}
五、数据一致性保障机制
5.1 幂等性设计
/**
* 幂等性保障 - 防止重复消费导致的数据错误
*/
@Component
@Slf4j
public class IdempotentProcessor {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProcessedMessageRepository messageRepository;
private static final String IDEMPOTENT_KEY_PREFIX = "idempotent:";
private static final long IDEMPOTENT_TTL = 24 * 60 * 60; // 24小时
/**
* 幂等执行
*
* @param messageId 消息唯一标识
* @param processor 业务处理器
*/
public void execute(String messageId, Runnable processor) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
// 1. 检查是否已处理(Redis快速判断)
Boolean processed = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(processed)) {
log.info("Message already processed: {}", messageId);
return;
}
// 2. 尝试设置处理中标记(原子操作)
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(key, "PROCESSING", IDEMPOTENT_TTL, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(locked)) {
log.info("Message is being processed by another thread: {}", messageId);
return;
}
try {
// 3. 执行业务逻辑
processor.run();
// 4. 标记为已处理
redisTemplate.opsForValue().set(key, "PROCESSED", IDEMPOTENT_TTL, TimeUnit.SECONDS);
// 5. 持久化到数据库(长期存储)
saveProcessedMessage(messageId);
} catch (Exception e) {
// 处理失败,删除标记允许重试
redisTemplate.delete(key);
throw e;
}
}
/**
* 基于数据库的唯一索引实现幂等
*/
@Transactional
public void executeWithDB(String messageId, Runnable processor) {
try {
// 尝试插入记录
ProcessedMessage record = new ProcessedMessage();
record.setMessageId(messageId);
record.setCreateTime(LocalDateTime.now());
messageRepository.save(record);
// 插入成功,执行业务
processor.run();
} catch (DuplicateKeyException e) {
// 已存在,说明已处理过
log.info("Message already processed: {}", messageId);
}
}
}
5.2 数据一致性校验
/**
* 数据一致性校验服务
*/
@Component
@Slf4j
public class DataConsistencyChecker {
@Autowired
private ProductMapper productMapper;
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 全量校验 - 定时任务执行
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
public void fullConsistencyCheck() {
log.info("Starting full consistency check");
// 1. 获取MySQL中的所有商品ID
List<Long> mysqlIds = productMapper.selectAllIds();
// 2. 分批校验
List<List<Long>> batches = Lists.partition(mysqlIds, 100);
int inconsistentCount = 0;
for (List<Long> batch : batches) {
for (Long id : batch) {
if (!checkConsistency(id)) {
inconsistentCount++;
// 记录不一致
recordInconsistency(id);
// 触发修复
repairData(id);
}
}
}
log.info("Consistency check completed, found {} inconsistent records", inconsistentCount);
// 发送告警
if (inconsistentCount > 0) {
sendAlert(inconsistentCount);
}
}
/**
* 单条数据一致性校验
*/
public boolean checkConsistency(Long productId) {
try {
// 1. 查询MySQL
Product mysqlProduct = productMapper.selectById(productId);
// 2. 查询ES
ProductDoc esProduct = elasticsearchTemplate
.get(productId.toString(), ProductDoc.class);
// 3. 对比关键字段
if (mysqlProduct == null && esProduct == null) {
return true; // 都不存在,一致
}
if (mysqlProduct == null || esProduct == null) {
return false; // 一个存在一个不存在,不一致
}
// 对比字段
return Objects.equals(mysqlProduct.getName(), esProduct.getName()) &&
Objects.equals(mysqlProduct.getPrice(), esProduct.getPrice()) &&
Objects.equals(mysqlProduct.getStatus(), esProduct.getStatus());
} catch (Exception e) {
log.error("Check consistency failed for product: {}", productId, e);
return false;
}
}
/**
* 修复不一致数据
*/
public void repairData(Long productId) {
log.info("Repairing data for product: {}", productId);
try {
// 以MySQL为准,重新同步到ES
Product product = productMapper.selectById(productId);
if (product != null) {
ProductDoc doc = convertToDoc(product);
elasticsearchTemplate.save(doc);
} else {
// MySQL中不存在,从ES删除
elasticsearchTemplate.delete(productId.toString(), ProductDoc.class);
}
log.info("Data repaired for product: {}", productId);
} catch (Exception e) {
log.error("Repair data failed for product: {}", productId, e);
}
}
/**
* 增量校验 - 基于时间窗口
*/
public void incrementalCheck(LocalDateTime startTime, LocalDateTime endTime) {
List<Long> changedIds = productMapper.selectChangedIds(startTime, endTime);
for (Long id : changedIds) {
if (!checkConsistency(id)) {
repairData(id);
}
}
}
}
5.3 同步延迟监控
/**
* 同步延迟监控
*/
@Component
@Slf4j
public class SyncLatencyMonitor {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 记录同步延迟
*/
public void recordLatency(String table, Long id, long sourceTimestamp) {
long latency = System.currentTimeMillis() - sourceTimestamp;
// 记录到Prometheus
Gauge.builder("data.sync.latency")
.tag("table", table)
.register(meterRegistry)
.set(latency);
// 记录到Redis(用于实时查询)
String key = "sync:latency:" + table;
redisTemplate.opsForHash().put(key, String.valueOf(id), String.valueOf(latency));
// 告警判断
if (latency > 5000) { // 5秒
log.warn("High sync latency detected, table: {}, id: {}, latency: {}ms",
table, id, latency);
sendLatencyAlert(table, latency);
}
}
/**
* 获取实时同步延迟
*/
public Map<String, Double> getRealtimeLatency() {
Map<String, Double> result = new HashMap<>();
// 查询各表的平均延迟
String[] tables = {"product", "order", "user"};
for (String table : tables) {
String key = "sync:latency:" + table;
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
if (!entries.isEmpty()) {
double avgLatency = entries.values().stream()
.mapToLong(v -> Long.parseLong(v.toString()))
.average()
.orElse(0);
result.put(table, avgLatency);
}
}
return result;
}
/**
* 同步健康检查
*/
public HealthCheckResult checkSyncHealth() {
HealthCheckResult result = new HealthCheckResult();
Map<String, Double> latencyMap = getRealtimeLatency();
boolean healthy = latencyMap.values().stream()
.allMatch(l -> l < 10000); // 所有表延迟<10秒
result.setHealthy(healthy);
result.setLatencyMap(latencyMap);
return result;
}
}
六、双写方案与事务消息
6.1 双写方案实现
/**
* 双写方案 - 同时写入MySQL和ES
* 注意:双写存在一致性问题,需要配合事务消息或补偿机制
*/
@Service
@Slf4j
public class DualWriteService {
@Autowired
private ProductMapper productMapper;
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 简单双写 - 不推荐,存在一致性问题
*/
public void saveProductSimple(Product product) {
// 1. 写MySQL
productMapper.insert(product);
// 2. 写ES
try {
ProductDoc doc = convertToDoc(product);
elasticsearchTemplate.save(doc);
} catch (Exception e) {
// ES写入失败,MySQL已提交,数据不一致!
log.error("ES write failed, data inconsistent!", e);
// 需要补偿机制
}
}
/**
* 改进版双写 - 使用本地事务表 + 异步补偿
*/
@Transactional
public void saveProductWithCompensation(Product product) {
// 1. 写MySQL
productMapper.insert(product);
// 2. 写入同步任务表(本地事务)
SyncTask task = new SyncTask();
task.setTableName("product");
task.setRecordId(product.getId());
task.setOperation("INSERT");
task.setStatus(0); // 待处理
task.setCreateTime(LocalDateTime.now());
syncTaskMapper.insert(task);
}
/**
* 定时任务处理同步任务
*/
@Scheduled(fixedRate = 5000) // 每5秒
public void processSyncTasks() {
// 查询待处理任务
List<SyncTask> tasks = syncTaskMapper.selectPendingTasks(100);
for (SyncTask task : tasks) {
try {
switch (task.getTableName()) {
case "product":
syncProductToES(task.getRecordId(), task.getOperation());
break;
}
// 标记为成功
task.setStatus(1);
task.setUpdateTime(LocalDateTime.now());
syncTaskMapper.updateById(task);
} catch (Exception e) {
log.error("Sync task failed: {}", task, e);
// 重试次数+1
task.setRetryCount(task.getRetryCount() + 1);
if (task.getRetryCount() >= 3) {
task.setStatus(2); // 失败
// 发送告警
sendSyncFailureAlert(task);
}
syncTaskMapper.updateById(task);
}
}
}
}
6.2 事务消息实现最终一致性
/**
* 基于事务消息的可靠数据同步
* 保证本地事务和消息发送的原子性
*/
@Service
@Slf4j
public class TransactionalMessageService {
@Autowired
private TransactionMQProducer producer;
@Autowired
private ProductMapper productMapper;
@Autowired
private TransactionLogMapper transactionLogMapper;
/**
* 保存商品 - 使用事务消息保证一致性
*/
public void saveProductWithTransaction(Product product) {
// 生成事务ID
String transactionId = UUID.randomUUID().toString();
// 构建消息
DataChangeMessage message = new DataChangeMessage();
message.setTransactionId(transactionId);
message.setTable("product");
message.setId(product.getId());
message.setOperation("INSERT");
Message mqMessage = new Message(
"data-sync-topic",
JSON.toJSONBytes(message)
);
mqMessage.setTransactionId(transactionId);
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(
mqMessage,
new LocalTransactionExecutor() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
try {
// 执行本地事务
productMapper.insert(product);
// 记录事务日志
TransactionLog log = new TransactionLog();
log.setTransactionId(transactionId);
log.setStatus(1); // 已提交
transactionLogMapper.insert(log);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("Local transaction failed", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
);
if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
throw new RuntimeException("Transaction failed");
}
}
}
/**
* 事务监听器
*/
@Component
@Slf4j
public class DataSyncTransactionListener implements TransactionListener {
@Autowired
private TransactionLogMapper transactionLogMapper;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 本地事务已在sendMessageInTransaction中执行
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
// 查询事务日志
TransactionLog log = transactionLogMapper.selectByTransactionId(transactionId);
if (log == null) {
// 未找到记录,可能是本地事务还未执行完
return LocalTransactionState.UNKNOW;
}
if (log.getStatus() == 1) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
七、性能优化与最佳实践
7.1 批量处理优化
/**
* 批量同步优化策略
*/
@Component
@Slf4j
public class BatchSyncOptimizer {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
/**
* 批量导入 - 适用于全量同步
*/
public void bulkImport(List<Product> products) {
List<ProductDoc> docs = products.stream()
.map(this::convertToDoc)
.collect(Collectors.toList());
// 分批处理,每批1000条
List<List<ProductDoc>> batches = Lists.partition(docs, 1000);
for (List<ProductDoc> batch : batches) {
try {
elasticsearchTemplate.save(batch);
log.info("Bulk imported {} products", batch.size());
// 短暂休眠,避免ES压力过大
Thread.sleep(100);
} catch (Exception e) {
log.error("Bulk import failed, batch size: {}", batch.size(), e);
// 降级为单条处理
for (ProductDoc doc : batch) {
try {
elasticsearchTemplate.save(doc);
} catch (Exception ex) {
log.error("Single import failed for product: {}", doc.getId(), ex);
}
}
}
}
}
/**
* 并发批量处理
*/
public void concurrentBulkImport(List<Product> products, int concurrency) {
List<ProductDoc> docs = products.stream()
.map(this::convertToDoc)
.collect(Collectors.toList());
List<List<ProductDoc>> batches = Lists.partition(docs, 1000);
// 使用线程池并发处理
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<CompletableFuture<Void>> futures = batches.stream()
.map(batch -> CompletableFuture.runAsync(() -> {
try {
elasticsearchTemplate.save(batch);
} catch (Exception e) {
log.error("Batch import failed", e);
}
}, executor))
.collect(Collectors.toList());
// 等待所有批次完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executor.shutdown();
}
}
7.2 失败重试与补偿
/**
* 失败重试与补偿机制
*/
@Component
@Slf4j
public class SyncRetryService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final String RETRY_KEY_PREFIX = "sync:retry:";
private static final int MAX_RETRY_TIMES = 3;
/**
* 记录失败消息,进入重试队列
*/
public void recordFailedMessage(DataChangeMessage message, Exception error) {
String key = RETRY_KEY_PREFIX + message.getId();
// 获取当前重试次数
String retryCountStr = redisTemplate.opsForValue().get(key);
int retryCount = retryCountStr != null ? Integer.parseInt(retryCountStr) : 0;
if (retryCount >= MAX_RETRY_TIMES) {
// 超过最大重试次数,进入死信队列
sendToDeadLetterQueue(message, error);
return;
}
// 增加重试次数
retryCount++;
redisTemplate.opsForValue().set(key, String.valueOf(retryCount), 1, TimeUnit.HOURS);
// 计算延迟时间(指数退避)
long delayTime = (long) (Math.pow(2, retryCount) * 1000); // 2s, 4s, 8s
// 发送延迟消息
Message<DataChangeMessage> delayMessage = MessageBuilder
.withPayload(message)
.setHeader("RETRY_COUNT", retryCount)
.build();
rocketMQTemplate.syncSendDelayTimeSeconds(
"data-sync-retry-topic",
delayMessage,
(int) (delayTime / 1000)
);
}
/**
* 死信队列处理
*/
@RocketMQMessageListener(topic = "data-sync-dlq", consumerGroup = "dlq-consumer")
public class DeadLetterConsumer implements RocketMQListener<DataChangeMessage> {
@Override
public void onMessage(DataChangeMessage message) {
log.error("Message entered dead letter queue: {}", message);
// 记录到数据库,人工介入处理
saveToFailedLog(message);
// 发送告警
sendAlert(message);
}
}
}
八、经验总结与最佳实践
8.1 方案选择建议
┌─────────────────────────────────────────────────────────────────────┐
│ 数据同步方案选择指南 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【CDC订阅方案 - 推荐】 │
│ 适用场景: │
│ - 已有成熟的MySQL架构 │
│ - 业务代码不希望侵入同步逻辑 │
│ - 需要实时同步(秒级延迟) │
│ │
│ 实施要点: │
│ 1. 部署Canal Server集群,保证高可用 │
│ 2. 配置合理的Binlog过滤规则,减少不必要的数据传输 │
│ 3. 实现幂等消费,防止重复处理 │
│ 4. 建立数据一致性校验机制 │
│ 5. 监控同步延迟,设置告警阈值 │
│ │
│ 【消息队列方案】 │
│ 适用场景: │
│ - 新业务系统,可以统一规划数据变更消息 │
│ - 需要灵活控制同步时机和逻辑 │
│ - 多系统间的数据分发 │
│ │
│ 实施要点: │
│ 1. 设计统一的消息格式,包含完整的变更信息 │
│ 2. 使用事务消息保证本地事务和消息发送一致 │
│ 3. 配置合理的重试策略和死信队列 │
│ 4. 保证消息顺序消费(对顺序敏感的场景) │
│ │
│ 【双写方案 - 谨慎使用】 │
│ 适用场景: │
│ - 简单场景,数据一致性要求不高 │
│ - 快速原型验证 │
│ │
│ 实施要点: │
│ 1. 必须配合事务消息或本地事务表 │
│ 2. 实现完善的补偿机制 │
│ 3. 定期进行数据一致性校验 │
│ │
└─────────────────────────────────────────────────────────────────────┘
8.2 最佳实践清单
| 实践项 | 说明 | 优先级 |
|---|---|---|
| 幂等设计 | 所有消费者必须实现幂等,防止重复消费 | 🔴 必须 |
| 顺序保证 | 对顺序敏感的数据,保证消费顺序 | 🔴 必须 |
| 延迟监控 | 监控同步延迟,及时发现同步异常 | 🔴 必须 |
| 一致性校验 | 定期进行数据一致性校验和修复 | 🟠 强烈建议 |
| 失败重试 | 实现指数退避的重试机制 | 🟠 强烈建议 |
| 死信处理 | 建立死信队列,人工介入处理 | 🟠 强烈建议 |
| 批量处理 | 使用批量操作提升同步性能 | 🟡 建议 |
| 灰度发布 | 同步逻辑变更先灰度验证 | 🟡 建议 |
8.3 常见错误与解决方案
| 错误 | 后果 | 解决方案 |
|---|---|---|
| 无幂等保护 | 重复消费导致数据错误 | 实现幂等检查,使用唯一键或Redis标记 |
| 忽略顺序 | 更新顺序错乱,最终数据错误 | 按业务ID分区,保证单分区顺序消费 |
| 同步无监控 | 同步失败无法及时发现 | 监控同步延迟和失败率,设置告警 |
| 无补偿机制 | 同步失败后数据永久不一致 | 建立定时补偿任务,定期校验修复 |
| 全量字段同步 | 网络开销大,同步慢 | 只同步变更字段,减少数据传输 |
| 忽略异常 | 异常被吞掉,数据丢失 | 完善异常处理,记录失败日志 |
系列上一篇:数据库缓存架构:Redis与数据库一致性
系列下一篇:JSON与文档存储的高级应用
知识点测试
读完文章了?来测试一下你对知识点的掌握程度吧!
评论区
使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。
如果评论系统无法加载,请确保:
- 您的网络可以访问 GitHub
- giscus GitHub App 已安装到仓库
- 仓库已启用 Discussions 功能