返回 筑基・网络云路秘径

后端服务优化策略

博主
大约 29 分钟

后端服务优化策略

一、问题引入:高并发下的服务雪崩

1.1 真实案例:电商秒杀系统崩溃

场景:2024年双11秒杀活动,订单服务全面瘫痪
影响:10分钟内损失订单超5000万,用户投诉激增

事故复盘:
┌─────────────────────────────────────────────────────────────┐
│ 阶段1:流量突增                                               │
│ - 0点秒杀开始,QPS瞬间从500飙升至50000                       │
│ - 数据库连接池迅速耗尽                                       │
│ - Redis缓存命中率骤降                                        │
├─────────────────────────────────────────────────────────────┤
│ 阶段2:级联故障                                               │
│ - 订单服务响应时间从50ms增至8s                               │
│ - 调用方(购物车、商品服务)超时等待                         │
│ - 线程池被占满,新请求被拒绝                                 │
├─────────────────────────────────────────────────────────────┤
│ 阶段3:雪崩效应                                               │
│ - 库存服务被拖垮,无法扣减库存                               │
│ - 支付服务队列堆积,交易超时                                 │
│ - 整个交易系统陷入瘫痪                                       │
├─────────────────────────────────────────────────────────────┤
│ 阶段4:根因分析                                               │
│ - 缺乏限流保护,未做流量削峰                                 │
│ - 没有熔断机制,故障级联扩散                                 │
│ - 数据库无读写分离,单点压力过大                             │
│ - 缓存策略不当,缓存穿透导致DB压力                           │
│ - 异步处理不足,同步调用链过长                               │
├─────────────────────────────────────────────────────────────┤
│ 阶段5:优化措施                                               │
│ - 接入Sentinel限流熔断                                       │
│ - 实施读写分离,分库分表                                     │
│ - 引入消息队列异步处理                                       │
│ - 优化缓存策略,布隆过滤器防穿透                             │
│ - 预扣库存,异步确认                                         │
├─────────────────────────────────────────────────────────────┤
│ 阶段6:优化效果                                               │
│ - 系统承载能力:5000 QPS → 100000 QPS                        │
│ - 平均响应时间:8s → 120ms                                   │
│ - 成功率:23% → 99.9%                                        │
│ - 无服务雪崩,故障隔离                                       │
└─────────────────────────────────────────────────────────────┘

核心教训:
1. 高并发系统需要多层防护
2. 故障隔离比故障恢复更重要
3. 异步化是提升吞吐量的关键
4. 缓存是双刃剑,需要合理使用

1.2 后端性能优化全景图

后端性能优化层次:
┌──────────────────────────────────────────────────────────────┐
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ 架构层优化                                            │   │
│  │ • 微服务拆分                                          │   │
│  │ • 服务网格(Service Mesh)                              │   │
│  │ • 事件驱动架构                                        │   │
│  │ • CQRS模式                                            │   │
│  └──────────────────────────────────────────────────────┘   │
│                           │                                  │
│  ┌────────────────────────┼──────────────────────────────┐  │
│  │ 应用层优化              │                              │  │
│  │ • 并发模型(线程池/协程)│                             │  │
│  │ • 异步非阻塞IO          │                              │  │
│  │ • 连接池优化            │                              │  │
│  │ • 缓存策略              │                              │  │
│  │ • 限流熔断              │                              │  │
│  └────────────────────────┼──────────────────────────────┘  │
│                           │                                  │
│  ┌────────────────────────┼──────────────────────────────┐  │
│  │ 数据层优化              │                              │  │
│  │ • 数据库优化(索引/SQL)│                              │  │
│  │ • 读写分离              │                              │  │
│  │ • 分库分表              │                              │  │
│  │ • 缓存集群              │                              │  │
│  │ • 消息队列              │                              │  │
│  └────────────────────────┼──────────────────────────────┘  │
│                           │                                  │
│  ┌────────────────────────┼──────────────────────────────┐  │
│  │ 基础设施优化            │                              │  │
│  │ • JVM调优              │                              │  │
│  │ • 容器资源限制          │                              │  │
│  │ • 网络优化              │                              │  │
│  │ • 操作系统参数          │                              │  │
│  └────────────────────────┴──────────────────────────────┘  │
│                                                              │
└──────────────────────────────────────────────────────────────┘

二、并发模型与线程池优化

2.1 Java并发模型演进

Java并发模型对比:
┌──────────────────────────────────────────────────────────────┐
│                                                              │
│  1. 同步阻塞模型(传统Servlet)                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ 请求 → 分配线程 → 执行业务 → 返回响应 → 释放线程     │   │
│  │                                                      │   │
│  │ 特点:                                               │   │
│  │ - 每个请求一个线程                                   │   │
│  │ - 线程创建销毁开销大                                 │   │
│  │ - 线程数受限,并发能力有限                           │   │
│  │ - 阻塞等待浪费资源                                   │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  2. 线程池模型(Tomcat线程池)                               │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ ┌─────────────┐    ┌─────────────┐                  │   │
│  │ │ 任务队列    │───▶│  线程池     │───▶ 执行任务     │   │
│  │ │ (有界队列)  │    │ (固定大小)  │                  │   │
│  │ └─────────────┘    └─────────────┘                  │   │
│  │                                                      │   │
│  │ 特点:                                               │   │
│  │ - 线程复用,减少创建开销                             │   │
│  │ - 队列缓冲,削峰填谷                                 │   │
│  │ - 拒绝策略保护系统                                   │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  3. 异步非阻塞模型(WebFlux/Reactor)                        │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ 请求 → EventLoop → 注册回调 → 立即返回 → 事件通知   │   │
│  │                                                      │   │
│  │ 特点:                                               │   │
│  │ - 少量线程处理大量连接                               │   │
│  │ - 非阻塞IO,资源利用率高                             │   │
│  │ - 响应式编程,背压控制                               │   │
│  │ - 适合IO密集型场景                                   │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  4. 协程模型(Project Loom虚拟线程)                         │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ 请求 → 虚拟线程 → 挂载到平台线程 → 阻塞时自动让出   │   │
│  │                                                      │   │
│  │ 特点:                                               │   │
│  │ - 轻量级线程(数百万个)                             │   │
│  │ - 自动管理,无需线程池                               │   │
│  │ - 同步代码,异步执行                                 │   │
│  │ - 兼容现有代码                                       │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
└──────────────────────────────────────────────────────────────┘

