返回 筑基・数据元府藏真
时序数据处理与分析
博主
大约 13 分钟
时序数据处理与分析
一、问题引入:系统监控数据爆炸
1.1 真实案例:监控系统的数据困境
场景:大型互联网公司的系统监控平台
规模:
- 服务器:5000台
- 每台采集指标:200个
- 采集频率:每10秒
- 数据量计算:
5000台 × 200指标 × 6次/分钟 × 1440分钟/天 = 86.4亿点/天
存储需求:约500GB/天,15TB/月
MySQL方案的困境:
┌─────────────────────────────────────────────────────────────┐
│ 表结构设计: │
│ CREATE TABLE metrics ( │
│ id BIGINT PRIMARY KEY AUTO_INCREMENT, │
│ host VARCHAR(50), │
│ metric_name VARCHAR(100), │
│ value DOUBLE, │
│ timestamp DATETIME, │
│ INDEX idx_time (timestamp), │
│ INDEX idx_host_metric (host, metric_name, timestamp) │
│ ); │
├─────────────────────────────────────────────────────────────┤
│ 遇到的问题: │
│ 1. 写入性能瓶颈 │
│ - 每秒写入:5000×200÷10 = 10万点/秒 │
│ - MySQL单节点写入极限:约2万点/秒 │
│ - 需要5个MySQL实例才能支撑写入 │
│ │
│ 2. 查询性能极差 │
│ - 查询一天数据:86亿行扫描 │
│ - 聚合查询(如每小时平均值)需要全表扫描 │
│ - 查询超时频繁 │
│ │
│ 3. 存储成本高昂 │
│ - 索引占用大量空间 │
│ - 数据压缩率低 │
│ - 历史数据清理困难 │
│ │
│ 4. 时间范围查询慢 │
│ - B+树索引不适合时间范围扫描 │
│ - 即使有索引,大量数据扫描依然很慢 │
└─────────────────────────────────────────────────────────────┘
时序数据库解决方案:
- InfluxDB:写入性能100万点/秒,查询快,自动压缩
- 存储优化:列式存储,压缩率可达10:1
- 降采样:自动聚合历史数据,保留趋势
1.2 时序数据特点
时序数据的典型特征:
┌──────────────────────────────────────────────────────────────┐
│ │
│ 1. 时间戳为核心 │
│ - 每条数据都有时间戳 │
│ - 数据按时间顺序写入 │
│ - 查询通常基于时间范围 │
│ │
│ 2. 写入特征 │
│ - 高并发写入 │
│ - 几乎不更新(追加为主) │
│ - 近期数据查询频繁,历史数据查询少 │
│ │
│ 3. 数据特点 │
│ - 数据量大 │
│ - 存在明显的冷热数据 │
│ - 需要聚合分析(avg/max/min等) │
│ │
│ 4. 典型应用场景 │
│ - 系统监控(CPU、内存、磁盘) │
│ - 业务指标(QPS、延迟、错误率) │
│ - IoT传感器数据 │
│ - 金融行情数据 │
│ - 日志数据 │
│ │
└──────────────────────────────────────────────────────────────┘
二、时序数据库选型
2.1 主流时序数据库对比
| 数据库 | 特点 | 性能 | 适用场景 | 学习成本 |
|---|---|---|---|---|
| InfluxDB | 类SQL语法,生态完善 | 高 | 监控、IoT | 低 |
| TimescaleDB | PostgreSQL扩展 | 高 | 需要SQL兼容 | 低 |
| TDengine | 国产,超高性能 | 极高 | 物联网、车联网 | 中 |
| VictoriaMetrics | 云原生,Prometheus生态 | 极高 | 云原生监控 | 中 |
| ClickHouse | OLAP数据库,支持时序 | 高 | 分析型时序 | 中 |
| IoTDB | Apache项目,IoT专用 | 高 | 工业物联网 | 中 |
2.2 选型决策树
┌─────────────────┐
│ 时序数据需求 │
└────────┬────────┘
│
▼
┌──────────────────────────────┐
│ 是否需要SQL兼容? │
└─────────────┬────────────────┘
│
┌────────────────┼────────────────┐
▼是 ▼否
┌───────────────┐ ┌───────────────┐
│ TimescaleDB │ │ 数据量多大? │
│ (PostgreSQL) │ │ │
└───────────────┘ └───────┬───────┘
│
┌────────────────┼────────────────┐
▼<100万点/秒 ▼>100万点/秒
┌───────────────┐ ┌───────────────┐
│ InfluxDB │ │ TDengine │
│ VictoriaMetrics│ │ (国产高性能) │
└───────────────┘ └───────────────┘
三、InfluxDB实战
3.1 核心概念
InfluxDB数据模型:
┌──────────────────────────────────────────────────────────────┐
│ │
│ Database(数据库) │
│ └── Measurement(表) │
│ ├── Tag(索引列)- 字符串类型,用于过滤和分组 │
│ │ ├── host = server01 │
│ │ ├── region = beijing │
│ │ └── app = order-service │
│ │ │
│ ├── Field(数据列)- 实际数值,不会被索引 │
│ │ ├── value = 75.5 │
│ │ └── temperature = 23.5 │
│ │ │
│ └── Timestamp(时间戳)- 纳秒精度 │
│ └── 1625097600000000000 │
│ │
│ 与传统数据库对比: │
│ ┌──────────────┬─────────────────┬──────────────────────┐ │
│ │ 传统数据库 │ InfluxDB │ 说明 │ │
│ ├──────────────┼─────────────────┼──────────────────────┤ │
│ │ Database │ Database │ 数据库 │ │
│ │ Table │ Measurement │ 表/度量 │ │
│ │ Index │ Tag │ 索引列 │ │
│ │ Column │ Field │ 数据列 │ │
│ │ Primary Key │ Timestamp │ 主键(时间戳) │ │
│ └──────────────┴─────────────────┴──────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
3.2 基础操作
-- 创建数据库
CREATE DATABASE metrics;
-- 使用数据库
USE metrics;
-- 写入数据(Line Protocol格式)
-- 格式:measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
INSERT cpu_usage,host=server01,region=beijing,env=prod value=75.5,usage_user=45.2 1625097600000000000;
INSERT cpu_usage,host=server02,region=beijing,env=prod value=82.3,usage_user=52.1 1625097600000000000;
INSERT cpu_usage,host=server01,region=beijing,env=prod value=78.1,usage_user=48.5 1625097660000000000;
-- 查询数据
SELECT * FROM cpu_usage WHERE time > now() - 1h;
-- 带条件的查询
SELECT host, mean(value) FROM cpu_usage
WHERE time > now() - 1h AND region = 'beijing'
GROUP BY host;
-- 时间聚合查询
SELECT mean(value), max(value), min(value) FROM cpu_usage
WHERE time > now() - 24h
GROUP BY time(1h), host;
-- 连续查询(降采样)
CREATE CONTINUOUS QUERY cq_5m ON metrics
BEGIN
SELECT mean(value) AS value_avg, max(value) AS value_max
INTO metrics.rp_7d.cpu_usage_5m
FROM metrics.rp_1d.cpu_usage
GROUP BY time(5m), *
END;
-- 保留策略
CREATE RETENTION POLICY rp_7d ON metrics DURATION 7d REPLICATION 1 DEFAULT;
CREATE RETENTION POLICY rp_30d ON metrics DURATION 30d REPLICATION 1;
-- 删除数据
DROP SERIES FROM cpu_usage WHERE host = 'server01';
3.3 Java客户端操作
/**
* InfluxDB服务
*/
@Service
@Slf4j
public class InfluxDBService {
private InfluxDB influxDB;
private String database = "metrics";
private String retentionPolicy = "rp_7d";
@PostConstruct
public void init() {
influxDB = InfluxDBFactory.connect(
"http://localhost:8086",
"admin",
"admin"
);
// 创建数据库(如果不存在)
influxDB.query(new Query("CREATE DATABASE IF NOT EXISTS " + database));
influxDB.setDatabase(database);
influxDB.setRetentionPolicy(retentionPolicy);
// 启用批量写入
influxDB.enableBatch(BatchOptions.DEFAULTS
.actions(1000)
.flushDuration(100)
.exceptionHandler((points, throwable) -> {
log.error("InfluxDB写入失败", throwable);
})
);
}
/**
* 写入单条数据
*/
public void writePoint(String measurement, Map<String, String> tags,
Map<String, Object> fields, long timestamp) {
Point.Builder builder = Point.measurement(measurement)
.time(timestamp, TimeUnit.MILLISECONDS);
// 添加标签
tags.forEach(builder::tag);
// 添加字段
fields.forEach((key, value) -> {
if (value instanceof Number) {
builder.addField(key, (Number) value);
} else if (value instanceof String) {
builder.addField(key, (String) value);
} else if (value instanceof Boolean) {
builder.addField(key, (Boolean) value);
}
});
influxDB.write(builder.build());
}
/**
* 批量写入
*/
public void writeBatch(List<MetricPoint> points) {
BatchPoints batchPoints = BatchPoints.database(database)
.retentionPolicy(retentionPolicy)
.build();
for (MetricPoint point : points) {
Point p = Point.measurement(point.getMeasurement())
.time(point.getTimestamp(), TimeUnit.MILLISECONDS)
.tag(point.getTags())
.fields(point.getFields())
.build();
batchPoints.point(p);
}
influxDB.write(batchPoints);
}
/**
* 查询数据
*/
public List<MetricData> query(String sql) {
QueryResult result = influxDB.query(new Query(sql, database));
List<MetricData> dataList = new ArrayList<>();
if (result.getResults() != null) {
for (QueryResult.Result r : result.getResults()) {
if (r.getSeries() != null) {
for (QueryResult.Series series : r.getSeries()) {
List<String> columns = series.getColumns();
List<List<Object>> values = series.getValues();
for (List<Object> value : values) {
MetricData data = new MetricData();
data.setMeasurement(series.getName());
data.setTags(series.getTags());
Map<String, Object> fields = new HashMap<>();
for (int i = 0; i < columns.size(); i++) {
fields.put(columns.get(i), value.get(i));
}
data.setFields(fields);
dataList.add(data);
}
}
}
}
}
return dataList;
}
/**
* 查询最近N分钟的数据
*/
public List<MetricData> queryRecent(String measurement, int minutes) {
String sql = String.format(
"SELECT * FROM %s WHERE time > now() - %dm",
measurement, minutes
);
return query(sql);
}
/**
* 聚合查询
*/
public List<MetricData> queryAggregation(String measurement,
String aggregation,
String timeWindow,
Map<String, String> tags) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT ").append(aggregation).append("(value) ")
.append("FROM ").append(measurement)
.append(" WHERE time > now() - 1h");
// 添加标签过滤
if (tags != null) {
tags.forEach((k, v) -> sql.append(" AND ").append(k).append("='").append(v).append("'"));
}
sql.append(" GROUP BY time(").append(timeWindow).append(")");
return query(sql.toString());
}
@PreDestroy
public void close() {
if (influxDB != null) {
influxDB.close();
}
}
}
/**
* 指标数据点
*/
@Data
public class MetricPoint {
private String measurement;
private Map<String, String> tags;
private Map<String, Object> fields;
private long timestamp;
}
四、TimescaleDB实战
4.1 TimescaleDB简介
TimescaleDB = PostgreSQL + 时序扩展
优势:
- 完全兼容PostgreSQL生态
- 支持SQL标准
- 支持JOIN、窗口函数等复杂查询
- 自动分区(按时间)
- 支持压缩
- 支持连续聚合
4.2 基础操作
-- 创建扩展
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- 创建时序表
CREATE TABLE metrics (
time TIMESTAMPTZ NOT NULL,
host VARCHAR(50) NOT NULL,
metric VARCHAR(50) NOT NULL,
value DOUBLE PRECISION NULL
);
-- 转换为超表(自动按时间分区)
SELECT create_hypertable('metrics', 'time');
-- 插入数据
INSERT INTO metrics (time, host, metric, value) VALUES
(NOW(), 'server01', 'cpu_usage', 75.5),
(NOW(), 'server01', 'memory_usage', 82.3),
(NOW(), 'server02', 'cpu_usage', 68.2);
-- 批量插入(使用COPY更快)
COPY metrics (time, host, metric, value) FROM '/data/metrics.csv' WITH CSV;
-- 查询数据
SELECT * FROM metrics
WHERE time > NOW() - INTERVAL '1 hour'
AND host = 'server01'
ORDER BY time DESC;
-- 时间聚合查询
SELECT
time_bucket('5 minutes', time) AS bucket,
host,
metric,
avg(value) as avg_value,
max(value) as max_value,
min(value) as min_value
FROM metrics
WHERE time > NOW() - INTERVAL '1 day'
GROUP BY bucket, host, metric
ORDER BY bucket DESC;
-- 连续聚合(物化视图)
CREATE MATERIALIZED VIEW metrics_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS bucket,
host,
metric,
avg(value) as avg_value,
max(value) as max_value,
min(value) as min_value,
count(*) as count
FROM metrics
GROUP BY bucket, host, metric;
-- 设置保留策略
SELECT add_retention_policy('metrics', INTERVAL '30 days');
-- 启用压缩
ALTER TABLE metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'host,metric'
);
-- 压缩7天前的数据
SELECT add_compression_policy('metrics', INTERVAL '7 days');
4.3 Java操作TimescaleDB
/**
* TimescaleDB服务
*/
@Service
@Slf4j
public class TimescaleDBService {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 批量写入数据
*/
public void batchInsert(List<Metric> metrics) {
String sql = "INSERT INTO metrics (time, host, metric, value) VALUES (?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Metric m = metrics.get(i);
ps.setTimestamp(1, Timestamp.from(m.getTime()));
ps.setString(2, m.getHost());
ps.setString(3, m.getMetric());
ps.setDouble(4, m.getValue());
}
@Override
public int getBatchSize() {
return metrics.size();
}
});
}
/**
* 使用COPY快速导入
*/
public void copyInsert(List<Metric> metrics) throws SQLException {
CopyManager copyManager = new CopyManager((BaseConnection)
jdbcTemplate.getDataSource().getConnection());
StringBuilder sb = new StringBuilder();
for (Metric m : metrics) {
sb.append(String.format("%s,%s,%s,%.2f\n",
m.getTime(), m.getHost(), m.getMetric(), m.getValue()));
}
String sql = "COPY metrics (time, host, metric, value) FROM STDIN WITH CSV";
copyManager.copyIn(sql, new StringReader(sb.toString()));
}
/**
* 查询聚合数据
*/
public List<MetricAggregation> queryAggregation(String timeWindow,
String host,
LocalDateTime start,
LocalDateTime end) {
String sql = String.format(
"SELECT " +
" time_bucket('%s', time) AS bucket, " +
" host, " +
" metric, " +
" avg(value) as avg_value, " +
" max(value) as max_value, " +
" min(value) as min_value " +
"FROM metrics " +
"WHERE time BETWEEN ? AND ? " +
" AND host = ? " +
"GROUP BY bucket, host, metric " +
"ORDER BY bucket DESC",
timeWindow
);
return jdbcTemplate.query(sql, (rs, rowNum) -> {
MetricAggregation agg = new MetricAggregation();
agg.setBucket(rs.getTimestamp("bucket").toLocalDateTime());
agg.setHost(rs.getString("host"));
agg.setMetric(rs.getString("metric"));
agg.setAvgValue(rs.getDouble("avg_value"));
agg.setMaxValue(rs.getDouble("max_value"));
agg.setMinValue(rs.getDouble("min_value"));
return agg;
}, Timestamp.valueOf(start), Timestamp.valueOf(end), host);
}
}
五、监控数据采集与存储架构
5.1 整体架构
┌─────────────────────────────────────────────────────────────────────┐
│ 监控数据采集架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 服务器01 │ │ 服务器02 │ │ 服务器N │ 被监控端 │
│ │ Node │ │ Node │ │ Node │ │
│ │ Exporter │ │ Exporter │ │ Exporter │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Kafka / MQ │ 消息队列(削峰填谷) │
│ └────────┬────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │InfluxDB │ │ClickHouse│ │ ES │ 时序存储层 │
│ │ (热数据)│ │ (温数据) │ │ (日志) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └────────────┼────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Grafana │ 可视化展示 │
│ │ / 自建大屏 │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
5.2 采集器实现
/**
* 系统指标采集器
*/
@Component
@Slf4j
public class SystemMetricsCollector {
@Autowired
private InfluxDBService influxDBService;
private final OperatingSystemMXBean osBean =
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
/**
* 定时采集系统指标
*/
@Scheduled(fixedRate = 10000) // 每10秒
public void collectSystemMetrics() {
try {
String host = InetAddress.getLocalHost().getHostName();
long timestamp = System.currentTimeMillis();
// CPU使用率
double cpuUsage = osBean.getProcessCpuLoad() * 100;
writeMetric("system_cpu", host, "usage", cpuUsage, timestamp);
// 内存使用
long totalMemory = osBean.getTotalPhysicalMemorySize();
long freeMemory = osBean.getFreePhysicalMemorySize();
double memoryUsage = (double) (totalMemory - freeMemory) / totalMemory * 100;
writeMetric("system_memory", host, "usage", memoryUsage, timestamp);
writeMetric("system_memory", host, "used", (totalMemory - freeMemory) / 1024 / 1024, timestamp);
// JVM内存
long jvmTotal = Runtime.getRuntime().totalMemory();
long jvmFree = Runtime.getRuntime().freeMemory();
double jvmUsage = (double) (jvmTotal - jvmFree) / jvmTotal * 100;
writeMetric("jvm_memory", host, "usage", jvmUsage, timestamp);
// 线程数
int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
writeMetric("jvm_threads", host, "count", threadCount, timestamp);
} catch (Exception e) {
log.error("采集系统指标失败", e);
}
}
private void writeMetric(String measurement, String host,
String metric, double value, long timestamp) {
Map<String, String> tags = new HashMap<>();
tags.put("host", host);
tags.put("metric", metric);
Map<String, Object> fields = new HashMap<>();
fields.put("value", value);
influxDBService.writePoint(measurement, tags, fields, timestamp);
}
}
六、最佳实践与注意事项
6.1 时序数据最佳实践
┌─────────────────────────────────────────────────────────────────────┐
│ 时序数据处理最佳实践 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【标签设计】 │
│ □ 1. 标签基数不要过高(<10万) │
│ □ 2. 高基数字段作为Field而非Tag │
│ □ 3. 标签命名规范,避免特殊字符 │
│ │
│ 【数据写入】 │
│ □ 1. 使用批量写入,减少网络开销 │
│ □ 2. 启用客户端缓存和异步写入 │
│ □ 3. 时间戳使用统一时区(建议UTC) │
│ │
│ 【存储优化】 │
│ □ 1. 设置合理的保留策略 │
│ □ 2. 对历史数据进行降采样 │
│ □ 3. 启用压缩(支持的数据库) │
│ │
│ 【查询优化】 │
│ □ 1. 查询必须带时间范围过滤 │
│ □ 2. 使用连续聚合/物化视图预计算 │
│ □ 3. 避免全表扫描查询 │
│ │
└─────────────────────────────────────────────────────────────────────┘
6.2 常见错误
| 错误 | 问题 | 解决方案 |
|---|---|---|
| 标签基数过高 | 内存占用过大,查询慢 | 高基数字段改为Field |
| 无时间过滤 | 全表扫描,性能极差 | 所有查询必须带时间范围 |
| 单点写入 | 写入性能瓶颈 | 批量写入,异步处理 |
| 无保留策略 | 存储无限增长 | 设置数据过期策略 |
| 时区混乱 | 数据时间不一致 | 统一使用UTC时间戳 |
系列上一篇:地理空间数据处理
系列下一篇:数据库监控与性能分析体系
知识点测试
读完文章了?来测试一下你对知识点的掌握程度吧!
评论区
使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。
如果评论系统无法加载,请确保:
- 您的网络可以访问 GitHub
- giscus GitHub App 已安装到仓库
- 仓库已启用 Discussions 功能