返回 筑基・数据元府藏真

时序数据处理与分析

博主
大约 13 分钟

时序数据处理与分析

一、问题引入:系统监控数据爆炸

1.1 真实案例:监控系统的数据困境

场景:大型互联网公司的系统监控平台
规模:
- 服务器:5000台
- 每台采集指标:200个
- 采集频率:每10秒
- 数据量计算:
  5000台 × 200指标 × 6次/分钟 × 1440分钟/天 = 86.4亿点/天
  存储需求:约500GB/天,15TB/月

MySQL方案的困境:
┌─────────────────────────────────────────────────────────────┐
│ 表结构设计:                                                │
│ CREATE TABLE metrics (                                      │
│     id BIGINT PRIMARY KEY AUTO_INCREMENT,                   │
│     host VARCHAR(50),                                       │
│     metric_name VARCHAR(100),                               │
│     value DOUBLE,                                           │
│     timestamp DATETIME,                                     │
│     INDEX idx_time (timestamp),                             │
│     INDEX idx_host_metric (host, metric_name, timestamp)    │
│ );                                                          │
├─────────────────────────────────────────────────────────────┤
│ 遇到的问题:                                                │
│ 1. 写入性能瓶颈                                             │
│    - 每秒写入:5000×200÷10 = 10万点/秒                      │
│    - MySQL单节点写入极限:约2万点/秒                        │
│    - 需要5个MySQL实例才能支撑写入                           │
│                                                             │
│ 2. 查询性能极差                                             │
│    - 查询一天数据:86亿行扫描                               │
│    - 聚合查询(如每小时平均值)需要全表扫描                 │
│    - 查询超时频繁                                           │
│                                                             │
│ 3. 存储成本高昂                                             │
│    - 索引占用大量空间                                       │
│    - 数据压缩率低                                           │
│    - 历史数据清理困难                                       │
│                                                             │
│ 4. 时间范围查询慢                                           │
│    - B+树索引不适合时间范围扫描                             │
│    - 即使有索引,大量数据扫描依然很慢                       │
└─────────────────────────────────────────────────────────────┘

时序数据库解决方案:
- InfluxDB:写入性能100万点/秒,查询快,自动压缩
- 存储优化:列式存储,压缩率可达10:1
- 降采样:自动聚合历史数据,保留趋势

1.2 时序数据特点

时序数据的典型特征:
┌──────────────────────────────────────────────────────────────┐
│                                                              │
│  1. 时间戳为核心                                             │
│     - 每条数据都有时间戳                                     │
│     - 数据按时间顺序写入                                     │
│     - 查询通常基于时间范围                                   │
│                                                              │
│  2. 写入特征                                                 │
│     - 高并发写入                                             │
│     - 几乎不更新(追加为主)                                 │
│     - 近期数据查询频繁,历史数据查询少                       │
│                                                              │
│  3. 数据特点                                                 │
│     - 数据量大                                               │
│     - 存在明显的冷热数据                                     │
│     - 需要聚合分析(avg/max/min等)                          │
│                                                              │
│  4. 典型应用场景                                             │
│     - 系统监控(CPU、内存、磁盘)                            │
│     - 业务指标(QPS、延迟、错误率)                          │
│     - IoT传感器数据                                          │
│     - 金融行情数据                                           │
│     - 日志数据                                               │
│                                                              │
└──────────────────────────────────────────────────────────────┘

二、时序数据库选型

2.1 主流时序数据库对比

数据库特点性能适用场景学习成本
InfluxDB类SQL语法,生态完善监控、IoT
TimescaleDBPostgreSQL扩展需要SQL兼容
TDengine国产,超高性能极高物联网、车联网
VictoriaMetrics云原生,Prometheus生态极高云原生监控
ClickHouseOLAP数据库,支持时序分析型时序
IoTDBApache项目,IoT专用工业物联网

2.2 选型决策树

                    ┌─────────────────┐
                    │  时序数据需求   │
                    └────────┬────────┘
                             │
                             ▼
              ┌──────────────────────────────┐
              │  是否需要SQL兼容?           │
              └─────────────┬────────────────┘
                            │
           ┌────────────────┼────────────────┐
           ▼是                               ▼否
    ┌───────────────┐                ┌───────────────┐
    │ TimescaleDB   │                │ 数据量多大?  │
    │ (PostgreSQL)  │                │               │
    └───────────────┘                └───────┬───────┘
                                             │
                            ┌────────────────┼────────────────┐
                            ▼<100万点/秒                      ▼>100万点/秒
                     ┌───────────────┐                ┌───────────────┐
                     │ InfluxDB      │                │ TDengine      │
                     │ VictoriaMetrics│               │ (国产高性能)  │
                     └───────────────┘                └───────────────┘

