数据插入 #

一、插入概述 #

1.1 TimescaleDB插入特点 #

text
TimescaleDB插入优势:

自动分区
├── 数据自动路由到正确分片
├── 无需手动管理分区
└── 透明化分区操作

高性能写入
├── 并行写入多个分片
├── 批量插入优化
├── 索引维护优化
└── WAL优化

兼容性
├── 标准INSERT语法
├── 支持所有PostgreSQL特性
└── 支持COPY命令

1.2 插入性能考虑 #

text
插入性能因素:

数据量
├── 单行插入 - 适合少量数据
├── 批量插入 - 适合大量数据
└── COPY导入 - 最快方式

索引数量
├── 索引越多,写入越慢
├── 建议不超过5个索引
└── 大批量导入前可临时禁用

分片配置
├── 合适的分区间隔
├── 空间分区数量
└── 并发写入优化

二、基本插入 #

2.1 单行插入 #

sql
-- 创建超表
CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER NOT NULL,
    temperature DOUBLE PRECISION,
    humidity DOUBLE PRECISION
);

SELECT create_hypertable('sensor_data', 'time');

-- 插入单行数据
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES (NOW(), 1, 25.5, 60.0);

-- 插入指定时间
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES ('2024-01-01 10:00:00+00', 1, 25.5, 60.0);

-- 使用时间函数
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
VALUES (
    NOW() - INTERVAL '1 hour',
    1,
    25.5,
    60.0
);

2.2 多行插入 #

sql
-- 插入多行数据(推荐)
INSERT INTO sensor_data (time, sensor_id, temperature, humidity) VALUES
    (NOW(), 1, 25.5, 60.0),
    (NOW(), 2, 26.0, 61.0),
    (NOW(), 3, 24.8, 59.5),
    (NOW(), 4, 25.2, 60.5);

-- 插入不同时间的数据
INSERT INTO sensor_data (time, sensor_id, temperature, humidity) VALUES
    (NOW() - INTERVAL '1 hour', 1, 24.8, 58.5),
    (NOW() - INTERVAL '2 hours', 1, 24.5, 57.2),
    (NOW() - INTERVAL '3 hours', 1, 24.2, 56.8);

2.3 插入默认值 #

sql
-- 创建带默认值的表
CREATE TABLE events (
    time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    event_id SERIAL,
    event_name VARCHAR(100),
    data JSONB
);

SELECT create_hypertable('events', 'time');

-- 使用默认值插入
INSERT INTO events (event_name, data)
VALUES ('system_start', '{"status": "ok"}');

-- 查看插入结果
SELECT * FROM events;

三、批量插入 #

3.1 批量插入优势 #

text
批量插入优势:

性能提升
├── 减少网络往返
├── 减少事务开销
├── 批量索引更新
└── WAL优化

推荐批量大小
├── 1000-10000行/批次
├── 根据行大小调整
├── 不超过10MB/批次
└── 测试最佳大小

3.2 批量插入示例 #

sql
-- 生成测试数据的函数
CREATE OR REPLACE FUNCTION generate_sensor_data(
    num_rows INTEGER,
    num_sensors INTEGER
)
RETURNS VOID AS $$
BEGIN
    INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
    SELECT 
        NOW() - (random() * INTERVAL '30 days'),
        (random() * num_sensors)::INTEGER + 1,
        20 + random() * 10,
        50 + random() * 20
    FROM generate_series(1, num_rows);
END;
$$ LANGUAGE plpgsql;

-- 生成10万条测试数据
SELECT generate_sensor_data(100000, 10);

-- 查看插入结果
SELECT 
    sensor_id,
    count(*) as reading_count,
    min(time) as first_reading,
    max(time) as last_reading
FROM sensor_data
GROUP BY sensor_id
ORDER BY sensor_id;

3.3 分批插入 #

sql
-- 分批插入大量数据
DO $$
DECLARE
    batch_size INTEGER := 10000;
    total_rows INTEGER := 1000000;
    batches INTEGER;
    i INTEGER;
BEGIN
    batches := total_rows / batch_size;
    
    FOR i IN 1..batches LOOP
        INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
        SELECT 
            NOW() - (random() * INTERVAL '365 days'),
            (random() * 100)::INTEGER + 1,
            20 + random() * 10,
            50 + random() * 20
        FROM generate_series(1, batch_size);
        
        -- 每批次提交
        COMMIT;
        
        -- 进度显示
        RAISE NOTICE 'Batch % of % completed', i, batches;
    END LOOP;
END $$;

四、COPY导入 #

4.1 COPY命令 #

sql
-- 准备CSV文件
-- data.csv内容:
-- time,sensor_id,temperature,humidity
-- 2024-01-01 00:00:00+00,1,25.5,60.0
-- 2024-01-01 01:00:00+00,1,25.6,60.5
-- 2024-01-01 02:00:00+00,1,25.4,59.8

