返回 筑基・网络云路秘径
后端服务优化策略
博主
大约 29 分钟
后端服务优化策略
一、问题引入:高并发下的服务雪崩
1.1 真实案例:电商秒杀系统崩溃
场景:2024年双11秒杀活动,订单服务全面瘫痪
影响:10分钟内损失订单超5000万,用户投诉激增
事故复盘:
┌─────────────────────────────────────────────────────────────┐
│ 阶段1:流量突增 │
│ - 0点秒杀开始,QPS瞬间从500飙升至50000 │
│ - 数据库连接池迅速耗尽 │
│ - Redis缓存命中率骤降 │
├─────────────────────────────────────────────────────────────┤
│ 阶段2:级联故障 │
│ - 订单服务响应时间从50ms增至8s │
│ - 调用方(购物车、商品服务)超时等待 │
│ - 线程池被占满,新请求被拒绝 │
├─────────────────────────────────────────────────────────────┤
│ 阶段3:雪崩效应 │
│ - 库存服务被拖垮,无法扣减库存 │
│ - 支付服务队列堆积,交易超时 │
│ - 整个交易系统陷入瘫痪 │
├─────────────────────────────────────────────────────────────┤
│ 阶段4:根因分析 │
│ - 缺乏限流保护,未做流量削峰 │
│ - 没有熔断机制,故障级联扩散 │
│ - 数据库无读写分离,单点压力过大 │
│ - 缓存策略不当,缓存穿透导致DB压力 │
│ - 异步处理不足,同步调用链过长 │
├─────────────────────────────────────────────────────────────┤
│ 阶段5:优化措施 │
│ - 接入Sentinel限流熔断 │
│ - 实施读写分离,分库分表 │
│ - 引入消息队列异步处理 │
│ - 优化缓存策略,布隆过滤器防穿透 │
│ - 预扣库存,异步确认 │
├─────────────────────────────────────────────────────────────┤
│ 阶段6:优化效果 │
│ - 系统承载能力:5000 QPS → 100000 QPS │
│ - 平均响应时间:8s → 120ms │
│ - 成功率:23% → 99.9% │
│ - 无服务雪崩,故障隔离 │
└─────────────────────────────────────────────────────────────┘
核心教训:
1. 高并发系统需要多层防护
2. 故障隔离比故障恢复更重要
3. 异步化是提升吞吐量的关键
4. 缓存是双刃剑,需要合理使用
1.2 后端性能优化全景图
后端性能优化层次:
┌──────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 架构层优化 │ │
│ │ • 微服务拆分 │ │
│ │ • 服务网格(Service Mesh) │ │
│ │ • 事件驱动架构 │ │
│ │ • CQRS模式 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼──────────────────────────────┐ │
│ │ 应用层优化 │ │ │
│ │ • 并发模型(线程池/协程)│ │ │
│ │ • 异步非阻塞IO │ │ │
│ │ • 连接池优化 │ │ │
│ │ • 缓存策略 │ │ │
│ │ • 限流熔断 │ │ │
│ └────────────────────────┼──────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼──────────────────────────────┐ │
│ │ 数据层优化 │ │ │
│ │ • 数据库优化(索引/SQL)│ │ │
│ │ • 读写分离 │ │ │
│ │ • 分库分表 │ │ │
│ │ • 缓存集群 │ │ │
│ │ • 消息队列 │ │ │
│ └────────────────────────┼──────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼──────────────────────────────┐ │
│ │ 基础设施优化 │ │ │
│ │ • JVM调优 │ │ │
│ │ • 容器资源限制 │ │ │
│ │ • 网络优化 │ │ │
│ │ • 操作系统参数 │ │ │
│ └────────────────────────┴──────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
二、并发模型与线程池优化
2.1 Java并发模型演进
Java并发模型对比:
┌──────────────────────────────────────────────────────────────┐
│ │
│ 1. 同步阻塞模型(传统Servlet) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 请求 → 分配线程 → 执行业务 → 返回响应 → 释放线程 │ │
│ │ │ │
│ │ 特点: │ │
│ │ - 每个请求一个线程 │ │
│ │ - 线程创建销毁开销大 │ │
│ │ - 线程数受限,并发能力有限 │ │
│ │ - 阻塞等待浪费资源 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ 2. 线程池模型(Tomcat线程池) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 任务队列 │───▶│ 线程池 │───▶ 执行任务 │ │
│ │ │ (有界队列) │ │ (固定大小) │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ 特点: │ │
│ │ - 线程复用,减少创建开销 │ │
│ │ - 队列缓冲,削峰填谷 │ │
│ │ - 拒绝策略保护系统 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ 3. 异步非阻塞模型(WebFlux/Reactor) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 请求 → EventLoop → 注册回调 → 立即返回 → 事件通知 │ │
│ │ │ │
│ │ 特点: │ │
│ │ - 少量线程处理大量连接 │ │
│ │ - 非阻塞IO,资源利用率高 │ │
│ │ - 响应式编程,背压控制 │ │
│ │ - 适合IO密集型场景 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ 4. 协程模型(Project Loom虚拟线程) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 请求 → 虚拟线程 → 挂载到平台线程 → 阻塞时自动让出 │ │
│ │ │ │
│ │ 特点: │ │
│ │ - 轻量级线程(数百万个) │ │
│ │ - 自动管理,无需线程池 │ │
│ │ - 同步代码,异步执行 │ │
│ │ - 兼容现有代码 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
2.2 线程池优化配置
/**
* 线程池优化配置
*/
@Configuration
@Slf4j
public class ThreadPoolConfig {
/**
* CPU密集型任务线程池
* 用于计算密集型任务,如复杂算法、数据处理
*/
@Bean("cpuIntensiveExecutor")
public ThreadPoolExecutor cpuIntensiveExecutor() {
int coreCount = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
coreCount, // 核心线程数 = CPU核心数
coreCount + 1, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(1000), // 有界队列,防止OOM
new ThreadFactoryBuilder()
.setNameFormat("cpu-pool-%d")
.setUncaughtExceptionHandler((t, e) ->
log.error("Uncaught exception in thread {}", t.getName(), e))
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者执行
);
}
/**
* IO密集型任务线程池
* 用于网络请求、数据库操作、文件IO
*/
@Bean("ioIntensiveExecutor")
public ThreadPoolExecutor ioIntensiveExecutor() {
int coreCount = Runtime.getRuntime().availableProcessors();
int maxPoolSize = coreCount * 4; // IO密集型可以更多线程
return new ThreadPoolExecutor(
coreCount * 2, // 核心线程数
maxPoolSize, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000), // 更大的队列
new ThreadFactoryBuilder()
.setNameFormat("io-pool-%d")
.build(),
new ThreadPoolExecutor.DiscardOldestPolicy() // 丢弃最老任务
);
}
/**
* 高优先级任务线程池
* 用于关键业务路径
*/
@Bean("priorityExecutor")
public ThreadPoolExecutor priorityExecutor() {
return new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(), // 优先级队列
new ThreadFactoryBuilder()
.setNameFormat("priority-pool-%d")
.build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
/**
* 定时任务线程池
*/
@Bean("scheduledExecutor")
public ScheduledExecutorService scheduledExecutor() {
return Executors.newScheduledThreadPool(
4,
new ThreadFactoryBuilder()
.setNameFormat("scheduled-pool-%d")
.build()
);
}
/**
* ForkJoinPool(并行流使用)
*/
@Bean("forkJoinPool")
public ForkJoinPool forkJoinPool() {
int parallelism = Runtime.getRuntime().availableProcessors();
return new ForkJoinPool(
parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true // asyncMode,适合事件处理
);
}
}
/**
* 线程池监控
*/
@Component
@Slf4j
public class ThreadPoolMonitor {
@Autowired
private Map<String, ThreadPoolExecutor> executors;
@Scheduled(fixedRate = 60000) // 每分钟监控
public void monitor() {
executors.forEach((name, executor) -> {
ThreadPoolExecutor pool = executor;
log.info("ThreadPool [{}] Status: " +
"Active={}, PoolSize={}, CorePoolSize={}, " +
"LargestPoolSize={}, TaskCount={}, CompletedTaskCount={}, " +
"QueueSize={}",
name,
pool.getActiveCount(),
pool.getPoolSize(),
pool.getCorePoolSize(),
pool.getLargestPoolSize(),
pool.getTaskCount(),
pool.getCompletedTaskCount(),
pool.getQueue().size()
);
// 告警:队列积压过多
if (pool.getQueue().size() > pool.getCorePoolSize() * 10) {
log.warn("ThreadPool [{}] queue is backing up! Size={}",
name, pool.getQueue().size());
}
});
}
}
2.3 虚拟线程(Project Loom)
/**
* 虚拟线程使用示例(Java 21+)
*/
@Service
@Slf4j
public class VirtualThreadService {
/**
* 创建虚拟线程执行器
*/
public ExecutorService createVirtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
/**
* 使用虚拟线程处理并发请求
*/
public List<String> processConcurrentRequests(List<String> urls) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = urls.stream()
.map(url -> executor.submit(() -> fetchUrl(url)))
.toList();
return futures.stream()
.map(future -> {
try {
return future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Failed to fetch", e);
return null;
}
})
.filter(Objects::nonNull)
.toList();
}
}
/**
* 结构化并发(Java 21+)
*/
public String structuredFetch(String url1, String url2) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<String> task1 =
scope.fork(() -> fetchUrl(url1));
StructuredTaskScope.Subtask<String> task2 =
scope.fork(() -> fetchUrl(url2));
scope.join(); // 等待所有任务
scope.throwIfFailed(); // 任一失败则抛出异常
return task1.get() + task2.get();
} catch (Exception e) {
log.error("Structured fetch failed", e);
throw new RuntimeException(e);
}
}
private String fetchUrl(String url) {
// 模拟HTTP请求
try {
Thread.sleep(100); // 虚拟线程会自动让出
return "Response of " + url;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
三、连接池优化
3.1 数据库连接池配置
/**
* HikariCP连接池优化配置
*/
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariConfig config = new HikariConfig();
// 基础配置
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
config.setUsername("user");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池大小优化
// 公式:connections = ((core_count * 2) + effective_spindle_count)
int coreCount = Runtime.getRuntime().availableProcessors();
config.setMaximumPoolSize(coreCount * 2 + 1); // 通常10-20
config.setMinimumIdle(coreCount); // 最小空闲连接
// 连接超时配置
config.setConnectionTimeout(30000); // 获取连接等待时间:30s
config.setIdleTimeout(600000); // 空闲连接超时:10min
config.setMaxLifetime(1800000); // 连接最大生命周期:30min
config.setKeepaliveTime(300000); // 保活检测间隔:5min
// 性能优化
config.setAutoCommit(false); // 手动控制事务
config.setCachePrepStmts(true); // 缓存预编译语句
config.setPrepStmtCacheSize(250); // 缓存大小
config.setPrepStmtCacheSqlLimit(2048); // 单条SQL长度限制
config.setUseServerPrepStmts(true); // 使用服务器端预编译
config.setUseLocalSessionState(true); // 使用本地会话状态
config.setRewriteBatchedStatements(true); // 批量重写
config.setCacheResultSetMetadata(true); // 缓存结果集元数据
config.setCacheServerConfiguration(true); // 缓存服务器配置
config.setElideSetAutoCommits(true); // 优化autocommit
config.setMaintainTimeStats(false); // 关闭时间统计
// 监控配置
config.setMetricRegistry(new CodahaleMetrics()); // 指标收集
config.setHealthCheckRegistry(new HealthCheckRegistry());
config.setPoolName("HikariPool-Main");
// 泄漏检测
config.setLeakDetectionThreshold(60000); // 连接泄漏检测:60s
return new HikariDataSource(config);
}
/**
* 读写分离配置
*/
@Bean
public DataSource routingDataSource(
@Qualifier("masterDataSource") DataSource master,
@Qualifier("slaveDataSource") DataSource slave) {
DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", master);
targetDataSources.put("slave", slave);
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(master);
return routingDataSource;
}
}
/**
* 数据源上下文
*/
public class DataSourceContext {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSource(String dataSource) {
contextHolder.set(dataSource);
}
public static String getDataSource() {
return contextHolder.get();
}
public static void clear() {
contextHolder.remove();
}
}
/**
* 动态数据源路由
*/
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContext.getDataSource();
}
}
/**
* 读写分离注解
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}
/**
* 读写分离切面
*/
@Aspect
@Component
public class DataSourceAspect {
@Before("@annotation(readOnly)")
public void setReadDataSource(ReadOnly readOnly) {
DataSourceContext.setDataSource("slave");
}
@After("@annotation(readOnly)")
public void clearDataSource() {
DataSourceContext.clear();
}
}
3.2 HTTP连接池优化
/**
* HTTP连接池优化配置
*/
@Configuration
public class HttpClientConfig {
/**
* OkHttp连接池配置
*/
@Bean
public OkHttpClient okHttpClient() {
// 连接池:复用HTTP/2连接
ConnectionPool connectionPool = new ConnectionPool(
50, // 最大空闲连接数
5, // 连接存活时间(分钟)
TimeUnit.MINUTES
);
return new OkHttpClient.Builder()
.connectionPool(connectionPool)
.connectTimeout(5, TimeUnit.SECONDS) // 连接超时
.readTimeout(30, TimeUnit.SECONDS) // 读取超时
.writeTimeout(30, TimeUnit.SECONDS) // 写入超时
.callTimeout(60, TimeUnit.SECONDS) // 整体调用超时
.retryOnConnectionFailure(true) // 连接失败重试
.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1))
.addInterceptor(new RetryInterceptor(3)) // 重试拦截器
.addInterceptor(new LoggingInterceptor()) // 日志拦截器
.build();
}
/**
* Apache HttpClient连接池
*/
@Bean
public CloseableHttpClient httpClient() {
PoolingHttpClientConnectionManager connectionManager =
new PoolingHttpClientConnectionManager();
// 连接池配置
connectionManager.setMaxTotal(200); // 最大连接数
connectionManager.setDefaultMaxPerRoute(50); // 每个路由最大连接
connectionManager.setValidateAfterInactivity(30000); // 验证空闲连接
// 连接配置
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(5000)
.setSocketTimeout(30000)
.setConnectionRequestTimeout(5000)
.build();
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.evictIdleConnections(60, TimeUnit.SECONDS) // 定期清理空闲连接
.evictExpiredConnections()
.build();
}
/**
* WebClient配置(响应式)
*/
@Bean
public WebClient webClient() {
ConnectionProvider provider = ConnectionProvider.builder("custom")
.maxConnections(500)
.maxIdleTime(Duration.ofSeconds(20))
.maxLifeTime(Duration.ofSeconds(60))
.pendingAcquireTimeout(Duration.ofSeconds(60))
.evictInBackground(Duration.ofSeconds(120))
.build();
HttpClient httpClient = HttpClient.create(provider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(30))
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(30))
.addHandlerLast(new WriteTimeoutHandler(30)));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(10 * 1024 * 1024)) // 10MB缓冲区
.build();
}
}
四、缓存策略优化
4.1 多级缓存架构
/**
* 多级缓存管理器
*/
@Component
@Slf4j
public class MultiLevelCache {
@Autowired
private CaffeineCache localCache; // L1: 本地缓存
@Autowired
private RedisTemplate<String, Object> redisCache; // L2: Redis缓存
private static final String CACHE_PREFIX = "app:cache:";
private static final long LOCAL_TTL = 60; // 本地缓存60秒
private static final long REDIS_TTL = 600; // Redis缓存10分钟
/**
* 读取缓存(多级缓存)
*/
public <T> T get(String key, Class<T> type, Supplier<T> loader) {
String fullKey = CACHE_PREFIX + key;
// 1. 查询本地缓存
T value = localCache.getIfPresent(fullKey);
if (value != null) {
log.debug("Local cache hit: {}", key);
return value;
}
// 2. 查询Redis缓存
value = (T) redisCache.opsForValue().get(fullKey);
if (value != null) {
log.debug("Redis cache hit: {}", key);
// 回填本地缓存
localCache.put(fullKey, value);
return value;
}
// 3. 加载数据
log.debug("Cache miss, loading: {}", key);
value = loader.get();
if (value != null) {
// 写入多级缓存
put(fullKey, value);
}
return value;
}
/**
* 写入缓存
*/
public void put(String key, Object value) {
String fullKey = CACHE_PREFIX + key;
// 写入本地缓存
localCache.put(fullKey, value);
// 写入Redis(异步)
CompletableFuture.runAsync(() -> {
redisCache.opsForValue().set(fullKey, value, REDIS_TTL, TimeUnit.SECONDS);
});
}
/**
* 删除缓存
*/
public void evict(String key) {
String fullKey = CACHE_PREFIX + key;
localCache.invalidate(fullKey);
redisCache.delete(fullKey);
// 发送缓存失效消息(用于集群同步)
publishEvictEvent(fullKey);
}
/**
* 缓存更新(先更新数据库,再删除缓存)
*/
public <T> T update(String key, Function<T, T> updater, Supplier<T> loader) {
// 1. 加载当前值
T current = get(key, (Class<T>) Object.class, loader);
// 2. 更新值
T updated = updater.apply(current);
// 3. 更新数据库(业务逻辑)
// ...
// 4. 删除缓存(非更新,避免并发问题)
evict(key);
return updated;
}
/**
* 批量读取(防止缓存击穿)
*/
public <T> Map<String, T> batchGet(
List<String> keys,
Class<T> type,
Function<List<String>, Map<String, T>> batchLoader) {
Map<String, T> result = new HashMap<>();
List<String> missingKeys = new ArrayList<>();
// 1. 批量查询缓存
for (String key : keys) {
T value = get(key, type, null);
if (value != null) {
result.put(key, value);
} else {
missingKeys.add(key);
}
}
// 2. 批量加载缺失数据
if (!missingKeys.isEmpty()) {
Map<String, T> loaded = batchLoader.apply(missingKeys);
loaded.forEach((k, v) -> {
put(k, v);
result.put(k, v);
});
}
return result;
}
private void publishEvictEvent(String key) {
// 发布缓存失效消息到Redis Pub/Sub
redisCache.convertAndSend("cache:evict", key);
}
}
/**
* 本地缓存配置(Caffeine)
*/
@Configuration
public class LocalCacheConfig {
@Bean
public CaffeineCache localCache() {
Caffeine<Object, Object> caffeine = Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10000) // 最大条目数
.expireAfterWrite(60, TimeUnit.SECONDS) // 写入后过期
.refreshAfterWrite(50, TimeUnit.SECONDS) // 自动刷新
.recordStats() // 开启统计
.removalListener((key, value, cause) ->
log.debug("Cache entry removed: {}, cause: {}", key, cause));
return new CaffeineCache("local", caffeine.build());
}
}
4.2 缓存问题解决方案
/**
* 缓存问题解决方案
*/
@Component
@Slf4j
public class CacheProblemSolver {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedissonClient redissonClient;
private static final String LOCK_PREFIX = "cache:lock:";
private static final String BLOOM_FILTER = "cache:bloom";
/**
* 解决缓存穿透:布隆过滤器 + 空值缓存
*/
public <T> T solveCachePenetration(
String key,
Class<T> type,
Supplier<T> loader) {
// 1. 布隆过滤器检查
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(BLOOM_FILTER);
if (!bloomFilter.contains(key)) {
log.warn("Bloom filter miss, key may not exist: {}", key);
return null; // 数据大概率不存在,直接返回
}
// 2. 查询缓存
T value = (T) redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 3. 查询数据库
value = loader.get();
// 4. 缓存结果(即使是null也缓存,防止穿透)
if (value != null) {
redisTemplate.opsForValue().set(key, value, 10, TimeUnit.MINUTES);
} else {
// 缓存空值,短时间过期
redisTemplate.opsForValue().set(key, "NULL", 60, TimeUnit.SECONDS);
}
return value;
}
/**
* 解决缓存击穿:互斥锁
*/
public <T> T solveCacheBreakdown(
String key,
Class<T> type,
Supplier<T> loader) {
// 1. 查询缓存
T value = (T) redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 2. 获取分布式锁
String lockKey = LOCK_PREFIX + key;
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试获取锁,等待10秒,锁持有30秒
boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (locked) {
try {
// 双重检查
value = (T) redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 加载数据
value = loader.get();
if (value != null) {
redisTemplate.opsForValue().set(key, value, 10, TimeUnit.MINUTES);
}
} finally {
lock.unlock();
}
} else {
// 获取锁失败,短暂等待后重试
Thread.sleep(100);
return solveCacheBreakdown(key, type, loader);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return value;
}
/**
* 解决缓存雪崩:随机过期时间 + 热点数据永不过期
*/
public <T> void setWithRandomExpire(String key, T value, long baseExpireSeconds) {
// 添加随机偏移,防止同时过期
long randomOffset = ThreadLocalRandom.current().nextLong(60);
long expireSeconds = baseExpireSeconds + randomOffset;
redisTemplate.opsForValue().set(key, value, expireSeconds, TimeUnit.SECONDS);
}
/**
* 热点数据永不过期 + 异步刷新
*/
public <T> T getHotData(String key, Class<T> type, Supplier<T> loader) {
String json = (String) redisTemplate.opsForValue().get(key);
if (json == null) {
return null;
}
CacheWrapper<T> wrapper = JSON.parseObject(json, new TypeReference<>() {});
T value = wrapper.getData();
// 检查是否需要刷新(逻辑过期)
if (wrapper.getExpireTime() < System.currentTimeMillis()) {
// 异步刷新,不阻塞当前请求
CompletableFuture.runAsync(() -> refreshData(key, loader));
}
return value;
}
private <T> void refreshData(String key, Supplier<T> loader) {
String lockKey = LOCK_PREFIX + key + ":refresh";
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
try {
T newValue = loader.get();
CacheWrapper<T> wrapper = new CacheWrapper<>();
wrapper.setData(newValue);
wrapper.setExpireTime(System.currentTimeMillis() + 60000);
redisTemplate.opsForValue().set(key, JSON.toJSONString(wrapper));
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Data
private static class CacheWrapper<T> {
private T data;
private long expireTime; // 逻辑过期时间
}
}
五、限流熔断与降级
5.1 Sentinel限流配置
/**
* Sentinel限流熔断配置
*/
@Configuration
@Slf4j
public class SentinelConfig {
@PostConstruct
public void initRules() {
// 1. 流量控制规则
List<FlowRule> flowRules = new ArrayList<>();
// QPS限流:每秒100个请求
FlowRule qpsRule = new FlowRule();
qpsRule.setResource("orderQuery");
qpsRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
qpsRule.setCount(100);
qpsRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
qpsRule.setWarmUpPeriodSec(10); // 预热10秒
flowRules.add(qpsRule);
// 并发线程数限流
FlowRule threadRule = new FlowRule();
threadRule.setResource("orderCreate");
threadRule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
threadRule.setCount(50); // 最大50个并发线程
flowRules.add(threadRule);
// 2. 熔断降级规则
List<DegradeRule> degradeRules = new ArrayList<>();
// 慢调用比例熔断
DegradeRule slowCallRule = new DegradeRule();
slowCallRule.setResource("paymentService");
slowCallRule.setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType());
slowCallRule.setCount(0.5); // 慢调用比例阈值50%
slowCallRule.setSlowRatioThreshold(0.5);
slowCallRule.setSlowRequestAmount(100); // 最小请求数
slowCallRule.setTimeWindow(30); // 熔断持续时间30秒
degradeRules.add(slowCallRule);
// 异常比例熔断
DegradeRule errorRule = new DegradeRule();
errorRule.setResource("inventoryService");
errorRule.setGrade(CircuitBreakerStrategy.ERROR_RATIO.getType());
errorRule.setCount(0.5); // 异常比例阈值50%
errorRule.setMinRequestAmount(50);
errorRule.setTimeWindow(30);
degradeRules.add(errorRule);
// 3. 系统保护规则
List<SystemRule> systemRules = new ArrayList<>();
SystemRule systemRule = new SystemRule();
systemRule.setHighestSystemLoad(10); // 最大系统负载
systemRule.setHighestCpuUsage(0.8); // 最大CPU使用率
systemRule.setAvgRt(500); // 平均响应时间
systemRule.setMaxThread(500); // 最大线程数
systemRule.setQps(1000); // 入口QPS
systemRules.add(systemRule);
// 加载规则
FlowRuleManager.loadRules(flowRules);
DegradeRuleManager.loadRules(degradeRules);
SystemRuleManager.loadRules(systemRules);
}
/**
* 自定义限流处理
*/
@Bean
public BlockExceptionHandler blockExceptionHandler() {
return (request, response, e) -> {
log.warn("Blocked by Sentinel: {}", e.getRule().getResource());
response.setStatus(429);
response.setContentType("application/json;charset=UTF-8");
Map<String, Object> result = new HashMap<>();
result.put("code", 429);
result.put("message", "系统繁忙,请稍后重试");
result.put("resource", e.getRule().getResource());
response.getWriter().write(JSON.toJSONString(result));
};
}
}
/**
* 使用Sentinel注解
*/
@Service
@Slf4j
public class OrderService {
/**
* 查询订单(限流保护)
*/
@SentinelResource(
value = "orderQuery",
blockHandler = "handleQueryBlock",
fallback = "handleQueryFallback"
)
public Order queryOrder(Long orderId) {
// 查询逻辑
return orderMapper.selectById(orderId);
}
/**
* 限流处理
*/
public Order handleQueryBlock(Long orderId, BlockException ex) {
log.warn("Query order blocked: {}", orderId);
// 返回缓存数据或默认值
return getOrderFromCache(orderId);
}
/**
* 异常降级
*/
public Order handleQueryFallback(Long orderId, Throwable ex) {
log.error("Query order failed: {}", orderId, ex);
// 返回兜底数据
return Order.empty();
}
/**
* 创建订单(热点参数限流)
*/
@SentinelResource("orderCreate")
public Order createOrder(CreateOrderRequest request) {
// 热点参数限流配置
ParamFlowRule rule = new ParamFlowRule("orderCreate")
.setParamIdx(0) // 第一个参数
.setGrade(RuleConstant.FLOW_GRADE_QPS)
.setCount(10); // 每个用户每秒10个请求
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
// 创建逻辑
return doCreateOrder(request);
}
}
5.2 Resilience4j熔断配置
/**
* Resilience4j熔断配置
*/
@Configuration
public class Resilience4jConfig {
@Bean
public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(CircuitBreakerConfig.custom()
// 失败率阈值50%
.failureRateThreshold(50)
// 慢调用比例阈值50%
.slowCallRateThreshold(50)
// 慢调用时间阈值2秒
.slowCallDurationThreshold(Duration.ofSeconds(2))
// 熔断持续时间30秒
.waitDurationInOpenState(Duration.ofSeconds(30))
// 半开状态允许请求数
.permittedNumberOfCallsInHalfOpenState(10)
// 滑动窗口大小100
.slidingWindowSize(100)
// 最小调用数
.minimumNumberOfCalls(20)
// 自动从开状态转到半开
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.build())
.timeLimiterConfig(TimeLimiterConfig.custom()
// 超时时间5秒
.timeoutDuration(Duration.ofSeconds(5))
.build())
.build());
}
@Bean
public Customizer<Resilience4jBulkheadProvider> bulkheadProviderCustomizer() {
return provider -> provider.configure(builder -> builder
.bulkheadConfig(BulkheadConfig.custom()
// 最大并发调用数
.maxConcurrentCalls(50)
// 等待时间
.maxWaitDuration(Duration.ofMillis(500))
.build()),
"orderService", "paymentService");
}
}
/**
* 使用Resilience4j
*/
@Service
@Slf4j
public class PaymentService {
private final CircuitBreakerFactory circuitBreakerFactory;
private final Bulkhead bulkhead;
@Autowired
public PaymentService(CircuitBreakerFactory circuitBreakerFactory,
BulkheadRegistry bulkheadRegistry) {
this.circuitBreakerFactory = circuitBreakerFactory;
this.bulkhead = bulkheadRegistry.bulkhead("payment");
}
/**
* 带熔断的支付调用
*/
public PaymentResult processPayment(PaymentRequest request) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("payment");
return circuitBreaker.run(
() -> doProcessPayment(request),
throwable -> {
log.error("Payment failed, fallback triggered", throwable);
return processPaymentFallback(request);
}
);
}
/**
* 带信号量隔离的支付调用
*/
@Bulkhead(name = "payment", type = Bulkhead.Type.SEMAPHORE)
public PaymentResult processPaymentWithBulkhead(PaymentRequest request) {
return doProcessPayment(request);
}
/**
* 带线程池隔离的支付调用
*/
@Bulkhead(name = "payment", type = Bulkhead.Type.THREADPOOL)
@TimeLimiter(name = "payment")
@CircuitBreaker(name = "payment", fallbackMethod = "processPaymentAsyncFallback")
public CompletableFuture<PaymentResult> processPaymentAsync(PaymentRequest request) {
return CompletableFuture.supplyAsync(() -> doProcessPayment(request));
}
private PaymentResult doProcessPayment(PaymentRequest request) {
// 实际支付逻辑
return new PaymentResult();
}
private PaymentResult processPaymentFallback(PaymentRequest request) {
// 降级逻辑:记录待处理,稍后重试
savePendingPayment(request);
return PaymentResult.pending();
}
private CompletableFuture<PaymentResult> processPaymentAsyncFallback(
PaymentRequest request, Exception ex) {
return CompletableFuture.completedFuture(processPaymentFallback(request));
}
}
六、异步与消息队列
6.1 异步处理架构
/**
* 异步事件处理器
*/
@Service
@Slf4j
public class AsyncEventProcessor {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private ThreadPoolExecutor asyncExecutor;
/**
* 发布异步事件
*/
public void publishAsyncEvent(OrderEvent event) {
CompletableFuture.runAsync(() -> {
eventPublisher.publishEvent(event);
}, asyncExecutor).exceptionally(ex -> {
log.error("Failed to publish event", ex);
// 保存到失败队列,稍后重试
saveToDeadLetterQueue(event);
return null;
});
}
/**
* 事务性事件发布(确保事务提交后才发布)
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreated(OrderCreatedEvent event) {
// 事务提交后执行
sendNotification(event);
updateStatistics(event);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleOrderRollback(OrderCreatedEvent event) {
// 事务回滚后执行补偿
compensateOrder(event);
}
}
/**
* 事件监听器
*/
@Component
@Slf4j
public class OrderEventListener {
@Async("ioIntensiveExecutor")
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Processing order created event: {}", event.getOrderId());
// 异步处理:发送邮件
sendEmailNotification(event);
// 异步处理:更新库存
updateInventory(event);
// 异步处理:记录日志
auditLog(event);
}
@Async
@EventListener(condition = "#event.amount > 10000")
public void handleLargeOrder(OrderCreatedEvent event) {
// 只处理大额订单
notifyManager(event);
}
}
6.2 RabbitMQ消息队列
/**
* RabbitMQ配置
*/
@Configuration
public class RabbitMQConfig {
/**
* 订单队列
*/
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx.exchange")
.withArgument("x-dead-letter-routing-key", "order.dlx.routingkey")
.withArgument("x-message-ttl", 300000) // 消息TTL 5分钟
.withArgument("x-max-length", 10000) // 队列最大长度
.build();
}
/**
* 死信队列
*/
@Bean
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable("order.dlx.queue").build();
}
/**
* 延迟队列(使用死信实现)
*/
@Bean
public Queue orderDelayQueue() {
return QueueBuilder.durable("order.delay.queue")
.withArgument("x-dead-letter-exchange", "order.exchange")
.withArgument("x-dead-letter-routing-key", "order.routingkey")
.withArgument("x-message-ttl", 1800000) // 延迟30分钟
.build();
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.routingkey");
}
/**
* 消费者配置
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5); // 并发消费者数
factory.setMaxConcurrentConsumers(20); // 最大并发消费者
factory.setPrefetchCount(50); // 预取数量
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
factory.setDefaultRequeueRejected(false); // 拒绝后不入队
factory.setAdviceChain(
RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 10000) // 指数退避
.recoverer(new RejectAndDontRequeueRecoverer())
.build()
);
return factory;
}
}
/**
* 消息生产者
*/
@Service
@Slf4j
public class OrderMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送订单消息
*/
public void sendOrderMessage(Order order) {
MessageProperties properties = new MessageProperties();
properties.setMessageId(UUID.randomUUID().toString());
properties.setTimestamp(new Date());
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化
Message message = new Message(
JSON.toJSONBytes(order),
properties
);
// 确认机制
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message sent successfully: {}", correlationData);
} else {
log.error("Message failed to send: {}, cause: {}", correlationData, cause);
// 重试或保存到数据库
}
});
rabbitTemplate.convertAndSend(
"order.exchange",
"order.routingkey",
message
);
}
/**
* 发送延迟消息
*/
public void sendDelayMessage(Order order, int delayMinutes) {
rabbitTemplate.convertAndSend(
"order.delay.exchange",
"order.delay.routingkey",
order,
message -> {
message.getMessageProperties()
.setDelay(delayMinutes * 60 * 1000);
return message;
}
);
}
}
/**
* 消息消费者
*/
@Component
@Slf4j
public class OrderMessageConsumer {
@RabbitListener(queues = "order.queue")
public void handleOrderMessage(
Message message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
Order order = JSON.parseObject(message.getBody(), Order.class);
// 幂等性检查
if (isDuplicate(order.getMessageId())) {
channel.basicAck(deliveryTag, false);
return;
}
// 处理订单
processOrder(order);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("Failed to process order message", e);
try {
// 拒绝消息,进入死信队列
channel.basicNack(deliveryTag, false, false);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
private boolean isDuplicate(String messageId) {
// 使用Redis或数据库检查消息是否已处理
return false;
}
private void processOrder(Order order) {
// 订单处理逻辑
}
}
七、最佳实践与检查清单
┌─────────────────────────────────────────────────────────────────────┐
│ 后端服务优化检查清单 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【并发优化】 │
│ □ 1. 合理配置线程池大小 │
│ □ 2. 区分CPU密集型和IO密集型任务 │
│ □ 3. 使用CompletableFuture异步编排 │
│ □ 4. 考虑使用虚拟线程(Java 21+) │
│ □ 5. 监控线程池状态,设置告警 │
│ │
│ 【连接池优化】 │
│ □ 1. 数据库连接池大小 = (核心数 * 2) + 有效磁盘数 │
│ □ 2. 启用连接池监控 │
│ □ 3. 配置连接超时和空闲检测 │
│ □ 4. HTTP连接池复用 │
│ □ 5. 实施读写分离 │
│ │
│ 【缓存优化】 │
│ □ 1. 多级缓存架构(本地+分布式) │
│ □ 2. 缓存穿透:布隆过滤器+空值缓存 │
│ □ 3. 缓存击穿:互斥锁保护 │
│ □ 4. 缓存雪崩:随机过期时间 │
│ □ 5. 热点数据:逻辑过期+异步刷新 │
│ │
│ 【稳定性保障】 │
│ □ 1. 限流:QPS限流、并发限流、热点限流 │
│ □ 2. 熔断:慢调用熔断、异常熔断 │
│ □ 3. 降级:优雅降级策略 │
│ □ 4. 隔离:线程池隔离、信号量隔离 │
│ □ 5. 重试:指数退避重试 │
│ │
│ 【异步化】 │
│ □ 1. 非关键路径异步处理 │
│ □ 2. 使用消息队列削峰填谷 │
│ □ 3. 事务消息保证最终一致性 │
│ □ 4. 延迟队列处理定时任务 │
│ □ 5. 死信队列处理失败消息 │
│ │
│ 【监控告警】 │
│ □ 1. 接口响应时间监控 │
│ □ 2. 错误率监控 │
│ □ 3. 吞吐量监控 │
│ □ 4. 资源使用率监控(CPU/内存/连接池) │
│ □ 5. 业务指标监控(订单量/支付成功率) │
│ │
└─────────────────────────────────────────────────────────────────────┘
八、经验总结
8.1 常见性能问题与解决方案
| 问题 | 现象 | 解决方案 |
|---|---|---|
| 线程池耗尽 | 请求被拒绝,响应超时 | 调大线程池,优化慢查询,异步化 |
| 数据库连接池耗尽 | 获取连接超时 | 优化SQL,增加连接池,读写分离 |
| 缓存穿透 | DB压力突增 | 布隆过滤器,空值缓存 |
| 缓存雪崩 | 大量请求打到DB | 随机过期时间,热点数据永不过期 |
| 服务雪崩 | 级联故障 | 熔断降级,限流保护,故障隔离 |
| 消息积压 | 消费延迟增大 | 增加消费者,优化消费逻辑 |
8.2 性能优化决策树
┌─────────────────┐
│ 性能问题定位 │
└────────┬────────┘
│
▼
┌──────────────────────────────┐
│ 是响应慢还是吞吐量低? │
└─────────────┬────────────────┘
│
┌────────────────┼────────────────┐
▼响应慢 ▼吞吐量低
┌───────────────┐ ┌───────────────┐
│ 检查慢查询 │ │ 检查线程池 │
│ 优化数据库 │ │ 异步化处理 │
│ 增加缓存 │ │ 消息队列削峰 │
└───────────────┘ └───────────────┘
系列上一篇:前端性能优化实战
系列下一篇:TCP拥塞控制深度解析
知识点测试
读完文章了?来测试一下你对知识点的掌握程度吧!
评论区
使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。
如果评论系统无法加载,请确保:
- 您的网络可以访问 GitHub
- giscus GitHub App 已安装到仓库
- 仓库已启用 Discussions 功能