三、InfluxDB实战

3.1 核心概念

InfluxDB数据模型:
┌──────────────────────────────────────────────────────────────┐
│                                                              │
│  Database(数据库)                                          │
│  └── Measurement(表)                                       │
│      ├── Tag(索引列)- 字符串类型,用于过滤和分组           │
│      │   ├── host = server01                                 │
│      │   ├── region = beijing                                │
│      │   └── app = order-service                             │
│      │                                                       │
│      ├── Field(数据列)- 实际数值,不会被索引               │
│      │   ├── value = 75.5                                    │
│      │   └── temperature = 23.5                              │
│      │                                                       │
│      └── Timestamp(时间戳)- 纳秒精度                       │
│          └── 1625097600000000000                             │
│                                                              │
│  与传统数据库对比:                                           │
│  ┌──────────────┬─────────────────┬──────────────────────┐  │
│  │ 传统数据库   │ InfluxDB        │ 说明                 │  │
│  ├──────────────┼─────────────────┼──────────────────────┤  │
│  │ Database     │ Database        │ 数据库               │  │
│  │ Table        │ Measurement     │ 表/度量              │  │
│  │ Index        │ Tag             │ 索引列               │  │
│  │ Column       │ Field           │ 数据列               │  │
│  │ Primary Key  │ Timestamp       │ 主键(时间戳)       │  │
│  └──────────────┴─────────────────┴──────────────────────┘  │
│                                                              │
└──────────────────────────────────────────────────────────────┘

3.2 基础操作

-- 创建数据库
CREATE DATABASE metrics;

-- 使用数据库
USE metrics;

-- 写入数据(Line Protocol格式)
-- 格式:measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
INSERT cpu_usage,host=server01,region=beijing,env=prod value=75.5,usage_user=45.2 1625097600000000000;
INSERT cpu_usage,host=server02,region=beijing,env=prod value=82.3,usage_user=52.1 1625097600000000000;
INSERT cpu_usage,host=server01,region=beijing,env=prod value=78.1,usage_user=48.5 1625097660000000000;

-- 查询数据
SELECT * FROM cpu_usage WHERE time > now() - 1h;

-- 带条件的查询
SELECT host, mean(value) FROM cpu_usage 
WHERE time > now() - 1h AND region = 'beijing'
GROUP BY host;

-- 时间聚合查询
SELECT mean(value), max(value), min(value) FROM cpu_usage 
WHERE time > now() - 24h 
GROUP BY time(1h), host;

-- 连续查询(降采样)
CREATE CONTINUOUS QUERY cq_5m ON metrics
BEGIN
  SELECT mean(value) AS value_avg, max(value) AS value_max
  INTO metrics.rp_7d.cpu_usage_5m
  FROM metrics.rp_1d.cpu_usage
  GROUP BY time(5m), *
END;

-- 保留策略
CREATE RETENTION POLICY rp_7d ON metrics DURATION 7d REPLICATION 1 DEFAULT;
CREATE RETENTION POLICY rp_30d ON metrics DURATION 30d REPLICATION 1;

-- 删除数据
DROP SERIES FROM cpu_usage WHERE host = 'server01';

3.3 Java客户端操作

/**
 * InfluxDB服务
 */
@Service
@Slf4j
public class InfluxDBService {
    
    private InfluxDB influxDB;
    private String database = "metrics";
    private String retentionPolicy = "rp_7d";
    
    @PostConstruct
    public void init() {
        influxDB = InfluxDBFactory.connect(
            "http://localhost:8086", 
            "admin", 
            "admin"
        );
        
        // 创建数据库(如果不存在)
        influxDB.query(new Query("CREATE DATABASE IF NOT EXISTS " + database));
        influxDB.setDatabase(database);
        influxDB.setRetentionPolicy(retentionPolicy);
        
        // 启用批量写入
        influxDB.enableBatch(BatchOptions.DEFAULTS
            .actions(1000)
            .flushDuration(100)
            .exceptionHandler((points, throwable) -> {
                log.error("InfluxDB写入失败", throwable);
            })
        );
    }
    
