数据共享 #

并发安全概述 #

在多线程环境中共享数据需要考虑线程安全问题。Rust 提供了多种并发安全机制:

类型 说明 适用场景
Arc<T> 原子引用计数 多线程共享只读数据
Mutex<T> 互斥锁 多线程共享可变数据
RwLock<T> 读写锁 读多写少场景
AtomicXxx 原子类型 简单计数器

Arc - 原子引用计数 #

基本用法 #

rust
use std::sync::Arc;
use actix_web::web;

pub struct Config {
    pub debug: bool,
    pub version: String,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let config = Arc::new(Config {
        debug: true,
        version: "1.0.0".to_string(),
    });
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(config.clone()))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

只读共享 #

rust
#[actix_web::get("/config")]
async fn get_config(config: web::Data<Arc<Config>>) -> impl Responder {
    web::Json(serde_json::json!({
        "debug": config.debug,
        "version": config.version
    }))
}

Mutex - 互斥锁 #

基本用法 #

rust
use std::sync::{Arc, Mutex};

pub struct AppState {
    pub counter: Mutex<u64>,
    pub users: Mutex<Vec<String>>,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let state = Arc::new(AppState {
        counter: Mutex::new(0),
        users: Mutex::new(Vec::new()),
    });
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(state.clone()))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

读写操作 #

rust
#[actix_web::get("/counter")]
async fn get_counter(state: web::Data<Arc<AppState>>) -> impl Responder {
    let counter = state.counter.lock().unwrap();
    web::Json(serde_json::json!({ "counter": *counter }))
}

#[actix_web::post("/counter/increment")]
async fn increment_counter(state: web::Data<Arc<AppState>>) -> impl Responder {
    let mut counter = state.counter.lock().unwrap();
    *counter += 1;
    web::Json(serde_json::json!({ "counter": *counter }))
}

避免死锁 #

rust
#[actix_web::post("/transfer")]
async fn transfer(state: web::Data<Arc<AppState>>) -> impl Responder {
    let mut users = state.users.lock().unwrap();
    users.push("new_user".to_string());
    drop(users);
    
    let mut counter = state.counter.lock().unwrap();
    *counter += 1;
    
    web::Json(serde_json::json!({ "success": true }))
}

RwLock - 读写锁 #

基本用法 #

rust
use std::sync::RwLock;

pub struct AppState {
    pub cache: RwLock<std::collections::HashMap<String, String>>,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let state = Arc::new(AppState {
        cache: RwLock::new(std::collections::HashMap::new()),
    });
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(state.clone()))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

读操作 #

rust
#[actix_web::get("/cache/{key}")]
async fn get_cache(
    path: web::Path<String>,
    state: web::Data<Arc<AppState>>,
) -> impl Responder {
    let key = path.into_inner();
    let cache = state.cache.read().unwrap();
    
    match cache.get(&key) {
        Some(value) => web::Json(serde_json::json!({ "value": value })),
        None => web::Json(serde_json::json!({ "error": "Not found" })),
    }
}

写操作 #

rust
#[actix_web::put("/cache/{key}")]
async fn set_cache(
    path: web::Path<String>,
    body: String,
    state: web::Data<Arc<AppState>>,
) -> impl Responder {
    let key = path.into_inner();
    let mut cache = state.cache.write().unwrap();
    cache.insert(key, body);
    
    web::Json(serde_json::json!({ "success": true }))
}

原子类型 #

基本用法 #

rust
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};

pub struct AppState {
    pub request_count: AtomicU64,
    pub is_healthy: AtomicBool,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let state = Arc::new(AppState {
        request_count: AtomicU64::new(0),
        is_healthy: AtomicBool::new(true),
    });
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(state.clone()))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

操作示例 #

rust
#[actix_web::get("/health")]
async fn health(state: web::Data<Arc<AppState>>) -> impl Responder {
    state.request_count.fetch_add(1, Ordering::SeqCst);
    
    if state.is_healthy.load(Ordering::SeqCst) {
        web::Json(serde_json::json!({ "status": "healthy" }))
    } else {
        web::Json(serde_json::json!({ "status": "unhealthy" }))
    }
}

#[actix_web::post("/health/toggle")]
async fn toggle_health(state: web::Data<Arc<AppState>>) -> impl Responder {
    let current = state.is_healthy.load(Ordering::SeqCst);
    state.is_healthy.store(!current, Ordering::SeqCst);
    
    web::Json(serde_json::json!({ "healthy": !current }))
}

组合使用 #

Arc + Mutex #

rust
pub struct AppState {
    pub data: Arc<Mutex<Vec<String>>>,
}

Arc + RwLock #

rust
pub struct AppState {
    pub cache: Arc<RwLock<std::collections::HashMap<String, String>>>,
}

完整示例 #

rust
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
    id: u64,
    name: String,
    email: String,
}

struct AppState {
    next_id: AtomicU64,
    users: RwLock<HashMap<u64, User>>,
}

impl AppState {
    fn new() -> Self {
        Self {
            next_id: AtomicU64::new(1),
            users: RwLock::new(HashMap::new()),
        }
    }
}

#[derive(Deserialize)]
struct CreateUser {
    name: String,
    email: String,
}

#[actix_web::get("/users")]
async fn list_users(state: web::Data<Arc<AppState>>) -> impl Responder {
    let users = state.users.read().unwrap();
    let users: Vec<&User> = users.values().collect();
    HttpResponse::Ok().json(users)
}

#[actix_web::get("/users/{id}")]
async fn get_user(
    path: web::Path<u64>,
    state: web::Data<Arc<AppState>>,
) -> impl Responder {
    let users = state.users.read().unwrap();
    
    match users.get(&path.into_inner()) {
        Some(user) => HttpResponse::Ok().json(user),
        None => HttpResponse::NotFound().json(serde_json::json!({
            "error": "User not found"
        })),
    }
}

#[actix_web::post("/users")]
async fn create_user(
    body: web::Json<CreateUser>,
    state: web::Data<Arc<AppState>>,
) -> impl Responder {
    let id = state.next_id.fetch_add(1, Ordering::SeqCst);
    
    let user = User {
        id,
        name: body.name.clone(),
        email: body.email.clone(),
    };
    
    state.users.write().unwrap().insert(id, user.clone());
    
    HttpResponse::Created().json(user)
}

#[actix_web::delete("/users/{id}")]
async fn delete_user(
    path: web::Path<u64>,
    state: web::Data<Arc<AppState>>,
) -> impl Responder {
    let mut users = state.users.write().unwrap();
    
    match users.remove(&path.into_inner()) {
        Some(_) => HttpResponse::NoContent().finish(),
        None => HttpResponse::NotFound().json(serde_json::json!({
            "error": "User not found"
        })),
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let state = Arc::new(AppState::new());
    
    println!("Server running at http://127.0.0.1:8080");
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(state.clone()))
            .service(list_users)
            .service(get_user)
            .service(create_user)
            .service(delete_user)
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

下一步 #

现在你已经掌握了数据共享,继续学习 数据库连接,了解数据库连接池管理!

最后更新:2026-03-29