SQLAlchemy 集成 #
安装 #
bash
pip install sqlalchemy
pip install aiosqlite
pip install asyncpg
同步 SQLAlchemy #
基本配置 #
python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = 'sqlite:///./test.db'
engine = create_engine(
DATABASE_URL,
connect_args={'check_same_thread': False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
定义模型 #
python
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
创建表 #
python
Base.metadata.create_all(bind=engine)
依赖注入 #
python
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get('/users/')
def read_users(db: Session = Depends(get_db)):
users = db.query(User).all()
return users
CRUD 操作 #
python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
app = FastAPI()
class UserCreate(BaseModel):
email: str
username: str
password: str
class UserResponse(BaseModel):
id: int
email: str
username: str
is_active: bool
class Config:
from_attributes = True
@app.post('/users/', response_model=UserResponse)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get('/users/{user_id}', response_model=UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
@app.put('/users/{user_id}', response_model=UserResponse)
def update_user(user_id: int, user_update: UserCreate, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail='User not found')
user.email = user_update.email
user.username = user_update.username
user.hashed_password = hash_password(user_update.password)
db.commit()
db.refresh(user)
return user
@app.delete('/users/{user_id}')
def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail='User not found')
db.delete(user)
db.commit()
return {'message': 'User deleted'}
异步 SQLAlchemy #
异步配置 #
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = 'sqlite+aiosqlite:///./test.db'
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
异步模型 #
python
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship('Item', back_populates='owner')
class Item(Base):
__tablename__ = 'items'
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String)
owner_id = Column(Integer, ForeignKey('users.id'))
owner = relationship('User', back_populates='items')
异步依赖 #
python
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.get('/users/')
async def read_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
users = result.scalars().all()
return users
异步 CRUD #
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@app.post('/users/', response_model=UserResponse)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@app.get('/users/{user_id}', response_model=UserResponse)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
@app.put('/users/{user_id}', response_model=UserResponse)
async def update_user(user_id: int, user_update: UserCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
user.email = user_update.email
user.username = user_update.username
user.hashed_password = hash_password(user_update.password)
await db.commit()
await db.refresh(user)
return user
@app.delete('/users/{user_id}')
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
await db.delete(user)
await db.commit()
return {'message': 'User deleted'}
查询操作 #
基本查询 #
python
from sqlalchemy import select
async def get_users(db: AsyncSession):
result = await db.execute(select(User))
return result.scalars().all()
async def get_user_by_id(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_user_by_email(db: AsyncSession, email: str):
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
过滤和排序 #
python
from sqlalchemy import select, or_, and_, desc
async def search_users(db: AsyncSession, query: str):
result = await db.execute(
select(User).where(
or_(
User.username.contains(query),
User.email.contains(query)
)
)
)
return result.scalars().all()
async def get_users_sorted(db: AsyncSession):
result = await db.execute(select(User).order_by(desc(User.id)))
return result.scalars().all()
分页 #
python
async def get_users_paginated(db: AsyncSession, skip: int = 0, limit: int = 10):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()
关联查询 #
python
from sqlalchemy.orm import selectinload
async def get_user_with_items(db: AsyncSession, user_id: int):
result = await db.execute(
select(User)
.options(selectinload(User.items))
.where(User.id == user_id)
)
return result.scalar_one_or_none()
完整示例 #
python
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select, Column, Integer, String, Boolean
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
DATABASE_URL = 'sqlite+aiosqlite:///./test.db'
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
class UserCreate(BaseModel):
email: str
username: str
password: str
class UserUpdate(BaseModel):
email: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
class UserResponse(BaseModel):
id: int
email: str
username: str
is_active: bool
class Config:
from_attributes = True
app = FastAPI()
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.on_event('startup')
async def startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.post('/users/', response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == user.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail='Email already registered')
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@app.get('/users/', response_model=list[UserResponse])
async def read_users(
skip: int = 0,
limit: int = 10,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()
@app.get('/users/{user_id}', response_model=UserResponse)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
@app.put('/users/{user_id}', response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserUpdate,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
update_data = user_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
if field == 'password':
setattr(user, 'hashed_password', hash_password(value))
else:
setattr(user, field, value)
await db.commit()
await db.refresh(user)
return user
@app.delete('/users/{user_id}')
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
await db.delete(user)
await db.commit()
return {'message': 'User deleted'}
下一步 #
现在你已经掌握了 SQLAlchemy 集成,接下来学习 JWT 认证,了解 FastAPI 的安全机制!
最后更新:2026-03-29