数据库连接 #
连接池概述 #
数据库连接池是管理数据库连接的组件,它可以复用连接,减少连接创建开销。
text
┌─────────────────────────────────────────────────────────────┐
│ 连接池架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 应用程序 │ │
│ └───────────────────────┬─────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 连接池 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │Conn1│ │Conn2│ │Conn3│ │Conn4│ │Conn5│ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └───────────────────────┬─────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 数据库 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
SQLx 连接池 #
添加依赖 #
toml
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }
创建连接池 #
rust
use sqlx::postgres::PgPoolOptions;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&database_url)
.await
.expect("Failed to create pool");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(pool.clone()))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
连接池配置 #
rust
let pool = PgPoolOptions::new()
.max_connections(10) // 最大连接数
.min_connections(2) // 最小连接数
.acquire_timeout(Duration::from_secs(30)) // 获取连接超时
.idle_timeout(Duration::from_secs(600)) // 空闲超时
.max_lifetime(Duration::from_secs(3600)) // 最大生命周期
.connect(&database_url)
.await?;
在处理函数中使用 #
基本查询 #
rust
use sqlx::PgPool;
#[actix_web::get("/users")]
async fn list_users(pool: web::Data<PgPool>) -> impl Responder {
let users = sqlx::query!(
"SELECT id, name, email FROM users ORDER BY id"
)
.fetch_all(pool.get_ref())
.await;
match users {
Ok(users) => HttpResponse::Ok().json(users),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
参数化查询 #
rust
#[actix_web::get("/users/{id}")]
async fn get_user(
path: web::Path<i32>,
pool: web::Data<PgPool>,
) -> impl Responder {
let user = sqlx::query!(
"SELECT id, name, email FROM users WHERE id = $1",
path.into_inner()
)
.fetch_optional(pool.get_ref())
.await;
match user {
Ok(Some(user)) => HttpResponse::Ok().json(user),
Ok(None) => HttpResponse::NotFound()
.json(serde_json::json!({ "error": "User not found" })),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
插入数据 #
rust
#[derive(Deserialize)]
struct CreateUser {
name: String,
email: String,
}
#[actix_web::post("/users")]
async fn create_user(
body: web::Json<CreateUser>,
pool: web::Data<PgPool>,
) -> impl Responder {
let result = sqlx::query!(
r#"
INSERT INTO users (name, email, created_at)
VALUES ($1, $2, NOW())
RETURNING id, name, email
"#,
body.name,
body.email
)
.fetch_one(pool.get_ref())
.await;
match result {
Ok(user) => HttpResponse::Created().json(user),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
事务处理 #
基本事务 #
rust
use sqlx::postgres::PgConnection;
async fn transfer_funds(
pool: web::Data<PgPool>,
from_id: i32,
to_id: i32,
amount: i64,
) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;
sqlx::query!(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount,
from_id
)
.execute(&mut *tx)
.await?;
sqlx::query!(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount,
to_id
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
在处理函数中使用事务 #
rust
#[actix_web::post("/transfer")]
async fn transfer(
body: web::Json<TransferRequest>,
pool: web::Data<PgPool>,
) -> impl Responder {
match transfer_funds(pool, body.from_id, body.to_id, body.amount).await {
Ok(_) => HttpResponse::Ok().json(serde_json::json!({ "success": true })),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
完整示例 #
rust
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, postgres::PgPoolOptions};
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
struct User {
id: i32,
name: String,
email: String,
}
#[derive(Deserialize)]
struct CreateUser {
name: String,
email: String,
}
#[derive(Deserialize)]
struct UpdateUser {
name: Option<String>,
email: Option<String>,
}
#[actix_web::get("/users")]
async fn list_users(pool: web::Data<PgPool>) -> impl Responder {
match sqlx::query_as!(User, "SELECT id, name, email FROM users ORDER BY id")
.fetch_all(pool.get_ref())
.await
{
Ok(users) => HttpResponse::Ok().json(users),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
#[actix_web::get("/users/{id}")]
async fn get_user(path: web::Path<i32>, pool: web::Data<PgPool>) -> impl Responder {
match sqlx::query_as!(User, "SELECT id, name, email FROM users WHERE id = $1", path.into_inner())
.fetch_optional(pool.get_ref())
.await
{
Ok(Some(user)) => HttpResponse::Ok().json(user),
Ok(None) => HttpResponse::NotFound()
.json(serde_json::json!({ "error": "User not found" })),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
#[actix_web::post("/users")]
async fn create_user(body: web::Json<CreateUser>, pool: web::Data<PgPool>) -> impl Responder {
match sqlx::query_as!(
User,
r#"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email"#,
body.name,
body.email
)
.fetch_one(pool.get_ref())
.await
{
Ok(user) => HttpResponse::Created().json(user),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
#[actix_web::put("/users/{id}")]
async fn update_user(
path: web::Path<i32>,
body: web::Json<UpdateUser>,
pool: web::Data<PgPool>,
) -> impl Responder {
let id = path.into_inner();
match sqlx::query_as!(
User,
r#"
UPDATE users
SET name = COALESCE($1, name),
email = COALESCE($2, email)
WHERE id = $3
RETURNING id, name, email
"#,
body.name,
body.email,
id
)
.fetch_optional(pool.get_ref())
.await
{
Ok(Some(user)) => HttpResponse::Ok().json(user),
Ok(None) => HttpResponse::NotFound()
.json(serde_json::json!({ "error": "User not found" })),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
#[actix_web::delete("/users/{id}")]
async fn delete_user(path: web::Path<i32>, pool: web::Data<PgPool>) -> impl Responder {
match sqlx::query!("DELETE FROM users WHERE id = $1", path.into_inner())
.execute(pool.get_ref())
.await
{
Ok(result) if result.rows_affected() > 0 => {
HttpResponse::NoContent().finish()
}
Ok(_) => HttpResponse::NotFound()
.json(serde_json::json!({ "error": "User not found" })),
Err(e) => HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": e.to_string() })),
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&database_url)
.await
.expect("Failed to create pool");
println!("Server running at http://127.0.0.1:8080");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(pool.clone()))
.service(list_users)
.service(get_user)
.service(create_user)
.service(update_user)
.service(delete_user)
})
.bind("127.0.0.1:8080")?
.run()
.await
}
下一步 #
现在你已经掌握了数据库连接,继续学习 数据库概述,深入了解数据库集成!
最后更新:2026-03-29