2.2 线程池优化配置

/**
 * 线程池优化配置
 */
@Configuration
@Slf4j
public class ThreadPoolConfig {

    /**
     * CPU密集型任务线程池
     * 用于计算密集型任务,如复杂算法、数据处理
     */
    @Bean("cpuIntensiveExecutor")
    public ThreadPoolExecutor cpuIntensiveExecutor() {
        int coreCount = Runtime.getRuntime().availableProcessors();
        
        return new ThreadPoolExecutor(
            coreCount,                          // 核心线程数 = CPU核心数
            coreCount + 1,                      // 最大线程数
            60L, TimeUnit.SECONDS,              // 空闲线程存活时间
            new LinkedBlockingQueue<>(1000),    // 有界队列,防止OOM
            new ThreadFactoryBuilder()
                .setNameFormat("cpu-pool-%d")
                .setUncaughtExceptionHandler((t, e) -> 
                    log.error("Uncaught exception in thread {}", t.getName(), e))
                .build(),
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略:调用者执行
        );
    }

    /**
     * IO密集型任务线程池
     * 用于网络请求、数据库操作、文件IO
     */
    @Bean("ioIntensiveExecutor")
    public ThreadPoolExecutor ioIntensiveExecutor() {
        int coreCount = Runtime.getRuntime().availableProcessors();
        int maxPoolSize = coreCount * 4;  // IO密集型可以更多线程
        
        return new ThreadPoolExecutor(
            coreCount * 2,                      // 核心线程数
            maxPoolSize,                        // 最大线程数
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5000),    // 更大的队列
            new ThreadFactoryBuilder()
                .setNameFormat("io-pool-%d")
                .build(),
            new ThreadPoolExecutor.DiscardOldestPolicy()  // 丢弃最老任务
        );
    }

    /**
     * 高优先级任务线程池
     * 用于关键业务路径
     */
    @Bean("priorityExecutor")
    public ThreadPoolExecutor priorityExecutor() {
        return new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new PriorityBlockingQueue<>(),      // 优先级队列
            new ThreadFactoryBuilder()
                .setNameFormat("priority-pool-%d")
                .build(),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }

    /**
     * 定时任务线程池
     */
    @Bean("scheduledExecutor")
    public ScheduledExecutorService scheduledExecutor() {
        return Executors.newScheduledThreadPool(
            4,
            new ThreadFactoryBuilder()
                .setNameFormat("scheduled-pool-%d")
                .build()
        );
    }

    /**
     * ForkJoinPool(并行流使用)
     */
    @Bean("forkJoinPool")
    public ForkJoinPool forkJoinPool() {
        int parallelism = Runtime.getRuntime().availableProcessors();
        return new ForkJoinPool(
            parallelism,
            ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null,
            true  // asyncMode,适合事件处理
        );
    }
}

/**
 * 线程池监控
 */
@Component
@Slf4j
public class ThreadPoolMonitor {

    @Autowired
    private Map<String, ThreadPoolExecutor> executors;

    @Scheduled(fixedRate = 60000)  // 每分钟监控
    public void monitor() {
        executors.forEach((name, executor) -> {
            ThreadPoolExecutor pool = executor;
            
            log.info("ThreadPool [{}] Status: " +
                    "Active={}, PoolSize={}, CorePoolSize={}, " +
                    "LargestPoolSize={}, TaskCount={}, CompletedTaskCount={}, " +
                    "QueueSize={}",
                name,
                pool.getActiveCount(),
                pool.getPoolSize(),
                pool.getCorePoolSize(),
                pool.getLargestPoolSize(),
                pool.getTaskCount(),
                pool.getCompletedTaskCount(),
                pool.getQueue().size()
            );
            
            // 告警:队列积压过多
            if (pool.getQueue().size() > pool.getCorePoolSize() * 10) {
                log.warn("ThreadPool [{}] queue is backing up! Size={}", 
                    name, pool.getQueue().size());
            }
        });
    }
}

2.3 虚拟线程(Project Loom)

/**
 * 虚拟线程使用示例(Java 21+)
 */
@Service
@Slf4j
public class VirtualThreadService {

