安全认证概述 #

认证与授权 #

概念区分 #

text
┌─────────────────────────────────────────────────────────────┐
│                    认证 vs 授权                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   认证(Authentication)                                     │
│   - 你是谁?                                                │
│   - 验证用户身份                                            │
│   - 登录、Token 验证                                        │
│                                                             │
│   授权(Authorization)                                      │
│   - 你能做什么?                                            │
│   - 验证用户权限                                            │
│   - 角色检查、权限验证                                       │
│                                                             │
│   流程:                                                    │
│   用户登录 ────> 认证 ────> 授权 ────> 访问资源              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

认证方式 #

1. API Key #

python
from fastapi import FastAPI, Header, HTTPException

app = FastAPI()

@app.get('/items/')
async def read_items(api_key: str = Header(...)):
    if api_key != 'secret-api-key':
        raise HTTPException(status_code=401, detail='Invalid API Key')
    return {'items': []}

2. HTTP Basic #

python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()
security = HTTPBasic()

def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
    if credentials.username != 'admin' or credentials.password != 'secret':
        raise HTTPException(status_code=401, detail='Invalid credentials')
    return credentials.username

@app.get('/users/me')
def read_users_me(username: str = Depends(get_current_user)):
    return {'username': username}

3. OAuth2 #

python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

@app.get('/users/me')
async def read_users_me(token: str = Depends(oauth2_scheme)):
    return {'token': token}

4. JWT #

python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'

async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get('sub')
        if username is None:
            raise HTTPException(status_code=401, detail='Invalid token')
    except JWTError:
        raise HTTPException(status_code=401, detail='Invalid token')
    return username

OAuth2 密码流 #

基本流程 #

text
┌─────────────────────────────────────────────────────────────┐
│                    OAuth2 密码流                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 用户发送用户名和密码                                    │
│      POST /token                                            │
│      { "username": "user", "password": "pass" }             │
│                                                             │
│   2. 服务器验证并返回 Token                                  │
│      { "access_token": "xxx", "token_type": "bearer" }      │
│                                                             │
│   3. 客户端携带 Token 访问资源                               │
│      GET /users/me                                          │
│      Authorization: Bearer xxx                              │
│                                                             │
│   4. 服务器验证 Token 并返回数据                             │
│      { "username": "user", "email": "..." }                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实现 #

python
from datetime import datetime, timedelta
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext

app = FastAPI()

SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

fake_users_db = {
    'john': {
        'username': 'john',
        'hashed_password': pwd_context.hash('secret'),
        'email': 'john@example.com'
    }
}

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({'exp': expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

@app.post('/token')
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = fake_users_db.get(form_data.username)
    if not user or not verify_password(form_data.password, user['hashed_password']):
        raise HTTPException(
            status_code=401,
            detail='Incorrect username or password',
            headers={'WWW-Authenticate': 'Bearer'}
        )
    
    access_token = create_access_token(
        data={'sub': user['username']},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    
    return {'access_token': access_token, 'token_type': 'bearer'}

@app.get('/users/me')
async def read_users_me(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get('sub')
        if username is None:
            raise HTTPException(status_code=401, detail='Invalid token')
    except JWTError:
        raise HTTPException(status_code=401, detail='Invalid token')
    
    user = fake_users_db.get(username)
    if user is None:
        raise HTTPException(status_code=401, detail='User not found')
    
    return {'username': user['username'], 'email': user['email']}

安全最佳实践 #

密码安全 #

python
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

Token 安全 #

python
import secrets

SECRET_KEY = secrets.token_urlsafe(32)

HTTPS #

python
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app.add_middleware(HTTPSRedirectMiddleware)

CORS #

python
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=['https://example.com'],
    allow_credentials=True,
    allow_methods=['GET', 'POST'],
    allow_headers=['*'],
)

输入验证 #

python
from pydantic import BaseModel, Field, validator

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=20)
    password: str = Field(..., min_length=8)
    
    @validator('password')
    def password_strength(cls, v):
        if not any(c.isupper() for c in v):
            raise ValueError('Password must contain uppercase')
        if not any(c.islower() for c in v):
            raise ValueError('Password must contain lowercase')
        if not any(c.isdigit() for c in v):
            raise ValueError('Password must contain digit')
        return v

常见安全威胁 #

1. SQL 注入 #

python
# 危险
query = f"SELECT * FROM users WHERE id = {user_id}"

# 安全
from sqlalchemy import select
stmt = select(User).where(User.id == user_id)

2. XSS #

python
# 使用 Pydantic 自动转义
from pydantic import BaseModel

class Comment(BaseModel):
    content: str

3. CSRF #

python
# 使用 SameSite Cookie
response.set_cookie(
    'session',
    value,
    httponly=True,
    samesite='strict',
    secure=True
)

4. 暴力破解 #

python
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post('/login')
@limiter.limit('5/minute')
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
    pass

下一步 #

现在你已经了解了安全认证概述,接下来学习 JWT 认证,深入了解 JWT 实现!

最后更新:2026-03-29