数据库连接 #

连接池概述 #

数据库连接池是管理数据库连接的组件,它可以复用连接,减少连接创建开销。

text
┌─────────────────────────────────────────────────────────────┐
│                      连接池架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              应用程序                                 │   │
│  └───────────────────────┬─────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              连接池                                   │   │
│  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐           │   │
│  │  │Conn1│ │Conn2│ │Conn3│ │Conn4│ │Conn5│           │   │
│  │  └─────┘ └─────┘ └─────┘ └─────┘ └─────┘           │   │
│  └───────────────────────┬─────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              数据库                                   │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

SQLx 连接池 #

添加依赖 #

toml
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }

创建连接池 #

rust
use sqlx::postgres::PgPoolOptions;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");
    
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect(&database_url)
        .await
        .expect("Failed to create pool");
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(pool.clone()))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

连接池配置 #

rust
let pool = PgPoolOptions::new()
    .max_connections(10)           // 最大连接数
    .min_connections(2)            // 最小连接数
    .acquire_timeout(Duration::from_secs(30))  // 获取连接超时
    .idle_timeout(Duration::from_secs(600))    // 空闲超时
    .max_lifetime(Duration::from_secs(3600))   // 最大生命周期
    .connect(&database_url)
    .await?;

在处理函数中使用 #

基本查询 #

rust
use sqlx::PgPool;

#[actix_web::get("/users")]
async fn list_users(pool: web::Data<PgPool>) -> impl Responder {
    let users = sqlx::query!(
        "SELECT id, name, email FROM users ORDER BY id"
    )
    .fetch_all(pool.get_ref())
    .await;
    
    match users {
        Ok(users) => HttpResponse::Ok().json(users),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

参数化查询 #

rust
#[actix_web::get("/users/{id}")]
async fn get_user(
    path: web::Path<i32>,
    pool: web::Data<PgPool>,
) -> impl Responder {
    let user = sqlx::query!(
        "SELECT id, name, email FROM users WHERE id = $1",
        path.into_inner()
    )
    .fetch_optional(pool.get_ref())
    .await;
    
    match user {
        Ok(Some(user)) => HttpResponse::Ok().json(user),
        Ok(None) => HttpResponse::NotFound()
            .json(serde_json::json!({ "error": "User not found" })),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

插入数据 #

rust
#[derive(Deserialize)]
struct CreateUser {
    name: String,
    email: String,
}

#[actix_web::post("/users")]
async fn create_user(
    body: web::Json<CreateUser>,
    pool: web::Data<PgPool>,
) -> impl Responder {
    let result = sqlx::query!(
        r#"
        INSERT INTO users (name, email, created_at)
        VALUES ($1, $2, NOW())
        RETURNING id, name, email
        "#,
        body.name,
        body.email
    )
    .fetch_one(pool.get_ref())
    .await;
    
    match result {
        Ok(user) => HttpResponse::Created().json(user),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

事务处理 #

基本事务 #

rust
use sqlx::postgres::PgConnection;

async fn transfer_funds(
    pool: web::Data<PgPool>,
    from_id: i32,
    to_id: i32,
    amount: i64,
) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;
    
    sqlx::query!(
        "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
        amount,
        from_id
    )
    .execute(&mut *tx)
    .await?;
    
    sqlx::query!(
        "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
        amount,
        to_id
    )
    .execute(&mut *tx)
    .await?;
    
    tx.commit().await?;
    
    Ok(())
}

在处理函数中使用事务 #

rust
#[actix_web::post("/transfer")]
async fn transfer(
    body: web::Json<TransferRequest>,
    pool: web::Data<PgPool>,
) -> impl Responder {
    match transfer_funds(pool, body.from_id, body.to_id, body.amount).await {
        Ok(_) => HttpResponse::Ok().json(serde_json::json!({ "success": true })),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

完整示例 #

rust
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, postgres::PgPoolOptions};

#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
struct User {
    id: i32,
    name: String,
    email: String,
}

#[derive(Deserialize)]
struct CreateUser {
    name: String,
    email: String,
}

#[derive(Deserialize)]
struct UpdateUser {
    name: Option<String>,
    email: Option<String>,
}

#[actix_web::get("/users")]
async fn list_users(pool: web::Data<PgPool>) -> impl Responder {
    match sqlx::query_as!(User, "SELECT id, name, email FROM users ORDER BY id")
        .fetch_all(pool.get_ref())
        .await
    {
        Ok(users) => HttpResponse::Ok().json(users),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

#[actix_web::get("/users/{id}")]
async fn get_user(path: web::Path<i32>, pool: web::Data<PgPool>) -> impl Responder {
    match sqlx::query_as!(User, "SELECT id, name, email FROM users WHERE id = $1", path.into_inner())
        .fetch_optional(pool.get_ref())
        .await
    {
        Ok(Some(user)) => HttpResponse::Ok().json(user),
        Ok(None) => HttpResponse::NotFound()
            .json(serde_json::json!({ "error": "User not found" })),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

#[actix_web::post("/users")]
async fn create_user(body: web::Json<CreateUser>, pool: web::Data<PgPool>) -> impl Responder {
    match sqlx::query_as!(
        User,
        r#"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email"#,
        body.name,
        body.email
    )
    .fetch_one(pool.get_ref())
    .await
    {
        Ok(user) => HttpResponse::Created().json(user),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

#[actix_web::put("/users/{id}")]
async fn update_user(
    path: web::Path<i32>,
    body: web::Json<UpdateUser>,
    pool: web::Data<PgPool>,
) -> impl Responder {
    let id = path.into_inner();
    
    match sqlx::query_as!(
        User,
        r#"
        UPDATE users
        SET name = COALESCE($1, name),
            email = COALESCE($2, email)
        WHERE id = $3
        RETURNING id, name, email
        "#,
        body.name,
        body.email,
        id
    )
    .fetch_optional(pool.get_ref())
    .await
    {
        Ok(Some(user)) => HttpResponse::Ok().json(user),
        Ok(None) => HttpResponse::NotFound()
            .json(serde_json::json!({ "error": "User not found" })),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

#[actix_web::delete("/users/{id}")]
async fn delete_user(path: web::Path<i32>, pool: web::Data<PgPool>) -> impl Responder {
    match sqlx::query!("DELETE FROM users WHERE id = $1", path.into_inner())
        .execute(pool.get_ref())
        .await
    {
        Ok(result) if result.rows_affected() > 0 => {
            HttpResponse::NoContent().finish()
        }
        Ok(_) => HttpResponse::NotFound()
            .json(serde_json::json!({ "error": "User not found" })),
        Err(e) => HttpResponse::InternalServerError()
            .json(serde_json::json!({ "error": e.to_string() })),
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");
    
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect(&database_url)
        .await
        .expect("Failed to create pool");
    
    println!("Server running at http://127.0.0.1:8080");
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(pool.clone()))
            .service(list_users)
            .service(get_user)
            .service(create_user)
            .service(update_user)
            .service(delete_user)
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

下一步 #

现在你已经掌握了数据库连接,继续学习 数据库概述,深入了解数据库集成!

最后更新:2026-03-29