    /**
     * 创建虚拟线程执行器
     */
    public ExecutorService createVirtualThreadExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }

    /**
     * 使用虚拟线程处理并发请求
     */
    public List<String> processConcurrentRequests(List<String> urls) {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            
            List<Future<String>> futures = urls.stream()
                .map(url -> executor.submit(() -> fetchUrl(url)))
                .toList();
            
            return futures.stream()
                .map(future -> {
                    try {
                        return future.get(10, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        log.error("Failed to fetch", e);
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .toList();
        }
    }

    /**
     * 结构化并发(Java 21+)
     */
    public String structuredFetch(String url1, String url2) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            
            StructuredTaskScope.Subtask<String> task1 = 
                scope.fork(() -> fetchUrl(url1));
            StructuredTaskScope.Subtask<String> task2 = 
                scope.fork(() -> fetchUrl(url2));
            
            scope.join();           // 等待所有任务
            scope.throwIfFailed();  // 任一失败则抛出异常
            
            return task1.get() + task2.get();
            
        } catch (Exception e) {
            log.error("Structured fetch failed", e);
            throw new RuntimeException(e);
        }
    }

    private String fetchUrl(String url) {
        // 模拟HTTP请求
        try {
            Thread.sleep(100);  // 虚拟线程会自动让出
            return "Response of " + url;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }
}

三、连接池优化

3.1 数据库连接池配置

/**
 * HikariCP连接池优化配置
 */
@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariConfig config = new HikariConfig();
        
        // 基础配置
        config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
        config.setUsername("user");
        config.setPassword("password");
        config.setDriverClassName("com.mysql.cj.jdbc.Driver");
        
        // 连接池大小优化
        // 公式:connections = ((core_count * 2) + effective_spindle_count)
        int coreCount = Runtime.getRuntime().availableProcessors();
        config.setMaximumPoolSize(coreCount * 2 + 1);  // 通常10-20
        config.setMinimumIdle(coreCount);              // 最小空闲连接
        
        // 连接超时配置
        config.setConnectionTimeout(30000);    // 获取连接等待时间:30s
        config.setIdleTimeout(600000);         // 空闲连接超时:10min
        config.setMaxLifetime(1800000);        // 连接最大生命周期:30min
        config.setKeepaliveTime(300000);       // 保活检测间隔:5min
        
        // 性能优化
        config.setAutoCommit(false);           // 手动控制事务
        config.setCachePrepStmts(true);        // 缓存预编译语句
        config.setPrepStmtCacheSize(250);      // 缓存大小
        config.setPrepStmtCacheSqlLimit(2048); // 单条SQL长度限制
        config.setUseServerPrepStmts(true);    // 使用服务器端预编译
        config.setUseLocalSessionState(true);  // 使用本地会话状态
        config.setRewriteBatchedStatements(true); // 批量重写
        config.setCacheResultSetMetadata(true);   // 缓存结果集元数据
        config.setCacheServerConfiguration(true); // 缓存服务器配置
        config.setElideSetAutoCommits(true);      // 优化autocommit
        config.setMaintainTimeStats(false);       // 关闭时间统计
        
        // 监控配置
        config.setMetricRegistry(new CodahaleMetrics());  // 指标收集
        config.setHealthCheckRegistry(new HealthCheckRegistry());
        config.setPoolName("HikariPool-Main");
        
        // 泄漏检测
        config.setLeakDetectionThreshold(60000);  // 连接泄漏检测:60s
        
        return new HikariDataSource(config);
    }

    /**
     * 读写分离配置
     */
    @Bean
    public DataSource routingDataSource(
            @Qualifier("masterDataSource") DataSource master,
            @Qualifier("slaveDataSource") DataSource slave) {
        
        DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", master);
        targetDataSources.put("slave", slave);
        
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(master);
        
        return routingDataSource;
    }
}

/**
 * 数据源上下文
 */
public class DataSourceContext {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
    }
    
    public static String getDataSource() {
        return contextHolder.get();
    }
    
    public static void clear() {
        contextHolder.remove();
    }
}

/**
 * 动态数据源路由
 */
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContext.getDataSource();
    }
}

/**
 * 读写分离注解
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}

/**
 * 读写分离切面
 */
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("@annotation(readOnly)")
    public void setReadDataSource(ReadOnly readOnly) {
        DataSourceContext.setDataSource("slave");
    }
    
    @After("@annotation(readOnly)")
    public void clearDataSource() {
        DataSourceContext.clear();
    }
}

3.2 HTTP连接池优化

/**
 * HTTP连接池优化配置
 */
@Configuration
public class HttpClientConfig {

