连接池管理 #
数据库连接池是Web应用性能的关键因素。本节将介绍如何配置和优化Rocket中的数据库连接池。
连接池概述 #
为什么需要连接池 #
- 建立数据库连接开销大
- 频繁创建销毁连接影响性能
- 连接池复用连接,提高效率
连接池工作原理 #
text
请求 → 从池中获取连接 → 执行查询 → 归还连接到池
配置连接池 #
rocket_db_pools配置 #
toml
[default.databases.my_db]
url = "postgres://user:password@localhost/mydb"
pool_size = 10
timeout = 5
rocket_sync_db_pools配置 #
toml
[default.databases.my_db]
url = "postgres://user:password@localhost/mydb"
pool_size = 10
timeout = 5
配置参数 #
pool_size #
连接池大小,决定最大并发连接数。
toml
[default.databases.my_db]
pool_size = 20
计算公式:
text
pool_size = CPU核心数 * 2 + 磁盘数
timeout #
获取连接的超时时间(秒)。
toml
[default.databases.my_db]
timeout = 10
完整配置示例 #
toml
[default.databases.postgres]
url = "postgres://user:password@localhost/mydb"
pool_size = 20
timeout = 5
[debug.databases.postgres]
url = "postgres://user:password@localhost/mydb_dev"
pool_size = 5
timeout = 10
[release.databases.postgres]
url = "postgres://user:password@prod-db/mydb"
pool_size = 50
timeout = 3
连接池监控 #
自定义监控 #
rust
use rocket_db_pools::Connection;
use std::time::Instant;
#[get("/users")]
async fn list_users(mut db: Connection<Db>) -> Json<Vec<User>> {
let start = Instant::now();
let users = sqlx::query_as!(User, "SELECT * FROM users")
.fetch_all(&mut **db)
.await
.unwrap_or_default();
let duration = start.elapsed();
println!("Query took: {:?}", duration);
Json(users)
}
Fairing监控 #
rust
use rocket::{Request, Data, Response};
use rocket::fairing::{Fairing, Info, Kind};
use std::sync::atomic::{AtomicU64, Ordering};
pub struct DbMetrics {
active_connections: AtomicU64,
total_queries: AtomicU64,
}
impl DbMetrics {
pub fn new() -> Self {
Self {
active_connections: AtomicU64::new(0),
total_queries: AtomicU64::new(0),
}
}
}
#[rocket::async_trait]
impl Fairing for DbMetrics {
fn info(&self) -> Info {
Info {
name: "Database Metrics",
kind: Kind::Request | Kind::Response,
}
}
async fn on_request(&self, _request: &mut Request<'_>, _data: &mut Data<'_>) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
}
async fn on_response<'r>(&self, _request: &'r Request<'_>, _response: &mut Response<'r>) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
}
连接池优化 #
根据负载调整 #
| 场景 | pool_size | timeout |
|---|---|---|
| 开发环境 | 5 | 10 |
| 小型应用 | 10 | 5 |
| 中型应用 | 20 | 5 |
| 大型应用 | 50+ | 3 |
性能调优建议 #
-
监控连接使用率
- 如果经常超时,增加pool_size
- 如果连接闲置,减少pool_size
-
设置合理的timeout
- 太短:正常请求超时
- 太长:请求堆积
-
数据库服务器配置
- 确保数据库支持足够的连接数
- PostgreSQL:
max_connections
错误处理 #
连接超时 #
rust
use rocket::http::Status;
#[get("/users")]
async fn list_users(db: Result<Connection<Db>, rocket_db_pools::Error>) -> Result<Json<Vec<User>>, Status> {
let mut db = db.map_err(|_| Status::ServiceUnavailable)?;
let users = sqlx::query_as!(User, "SELECT * FROM users")
.fetch_all(&mut **db)
.await
.map_err(|_| Status::InternalServerError)?;
Ok(Json(users))
}
重连机制 #
rust
use rocket_db_pools::Connection;
async fn with_retry<F, T>(db: &mut Connection<Db>, f: F) -> Result<T, Status>
where
F: Fn(&mut Connection<Db>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, sqlx::Error>> + Send>> + Send,
{
let mut retries = 3;
loop {
match f(db).await {
Ok(result) => return Ok(result),
Err(e) if retries > 0 => {
retries -= 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Err(_) => return Err(Status::InternalServerError),
}
}
}
多数据库配置 #
配置多个数据库 #
toml
[default.databases.primary]
url = "postgres://user:password@primary-db/mydb"
pool_size = 20
[default.databases.replica]
url = "postgres://user:password@replica-db/mydb"
pool_size = 10
[default.databases.cache]
url = "redis://redis-server:6379"
pool_size = 5
使用多个数据库 #
rust
#[derive(Database)]
#[database("primary")]
struct PrimaryDb(PgPool);
#[derive(Database)]
#[database("replica")]
struct ReplicaDb(PgPool);
#[get("/users")]
async fn list_users(
primary: Connection<PrimaryDb>,
replica: Connection<ReplicaDb>,
) -> Json<Vec<User>> {
// 读操作使用replica
// 写操作使用primary
Json(vec![])
}
连接池最佳实践 #
1. 合理设置池大小 #
rust
fn calculate_pool_size() -> u32 {
let cpu_count = num_cpus::get();
(cpu_count * 2) as u32
}
2. 监控连接状态 #
rust
#[get("/health/db")]
async fn db_health(mut db: Connection<Db>) -> &'static str {
match sqlx::query("SELECT 1").fetch_one(&mut **db).await {
Ok(_) => "Database is healthy",
Err(_) => "Database connection failed",
}
}
3. 优雅关闭 #
rust
use rocket::Shutdown;
#[get("/shutdown")]
async fn shutdown_trigger(shutdown: Shutdown) {
shutdown.notify();
}
完整示例 #
rust
#[macro_use] extern crate rocket;
use rocket::serde::json::Json;
use rocket_db_pools::{Database, Connection, sqlx};
use sqlx::PgPool;
#[derive(Database)]
#[database("postgres")]
struct Db(PgPool);
#[derive(sqlx::FromRow, serde::Serialize)]
struct User {
id: i32,
name: String,
}
#[get("/health")]
async fn health(mut db: Connection<Db>) -> &'static str {
sqlx::query("SELECT 1")
.fetch_one(&mut **db)
.await
.map(|_| "OK")
.unwrap_or("Error")
}
#[get("/users")]
async fn list_users(mut db: Connection<Db>) -> Json<Vec<User>> {
let users = sqlx::query_as!(User, "SELECT id, name FROM users")
.fetch_all(&mut **db)
.await
.unwrap_or_default();
Json(users)
}
#[launch]
fn rocket() -> _ {
rocket::build()
.attach(Db::init())
.mount("/api", routes![health, list_users])
}
下一步 #
掌握了连接池管理后,让我们继续学习 Cookie与Session,了解Rocket中的会话管理。
最后更新:2026-03-28