    /**
     * 写入单条数据
     */
    public void writePoint(String measurement, Map<String, String> tags, 
                          Map<String, Object> fields, long timestamp) {
        Point.Builder builder = Point.measurement(measurement)
            .time(timestamp, TimeUnit.MILLISECONDS);
        
        // 添加标签
        tags.forEach(builder::tag);
        
        // 添加字段
        fields.forEach((key, value) -> {
            if (value instanceof Number) {
                builder.addField(key, (Number) value);
            } else if (value instanceof String) {
                builder.addField(key, (String) value);
            } else if (value instanceof Boolean) {
                builder.addField(key, (Boolean) value);
            }
        });
        
        influxDB.write(builder.build());
    }
    
    /**
     * 批量写入
     */
    public void writeBatch(List<MetricPoint> points) {
        BatchPoints batchPoints = BatchPoints.database(database)
            .retentionPolicy(retentionPolicy)
            .build();
        
        for (MetricPoint point : points) {
            Point p = Point.measurement(point.getMeasurement())
                .time(point.getTimestamp(), TimeUnit.MILLISECONDS)
                .tag(point.getTags())
                .fields(point.getFields())
                .build();
            batchPoints.point(p);
        }
        
        influxDB.write(batchPoints);
    }
    
    /**
     * 查询数据
     */
    public List<MetricData> query(String sql) {
        QueryResult result = influxDB.query(new Query(sql, database));
        
        List<MetricData> dataList = new ArrayList<>();
        
        if (result.getResults() != null) {
            for (QueryResult.Result r : result.getResults()) {
                if (r.getSeries() != null) {
                    for (QueryResult.Series series : r.getSeries()) {
                        List<String> columns = series.getColumns();
                        List<List<Object>> values = series.getValues();
                        
                        for (List<Object> value : values) {
                            MetricData data = new MetricData();
                            data.setMeasurement(series.getName());
                            data.setTags(series.getTags());
                            
                            Map<String, Object> fields = new HashMap<>();
                            for (int i = 0; i < columns.size(); i++) {
                                fields.put(columns.get(i), value.get(i));
                            }
                            data.setFields(fields);
                            dataList.add(data);
                        }
                    }
                }
            }
        }
        
        return dataList;
    }
    
    /**
     * 查询最近N分钟的数据
     */
    public List<MetricData> queryRecent(String measurement, int minutes) {
        String sql = String.format(
            "SELECT * FROM %s WHERE time > now() - %dm",
            measurement, minutes
        );
        return query(sql);
    }
    
    /**
     * 聚合查询
     */
    public List<MetricData> queryAggregation(String measurement, 
                                              String aggregation,
                                              String timeWindow,
                                              Map<String, String> tags) {
        StringBuilder sql = new StringBuilder();
        sql.append("SELECT ").append(aggregation).append("(value) ")
           .append("FROM ").append(measurement)
           .append(" WHERE time > now() - 1h");
        
        // 添加标签过滤
        if (tags != null) {
            tags.forEach((k, v) -> sql.append(" AND ").append(k).append("='").append(v).append("'"));
        }
        
        sql.append(" GROUP BY time(").append(timeWindow).append(")");
        
        return query(sql.toString());
    }
    
    @PreDestroy
    public void close() {
        if (influxDB != null) {
            influxDB.close();
        }
    }
}

/**
 * 指标数据点
 */
@Data
public class MetricPoint {
    private String measurement;
    private Map<String, String> tags;
    private Map<String, Object> fields;
    private long timestamp;
}

四、TimescaleDB实战

4.1 TimescaleDB简介

TimescaleDB = PostgreSQL + 时序扩展

优势:
- 完全兼容PostgreSQL生态
- 支持SQL标准
- 支持JOIN、窗口函数等复杂查询
- 自动分区(按时间)
- 支持压缩
- 支持连续聚合

4.2 基础操作

-- 创建扩展
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- 创建时序表
CREATE TABLE metrics (
    time TIMESTAMPTZ NOT NULL,
    host VARCHAR(50) NOT NULL,
    metric VARCHAR(50) NOT NULL,
    value DOUBLE PRECISION NULL
);

-- 转换为超表(自动按时间分区)
SELECT create_hypertable('metrics', 'time');

-- 插入数据
INSERT INTO metrics (time, host, metric, value) VALUES
    (NOW(), 'server01', 'cpu_usage', 75.5),
    (NOW(), 'server01', 'memory_usage', 82.3),
    (NOW(), 'server02', 'cpu_usage', 68.2);

-- 批量插入(使用COPY更快)
COPY metrics (time, host, metric, value) FROM '/data/metrics.csv' WITH CSV;

