返回 筑基・数据元府藏真
高并发场景下的数据库应对策略
博主
大约 24 分钟
高并发场景下的数据库应对策略
问题引入:秒杀系统的数据库噩梦
去年双十一,我们的秒杀系统上线了一款爆款手机,10000台库存吸引了超过500万用户预约。活动开始瞬间,系统直接崩溃,数据库CPU飙升到100%,连接池耗尽,大量用户投诉。事后复盘发现:
场景:100万人抢10000件商品
问题统计:
- 瞬时并发峰值:50万QPS
- 数据库连接池:100个连接瞬间耗尽
- 行锁竞争:每秒产生2000+死锁
- 库存超卖:实际卖出12000+件
- 响应时间:平均15秒,大量超时
- 数据库CPU:100%,持续10分钟
// 问题代码:直接操作数据库
@Service
public class SeckillService {
@Autowired
private ProductMapper productMapper;
@Autowired
private OrderMapper orderMapper;
@Transactional
public void seckill(Long userId, Long productId) {
// 1. 查询库存(直接查数据库)
Integer stock = productMapper.getStock(productId); // 50万QPS打到数据库
if (stock > 0) {
// 2. 扣减库存(行锁竞争)
productMapper.decreaseStock(productId, 1); // 大量线程等待行锁
// 3. 创建订单
orderMapper.insert(new Order(userId, productId));
// 4. 发送短信(同步调用,阻塞事务)
smsService.sendSeckillSuccessSms(userId);
}
}
}
根本原因:
- 无缓存保护,所有请求直接打到数据库
- 长事务持有行锁,导致大量锁等待
- 同步调用外部服务,阻塞数据库连接
- 无限流措施,数据库被流量冲垮
现象描述:高并发下的数据库问题
案例1:连接池耗尽
场景:电商大促期间
数据库连接池状态:
┌─────────────────────────────────────────────────────────────┐
│ 连接池 (max: 100) │
├─────────────────────────────────────────────────────────────┤
│ 活跃连接: 100/100 (100%) │
│ 等待队列: 500+ 线程 │
│ 等待超时: 持续增加 │
├─────────────────────────────────────────────────────────────┤
│ 连接占用分析: │
│ - 订单服务: 40个连接 │
│ - 库存服务: 35个连接 │
│ - 用户服务: 25个连接 │
└─────────────────────────────────────────────────────────────┘
错误日志:
HikariPool-1 - Thread starvation or clock leap detected
java.sql.SQLTimeoutException: Timeout after 30000ms of waiting for a connection
影响:新请求无法获取连接,服务全面不可用。
案例2:行锁竞争与死锁
场景:秒杀库存扣减
-- 事务A
BEGIN;
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001; -- 获取行锁
-- 执行业务逻辑(耗时2秒)
UPDATE orders SET status = 'PAID' WHERE order_id = 2001; -- 需要order_id=2001的锁
COMMIT;
-- 事务B
BEGIN;
UPDATE orders SET status = 'PAID' WHERE order_id = 2001; -- 获取order_id=2001的锁
-- 执行业务逻辑(耗时2秒)
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001; -- 需要product_id=1001的锁
COMMIT;
-- 结果:死锁!
-- MySQL检测到死锁,回滚其中一个事务
死锁日志:
LATEST DETECTED DEADLOCK
------------------------
*** (1) TRANSACTION:
TRANSACTION 12345, ACTIVE 2 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 2 lock struct(s), heap size 1136, 1 row lock(s)
MySQL thread id 100, OS thread handle 123456789
UPDATE orders SET status = 'PAID' WHERE order_id = 2001
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 58 page no 3 n bits 72 index PRIMARY
of table `db`.`orders` trx id 12345 lock_mode X locks rec but not gap waiting
*** (2) TRANSACTION:
TRANSACTION 12346, ACTIVE 2 sec starting index read
mysql tables in use 1, locked 1
3 lock struct(s), heap size 1136, 2 row lock(s)
MySQL thread id 101, OS thread handle 123456790
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 58 page no 3 n bits 72 index PRIMARY
of table `db`.`orders` trx id 12346 lock_mode X locks rec but not gap
*** WE ROLL BACK TRANSACTION (1)
影响:死锁导致事务回滚,用户体验差,系统吞吐量下降。
案例3:热点数据问题
场景:热门商品库存扣减
热点数据现象:
┌─────────────────────────────────────────────────────────────┐
│ 热点行竞争 │
├─────────────────────────────────────────────────────────────┤
│ 商品ID: 1001 (iPhone 15 Pro) │
│ 初始库存: 10000 │
│ 并发请求: 50000/秒 │
│ │
│ 行锁等待队列: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 事务T1 │→│ 事务T2 │→│ 事务T3 │→│ 事务T4 │→ ... │
│ │ (持有锁)│ │ (等待) │ │ (等待) │ │ (等待) │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 平均等待时间: 50ms │
│ 吞吐量: 20 TPS (理论上可达2000+) │
└─────────────────────────────────────────────────────────────┘
影响:单行热点导致系统吞吐量急剧下降。
原因分析:高并发数据库瓶颈
1. 数据库连接瓶颈
连接池瓶颈分析:
┌─────────────────────────────────────────────────────────────┐
│ 连接池模型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 应用线程 连接池 数据库 │
│ ┌──────┐ ┌──────────┐ ┌──────────┐ │
│ │ T1 │───→│ 连接1 │──→│ │ │
│ │ T2 │───→│ 连接2 │──→│ │ │
│ │ T3 │───→│ 连接3 │──→│ MySQL │ │
│ │ ... │ │ ... │ │ │ │
│ │ Tn │───→│ 连接n │──→│ │ │
│ └──────┘ └──────────┘ └──────────┘ │
│ │
│ 瓶颈点: │
│ 1. 连接数限制(通常100-200) │
│ 2. 每个连接占用内存(约10MB) │
│ 3. 数据库最大连接数限制(max_connections) │
│ 4. 线程上下文切换开销 │
│ │
└─────────────────────────────────────────────────────────────┘
2. 锁竞争分析
锁竞争类型:
1. 行锁竞争(Row Lock Contention)
┌─────────┐ ┌─────────┐
│ 事务A │ ──锁定行R──→ │ 行R │
│ (持有锁)│ │ (被锁定)│
└─────────┘ └────┬────┘
│
┌─────────┐ │ 等待
│ 事务B │ ←─────────────────┘
│ (阻塞) │
└─────────┘
2. 间隙锁竞争(Gap Lock Contention)
范围查询时,锁定索引间隙,阻止其他事务插入
3. 表锁竞争(Table Lock Contention)
全表扫描或DDL操作导致整个表被锁定
4. 元数据锁(Metadata Lock)
DDL操作与DML操作之间的冲突
3. 数据库资源限制
| 资源类型 | 限制因素 | 典型值 | 影响 |
|---|---|---|---|
| CPU | 查询复杂度、并发数 | 32核 | 复杂查询消耗CPU |
| 内存 | 缓冲池大小 | 128GB | 内存不足导致磁盘IO |
| 磁盘IO | IOPS、吞吐量 | 10000 IOPS | IO瓶颈导致查询慢 |
| 网络 | 带宽、延迟 | 1Gbps | 大数据量传输慢 |
| 连接数 | max_connections | 1000 | 连接数超限拒绝服务 |
解决方案:高并发应对策略
1. 连接池优化
1.1 连接池配置优化
# HikariCP优化配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/db?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
# 核心配置
minimum-idle: 20 # 最小空闲连接数
maximum-pool-size: 100 # 最大连接数(根据数据库max_connections设置)
connection-timeout: 30000 # 获取连接等待超时时间(毫秒)
idle-timeout: 600000 # 空闲连接超时时间(毫秒)
max-lifetime: 1800000 # 连接最大生命周期(毫秒)
# 性能优化
auto-commit: false # 关闭自动提交,手动控制事务
connection-test-query: SELECT 1 # 连接测试语句
validation-timeout: 5000 # 连接验证超时
leak-detection-threshold: 60000 # 连接泄漏检测阈值
# 额外配置
data-source-properties:
cachePrepStmts: true # 启用预编译语句缓存
prepStmtCacheSize: 250 # 预编译语句缓存大小
prepStmtCacheSqlLimit: 2048 # 单条SQL缓存长度限制
useServerPrepStmts: true # 使用服务器端预编译
useLocalSessionState: true # 使用本地会话状态
rewriteBatchedStatements: true # 批量操作重写
cacheResultSetMetadata: true # 缓存结果集元数据
cacheServerConfiguration: true # 缓存服务器配置
elideSetAutoCommits: true # 优化自动提交设置
maintainTimeStats: false # 关闭时间统计,减少开销
1.2 连接池监控
@Component
public class ConnectionPoolMetrics {
@Autowired
private HikariDataSource dataSource;
@Scheduled(fixedRate = 60000)
public void reportMetrics() {
HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
HikariConfigMXBean configMXBean = dataSource.getHikariConfigMXBean();
int activeConnections = poolMXBean.getActiveConnections();
int idleConnections = poolMXBean.getIdleConnections();
int totalConnections = poolMXBean.getTotalConnections();
int waitingConnections = poolMXBean.getThreadsAwaitingConnection();
log.info("连接池状态 - 活跃: {}, 空闲: {}, 总计: {}, 等待: {}",
activeConnections, idleConnections, totalConnections, waitingConnections);
// 告警:活跃连接超过80%
if (activeConnections > configMXBean.getMaximumPoolSize() * 0.8) {
alertService.sendAlert("连接池告警", "活跃连接超过80%: " + activeConnections);
}
// 告警:有线程在等待连接
if (waitingConnections > 0) {
alertService.sendAlert("连接池告警", "有线程等待连接: " + waitingConnections);
}
}
}
2. 缓存策略
2.1 多级缓存架构
多级缓存架构:
┌─────────────────────────────────────────────────────────────┐
│ 请求层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CDN缓存 │ │ 浏览器缓存 │ │ Nginx缓存 │ │
│ │ (静态资源) │ │ (页面缓存) │ │ (本地缓存) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 本地缓存 │ │ 分布式缓存 │ │ 数据库 │ │
│ │ (Caffeine) │ │ (Redis) │ │ (MySQL) │ │
│ │ 命中率: 60% │ │ 命中率: 35%│ │ 命中率: 5% │ │
│ │ 延迟: 1μs │ │ 延迟: 1ms │ │ 延迟: 10ms │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
2.2 本地缓存实现
@Configuration
public class LocalCacheConfig {
@Bean
public Cache<String, Object> localCache() {
return Caffeine.newBuilder()
// 初始容量
.initialCapacity(1000)
// 最大容量
.maximumSize(10000)
// 写入后过期时间
.expireAfterWrite(10, TimeUnit.MINUTES)
// 访问后过期时间
.expireAfterAccess(5, TimeUnit.MINUTES)
// 刷新时间(异步刷新)
.refreshAfterWrite(1, TimeUnit.MINUTES)
// 统计命中率
.recordStats()
// 移除监听器
.removalListener((key, value, cause) -> {
log.debug("缓存移除: key={}, cause={}", key, cause);
})
.build();
}
}
@Service
public class ProductServiceWithCache {
@Autowired
private Cache<String, Object> localCache;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
private static final String REDIS_KEY_PREFIX = "product:";
public Product getProduct(Long productId) {
String localKey = "product:local:" + productId;
String redisKey = REDIS_KEY_PREFIX + productId;
// 1. 查询本地缓存
Product product = (Product) localCache.getIfPresent(localKey);
if (product != null) {
return product;
}
// 2. 查询Redis
String productJson = redisTemplate.opsForValue().get(redisKey);
if (productJson != null) {
product = JsonUtils.fromJson(productJson, Product.class);
// 回填本地缓存
localCache.put(localKey, product);
return product;
}
// 3. 查询数据库
product = productMapper.selectById(productId);
if (product != null) {
// 回填Redis
redisTemplate.opsForValue().set(redisKey, JsonUtils.toJson(product), 30, TimeUnit.MINUTES);
// 回填本地缓存
localCache.put(localKey, product);
}
return product;
}
}
2.3 缓存预热与更新
@Component
public class CacheWarmUp {
@Autowired
private ProductService productService;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 秒杀活动开始前预热缓存
*/
@EventListener
public void onSeckillActivityStart(SeckillActivityStartEvent event) {
Long activityId = event.getActivityId();
// 1. 加载活动商品
List<Product> products = productService.getActivityProducts(activityId);
// 2. 预热商品信息缓存
for (Product product : products) {
String key = "seckill:product:" + product.getId();
redisTemplate.opsForValue().set(key, JsonUtils.toJson(product), 1, TimeUnit.HOURS);
}
// 3. 预热库存缓存
for (Product product : products) {
String stockKey = "seckill:stock:" + product.getId();
redisTemplate.opsForValue().set(stockKey, String.valueOf(product.getStock()), 1, TimeUnit.HOURS);
}
log.info("秒杀活动缓存预热完成,活动ID: {}, 商品数: {}", activityId, products.size());
}
}
3. 热点数据优化
3.1 库存扣减优化
@Service
public class OptimizedSeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 方案1:Redis预减库存 + 异步同步
*/
public SeckillResult seckillWithRedis(Long userId, Long productId) {
String stockKey = "seckill:stock:" + productId;
String userKey = "seckill:user:" + productId + ":" + userId;
// 1. 检查用户是否已抢购(幂等性)
Boolean hasBought = redisTemplate.hasKey(userKey);
if (Boolean.TRUE.equals(hasBought)) {
return SeckillResult.fail("您已抢购过该商品");
}
// 2. Redis预减库存
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,回补
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("商品已售罄");
}
// 3. 标记用户已抢购
redisTemplate.opsForValue().set(userKey, "1", 1, TimeUnit.HOURS);
// 4. 发送异步消息,创建订单
SeckillMessage message = new SeckillMessage(userId, productId, System.currentTimeMillis());
kafkaTemplate.send("seckill-order", JsonUtils.toJson(message));
return SeckillResult.success("抢购成功,正在生成订单");
}
/**
* 方案2:分段库存(热点分散)
*/
public SeckillResult seckillWithSegment(Long userId, Long productId) {
// 将库存分成100个段
int segmentCount = 100;
int segmentIndex = (int) (userId % segmentCount);
String stockKey = "seckill:stock:" + productId + ":" + segmentIndex;
// 随机选择段(避免单段热点)
for (int i = 0; i < segmentCount; i++) {
int tryIndex = (segmentIndex + i) % segmentCount;
String tryKey = "seckill:stock:" + productId + ":" + tryIndex;
Long stock = redisTemplate.opsForValue().decrement(tryKey);
if (stock != null && stock >= 0) {
// 扣减成功
sendCreateOrderMessage(userId, productId);
return SeckillResult.success("抢购成功");
}
// 回补
if (stock != null && stock < 0) {
redisTemplate.opsForValue().increment(tryKey);
}
}
return SeckillResult.fail("商品已售罄");
}
/**
* 方案3:令牌桶限流 + 队列削峰
*/
public SeckillResult seckillWithLimiter(Long userId, Long productId) {
String limiterKey = "seckill:limiter:" + productId;
// 1. 限流检查
RRateLimiter rateLimiter = redissonClient.getRateLimiter(limiterKey);
rateLimiter.trySetRate(RateType.OVERALL, 1000, 1, RateIntervalUnit.SECONDS);
if (!rateLimiter.tryAcquire(1, 100, TimeUnit.MILLISECONDS)) {
return SeckillResult.fail("系统繁忙,请重试");
}
// 2. 进入队列
String queueKey = "seckill:queue:" + productId;
SeckillRequest request = new SeckillRequest(userId, productId, System.currentTimeMillis());
RQueue<SeckillRequest> queue = redissonClient.getQueue(queueKey);
if (queue.size() >= 10000) {
return SeckillResult.fail("排队人数过多,请稍后再试");
}
queue.add(request);
// 3. 异步消费队列
return SeckillResult.success("已进入排队,请耐心等待");
}
}
/**
* 队列消费服务
*/
@Component
public class SeckillQueueConsumer {
@Autowired
private RedissonClient redissonClient;
@Autowired
private OrderService orderService;
@PostConstruct
public void startConsumer() {
// 启动多个消费者线程
for (int i = 0; i < 10; i++) {
new Thread(this::consumeQueue).start();
}
}
private void consumeQueue() {
while (true) {
try {
RQueue<SeckillRequest> queue = redissonClient.getQueue("seckill:queue:*");
SeckillRequest request = queue.poll(1, TimeUnit.SECONDS);
if (request != null) {
// 处理订单
processSeckillRequest(request);
}
} catch (Exception e) {
log.error("队列消费异常", e);
}
}
}
private void processSeckillRequest(SeckillRequest request) {
// 1. 再次检查库存
// 2. 创建订单
// 3. 扣减数据库库存
orderService.createSeckillOrder(request);
}
}
4. 限流与降级
4.1 限流策略
@Component
public class RateLimitService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 令牌桶限流
*/
public boolean tryAcquireWithTokenBucket(String key, int rate, int capacity) {
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
rateLimiter.trySetRate(RateType.OVERALL, rate, 1, RateIntervalUnit.SECONDS);
return rateLimiter.tryAcquire(1, 100, TimeUnit.MILLISECONDS);
}
/**
* 滑动窗口限流(Redis实现)
*/
public boolean tryAcquireWithSlidingWindow(String key, int limit, int windowSeconds) {
String redisKey = "rate:limit:" + key;
long now = System.currentTimeMillis();
long windowStart = now - windowSeconds * 1000;
// 使用Redis Sorted Set
ZSetOperations<String, String> zSetOps = redisTemplate.opsForZSet();
// 移除窗口外的请求记录
zSetOps.removeRangeByScore(redisKey, 0, windowStart);
// 获取当前窗口内的请求数
Long currentCount = zSetOps.zCard(redisKey);
if (currentCount != null && currentCount >= limit) {
return false;
}
// 记录当前请求
zSetOps.add(redisKey, String.valueOf(now), now);
redisTemplate.expire(redisKey, windowSeconds, TimeUnit.SECONDS);
return true;
}
/**
* 漏桶限流
*/
public boolean tryAcquireWithLeakyBucket(String key, int rate) {
String redisKey = "leaky:bucket:" + key;
// 使用Redis Lua脚本实现原子操作
String luaScript =
"local key = KEYS[1] " +
"local rate = tonumber(ARGV[1]) " +
"local now = tonumber(ARGV[2]) " +
"local lastTime = redis.call('GET', key .. ':last') " +
"local water = redis.call('GET', key .. ':water') " +
"if lastTime == false then " +
" lastTime = now " +
" water = 0 " +
"else " +
" lastTime = tonumber(lastTime) " +
" water = tonumber(water) " +
"end " +
"local leaked = (now - lastTime) * rate / 1000 " +
"water = math.max(0, water - leaked) " +
"if water < 1 then " +
" water = water + 1 " +
" redis.call('SET', key .. ':last', now) " +
" redis.call('SET', key .. ':water', water) " +
" return 1 " +
"else " +
" redis.call('SET', key .. ':last', now) " +
" redis.call('SET', key .. ':water', water) " +
" return 0 " +
"end";
RedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(redisKey),
String.valueOf(rate), String.valueOf(System.currentTimeMillis()));
return result != null && result == 1;
}
}
4.2 降级策略
@Component
public class DegradeService {
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
@Autowired
private ProductService productService;
/**
* 商品查询降级
*/
@CircuitBreaker(name = "productService", fallbackMethod = "getProductFallback")
public Product getProduct(Long productId) {
return productService.getProductFromDB(productId);
}
/**
* 降级方法:返回缓存数据或默认值
*/
public Product getProductFallback(Long productId, Exception ex) {
log.warn("商品查询降级, productId: {}, error: {}", productId, ex.getMessage());
// 1. 尝试从本地缓存获取
Product cachedProduct = getFromLocalCache(productId);
if (cachedProduct != null) {
return cachedProduct;
}
// 2. 返回默认商品信息
Product defaultProduct = new Product();
defaultProduct.setId(productId);
defaultProduct.setName("商品信息暂时不可用");
defaultProduct.setPrice(BigDecimal.ZERO);
defaultProduct.setStock(0);
return defaultProduct;
}
/**
* 库存查询降级
*/
@CircuitBreaker(name = "inventoryService", fallbackMethod = "getStockFallback")
public Integer getStock(Long productId) {
return productService.getStockFromDB(productId);
}
public Integer getStockFallback(Long productId, Exception ex) {
log.warn("库存查询降级, productId: {}", productId);
// 降级时返回0,防止超卖
return 0;
}
/**
* 手动降级开关
*/
private volatile boolean manualDegrade = false;
public void enableManualDegrade() {
this.manualDegrade = true;
log.info("手动降级已开启");
}
public void disableManualDegrade() {
this.manualDegrade = false;
log.info("手动降级已关闭");
}
public boolean isManualDegrade() {
return manualDegrade;
}
}
// Resilience4j配置
resilience4j:
circuitbreaker:
instances:
productService:
registerHealthIndicator: true
slidingWindowSize: 100
minimumNumberOfCalls: 10
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 10s
failureRateThreshold: 50
eventConsumerBufferSize: 10
5. 数据库层面优化
5.1 读写分离
# 读写分离配置
spring:
shardingsphere:
datasource:
names: master, slave0, slave1
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://master:3306/db
username: root
password: password
slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave0:3306/db
username: root
password: password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave1:3306/db
username: root
password: password
rules:
read-write-splitting:
data-sources:
pr_ds:
type: Static
props:
write-data-source-name: master
read-data-source-names: slave0, slave1
load-balancer-name: round_robin
load-balancers:
round_robin:
type: ROUND_ROBIN
5.2 分库分表
# 分库分表配置
spring:
shardingsphere:
rules:
sharding:
tables:
orders:
actual-data-nodes: ds${0..1}.orders${0..15}
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-table-algorithm
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-database-algorithm
sharding-algorithms:
order-table-algorithm:
type: INLINE
props:
algorithm-expression: orders${order_id % 16}
order-database-algorithm:
type: INLINE
props:
algorithm-expression: ds${user_id % 2}
6. 异步化处理
@Service
public class AsyncOrderService {
@Autowired
private ThreadPoolExecutor orderExecutor;
@Autowired
private OrderMapper orderMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 异步创建订单
*/
@Async("orderExecutor")
public CompletableFuture<Order> createOrderAsync(OrderRequest request) {
try {
Order order = createOrder(request);
return CompletableFuture.completedFuture(order);
} catch (Exception e) {
CompletableFuture<Order> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
/**
* 线程池配置
*/
@Configuration
public class ThreadPoolConfig {
@Bean("orderExecutor")
public ThreadPoolExecutor orderExecutor() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
int maximumPoolSize = corePoolSize * 2;
long keepAliveTime = 60L;
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
}
实战案例:秒杀系统架构
系统架构图
秒杀系统架构:
┌─────────────────────────────────────────────────────────────┐
│ 接入层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CDN │ │ WAF │ │ 负载均衡 │ │
│ │ (静态资源) │ │ (安全防护) │ │ (Nginx) │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
└───────────────────────────────────────────┼─────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 网关层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 限流 │ │ 鉴权 │ │ 路由 │ │
│ │ (Sentinel) │ │ (JWT) │ │ (Gateway) │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
└───────────────────────────────────────────┼─────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 库存服务 │ │ 订单服务 │ │ 用户服务 │ │
│ │ (Redis) │ │ (异步队列) │ │ (缓存) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 数据层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Redis │ │ MQ │ │ MySQL │ │
│ │ (热点数据) │ │ (消息队列) │ │ (持久化) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
核心代码实现
@RestController
public class SeckillController {
@Autowired
private SeckillService seckillService;
@Autowired
private RateLimitService rateLimitService;
@PostMapping("/seckill/{productId}")
public Result<SeckillResult> seckill(
@PathVariable Long productId,
@RequestHeader("User-Id") Long userId) {
// 1. 参数校验
if (productId == null || userId == null) {
return Result.fail("参数错误");
}
// 2. 限流检查
String limitKey = "seckill:" + productId + ":" + userId;
if (!rateLimitService.tryAcquireWithSlidingWindow(limitKey, 1, 60)) {
return Result.fail("操作过于频繁,请稍后再试");
}
// 3. 全局限流
String globalLimitKey = "seckill:global:" + productId;
if (!rateLimitService.tryAcquireWithTokenBucket(globalLimitKey, 10000, 10000)) {
return Result.fail("系统繁忙,请重试");
}
// 4. 执行秒杀
SeckillResult result = seckillService.seckill(userId, productId);
return Result.success(result);
}
}
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private SeckillActivityMapper activityMapper;
/**
* 秒杀核心逻辑
*/
public SeckillResult seckill(Long userId, Long productId) {
// 1. 检查活动状态
SeckillActivity activity = getActivity(productId);
if (activity == null || !activity.isActive()) {
return SeckillResult.fail("活动不存在或已结束");
}
// 2. 检查用户是否已抢购
String userKey = "seckill:user:" + productId + ":" + userId;
Boolean hasBought = redisTemplate.hasKey(userKey);
if (Boolean.TRUE.equals(hasBought)) {
return SeckillResult.fail("您已抢购过该商品");
}
// 3. Redis预减库存
String stockKey = "seckill:stock:" + productId;
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,回补
if (stock != null && stock < 0) {
redisTemplate.opsForValue().increment(stockKey);
}
return SeckillResult.fail("商品已售罄");
}
// 4. 标记用户已抢购
redisTemplate.opsForValue().set(userKey, "1", 24, TimeUnit.HOURS);
// 5. 发送创建订单消息
SeckillMessage message = new SeckillMessage(userId, productId, activity.getPrice());
kafkaTemplate.send("seckill-order", JsonUtils.toJson(message));
return SeckillResult.success("抢购成功,正在生成订单");
}
private SeckillActivity getActivity(Long productId) {
String key = "seckill:activity:" + productId;
String activityJson = redisTemplate.opsForValue().get(key);
if (activityJson != null) {
return JsonUtils.fromJson(activityJson, SeckillActivity.class);
}
// 查询数据库并缓存
SeckillActivity activity = activityMapper.selectByProductId(productId);
if (activity != null) {
redisTemplate.opsForValue().set(key, JsonUtils.toJson(activity), 1, TimeUnit.HOURS);
}
return activity;
}
}
@Component
public class OrderCreationConsumer {
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
@KafkaListener(topics = "seckill-order", groupId = "order-service")
public void onSeckillMessage(ConsumerRecord<String, String> record) {
SeckillMessage message = JsonUtils.fromJson(record.value(), SeckillMessage.class);
try {
// 1. 创建订单
Order order = orderService.createSeckillOrder(message);
// 2. 扣减数据库库存
inventoryService.decreaseStock(message.getProductId(), 1);
// 3. 发送支付超时消息(15分钟未支付取消订单)
sendPaymentTimeoutMessage(order.getOrderId(), 15);
log.info("秒杀订单创建成功, orderId: {}, userId: {}",
order.getOrderId(), message.getUserId());
} catch (Exception e) {
log.error("秒杀订单创建失败, userId: {}, productId: {}",
message.getUserId(), message.getProductId(), e);
// 补偿:恢复Redis库存
restoreRedisStock(message.getProductId());
}
}
private void restoreRedisStock(Long productId) {
String stockKey = "seckill:stock:" + productId;
redisTemplate.opsForValue().increment(stockKey);
}
}
性能测试数据
1. 优化前后性能对比
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 峰值QPS | 500 | 50,000 | 100x |
| 平均响应时间 | 15s | 50ms | 300x |
| 数据库连接使用率 | 100% | 30% | 70% |
| 库存超卖 | 20% | 0% | 100% |
| 系统可用性 | 85% | 99.9% | 14.9% |
2. 缓存命中率统计
| 缓存层级 | 命中率 | 平均延迟 | 容量 |
|---|---|---|---|
| 本地缓存(Caffeine) | 65% | 1μs | 10,000 |
| Redis缓存 | 30% | 2ms | 1,000,000 |
| 数据库 | 5% | 10ms | ∞ |
3. 限流效果测试
| 限流算法 | 突发流量处理 | 平滑度 | 复杂度 |
|---|---|---|---|
| 固定窗口 | 差 | 低 | 低 |
| 滑动窗口 | 好 | 中 | 中 |
| 令牌桶 | 好 | 高 | 中 |
| 漏桶 | 好 | 高 | 中 |
经验总结
✅ 最佳实践
-
缓存抗读,队列抗写
- 读多写少场景:多级缓存
- 写多读少场景:消息队列削峰
-
热点数据分散
- 分片存储:将热点数据分散到多个分片
- 读写分离:读请求路由到从库
- 本地缓存:热点数据本地缓存
-
异步化处理
- 非核心操作异步化
- 使用消息队列解耦
- 线程池隔离
-
降级预案
- 熔断降级:服务故障时快速失败
- 限流降级:流量过大时拒绝服务
- 手动降级:大促期间主动降级非核心功能
-
全链路压测
- 生产环境压测
- 模拟真实流量
- 验证系统容量
❌ 常见错误
-
直接打到数据库
// 错误:无缓存保护 @GetMapping("/product/{id}") public Product getProduct(@PathVariable Long id) { return productMapper.selectById(id); // 所有请求直接查库 } // 正确:多级缓存 @GetMapping("/product/{id}") public Product getProduct(@PathVariable Long id) { return productService.getProductWithCache(id); } -
忽略连接池限制
# 错误:连接池配置不合理 maximum-pool-size: 1000 # 超过数据库max_connections # 正确:合理配置 maximum-pool-size: 100 # 根据数据库配置 -
不加锁或锁粒度太大
// 错误:无锁控制 public void deductStock(Long productId) { Integer stock = productMapper.getStock(productId); productMapper.updateStock(productId, stock - 1); // 并发问题 } // 错误:锁粒度太大 @Synchronized public void deductStock(Long productId) { // 锁住了整个方法 // ... } // 正确:细粒度锁 public void deductStock(Long productId) { // 使用数据库行锁或分布式锁 productMapper.decreaseStock(productId, 1); } -
没有降级方案
// 错误:无降级处理 public Product getProduct(Long id) { return productService.getProduct(id); // 服务故障时直接抛异常 } // 正确:有降级方案 @CircuitBreaker(name = "productService", fallbackMethod = "getProductFallback") public Product getProduct(Long id) { return productService.getProduct(id); } public Product getProductFallback(Long id, Exception ex) { return getFromCache(id); // 降级到缓存 }
决策树:高并发场景技术选型
┌─────────────────────────────────────┐
│ 高并发场景分析 │
└─────────────────┬───────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 读多写少 │ │ 写多读少 │ │ 读写均衡 │
│ 商品查询 │ │ 订单创建 │ │ 用户系统 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 多级缓存 │ │ 消息队列 │ │ 读写分离 │
│ CDN+Redis │ │ 异步处理 │ │ 分库分表 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 本地缓存 │ │ 队列削峰 │ │ 主从复制 │
│ 热点数据识别 │ │ 限流保护 │ │ 数据分片 │
└───────────────┘ └───────────────┘ └───────────────┘
检查清单
高并发设计检查清单
- 是否使用了多级缓存?
- 热点数据是否已识别并优化?
- 是否有降级预案?
- 是否配置了限流?
- 连接池配置是否合理?
- 是否有熔断保护?
- 是否进行了全链路压测?
- 是否有监控告警?
- 异步处理是否完善?
- 是否有防刷机制?
秒杀系统专项清单
- 库存预热是否完成?
- 限流策略是否生效?
- 队列消费是否正常?
- 降级开关是否可用?
- 监控大盘是否就绪?
- 应急预案是否演练?
- 对账机制是否完善?
- 数据一致性如何保证?
系列上一篇:分布式事务实战:从本地到分布式的一致性
系列下一篇:主从复制与读写分离实战
知识点测试
读完文章了?来测试一下你对知识点的掌握程度吧!
评论区
使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。
如果评论系统无法加载,请确保:
- 您的网络可以访问 GitHub
- giscus GitHub App 已安装到仓库
- 仓库已启用 Discussions 功能