-- 从CSV文件导入
COPY sensor_data (time, sensor_id, temperature, humidity)
FROM '/path/to/data.csv'
WITH (
    FORMAT csv,
    HEADER true,
    DELIMITER ','
);

-- 从标准输入导入
COPY sensor_data (time, sensor_id, temperature, humidity)
FROM STDIN WITH (FORMAT csv, HEADER true);

4.2 COPY导出 #

sql
-- 导出数据到CSV
COPY (
    SELECT * FROM sensor_data
    WHERE time > NOW() - INTERVAL '7 days'
) TO '/path/to/export.csv'
WITH (
    FORMAT csv,
    HEADER true,
    DELIMITER ','
);

-- 导出特定列
COPY (
    SELECT time, sensor_id, temperature
    FROM sensor_data
    WHERE sensor_id = 1
) TO '/path/to/sensor_1.csv'
WITH (FORMAT csv, HEADER true);

4.3 COPY性能优化 #

sql
-- 大批量导入前的优化
-- 1. 临时移除压缩策略
SELECT remove_compression_policy('sensor_data');

-- 2. 禁用触发器(如果有)
ALTER TABLE sensor_data DISABLE TRIGGER ALL;

-- 3. 导入数据
COPY sensor_data FROM '/path/to/large_data.csv' WITH (FORMAT csv);

-- 4. 重新启用触发器
ALTER TABLE sensor_data ENABLE TRIGGER ALL;

-- 5. 更新统计信息
ANALYZE sensor_data;

-- 6. 重新添加压缩策略
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');

五、从查询插入 #

5.1 INSERT SELECT #

sql
-- 从其他表插入
CREATE TABLE sensor_data_archive (LIKE sensor_data INCLUDING ALL);
SELECT create_hypertable('sensor_data_archive', 'time');

-- 插入旧数据到归档表
INSERT INTO sensor_data_archive
SELECT * FROM sensor_data
WHERE time < NOW() - INTERVAL '90 days';

-- 带条件插入
INSERT INTO high_temp_events (time, sensor_id, temperature)
SELECT time, sensor_id, temperature
FROM sensor_data
WHERE temperature > 35;

5.2 聚合后插入 #

sql
-- 创建聚合表
CREATE TABLE hourly_stats (
    hour TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER NOT NULL,
    avg_temp DOUBLE PRECISION,
    max_temp DOUBLE PRECISION,
    min_temp DOUBLE PRECISION,
    reading_count BIGINT
);

SELECT create_hypertable('hourly_stats', 'hour');

-- 插入聚合数据
INSERT INTO hourly_stats (hour, sensor_id, avg_temp, max_temp, min_temp, reading_count)
SELECT 
    time_bucket('1 hour', time) AS hour,
    sensor_id,
    AVG(temperature),
    MAX(temperature),
    MIN(temperature),
    COUNT(*)
FROM sensor_data
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, sensor_id;

六、使用程序插入 #

6.1 Python示例 #

python
import psycopg2
from datetime import datetime, timedelta
import random

# 连接数据库
conn = psycopg2.connect(
    host="localhost",
    database="tsdb",
    user="postgres",
    password="password"
)
cur = conn.cursor()

# 单行插入
def insert_single():
    cur.execute("""
        INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
        VALUES (%s, %s, %s, %s)
    """, (datetime.now(), 1, 25.5, 60.0))
    conn.commit()

# 批量插入
def insert_batch(rows):
    data = [
        (datetime.now() - timedelta(hours=i), 
         random.randint(1, 10), 
         20 + random.random() * 10,
         50 + random.random() * 20)
        for i in range(rows)
    ]
    
    cur.executemany("""
        INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
        VALUES (%s, %s, %s, %s)
    """, data)
    conn.commit()

# 使用COPY
def insert_copy(rows):
    import io
    
    buffer = io.StringIO()
    for i in range(rows):
        time = datetime.now() - timedelta(hours=i)
        sensor_id = random.randint(1, 10)
        temperature = 20 + random.random() * 10
        humidity = 50 + random.random() * 20
        buffer.write(f"{time},{sensor_id},{temperature},{humidity}\n")
    
    buffer.seek(0)
    cur.copy_expert("""
        COPY sensor_data (time, sensor_id, temperature, humidity)
        FROM STDIN WITH (FORMAT csv)
    """, buffer)
    conn.commit()

# 执行
insert_batch(10000)
insert_copy(100000)

cur.close()
conn.close()

6.2 Java示例 #