-- 查询数据
SELECT * FROM metrics 
WHERE time > NOW() - INTERVAL '1 hour'
  AND host = 'server01'
ORDER BY time DESC;

-- 时间聚合查询
SELECT 
    time_bucket('5 minutes', time) AS bucket,
    host,
    metric,
    avg(value) as avg_value,
    max(value) as max_value,
    min(value) as min_value
FROM metrics
WHERE time > NOW() - INTERVAL '1 day'
GROUP BY bucket, host, metric
ORDER BY bucket DESC;

-- 连续聚合(物化视图)
CREATE MATERIALIZED VIEW metrics_5m
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('5 minutes', time) AS bucket,
    host,
    metric,
    avg(value) as avg_value,
    max(value) as max_value,
    min(value) as min_value,
    count(*) as count
FROM metrics
GROUP BY bucket, host, metric;

-- 设置保留策略
SELECT add_retention_policy('metrics', INTERVAL '30 days');

-- 启用压缩
ALTER TABLE metrics SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'host,metric'
);

-- 压缩7天前的数据
SELECT add_compression_policy('metrics', INTERVAL '7 days');

4.3 Java操作TimescaleDB

/**
 * TimescaleDB服务
 */
@Service
@Slf4j
public class TimescaleDBService {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    /**
     * 批量写入数据
     */
    public void batchInsert(List<Metric> metrics) {
        String sql = "INSERT INTO metrics (time, host, metric, value) VALUES (?, ?, ?, ?)";
        
        jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                Metric m = metrics.get(i);
                ps.setTimestamp(1, Timestamp.from(m.getTime()));
                ps.setString(2, m.getHost());
                ps.setString(3, m.getMetric());
                ps.setDouble(4, m.getValue());
            }
            
            @Override
            public int getBatchSize() {
                return metrics.size();
            }
        });
    }
    
    /**
     * 使用COPY快速导入
     */
    public void copyInsert(List<Metric> metrics) throws SQLException {
        CopyManager copyManager = new CopyManager((BaseConnection) 
            jdbcTemplate.getDataSource().getConnection());
        
        StringBuilder sb = new StringBuilder();
        for (Metric m : metrics) {
            sb.append(String.format("%s,%s,%s,%.2f\n",
                m.getTime(), m.getHost(), m.getMetric(), m.getValue()));
        }
        
        String sql = "COPY metrics (time, host, metric, value) FROM STDIN WITH CSV";
        copyManager.copyIn(sql, new StringReader(sb.toString()));
    }
    
    /**
     * 查询聚合数据
     */
    public List<MetricAggregation> queryAggregation(String timeWindow, 
                                                     String host,
                                                     LocalDateTime start,
                                                     LocalDateTime end) {
        String sql = String.format(
            "SELECT " +
            "  time_bucket('%s', time) AS bucket, " +
            "  host, " +
            "  metric, " +
            "  avg(value) as avg_value, " +
            "  max(value) as max_value, " +
            "  min(value) as min_value " +
            "FROM metrics " +
            "WHERE time BETWEEN ? AND ? " +
            "  AND host = ? " +
            "GROUP BY bucket, host, metric " +
            "ORDER BY bucket DESC",
            timeWindow
        );
        
        return jdbcTemplate.query(sql, (rs, rowNum) -> {
            MetricAggregation agg = new MetricAggregation();
            agg.setBucket(rs.getTimestamp("bucket").toLocalDateTime());
            agg.setHost(rs.getString("host"));
            agg.setMetric(rs.getString("metric"));
            agg.setAvgValue(rs.getDouble("avg_value"));
            agg.setMaxValue(rs.getDouble("max_value"));
            agg.setMinValue(rs.getDouble("min_value"));
            return agg;
        }, Timestamp.valueOf(start), Timestamp.valueOf(end), host);
    }
}

五、监控数据采集与存储架构

5.1 整体架构

┌─────────────────────────────────────────────────────────────────────┐
│                      监控数据采集架构                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                          │
│  │ 服务器01 │  │ 服务器02 │  │ 服务器N  │     被监控端              │
│  │ Node     │  │ Node     │  │ Node     │                          │
│  │ Exporter │  │ Exporter │  │ Exporter │                          │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                          │
│       │             │             │                                 │
│       └─────────────┼─────────────┘                                 │
│                     ▼                                               │
│           ┌─────────────────┐                                       │
│           │  Kafka / MQ     │     消息队列(削峰填谷)              │
│           └────────┬────────┘                                       │
│                    │                                                │
│       ┌────────────┼────────────┐                                   │
│       ▼            ▼            ▼                                   │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                             │
│  │InfluxDB │  │ClickHouse│  │  ES     │     时序存储层              │
│  │ (热数据)│  │ (温数据) │  │ (日志)  │                             │
│  └────┬────┘  └────┬────┘  └────┬────┘                             │
│       │            │            │                                   │
│       └────────────┼────────────┘                                   │
│                    ▼                                                │
│           ┌─────────────────┐                                       │
│           │  Grafana        │     可视化展示                        │
│           │  / 自建大屏     │                                       │
│           └─────────────────┘                                       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

