数据插入 #
一、插入概述 #
1.1 TimescaleDB插入特点 #
text
TimescaleDB插入优势:
自动分区
├── 数据自动路由到正确分片
├── 无需手动管理分区
└── 透明化分区操作
高性能写入
├── 并行写入多个分片
├── 批量插入优化
├── 索引维护优化
└── WAL优化
兼容性
├── 标准INSERT语法
├── 支持所有PostgreSQL特性
└── 支持COPY命令
1.2 插入性能考虑 #
text
插入性能因素:
数据量
├── 单行插入 - 适合少量数据
├── 批量插入 - 适合大量数据
└── COPY导入 - 最快方式
索引数量
├── 索引越多,写入越慢
├── 建议不超过5个索引
└── 大批量导入前可临时禁用
分片配置
├── 合适的分区间隔
├── 空间分区数量
└── 并发写入优化
二、基本插入 #
2.1 单行插入 #
sql
-- 创建超表
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
sensor_id INTEGER NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
SELECT create_hypertable('sensor_data', 'time');
-- 插入单行数据
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES (NOW(), 1, 25.5, 60.0);
-- 插入指定时间
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES ('2024-01-01 10:00:00+00', 1, 25.5, 60.0);
-- 使用时间函数
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES (
NOW() - INTERVAL '1 hour',
1,
25.5,
60.0
);
2.2 多行插入 #
sql
-- 插入多行数据(推荐)
INSERT INTO sensor_data (time, sensor_id, temperature, humidity) VALUES
(NOW(), 1, 25.5, 60.0),
(NOW(), 2, 26.0, 61.0),
(NOW(), 3, 24.8, 59.5),
(NOW(), 4, 25.2, 60.5);
-- 插入不同时间的数据
INSERT INTO sensor_data (time, sensor_id, temperature, humidity) VALUES
(NOW() - INTERVAL '1 hour', 1, 24.8, 58.5),
(NOW() - INTERVAL '2 hours', 1, 24.5, 57.2),
(NOW() - INTERVAL '3 hours', 1, 24.2, 56.8);
2.3 插入默认值 #
sql
-- 创建带默认值的表
CREATE TABLE events (
time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
event_id SERIAL,
event_name VARCHAR(100),
data JSONB
);
SELECT create_hypertable('events', 'time');
-- 使用默认值插入
INSERT INTO events (event_name, data)
VALUES ('system_start', '{"status": "ok"}');
-- 查看插入结果
SELECT * FROM events;
三、批量插入 #
3.1 批量插入优势 #
text
批量插入优势:
性能提升
├── 减少网络往返
├── 减少事务开销
├── 批量索引更新
└── WAL优化
推荐批量大小
├── 1000-10000行/批次
├── 根据行大小调整
├── 不超过10MB/批次
└── 测试最佳大小
3.2 批量插入示例 #
sql
-- 生成测试数据的函数
CREATE OR REPLACE FUNCTION generate_sensor_data(
num_rows INTEGER,
num_sensors INTEGER
)
RETURNS VOID AS $$
BEGIN
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
SELECT
NOW() - (random() * INTERVAL '30 days'),
(random() * num_sensors)::INTEGER + 1,
20 + random() * 10,
50 + random() * 20
FROM generate_series(1, num_rows);
END;
$$ LANGUAGE plpgsql;
-- 生成10万条测试数据
SELECT generate_sensor_data(100000, 10);
-- 查看插入结果
SELECT
sensor_id,
count(*) as reading_count,
min(time) as first_reading,
max(time) as last_reading
FROM sensor_data
GROUP BY sensor_id
ORDER BY sensor_id;
3.3 分批插入 #
sql
-- 分批插入大量数据
DO $$
DECLARE
batch_size INTEGER := 10000;
total_rows INTEGER := 1000000;
batches INTEGER;
i INTEGER;
BEGIN
batches := total_rows / batch_size;
FOR i IN 1..batches LOOP
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
SELECT
NOW() - (random() * INTERVAL '365 days'),
(random() * 100)::INTEGER + 1,
20 + random() * 10,
50 + random() * 20
FROM generate_series(1, batch_size);
-- 每批次提交
COMMIT;
-- 进度显示
RAISE NOTICE 'Batch % of % completed', i, batches;
END LOOP;
END $$;
四、COPY导入 #
4.1 COPY命令 #
sql
-- 准备CSV文件
-- data.csv内容:
-- time,sensor_id,temperature,humidity
-- 2024-01-01 00:00:00+00,1,25.5,60.0
-- 2024-01-01 01:00:00+00,1,25.6,60.5
-- 2024-01-01 02:00:00+00,1,25.4,59.8
-- 从CSV文件导入
COPY sensor_data (time, sensor_id, temperature, humidity)
FROM '/path/to/data.csv'
WITH (
FORMAT csv,
HEADER true,
DELIMITER ','
);
-- 从标准输入导入
COPY sensor_data (time, sensor_id, temperature, humidity)
FROM STDIN WITH (FORMAT csv, HEADER true);
4.2 COPY导出 #
sql
-- 导出数据到CSV
COPY (
SELECT * FROM sensor_data
WHERE time > NOW() - INTERVAL '7 days'
) TO '/path/to/export.csv'
WITH (
FORMAT csv,
HEADER true,
DELIMITER ','
);
-- 导出特定列
COPY (
SELECT time, sensor_id, temperature
FROM sensor_data
WHERE sensor_id = 1
) TO '/path/to/sensor_1.csv'
WITH (FORMAT csv, HEADER true);
4.3 COPY性能优化 #
sql
-- 大批量导入前的优化
-- 1. 临时移除压缩策略
SELECT remove_compression_policy('sensor_data');
-- 2. 禁用触发器(如果有)
ALTER TABLE sensor_data DISABLE TRIGGER ALL;
-- 3. 导入数据
COPY sensor_data FROM '/path/to/large_data.csv' WITH (FORMAT csv);
-- 4. 重新启用触发器
ALTER TABLE sensor_data ENABLE TRIGGER ALL;
-- 5. 更新统计信息
ANALYZE sensor_data;
-- 6. 重新添加压缩策略
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');
五、从查询插入 #
5.1 INSERT SELECT #
sql
-- 从其他表插入
CREATE TABLE sensor_data_archive (LIKE sensor_data INCLUDING ALL);
SELECT create_hypertable('sensor_data_archive', 'time');
-- 插入旧数据到归档表
INSERT INTO sensor_data_archive
SELECT * FROM sensor_data
WHERE time < NOW() - INTERVAL '90 days';
-- 带条件插入
INSERT INTO high_temp_events (time, sensor_id, temperature)
SELECT time, sensor_id, temperature
FROM sensor_data
WHERE temperature > 35;
5.2 聚合后插入 #
sql
-- 创建聚合表
CREATE TABLE hourly_stats (
hour TIMESTAMPTZ NOT NULL,
sensor_id INTEGER NOT NULL,
avg_temp DOUBLE PRECISION,
max_temp DOUBLE PRECISION,
min_temp DOUBLE PRECISION,
reading_count BIGINT
);
SELECT create_hypertable('hourly_stats', 'hour');
-- 插入聚合数据
INSERT INTO hourly_stats (hour, sensor_id, avg_temp, max_temp, min_temp, reading_count)
SELECT
time_bucket('1 hour', time) AS hour,
sensor_id,
AVG(temperature),
MAX(temperature),
MIN(temperature),
COUNT(*)
FROM sensor_data
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, sensor_id;
六、使用程序插入 #
6.1 Python示例 #
python
import psycopg2
from datetime import datetime, timedelta
import random
# 连接数据库
conn = psycopg2.connect(
host="localhost",
database="tsdb",
user="postgres",
password="password"
)
cur = conn.cursor()
# 单行插入
def insert_single():
cur.execute("""
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES (%s, %s, %s, %s)
""", (datetime.now(), 1, 25.5, 60.0))
conn.commit()
# 批量插入
def insert_batch(rows):
data = [
(datetime.now() - timedelta(hours=i),
random.randint(1, 10),
20 + random.random() * 10,
50 + random.random() * 20)
for i in range(rows)
]
cur.executemany("""
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES (%s, %s, %s, %s)
""", data)
conn.commit()
# 使用COPY
def insert_copy(rows):
import io
buffer = io.StringIO()
for i in range(rows):
time = datetime.now() - timedelta(hours=i)
sensor_id = random.randint(1, 10)
temperature = 20 + random.random() * 10
humidity = 50 + random.random() * 20
buffer.write(f"{time},{sensor_id},{temperature},{humidity}\n")
buffer.seek(0)
cur.copy_expert("""
COPY sensor_data (time, sensor_id, temperature, humidity)
FROM STDIN WITH (FORMAT csv)
""", buffer)
conn.commit()
# 执行
insert_batch(10000)
insert_copy(100000)
cur.close()
conn.close()
6.2 Java示例 #
java
import java.sql.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TimescaleDBInsert {
public static void main(String[] args) throws SQLException {
String url = "jdbc:postgresql://localhost:5432/tsdb";
Connection conn = DriverManager.getConnection(url, "postgres", "password");
// 批量插入
insertBatch(conn, 10000);
conn.close();
}
public static void insertBatch(Connection conn, int batchSize) throws SQLException {
String sql = "INSERT INTO sensor_data (time, sensor_id, temperature, humidity) VALUES (?, ?, ?, ?)";
PreparedStatement pstmt = conn.prepareStatement(sql);
Random random = new Random();
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < batchSize; i++) {
pstmt.setTimestamp(1, Timestamp.valueOf(now.minusHours(i)));
pstmt.setInt(2, random.nextInt(10) + 1);
pstmt.setDouble(3, 20 + random.nextDouble() * 10);
pstmt.setDouble(4, 50 + random.nextDouble() * 20);
pstmt.addBatch();
if (i % 1000 == 0) {
pstmt.executeBatch();
conn.commit();
}
}
pstmt.executeBatch();
conn.commit();
pstmt.close();
}
}
七、插入性能优化 #
7.1 批量大小优化 #
text
批量大小建议:
数据行大小 推荐批量大小
─────────────────────────────
小(<100字节) 5000-10000行
中(100-500字节) 1000-5000行
大(>500字节) 500-1000行
测试方法
├── 测试不同批量大小
├── 监控插入速度
├── 监控内存使用
└── 选择最佳大小
7.2 索引优化 #
sql
-- 查看索引
\di sensor_data
-- 大批量导入前临时删除索引
DROP INDEX idx_sensor_id;
-- 导入数据
COPY sensor_data FROM '/path/to/data.csv' WITH (FORMAT csv);
-- 重新创建索引
CREATE INDEX idx_sensor_id ON sensor_data (sensor_id);
-- 更新统计信息
ANALYZE sensor_data;
7.3 并发写入优化 #
sql
-- 创建空间分区
SELECT create_hypertable(
'sensor_data',
'time',
partitioning_column => 'sensor_id',
number_partitions => 4
);
-- 多连接并发写入
-- 每个连接写入不同sensor_id的数据
-- 减少锁竞争
八、插入监控 #
8.1 查看插入统计 #
sql
-- 查看表统计
SELECT
schemaname,
tablename,
n_tup_ins as inserts,
n_tup_upd as updates,
n_tup_del as deletes
FROM pg_stat_user_tables
WHERE tablename = 'sensor_data';
-- 查看分片写入分布
SELECT
chunk_name,
pg_size_pretty(total_bytes) as size
FROM chunk_relation_size('sensor_data')
ORDER BY total_bytes DESC
LIMIT 10;
8.2 监控插入性能 #
sql
-- 测试插入性能
EXPLAIN ANALYZE
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
SELECT
NOW() - (random() * INTERVAL '30 days'),
(random() * 10)::INTEGER + 1,
20 + random() * 10,
50 + random() * 20
FROM generate_series(1, 10000);
-- 查看写入速率
SELECT
hypertable_name,
chunk_name,
range_start,
range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data'
ORDER BY range_start DESC
LIMIT 5;
九、常见问题 #
9.1 插入性能慢 #
sql
-- 检查索引数量
SELECT count(*) FROM pg_indexes
WHERE tablename = 'sensor_data';
-- 解决方案
-- 1. 减少索引数量
-- 2. 使用批量插入
-- 3. 使用COPY命令
-- 4. 调整分区间隔
9.2 分片不均匀 #
sql
-- 检查分片大小
SELECT
chunk_name,
pg_size_pretty(total_bytes) as size
FROM chunk_relation_size('sensor_data')
ORDER BY total_bytes DESC;
-- 解决方案
-- 1. 调整分区间隔
-- 2. 使用空间分区
-- 3. 合并小分片
9.3 内存不足 #
sql
-- 大批量导入时分批处理
-- 每批不超过10MB
-- 调整work_mem
SET work_mem = '256MB';
-- 调整maintenance_work_mem
SET maintenance_work_mem = '1GB';
十、总结 #
插入操作要点:
| 方法 | 适用场景 | 性能 |
|---|---|---|
| 单行INSERT | 少量数据 | 低 |
| 多行INSERT | 中量数据 | 中 |
| COPY | 大量数据 | 高 |
| INSERT SELECT | 数据迁移 | 中 |
最佳实践:
- 批量插入:使用多行INSERT或COPY
- 索引优化:大批量导入前临时删除索引
- 分片配置:合理配置分区间隔和空间分区
- 监控调优:监控插入性能,持续优化
下一步,让我们学习数据查询操作!
最后更新:2026-03-27