    /**
     * OkHttp连接池配置
     */
    @Bean
    public OkHttpClient okHttpClient() {
        // 连接池:复用HTTP/2连接
        ConnectionPool connectionPool = new ConnectionPool(
            50,      // 最大空闲连接数
            5,       // 连接存活时间(分钟)
            TimeUnit.MINUTES
        );

        return new OkHttpClient.Builder()
            .connectionPool(connectionPool)
            .connectTimeout(5, TimeUnit.SECONDS)     // 连接超时
            .readTimeout(30, TimeUnit.SECONDS)       // 读取超时
            .writeTimeout(30, TimeUnit.SECONDS)      // 写入超时
            .callTimeout(60, TimeUnit.SECONDS)       // 整体调用超时
            .retryOnConnectionFailure(true)          // 连接失败重试
            .protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1))
            .addInterceptor(new RetryInterceptor(3))  // 重试拦截器
            .addInterceptor(new LoggingInterceptor()) // 日志拦截器
            .build();
    }

    /**
     * Apache HttpClient连接池
     */
    @Bean
    public CloseableHttpClient httpClient() {
        PoolingHttpClientConnectionManager connectionManager = 
            new PoolingHttpClientConnectionManager();
        
        // 连接池配置
        connectionManager.setMaxTotal(200);           // 最大连接数
        connectionManager.setDefaultMaxPerRoute(50);  // 每个路由最大连接
        connectionManager.setValidateAfterInactivity(30000); // 验证空闲连接
        
        // 连接配置
        RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(5000)
            .setSocketTimeout(30000)
            .setConnectionRequestTimeout(5000)
            .build();
        
        return HttpClients.custom()
            .setConnectionManager(connectionManager)
            .setDefaultRequestConfig(requestConfig)
            .evictIdleConnections(60, TimeUnit.SECONDS)  // 定期清理空闲连接
            .evictExpiredConnections()
            .build();
    }

    /**
     * WebClient配置(响应式)
     */
    @Bean
    public WebClient webClient() {
        ConnectionProvider provider = ConnectionProvider.builder("custom")
            .maxConnections(500)
            .maxIdleTime(Duration.ofSeconds(20))
            .maxLifeTime(Duration.ofSeconds(60))
            .pendingAcquireTimeout(Duration.ofSeconds(60))
            .evictInBackground(Duration.ofSeconds(120))
            .build();

        HttpClient httpClient = HttpClient.create(provider)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            .responseTimeout(Duration.ofSeconds(30))
            .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(30))
                .addHandlerLast(new WriteTimeoutHandler(30)));

        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .codecs(configurer -> configurer
                .defaultCodecs()
                .maxInMemorySize(10 * 1024 * 1024))  // 10MB缓冲区
            .build();
    }
}

四、缓存策略优化

4.1 多级缓存架构

/**
 * 多级缓存管理器
 */
@Component
@Slf4j
public class MultiLevelCache {

    @Autowired
    private CaffeineCache localCache;      // L1: 本地缓存
    
    @Autowired
    private RedisTemplate<String, Object> redisCache;  // L2: Redis缓存

    private static final String CACHE_PREFIX = "app:cache:";
    private static final long LOCAL_TTL = 60;      // 本地缓存60秒
    private static final long REDIS_TTL = 600;     // Redis缓存10分钟

    /**
     * 读取缓存(多级缓存)
     */
    public <T> T get(String key, Class<T> type, Supplier<T> loader) {
        String fullKey = CACHE_PREFIX + key;
        
        // 1. 查询本地缓存
        T value = localCache.getIfPresent(fullKey);
        if (value != null) {
            log.debug("Local cache hit: {}", key);
            return value;
        }
        
        // 2. 查询Redis缓存
        value = (T) redisCache.opsForValue().get(fullKey);
        if (value != null) {
            log.debug("Redis cache hit: {}", key);
            // 回填本地缓存
            localCache.put(fullKey, value);
            return value;
        }
        
        // 3. 加载数据
        log.debug("Cache miss, loading: {}", key);
        value = loader.get();
        
        if (value != null) {
            // 写入多级缓存
            put(fullKey, value);
        }
        
        return value;
    }

    /**
     * 写入缓存
     */
    public void put(String key, Object value) {
        String fullKey = CACHE_PREFIX + key;
        
        // 写入本地缓存
        localCache.put(fullKey, value);
        
        // 写入Redis(异步)
        CompletableFuture.runAsync(() -> {
            redisCache.opsForValue().set(fullKey, value, REDIS_TTL, TimeUnit.SECONDS);
        });
    }

    /**
     * 删除缓存
     */
    public void evict(String key) {
        String fullKey = CACHE_PREFIX + key;
        
        localCache.invalidate(fullKey);
        redisCache.delete(fullKey);
        
        // 发送缓存失效消息(用于集群同步)
        publishEvictEvent(fullKey);
    }

    /**
     * 缓存更新(先更新数据库,再删除缓存)
     */
    public <T> T update(String key, Function<T, T> updater, Supplier<T> loader) {
        // 1. 加载当前值
        T current = get(key, (Class<T>) Object.class, loader);
        
        // 2. 更新值
        T updated = updater.apply(current);
        
        // 3. 更新数据库(业务逻辑)
        // ...
        
        // 4. 删除缓存(非更新,避免并发问题)
        evict(key);
        
        return updated;
    }

    /**
     * 批量读取(防止缓存击穿)
     */
    public <T> Map<String, T> batchGet(
            List<String> keys, 
            Class<T> type,
            Function<List<String>, Map<String, T>> batchLoader) {
        
        Map<String, T> result = new HashMap<>();
        List<String> missingKeys = new ArrayList<>();
        
        // 1. 批量查询缓存
        for (String key : keys) {
            T value = get(key, type, null);
            if (value != null) {
                result.put(key, value);
            } else {
                missingKeys.add(key);
            }
        }
        
        // 2. 批量加载缺失数据
        if (!missingKeys.isEmpty()) {
            Map<String, T> loaded = batchLoader.apply(missingKeys);
            loaded.forEach((k, v) -> {
                put(k, v);
                result.put(k, v);
            });
        }
        
        return result;
    }

    private void publishEvictEvent(String key) {
        // 发布缓存失效消息到Redis Pub/Sub
        redisCache.convertAndSend("cache:evict", key);
    }
}

/**
 * 本地缓存配置(Caffeine)
 */
@Configuration
public class LocalCacheConfig {

