SQLx集成 #

SQLx是一个纯Rust编写的SQL工具包,支持编译时查询检查和原生异步操作。本节将介绍如何在Rocket中使用SQLx。

安装配置 #

添加依赖 #

toml
[dependencies]
rocket = { version = "0.5", features = ["json"] }
rocket_db_pools = { version = "0.1", features = ["sqlx_postgres"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }
serde = { version = "1.0", features = ["derive"] }

配置数据库 #

toml
[default.databases.postgres]
url = "postgres://user:password@localhost/mydb"

数据库连接 #

配置连接池 #

rust
use rocket_db_pools::{Database, Connection};
use sqlx::PgPool;

#[derive(Database)]
#[database("postgres")]
struct Db(PgPool);

#[launch]
fn rocket() -> _ {
    rocket::build()
        .attach(Db::init())
        .mount("/api", routes![list_users])
}

模型定义 #

rust
use serde::{Serialize, Deserialize};
use sqlx::FromRow;

#[derive(Debug, FromRow, Serialize)]
pub struct User {
    pub id: i32,
    pub username: String,
    pub email: String,
    pub created_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, Deserialize)]
pub struct NewUser {
    pub username: String,
    pub email: String,
    pub password: String,
}

CRUD操作 #

查询所有 #

rust
use rocket_db_pools::Connection;

#[get("/users")]
async fn list_users(mut db: Connection<Db>) -> Json<Vec<User>> {
    let users = sqlx::query_as!(
        User,
        "SELECT id, username, email, created_at FROM users ORDER BY created_at DESC"
    )
    .fetch_all(&mut **db)
    .await
    .unwrap_or_default();
    
    Json(users)
}

查询单个 #

rust
#[get("/users/<user_id>")]
async fn get_user(mut db: Connection<Db>, user_id: i32) -> Option<Json<User>> {
    sqlx::query_as!(
        User,
        "SELECT id, username, email, created_at FROM users WHERE id = $1",
        user_id
    )
    .fetch_optional(&mut **db)
    .await
    .ok()
    .flatten()
    .map(Json)
}

创建 #

rust
#[post("/users", format = "json", data = "<new_user>")]
async fn create_user(
    mut db: Connection<Db>,
    new_user: Json<NewUser>,
) -> Result<Json<User>, Status> {
    let user = sqlx::query_as!(
        User,
        r#"
        INSERT INTO users (username, email, password_hash)
        VALUES ($1, $2, $3)
        RETURNING id, username, email, created_at
        "#,
        new_user.username,
        new_user.email,
        hash_password(&new_user.password),
    )
    .fetch_one(&mut **db)
    .await
    .map_err(|_| Status::InternalServerError)?;
    
    Ok(Json(user))
}

更新 #

rust
#[derive(Deserialize)]
struct UpdateUser {
    username: Option<String>,
    email: Option<String>,
}

#[put("/users/<user_id>", format = "json", data = "<update>")]
async fn update_user(
    mut db: Connection<Db>,
    user_id: i32,
    update: Json<UpdateUser>,
) -> Option<Json<User>> {
    sqlx::query_as!(
        User,
        r#"
        UPDATE users
        SET username = COALESCE($1, username),
            email = COALESCE($2, email),
            updated_at = NOW()
        WHERE id = $3
        RETURNING id, username, email, created_at
        "#,
        update.username,
        update.email,
        user_id,
    )
    .fetch_optional(&mut **db)
    .await
    .ok()
    .flatten()
    .map(Json)
}

删除 #

rust
#[delete("/users/<user_id>")]
async fn delete_user(mut db: Connection<Db>, user_id: i32) -> Status {
    let result = sqlx::query!(
        "DELETE FROM users WHERE id = $1",
        user_id
    )
    .execute(&mut **db)
    .await;
    
    match result {
        Ok(r) if r.rows_affected() > 0 => Status::NoContent,
        _ => Status::NotFound,
    }
}

复杂查询 #

条件查询 #

rust
#[get("/users/search?<q>")]
async fn search_users(mut db: Connection<Db>, q: &str) -> Json<Vec<User>> {
    let pattern = format!("%{}%", q);
    
    let users = sqlx::query_as!(
        User,
        r#"
        SELECT id, username, email, created_at
        FROM users
        WHERE username ILIKE $1 OR email ILIKE $1
        ORDER BY created_at DESC
        "#,
        pattern,
    )
    .fetch_all(&mut **db)
    .await
    .unwrap_or_default();
    
    Json(users)
}

