JWT 认证 #
安装依赖 #
bash
pip install python-jose[cryptography]
pip install passlib[bcrypt]
pip install python-multipart
JWT 基础 #
JWT 结构 #
text
┌─────────────────────────────────────────────────────────────┐
│ JWT 结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Header.Payload.Signature │
│ │
│ Header: │
│ { │
│ "alg": "HS256", │
│ "typ": "JWT" │
│ } │
│ │
│ Payload: │
│ { │
│ "sub": "user123", │
│ "exp": 1516239022, │
│ "iat": 1516239022 │
│ } │
│ │
│ Signature: │
│ HMACSHA256(base64(header) + "." + base64(payload), secret)│
│ │
└─────────────────────────────────────────────────────────────┘
基本实现 #
配置 #
python
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
SECRET_KEY = 'your-secret-key-keep-it-secret'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
app = FastAPI()
模型定义 #
python
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
密码处理 #
python
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
Token 生成 #
python
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
用户验证 #
python
fake_users_db = {
'johndoe': {
'username': 'johndoe',
'full_name': 'John Doe',
'email': 'johndoe@example.com',
'hashed_password': '$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW',
'disabled': False,
}
}
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
当前用户依赖 #
python
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: UserInDB = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail='Inactive user')
return current_user
路由实现 #
python
@app.post('/token', response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Incorrect username or password',
headers={'WWW-Authenticate': 'Bearer'},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={'sub': user.username},
expires_delta=access_token_expires
)
return {'access_token': access_token, 'token_type': 'bearer'}
@app.get('/users/me/', response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get('/users/me/items/')
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{'item_id': 'Foo', 'owner': current_user.username}]
Token 刷新 #
刷新 Token #
python
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7)
to_encode.update({'exp': expire, 'type': 'refresh'})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
@app.post('/refresh', response_model=Token)
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get('type') != 'refresh':
raise HTTPException(status_code=401, detail='Invalid token type')
username = payload.get('sub')
if username is None:
raise HTTPException(status_code=401, detail='Invalid token')
access_token = create_access_token(data={'sub': username})
return {'access_token': access_token, 'token_type': 'bearer'}
except JWTError:
raise HTTPException(status_code=401, detail='Invalid token')
完整示例 #
python
from datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, EmailStr
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker
SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
Base = declarative_base()
class UserDB(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
class User(BaseModel):
id: int
username: str
email: str
is_active: bool
class Config:
from_attributes = True
app = FastAPI()
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({'exp': expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(UserDB).where(UserDB.username == username))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: UserDB = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail='Inactive user')
return current_user
@app.post('/register', response_model=User, status_code=201)
async def register(user: UserCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(UserDB).where(UserDB.username == user.username))
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail='Username already registered')
result = await db.execute(select(UserDB).where(UserDB.email == user.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail='Email already registered')
db_user = UserDB(
username=user.username,
email=user.email,
hashed_password=get_password_hash(user.password)
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@app.post('/token', response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(UserDB).where(UserDB.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Incorrect username or password',
headers={'WWW-Authenticate': 'Bearer'},
)
access_token = create_access_token(
data={'sub': user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {'access_token': access_token, 'token_type': 'bearer'}
@app.get('/users/me', response_model=User)
async def read_users_me(current_user: UserDB = Depends(get_current_active_user)):
return current_user
下一步 #
现在你已经掌握了 JWT 认证,接下来学习 测试基础,了解 FastAPI 的测试方法!
最后更新:2026-03-29