    @Bean
    public CaffeineCache localCache() {
        Caffeine<Object, Object> caffeine = Caffeine.newBuilder()
            .initialCapacity(100)
            .maximumSize(10000)              // 最大条目数
            .expireAfterWrite(60, TimeUnit.SECONDS)  // 写入后过期
            .refreshAfterWrite(50, TimeUnit.SECONDS) // 自动刷新
            .recordStats()                   // 开启统计
            .removalListener((key, value, cause) -> 
                log.debug("Cache entry removed: {}, cause: {}", key, cause));
        
        return new CaffeineCache("local", caffeine.build());
    }
}

4.2 缓存问题解决方案

/**
 * 缓存问题解决方案
 */
@Component
@Slf4j
public class CacheProblemSolver {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    private static final String LOCK_PREFIX = "cache:lock:";
    private static final String BLOOM_FILTER = "cache:bloom";

    /**
     * 解决缓存穿透:布隆过滤器 + 空值缓存
     */
    public <T> T solveCachePenetration(
            String key, 
            Class<T> type,
            Supplier<T> loader) {
        
        // 1. 布隆过滤器检查
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(BLOOM_FILTER);
        if (!bloomFilter.contains(key)) {
            log.warn("Bloom filter miss, key may not exist: {}", key);
            return null;  // 数据大概率不存在,直接返回
        }
        
        // 2. 查询缓存
        T value = (T) redisTemplate.opsForValue().get(key);
        if (value != null) {
            return value;
        }
        
        // 3. 查询数据库
        value = loader.get();
        
        // 4. 缓存结果(即使是null也缓存,防止穿透)
        if (value != null) {
            redisTemplate.opsForValue().set(key, value, 10, TimeUnit.MINUTES);
        } else {
            // 缓存空值,短时间过期
            redisTemplate.opsForValue().set(key, "NULL", 60, TimeUnit.SECONDS);
        }
        
        return value;
    }