分页查询 #

rust
#[derive(Serialize)]
struct PaginatedResponse<T> {
    data: Vec<T>,
    total: i64,
    page: i64,
    per_page: i64,
}

#[get("/users?<page>&<per_page>")]
async fn list_users_paginated(
    mut db: Connection<Db>,
    page: Option<i64>,
    per_page: Option<i64>,
) -> Json<PaginatedResponse<User>> {
    let page = page.unwrap_or(1);
    let per_page = per_page.unwrap_or(10);
    let offset = (page - 1) * per_page;
    
    let users = sqlx::query_as!(
        User,
        r#"
        SELECT id, username, email, created_at
        FROM users
        ORDER BY created_at DESC
        LIMIT $1 OFFSET $2
        "#,
        per_page,
        offset,
    )
    .fetch_all(&mut **db)
    .await
    .unwrap_or_default();
    
    let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
        .fetch_one(&mut **db)
        .await
        .unwrap_or(0);
    
    Json(PaginatedResponse {
        data: users,
        total,
        page,
        per_page,
    })
}

事务处理 #

手动事务 #

rust
use sqlx::postgres::PgPool;

async fn transfer_money(
    pool: &PgPool,
    from_id: i32,
    to_id: i32,
    amount: f64,
) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;
    
    sqlx::query!(
        "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
        amount,
        from_id
    )
    .execute(&mut *tx)
    .await?;
    
    sqlx::query!(
        "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
        amount,
        to_id
    )
    .execute(&mut *tx)
    .await?;
    
    tx.commit().await
}

编译时检查 #

设置离线模式 #

toml
[dependencies]
sqlx = { version = "0.7", features = ["offline"] }

生成查询元数据 #

bash
cargo sqlx prepare

这会生成 .sqlx 目录,包含查询的编译时检查信息。

完整示例 #

rust
#[macro_use] extern crate rocket;

use rocket::serde::json::Json;
use rocket::serde::{Serialize, Deserialize};
use rocket_db_pools::{Database, Connection};
use sqlx::{PgPool, FromRow};

#[derive(Database)]
#[database("postgres")]
struct Db(PgPool);

#[derive(Debug, FromRow, Serialize)]
struct User {
    id: i32,
    username: String,
    email: String,
    created_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, Deserialize)]
struct NewUser {
    username: String,
    email: String,
}

#[get("/")]
fn index() -> &'static str {
    "API Running"
}

#[get("/users")]
async fn list_users(mut db: Connection<Db>) -> Json<Vec<User>> {
    let users = sqlx::query_as!(
        User,
        "SELECT id, username, email, created_at FROM users ORDER BY id"
    )
    .fetch_all(&mut **db)
    .await
    .unwrap_or_default();
    
    Json(users)
}

#[get("/users/<id>")]
async fn get_user(mut db: Connection<Db>, id: i32) -> Option<Json<User>> {
    sqlx::query_as!(
        User,
        "SELECT id, username, email, created_at FROM users WHERE id = $1",
        id
    )
    .fetch_optional(&mut **db)
    .await
    .ok()
    .flatten()
    .map(Json)
}

#[post("/users", format = "json", data = "<user>")]
async fn create_user(mut db: Connection<Db>, user: Json<NewUser>) -> Json<User> {
    let created = sqlx::query_as!(
        User,
        r#"
        INSERT INTO users (username, email, password_hash)
        VALUES ($1, $2, 'hashed')
        RETURNING id, username, email, created_at
        "#,
        user.username,
        user.email
    )
    .fetch_one(&mut **db)
    .await
    .unwrap();
    
    Json(created)
}

#[delete("/users/<id>")]
async fn delete_user(mut db: Connection<Db>, id: i32) -> &'static str {
    sqlx::query!("DELETE FROM users WHERE id = $1", id)
        .execute(&mut **db)
        .await
        .ok();
    
    "Deleted"
}

#[launch]
fn rocket() -> _ {
    rocket::build()
        .attach(Db::init())
        .mount("/api", routes![index, list_users, get_user, create_user, delete_user])
}

下一步 #

掌握了SQLx集成后,让我们继续学习 连接池管理,了解如何优化数据库连接。

最后更新:2026-03-28