5.2 采集器实现

/**
 * 系统指标采集器
 */
@Component
@Slf4j
public class SystemMetricsCollector {
    
    @Autowired
    private InfluxDBService influxDBService;
    
    private final OperatingSystemMXBean osBean = 
        (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
    
    /**
     * 定时采集系统指标
     */
    @Scheduled(fixedRate = 10000) // 每10秒
    public void collectSystemMetrics() {
        try {
            String host = InetAddress.getLocalHost().getHostName();
            long timestamp = System.currentTimeMillis();
            
            // CPU使用率
            double cpuUsage = osBean.getProcessCpuLoad() * 100;
            writeMetric("system_cpu", host, "usage", cpuUsage, timestamp);
            
            // 内存使用
            long totalMemory = osBean.getTotalPhysicalMemorySize();
            long freeMemory = osBean.getFreePhysicalMemorySize();
            double memoryUsage = (double) (totalMemory - freeMemory) / totalMemory * 100;
            writeMetric("system_memory", host, "usage", memoryUsage, timestamp);
            writeMetric("system_memory", host, "used", (totalMemory - freeMemory) / 1024 / 1024, timestamp);
            
            // JVM内存
            long jvmTotal = Runtime.getRuntime().totalMemory();
            long jvmFree = Runtime.getRuntime().freeMemory();
            double jvmUsage = (double) (jvmTotal - jvmFree) / jvmTotal * 100;
            writeMetric("jvm_memory", host, "usage", jvmUsage, timestamp);
            
            // 线程数
            int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
            writeMetric("jvm_threads", host, "count", threadCount, timestamp);
            
        } catch (Exception e) {
            log.error("采集系统指标失败", e);
        }
    }
    
    private void writeMetric(String measurement, String host, 
                            String metric, double value, long timestamp) {
        Map<String, String> tags = new HashMap<>();
        tags.put("host", host);
        tags.put("metric", metric);
        
        Map<String, Object> fields = new HashMap<>();
        fields.put("value", value);
        
        influxDBService.writePoint(measurement, tags, fields, timestamp);
    }
}

六、最佳实践与注意事项

6.1 时序数据最佳实践

┌─────────────────────────────────────────────────────────────────────┐
│                      时序数据处理最佳实践                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  【标签设计】                                                       │
│  □ 1. 标签基数不要过高(<10万)                                    │
│  □ 2. 高基数字段作为Field而非Tag                                   │
│  □ 3. 标签命名规范,避免特殊字符                                   │
│                                                                     │
│  【数据写入】                                                       │
│  □ 1. 使用批量写入,减少网络开销                                   │
│  □ 2. 启用客户端缓存和异步写入                                     │
│  □ 3. 时间戳使用统一时区(建议UTC)                                │
│                                                                     │
│  【存储优化】                                                       │
│  □ 1. 设置合理的保留策略                                           │
│  □ 2. 对历史数据进行降采样                                         │
│  □ 3. 启用压缩(支持的数据库)                                     │
│                                                                     │
│  【查询优化】                                                       │
│  □ 1. 查询必须带时间范围过滤                                       │
│  □ 2. 使用连续聚合/物化视图预计算                                  │
│  □ 3. 避免全表扫描查询                                             │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

6.2 常见错误

错误问题解决方案
标签基数过高内存占用过大,查询慢高基数字段改为Field
无时间过滤全表扫描,性能极差所有查询必须带时间范围
单点写入写入性能瓶颈批量写入,异步处理
无保留策略存储无限增长设置数据过期策略
时区混乱数据时间不一致统一使用UTC时间戳

系列上一篇地理空间数据处理

系列下一篇数据库监控与性能分析体系

知识点测试

读完文章了?来测试一下你对知识点的掌握程度吧!

评论区

使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。

如果评论系统无法加载,请确保:

  • 您的网络可以访问 GitHub
  • giscus GitHub App 已安装到仓库
  • 仓库已启用 Discussions 功能