    /**
     * 解决缓存击穿:互斥锁
     */
    public <T> T solveCacheBreakdown(
            String key,
            Class<T> type,
            Supplier<T> loader) {
        
        // 1. 查询缓存
        T value = (T) redisTemplate.opsForValue().get(key);
        if (value != null) {
            return value;
        }
        
        // 2. 获取分布式锁
        String lockKey = LOCK_PREFIX + key;
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            // 尝试获取锁,等待10秒,锁持有30秒
            boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
            
            if (locked) {
                try {
                    // 双重检查
                    value = (T) redisTemplate.opsForValue().get(key);
                    if (value != null) {
                        return value;
                    }
                    
                    // 加载数据
                    value = loader.get();
                    if (value != null) {
                        redisTemplate.opsForValue().set(key, value, 10, TimeUnit.MINUTES);
                    }
                } finally {
                    lock.unlock();
                }
            } else {
                // 获取锁失败,短暂等待后重试
                Thread.sleep(100);
                return solveCacheBreakdown(key, type, loader);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        
        return value;
    }

    /**
     * 解决缓存雪崩:随机过期时间 + 热点数据永不过期
     */
    public <T> void setWithRandomExpire(String key, T value, long baseExpireSeconds) {
        // 添加随机偏移,防止同时过期
        long randomOffset = ThreadLocalRandom.current().nextLong(60);
        long expireSeconds = baseExpireSeconds + randomOffset;
        
        redisTemplate.opsForValue().set(key, value, expireSeconds, TimeUnit.SECONDS);
    }

    /**
     * 热点数据永不过期 + 异步刷新
     */
    public <T> T getHotData(String key, Class<T> type, Supplier<T> loader) {
        String json = (String) redisTemplate.opsForValue().get(key);
        
        if (json == null) {
            return null;
        }
        
        CacheWrapper<T> wrapper = JSON.parseObject(json, new TypeReference<>() {});
        T value = wrapper.getData();
        
        // 检查是否需要刷新(逻辑过期)
        if (wrapper.getExpireTime() < System.currentTimeMillis()) {
            // 异步刷新,不阻塞当前请求
            CompletableFuture.runAsync(() -> refreshData(key, loader));
        }
        
        return value;
    }

    private <T> void refreshData(String key, Supplier<T> loader) {
        String lockKey = LOCK_PREFIX + key + ":refresh";
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
                try {
                    T newValue = loader.get();
                    CacheWrapper<T> wrapper = new CacheWrapper<>();
                    wrapper.setData(newValue);
                    wrapper.setExpireTime(System.currentTimeMillis() + 60000);
                    
                    redisTemplate.opsForValue().set(key, JSON.toJSONString(wrapper));
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Data
    private static class CacheWrapper<T> {
        private T data;
        private long expireTime;  // 逻辑过期时间
    }
}

五、限流熔断与降级

5.1 Sentinel限流配置

/**
 * Sentinel限流熔断配置
 */
@Configuration
@Slf4j
public class SentinelConfig {

    @PostConstruct
    public void initRules() {
        // 1. 流量控制规则
        List<FlowRule> flowRules = new ArrayList<>();
        
        // QPS限流:每秒100个请求
        FlowRule qpsRule = new FlowRule();
        qpsRule.setResource("orderQuery");
        qpsRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        qpsRule.setCount(100);
        qpsRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
        qpsRule.setWarmUpPeriodSec(10);  // 预热10秒
        flowRules.add(qpsRule);
        
        // 并发线程数限流
        FlowRule threadRule = new FlowRule();
        threadRule.setResource("orderCreate");
        threadRule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        threadRule.setCount(50);  // 最大50个并发线程
        flowRules.add(threadRule);
        
        // 2. 熔断降级规则
        List<DegradeRule> degradeRules = new ArrayList<>();
        
        // 慢调用比例熔断
        DegradeRule slowCallRule = new DegradeRule();
        slowCallRule.setResource("paymentService");
        slowCallRule.setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType());
        slowCallRule.setCount(0.5);      // 慢调用比例阈值50%
        slowCallRule.setSlowRatioThreshold(0.5);
        slowCallRule.setSlowRequestAmount(100);  // 最小请求数
        slowCallRule.setTimeWindow(30);  // 熔断持续时间30秒
        degradeRules.add(slowCallRule);
        
        // 异常比例熔断
        DegradeRule errorRule = new DegradeRule();
        errorRule.setResource("inventoryService");
        errorRule.setGrade(CircuitBreakerStrategy.ERROR_RATIO.getType());
        errorRule.setCount(0.5);         // 异常比例阈值50%
        errorRule.setMinRequestAmount(50);
        errorRule.setTimeWindow(30);
        degradeRules.add(errorRule);
        
        // 3. 系统保护规则
        List<SystemRule> systemRules = new ArrayList<>();
        SystemRule systemRule = new SystemRule();
        systemRule.setHighestSystemLoad(10);    // 最大系统负载
        systemRule.setHighestCpuUsage(0.8);     // 最大CPU使用率
        systemRule.setAvgRt(500);               // 平均响应时间
        systemRule.setMaxThread(500);           // 最大线程数
        systemRule.setQps(1000);                // 入口QPS
        systemRules.add(systemRule);
        
        // 加载规则
        FlowRuleManager.loadRules(flowRules);
        DegradeRuleManager.loadRules(degradeRules);
        SystemRuleManager.loadRules(systemRules);
    }

    /**
     * 自定义限流处理
     */
    @Bean
    public BlockExceptionHandler blockExceptionHandler() {
        return (request, response, e) -> {
            log.warn("Blocked by Sentinel: {}", e.getRule().getResource());
            
            response.setStatus(429);
            response.setContentType("application/json;charset=UTF-8");
            
            Map<String, Object> result = new HashMap<>();
            result.put("code", 429);
            result.put("message", "系统繁忙,请稍后重试");
            result.put("resource", e.getRule().getResource());
            
            response.getWriter().write(JSON.toJSONString(result));
        };
    }
}

/**
 * 使用Sentinel注解
 */
@Service
@Slf4j
public class OrderService {

    /**
     * 查询订单(限流保护)
     */
    @SentinelResource(
        value = "orderQuery",
        blockHandler = "handleQueryBlock",
        fallback = "handleQueryFallback"
    )
    public Order queryOrder(Long orderId) {
        // 查询逻辑
        return orderMapper.selectById(orderId);
    }

    /**
     * 限流处理
     */
    public Order handleQueryBlock(Long orderId, BlockException ex) {
        log.warn("Query order blocked: {}", orderId);
        // 返回缓存数据或默认值
        return getOrderFromCache(orderId);
    }

    /**
     * 异常降级
     */
    public Order handleQueryFallback(Long orderId, Throwable ex) {
        log.error("Query order failed: {}", orderId, ex);
        // 返回兜底数据
        return Order.empty();
    }

    /**
     * 创建订单(热点参数限流)
     */
    @SentinelResource("orderCreate")
    public Order createOrder(CreateOrderRequest request) {
        // 热点参数限流配置
        ParamFlowRule rule = new ParamFlowRule("orderCreate")
            .setParamIdx(0)  // 第一个参数
            .setGrade(RuleConstant.FLOW_GRADE_QPS)
            .setCount(10);   // 每个用户每秒10个请求
        
        ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
        
        // 创建逻辑
        return doCreateOrder(request);
    }
}

5.2 Resilience4j熔断配置

/**
 * Resilience4j熔断配置
 */
@Configuration
public class Resilience4jConfig {

    @Bean
    public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
        return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .circuitBreakerConfig(CircuitBreakerConfig.custom()
                // 失败率阈值50%
                .failureRateThreshold(50)
                // 慢调用比例阈值50%
                .slowCallRateThreshold(50)
                // 慢调用时间阈值2秒
                .slowCallDurationThreshold(Duration.ofSeconds(2))
                // 熔断持续时间30秒
                .waitDurationInOpenState(Duration.ofSeconds(30))
                // 半开状态允许请求数
                .permittedNumberOfCallsInHalfOpenState(10)
                // 滑动窗口大小100
                .slidingWindowSize(100)
                // 最小调用数
                .minimumNumberOfCalls(20)
                // 自动从开状态转到半开
                .automaticTransitionFromOpenToHalfOpenEnabled(true)
                .build())
            .timeLimiterConfig(TimeLimiterConfig.custom()
                // 超时时间5秒
                .timeoutDuration(Duration.ofSeconds(5))
                .build())
            .build());
    }

    @Bean
    public Customizer<Resilience4jBulkheadProvider> bulkheadProviderCustomizer() {
        return provider -> provider.configure(builder -> builder
            .bulkheadConfig(BulkheadConfig.custom()
                // 最大并发调用数
                .maxConcurrentCalls(50)
                // 等待时间
                .maxWaitDuration(Duration.ofMillis(500))
                .build()),
            "orderService", "paymentService");
    }
}

/**
 * 使用Resilience4j
 */
@Service
@Slf4j
public class PaymentService {

