安全认证概述 #
认证与授权 #
概念区分 #
text
┌─────────────────────────────────────────────────────────────┐
│ 认证 vs 授权 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 认证(Authentication) │
│ - 你是谁? │
│ - 验证用户身份 │
│ - 登录、Token 验证 │
│ │
│ 授权(Authorization) │
│ - 你能做什么? │
│ - 验证用户权限 │
│ - 角色检查、权限验证 │
│ │
│ 流程: │
│ 用户登录 ────> 认证 ────> 授权 ────> 访问资源 │
│ │
└─────────────────────────────────────────────────────────────┘
认证方式 #
1. API Key #
python
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
@app.get('/items/')
async def read_items(api_key: str = Header(...)):
if api_key != 'secret-api-key':
raise HTTPException(status_code=401, detail='Invalid API Key')
return {'items': []}
2. HTTP Basic #
python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBasic, HTTPBasicCredentials
app = FastAPI()
security = HTTPBasic()
def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
if credentials.username != 'admin' or credentials.password != 'secret':
raise HTTPException(status_code=401, detail='Invalid credentials')
return credentials.username
@app.get('/users/me')
def read_users_me(username: str = Depends(get_current_user)):
return {'username': username}
3. OAuth2 #
python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
@app.get('/users/me')
async def read_users_me(token: str = Depends(oauth2_scheme)):
return {'token': token}
4. JWT #
python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise HTTPException(status_code=401, detail='Invalid token')
except JWTError:
raise HTTPException(status_code=401, detail='Invalid token')
return username
OAuth2 密码流 #
基本流程 #
text
┌─────────────────────────────────────────────────────────────┐
│ OAuth2 密码流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 用户发送用户名和密码 │
│ POST /token │
│ { "username": "user", "password": "pass" } │
│ │
│ 2. 服务器验证并返回 Token │
│ { "access_token": "xxx", "token_type": "bearer" } │
│ │
│ 3. 客户端携带 Token 访问资源 │
│ GET /users/me │
│ Authorization: Bearer xxx │
│ │
│ 4. 服务器验证 Token 并返回数据 │
│ { "username": "user", "email": "..." } │
│ │
└─────────────────────────────────────────────────────────────┘
实现 #
python
from datetime import datetime, timedelta
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
app = FastAPI()
SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
fake_users_db = {
'john': {
'username': 'john',
'hashed_password': pwd_context.hash('secret'),
'email': 'john@example.com'
}
}
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({'exp': expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
@app.post('/token')
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = fake_users_db.get(form_data.username)
if not user or not verify_password(form_data.password, user['hashed_password']):
raise HTTPException(
status_code=401,
detail='Incorrect username or password',
headers={'WWW-Authenticate': 'Bearer'}
)
access_token = create_access_token(
data={'sub': user['username']},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {'access_token': access_token, 'token_type': 'bearer'}
@app.get('/users/me')
async def read_users_me(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise HTTPException(status_code=401, detail='Invalid token')
except JWTError:
raise HTTPException(status_code=401, detail='Invalid token')
user = fake_users_db.get(username)
if user is None:
raise HTTPException(status_code=401, detail='User not found')
return {'username': user['username'], 'email': user['email']}
安全最佳实践 #
密码安全 #
python
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
Token 安全 #
python
import secrets
SECRET_KEY = secrets.token_urlsafe(32)
HTTPS #
python
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
CORS #
python
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=['https://example.com'],
allow_credentials=True,
allow_methods=['GET', 'POST'],
allow_headers=['*'],
)
输入验证 #
python
from pydantic import BaseModel, Field, validator
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=20)
password: str = Field(..., min_length=8)
@validator('password')
def password_strength(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase')
if not any(c.islower() for c in v):
raise ValueError('Password must contain lowercase')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return v
常见安全威胁 #
1. SQL 注入 #
python
# 危险
query = f"SELECT * FROM users WHERE id = {user_id}"
# 安全
from sqlalchemy import select
stmt = select(User).where(User.id == user_id)
2. XSS #
python
# 使用 Pydantic 自动转义
from pydantic import BaseModel
class Comment(BaseModel):
content: str
3. CSRF #
python
# 使用 SameSite Cookie
response.set_cookie(
'session',
value,
httponly=True,
samesite='strict',
secure=True
)
4. 暴力破解 #
python
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post('/login')
@limiter.limit('5/minute')
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
pass
下一步 #
现在你已经了解了安全认证概述,接下来学习 JWT 认证,深入了解 JWT 实现!
最后更新:2026-03-29