java
import java.sql.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class TimescaleDBInsert {
    
    public static void main(String[] args) throws SQLException {
        String url = "jdbc:postgresql://localhost:5432/tsdb";
        Connection conn = DriverManager.getConnection(url, "postgres", "password");
        
        // 批量插入
        insertBatch(conn, 10000);
        
        conn.close();
    }
    
    public static void insertBatch(Connection conn, int batchSize) throws SQLException {
        String sql = "INSERT INTO sensor_data (time, sensor_id, temperature, humidity) VALUES (?, ?, ?, ?)";
        PreparedStatement pstmt = conn.prepareStatement(sql);
        
        Random random = new Random();
        LocalDateTime now = LocalDateTime.now();
        
        for (int i = 0; i < batchSize; i++) {
            pstmt.setTimestamp(1, Timestamp.valueOf(now.minusHours(i)));
            pstmt.setInt(2, random.nextInt(10) + 1);
            pstmt.setDouble(3, 20 + random.nextDouble() * 10);
            pstmt.setDouble(4, 50 + random.nextDouble() * 20);
            pstmt.addBatch();
            
            if (i % 1000 == 0) {
                pstmt.executeBatch();
                conn.commit();
            }
        }
        
        pstmt.executeBatch();
        conn.commit();
        pstmt.close();
    }
}

七、插入性能优化 #

7.1 批量大小优化 #

text
批量大小建议:

数据行大小        推荐批量大小
─────────────────────────────
小(<100字节)    5000-10000行
中(100-500字节) 1000-5000行
大(>500字节)    500-1000行

测试方法
├── 测试不同批量大小
├── 监控插入速度
├── 监控内存使用
└── 选择最佳大小

7.2 索引优化 #

sql
-- 查看索引
\di sensor_data

-- 大批量导入前临时删除索引
DROP INDEX idx_sensor_id;

-- 导入数据
COPY sensor_data FROM '/path/to/data.csv' WITH (FORMAT csv);

-- 重新创建索引
CREATE INDEX idx_sensor_id ON sensor_data (sensor_id);

-- 更新统计信息
ANALYZE sensor_data;

7.3 并发写入优化 #

sql
-- 创建空间分区
SELECT create_hypertable(
    'sensor_data',
    'time',
    partitioning_column => 'sensor_id',
    number_partitions => 4
);

-- 多连接并发写入
-- 每个连接写入不同sensor_id的数据
-- 减少锁竞争

八、插入监控 #

8.1 查看插入统计 #

sql
-- 查看表统计
SELECT 
    schemaname,
    tablename,
    n_tup_ins as inserts,
    n_tup_upd as updates,
    n_tup_del as deletes
FROM pg_stat_user_tables
WHERE tablename = 'sensor_data';

-- 查看分片写入分布
SELECT 
    chunk_name,
    pg_size_pretty(total_bytes) as size
FROM chunk_relation_size('sensor_data')
ORDER BY total_bytes DESC
LIMIT 10;

8.2 监控插入性能 #

sql
-- 测试插入性能
EXPLAIN ANALYZE
INSERT INTO sensor_data (time, sensor_id, temperature, humidity)
SELECT 
    NOW() - (random() * INTERVAL '30 days'),
    (random() * 10)::INTEGER + 1,
    20 + random() * 10,
    50 + random() * 20
FROM generate_series(1, 10000);

-- 查看写入速率
SELECT 
    hypertable_name,
    chunk_name,
    range_start,
    range_end
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data'
ORDER BY range_start DESC
LIMIT 5;

九、常见问题 #

9.1 插入性能慢 #

sql
-- 检查索引数量
SELECT count(*) FROM pg_indexes
WHERE tablename = 'sensor_data';

-- 解决方案
-- 1. 减少索引数量
-- 2. 使用批量插入
-- 3. 使用COPY命令
-- 4. 调整分区间隔

9.2 分片不均匀 #

sql
-- 检查分片大小
SELECT 
    chunk_name,
    pg_size_pretty(total_bytes) as size
FROM chunk_relation_size('sensor_data')
ORDER BY total_bytes DESC;

-- 解决方案
-- 1. 调整分区间隔
-- 2. 使用空间分区
-- 3. 合并小分片

9.3 内存不足 #

sql
-- 大批量导入时分批处理
-- 每批不超过10MB

-- 调整work_mem
SET work_mem = '256MB';

-- 调整maintenance_work_mem
SET maintenance_work_mem = '1GB';

十、总结 #

插入操作要点:

方法 适用场景 性能
单行INSERT 少量数据
多行INSERT 中量数据
COPY 大量数据
INSERT SELECT 数据迁移

最佳实践:

  1. 批量插入:使用多行INSERT或COPY
  2. 索引优化:大批量导入前临时删除索引
  3. 分片配置:合理配置分区间隔和空间分区
  4. 监控调优:监控插入性能,持续优化

下一步,让我们学习数据查询操作!

最后更新:2026-03-27