    private final CircuitBreakerFactory circuitBreakerFactory;
    private final Bulkhead bulkhead;

    @Autowired
    public PaymentService(CircuitBreakerFactory circuitBreakerFactory, 
                         BulkheadRegistry bulkheadRegistry) {
        this.circuitBreakerFactory = circuitBreakerFactory;
        this.bulkhead = bulkheadRegistry.bulkhead("payment");
    }

    /**
     * 带熔断的支付调用
     */
    public PaymentResult processPayment(PaymentRequest request) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("payment");
        
        return circuitBreaker.run(
            () -> doProcessPayment(request),
            throwable -> {
                log.error("Payment failed, fallback triggered", throwable);
                return processPaymentFallback(request);
            }
        );
    }

    /**
     * 带信号量隔离的支付调用
     */
    @Bulkhead(name = "payment", type = Bulkhead.Type.SEMAPHORE)
    public PaymentResult processPaymentWithBulkhead(PaymentRequest request) {
        return doProcessPayment(request);
    }

    /**
     * 带线程池隔离的支付调用
     */
    @Bulkhead(name = "payment", type = Bulkhead.Type.THREADPOOL)
    @TimeLimiter(name = "payment")
    @CircuitBreaker(name = "payment", fallbackMethod = "processPaymentAsyncFallback")
    public CompletableFuture<PaymentResult> processPaymentAsync(PaymentRequest request) {
        return CompletableFuture.supplyAsync(() -> doProcessPayment(request));
    }

    private PaymentResult doProcessPayment(PaymentRequest request) {
        // 实际支付逻辑
        return new PaymentResult();
    }

    private PaymentResult processPaymentFallback(PaymentRequest request) {
        // 降级逻辑:记录待处理,稍后重试
        savePendingPayment(request);
        return PaymentResult.pending();
    }

    private CompletableFuture<PaymentResult> processPaymentAsyncFallback(
            PaymentRequest request, Exception ex) {
        return CompletableFuture.completedFuture(processPaymentFallback(request));
    }
}

六、异步与消息队列

6.1 异步处理架构

/**
 * 异步事件处理器
 */
@Service
@Slf4j
public class AsyncEventProcessor {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private ThreadPoolExecutor asyncExecutor;

    /**
     * 发布异步事件
     */
    public void publishAsyncEvent(OrderEvent event) {
        CompletableFuture.runAsync(() -> {
            eventPublisher.publishEvent(event);
        }, asyncExecutor).exceptionally(ex -> {
            log.error("Failed to publish event", ex);
            // 保存到失败队列,稍后重试
            saveToDeadLetterQueue(event);
            return null;
        });
    }

    /**
     * 事务性事件发布(确保事务提交后才发布)
     */
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleOrderCreated(OrderCreatedEvent event) {
        // 事务提交后执行
        sendNotification(event);
        updateStatistics(event);
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void handleOrderRollback(OrderCreatedEvent event) {
        // 事务回滚后执行补偿
        compensateOrder(event);
    }
}

/**
 * 事件监听器
 */
@Component
@Slf4j
public class OrderEventListener {

    @Async("ioIntensiveExecutor")
    @EventListener
    public void handleOrderCreated(OrderCreatedEvent event) {
        log.info("Processing order created event: {}", event.getOrderId());
        
        // 异步处理:发送邮件
        sendEmailNotification(event);
        
        // 异步处理:更新库存
        updateInventory(event);
        
        // 异步处理:记录日志
        auditLog(event);
    }

    @Async
    @EventListener(condition = "#event.amount > 10000")
    public void handleLargeOrder(OrderCreatedEvent event) {
        // 只处理大额订单
        notifyManager(event);
    }
}

6.2 RabbitMQ消息队列

/**
 * RabbitMQ配置
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 订单队列
     */
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-dead-letter-exchange", "order.dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "order.dlx.routingkey")
            .withArgument("x-message-ttl", 300000)  // 消息TTL 5分钟
            .withArgument("x-max-length", 10000)     // 队列最大长度
            .build();
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue orderDeadLetterQueue() {
        return QueueBuilder.durable("order.dlx.queue").build();
    }

    /**
     * 延迟队列(使用死信实现)
     */
    @Bean
    public Queue orderDelayQueue() {
        return QueueBuilder.durable("order.delay.queue")
            .withArgument("x-dead-letter-exchange", "order.exchange")
            .withArgument("x-dead-letter-routing-key", "order.routingkey")
            .withArgument("x-message-ttl", 1800000)  // 延迟30分钟
            .build();
    }

    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange");
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with("order.routingkey");
    }

    /**
     * 消费者配置
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);      // 并发消费者数
        factory.setMaxConcurrentConsumers(20);  // 最大并发消费者
        factory.setPrefetchCount(50);           // 预取数量
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);  // 手动确认
        factory.setDefaultRequeueRejected(false);  // 拒绝后不入队
        factory.setAdviceChain(
            RetryInterceptorBuilder.stateless()
                .maxAttempts(3)
                .backOffOptions(1000, 2.0, 10000)  // 指数退避
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build()
        );
        return factory;
    }
}

/**
 * 消息生产者
 */
