连接池管理 #

数据库连接池是Web应用性能的关键因素。本节将介绍如何配置和优化Rocket中的数据库连接池。

连接池概述 #

为什么需要连接池 #

  • 建立数据库连接开销大
  • 频繁创建销毁连接影响性能
  • 连接池复用连接,提高效率

连接池工作原理 #

text
请求 → 从池中获取连接 → 执行查询 → 归还连接到池

配置连接池 #

rocket_db_pools配置 #

toml
[default.databases.my_db]
url = "postgres://user:password@localhost/mydb"
pool_size = 10
timeout = 5

rocket_sync_db_pools配置 #

toml
[default.databases.my_db]
url = "postgres://user:password@localhost/mydb"
pool_size = 10
timeout = 5

配置参数 #

pool_size #

连接池大小,决定最大并发连接数。

toml
[default.databases.my_db]
pool_size = 20

计算公式:

text
pool_size = CPU核心数 * 2 + 磁盘数

timeout #

获取连接的超时时间(秒)。

toml
[default.databases.my_db]
timeout = 10

完整配置示例 #

toml
[default.databases.postgres]
url = "postgres://user:password@localhost/mydb"
pool_size = 20
timeout = 5

[debug.databases.postgres]
url = "postgres://user:password@localhost/mydb_dev"
pool_size = 5
timeout = 10

[release.databases.postgres]
url = "postgres://user:password@prod-db/mydb"
pool_size = 50
timeout = 3

连接池监控 #

自定义监控 #

rust
use rocket_db_pools::Connection;
use std::time::Instant;

#[get("/users")]
async fn list_users(mut db: Connection<Db>) -> Json<Vec<User>> {
    let start = Instant::now();
    
    let users = sqlx::query_as!(User, "SELECT * FROM users")
        .fetch_all(&mut **db)
        .await
        .unwrap_or_default();
    
    let duration = start.elapsed();
    println!("Query took: {:?}", duration);
    
    Json(users)
}

Fairing监控 #

rust
use rocket::{Request, Data, Response};
use rocket::fairing::{Fairing, Info, Kind};
use std::sync::atomic::{AtomicU64, Ordering};

pub struct DbMetrics {
    active_connections: AtomicU64,
    total_queries: AtomicU64,
}

impl DbMetrics {
    pub fn new() -> Self {
        Self {
            active_connections: AtomicU64::new(0),
            total_queries: AtomicU64::new(0),
        }
    }
}

#[rocket::async_trait]
impl Fairing for DbMetrics {
    fn info(&self) -> Info {
        Info {
            name: "Database Metrics",
            kind: Kind::Request | Kind::Response,
        }
    }

    async fn on_request(&self, _request: &mut Request<'_>, _data: &mut Data<'_>) {
        self.active_connections.fetch_add(1, Ordering::Relaxed);
    }

    async fn on_response<'r>(&self, _request: &'r Request<'_>, _response: &mut Response<'r>) {
        self.active_connections.fetch_sub(1, Ordering::Relaxed);
    }
}

连接池优化 #

根据负载调整 #

场景 pool_size timeout
开发环境 5 10
小型应用 10 5
中型应用 20 5
大型应用 50+ 3

性能调优建议 #

  1. 监控连接使用率

    • 如果经常超时,增加pool_size
    • 如果连接闲置,减少pool_size
  2. 设置合理的timeout

    • 太短:正常请求超时
    • 太长:请求堆积
  3. 数据库服务器配置

    • 确保数据库支持足够的连接数
    • PostgreSQL: max_connections

错误处理 #

连接超时 #

rust
use rocket::http::Status;

#[get("/users")]
async fn list_users(db: Result<Connection<Db>, rocket_db_pools::Error>) -> Result<Json<Vec<User>>, Status> {
    let mut db = db.map_err(|_| Status::ServiceUnavailable)?;
    
    let users = sqlx::query_as!(User, "SELECT * FROM users")
        .fetch_all(&mut **db)
        .await
        .map_err(|_| Status::InternalServerError)?;
    
    Ok(Json(users))
}

重连机制 #

rust
use rocket_db_pools::Connection;

async fn with_retry<F, T>(db: &mut Connection<Db>, f: F) -> Result<T, Status>
where
    F: Fn(&mut Connection<Db>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, sqlx::Error>> + Send>> + Send,
{
    let mut retries = 3;
    
    loop {
        match f(db).await {
            Ok(result) => return Ok(result),
            Err(e) if retries > 0 => {
                retries -= 1;
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            }
            Err(_) => return Err(Status::InternalServerError),
        }
    }
}

多数据库配置 #

配置多个数据库 #

toml
[default.databases.primary]
url = "postgres://user:password@primary-db/mydb"
pool_size = 20

[default.databases.replica]
url = "postgres://user:password@replica-db/mydb"
pool_size = 10

[default.databases.cache]
url = "redis://redis-server:6379"
pool_size = 5

使用多个数据库 #

rust
#[derive(Database)]
#[database("primary")]
struct PrimaryDb(PgPool);

#[derive(Database)]
#[database("replica")]
struct ReplicaDb(PgPool);

#[get("/users")]
async fn list_users(
    primary: Connection<PrimaryDb>,
    replica: Connection<ReplicaDb>,
) -> Json<Vec<User>> {
    // 读操作使用replica
    // 写操作使用primary
    Json(vec![])
}

连接池最佳实践 #

1. 合理设置池大小 #

rust
fn calculate_pool_size() -> u32 {
    let cpu_count = num_cpus::get();
    (cpu_count * 2) as u32
}

2. 监控连接状态 #

rust
#[get("/health/db")]
async fn db_health(mut db: Connection<Db>) -> &'static str {
    match sqlx::query("SELECT 1").fetch_one(&mut **db).await {
        Ok(_) => "Database is healthy",
        Err(_) => "Database connection failed",
    }
}

3. 优雅关闭 #

rust
use rocket::Shutdown;

#[get("/shutdown")]
async fn shutdown_trigger(shutdown: Shutdown) {
    shutdown.notify();
}

完整示例 #

rust
#[macro_use] extern crate rocket;

use rocket::serde::json::Json;
use rocket_db_pools::{Database, Connection, sqlx};
use sqlx::PgPool;

#[derive(Database)]
#[database("postgres")]
struct Db(PgPool);

#[derive(sqlx::FromRow, serde::Serialize)]
struct User {
    id: i32,
    name: String,
}

#[get("/health")]
async fn health(mut db: Connection<Db>) -> &'static str {
    sqlx::query("SELECT 1")
        .fetch_one(&mut **db)
        .await
        .map(|_| "OK")
        .unwrap_or("Error")
}

#[get("/users")]
async fn list_users(mut db: Connection<Db>) -> Json<Vec<User>> {
    let users = sqlx::query_as!(User, "SELECT id, name FROM users")
        .fetch_all(&mut **db)
        .await
        .unwrap_or_default();
    
    Json(users)
}

#[launch]
fn rocket() -> _ {
    rocket::build()
        .attach(Db::init())
        .mount("/api", routes![health, list_users])
}

下一步 #

掌握了连接池管理后,让我们继续学习 Cookie与Session,了解Rocket中的会话管理。

最后更新:2026-03-28