@Service
@Slf4j
public class OrderMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送订单消息
     */
    public void sendOrderMessage(Order order) {
        MessageProperties properties = new MessageProperties();
        properties.setMessageId(UUID.randomUUID().toString());
        properties.setTimestamp(new Date());
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);  // 持久化
        
        Message message = new Message(
            JSON.toJSONBytes(order), 
            properties
        );
        
        // 确认机制
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("Message sent successfully: {}", correlationData);
            } else {
                log.error("Message failed to send: {}, cause: {}", correlationData, cause);
                // 重试或保存到数据库
            }
        });
        
        rabbitTemplate.convertAndSend(
            "order.exchange", 
            "order.routingkey", 
            message
        );
    }

    /**
     * 发送延迟消息
     */
    public void sendDelayMessage(Order order, int delayMinutes) {
        rabbitTemplate.convertAndSend(
            "order.delay.exchange",
            "order.delay.routingkey",
            order,
            message -> {
                message.getMessageProperties()
                    .setDelay(delayMinutes * 60 * 1000);
                return message;
            }
        );
    }
}

/**
 * 消息消费者
 */
@Component
@Slf4j
public class OrderMessageConsumer {

    @RabbitListener(queues = "order.queue")
    public void handleOrderMessage(
            Message message, 
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
        try {
            Order order = JSON.parseObject(message.getBody(), Order.class);
            
            // 幂等性检查
            if (isDuplicate(order.getMessageId())) {
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            // 处理订单
            processOrder(order);
            
            // 手动确认
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("Failed to process order message", e);
            
            try {
                // 拒绝消息,进入死信队列
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }

    private boolean isDuplicate(String messageId) {
        // 使用Redis或数据库检查消息是否已处理
        return false;
    }

    private void processOrder(Order order) {
        // 订单处理逻辑
    }
}

七、最佳实践与检查清单

┌─────────────────────────────────────────────────────────────────────┐
│                    后端服务优化检查清单                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  【并发优化】                                                       │
│  □ 1. 合理配置线程池大小                                           │
│  □ 2. 区分CPU密集型和IO密集型任务                                  │
│  □ 3. 使用CompletableFuture异步编排                                │
│  □ 4. 考虑使用虚拟线程(Java 21+)                                 │
│  □ 5. 监控线程池状态,设置告警                                     │
│                                                                     │
│  【连接池优化】                                                     │
│  □ 1. 数据库连接池大小 = (核心数 * 2) + 有效磁盘数                 │
│  □ 2. 启用连接池监控                                               │
│  □ 3. 配置连接超时和空闲检测                                       │
│  □ 4. HTTP连接池复用                                               │
│  □ 5. 实施读写分离                                                 │
│                                                                     │
│  【缓存优化】                                                       │
│  □ 1. 多级缓存架构(本地+分布式)                                  │
│  □ 2. 缓存穿透:布隆过滤器+空值缓存                                │
│  □ 3. 缓存击穿:互斥锁保护                                         │
│  □ 4. 缓存雪崩:随机过期时间                                       │
│  □ 5. 热点数据:逻辑过期+异步刷新                                  │
│                                                                     │
│  【稳定性保障】                                                     │
│  □ 1. 限流:QPS限流、并发限流、热点限流                            │
│  □ 2. 熔断:慢调用熔断、异常熔断                                   │
│  □ 3. 降级:优雅降级策略                                           │
│  □ 4. 隔离:线程池隔离、信号量隔离                                 │
│  □ 5. 重试:指数退避重试                                           │
│                                                                     │
│  【异步化】                                                         │
│  □ 1. 非关键路径异步处理                                           │
│  □ 2. 使用消息队列削峰填谷                                         │
│  □ 3. 事务消息保证最终一致性                                       │
│  □ 4. 延迟队列处理定时任务                                         │
│  □ 5. 死信队列处理失败消息                                         │
│                                                                     │
│  【监控告警】                                                       │
│  □ 1. 接口响应时间监控                                             │
│  □ 2. 错误率监控                                                   │
│  □ 3. 吞吐量监控                                                   │
│  □ 4. 资源使用率监控(CPU/内存/连接池)                            │
│  □ 5. 业务指标监控(订单量/支付成功率)                            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

八、经验总结

8.1 常见性能问题与解决方案

问题现象解决方案
线程池耗尽请求被拒绝,响应超时调大线程池,优化慢查询,异步化
数据库连接池耗尽获取连接超时优化SQL,增加连接池,读写分离
缓存穿透DB压力突增布隆过滤器,空值缓存
缓存雪崩大量请求打到DB随机过期时间,热点数据永不过期
服务雪崩级联故障熔断降级,限流保护,故障隔离
消息积压消费延迟增大增加消费者,优化消费逻辑

8.2 性能优化决策树

                    ┌─────────────────┐
                    │  性能问题定位   │
                    └────────┬────────┘
                             │
                             ▼
              ┌──────────────────────────────┐
              │  是响应慢还是吞吐量低?      │
              └─────────────┬────────────────┘
                            │
           ┌────────────────┼────────────────┐
           ▼响应慢                           ▼吞吐量低
    ┌───────────────┐                ┌───────────────┐
    │ 检查慢查询    │                │ 检查线程池    │
    │ 优化数据库    │                │ 异步化处理    │
    │ 增加缓存      │                │ 消息队列削峰  │
    └───────────────┘                └───────────────┘

系列上一篇前端性能优化实战

系列下一篇TCP拥塞控制深度解析

知识点测试

读完文章了?来测试一下你对知识点的掌握程度吧!

评论区

使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。

如果评论系统无法加载,请确保:

  • 您的网络可以访问 GitHub
  • giscus GitHub App 已安装到仓库
  • 仓库已启用 Discussions 功能