JWT 实战实现 #
Express + JWT 完整实现 #
项目结构 #
text
express-jwt-auth/
├── src/
│ ├── controllers/
│ │ └── authController.js
│ ├── middleware/
│ │ └── authMiddleware.js
│ ├── routes/
│ │ └── authRoutes.js
│ ├── services/
│ │ └── tokenService.js
│ ├── models/
│ │ └── User.js
│ └── app.js
├── .env
└── package.json
安装依赖 #
bash
npm init -y
npm install express jsonwebtoken bcryptjs dotenv cookie-parser cors helmet
npm install --save-dev nodemon
环境配置 #
env
PORT=3000
JWT_SECRET=your-super-secret-key-at-least-32-characters
JWT_REFRESH_SECRET=your-refresh-secret-key-at-least-32-characters
JWT_EXPIRES_IN=15m
JWT_REFRESH_EXPIRES_IN=7d
NODE_ENV=development
Token 服务 #
javascript
const jwt = require('jsonwebtoken');
class TokenService {
generateAccessToken(user) {
return jwt.sign(
{
sub: user.id,
email: user.email,
role: user.role
},
process.env.JWT_SECRET,
{
algorithm: 'HS256',
expiresIn: process.env.JWT_EXPIRES_IN,
issuer: 'express-jwt-auth',
audience: 'api'
}
);
}
generateRefreshToken(user) {
return jwt.sign(
{ sub: user.id, type: 'refresh' },
process.env.JWT_REFRESH_SECRET,
{
algorithm: 'HS256',
expiresIn: process.env.JWT_REFRESH_EXPIRES_IN,
issuer: 'express-jwt-auth'
}
);
}
verifyAccessToken(token) {
return jwt.verify(token, process.env.JWT_SECRET, {
algorithms: ['HS256'],
issuer: 'express-jwt-auth',
audience: 'api'
});
}
verifyRefreshToken(token) {
return jwt.verify(token, process.env.JWT_REFRESH_SECRET, {
algorithms: ['HS256'],
issuer: 'express-jwt-auth'
});
}
}
module.exports = new TokenService();
认证中间件 #
javascript
const tokenService = require('../services/tokenService');
function authenticate(req, res, next) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({
error: 'Unauthorized',
message: 'No token provided'
});
}
const token = authHeader.split(' ')[1];
try {
const decoded = tokenService.verifyAccessToken(token);
req.user = decoded;
next();
} catch (err) {
if (err.name === 'TokenExpiredError') {
return res.status(401).json({
error: 'TokenExpired',
message: 'Token has expired'
});
}
return res.status(401).json({
error: 'InvalidToken',
message: 'Invalid token'
});
}
}
function authorize(...roles) {
return (req, res, next) => {
if (!req.user) {
return res.status(401).json({
error: 'Unauthorized',
message: 'User not authenticated'
});
}
if (!roles.includes(req.user.role)) {
return res.status(403).json({
error: 'Forbidden',
message: 'Insufficient permissions'
});
}
next();
};
}
module.exports = { authenticate, authorize };
认证控制器 #
javascript
const bcrypt = require('bcryptjs');
const tokenService = require('../services/tokenService');
const User = require('../models/User');
class AuthController {
async register(req, res) {
try {
const { email, password, name } = req.body;
const existingUser = await User.findByEmail(email);
if (existingUser) {
return res.status(400).json({
error: 'UserExists',
message: 'Email already registered'
});
}
const hashedPassword = await bcrypt.hash(password, 12);
const user = await User.create({
email,
password: hashedPassword,
name,
role: 'user'
});
const accessToken = tokenService.generateAccessToken(user);
const refreshToken = tokenService.generateRefreshToken(user);
res.status(201).json({
message: 'User registered successfully',
user: {
id: user.id,
email: user.email,
name: user.name,
role: user.role
},
accessToken,
refreshToken
});
} catch (err) {
console.error('Register error:', err);
res.status(500).json({
error: 'ServerError',
message: 'Registration failed'
});
}
}
async login(req, res) {
try {
const { email, password } = req.body;
const user = await User.findByEmail(email);
if (!user) {
return res.status(401).json({
error: 'InvalidCredentials',
message: 'Invalid email or password'
});
}
const isValidPassword = await bcrypt.compare(password, user.password);
if (!isValidPassword) {
return res.status(401).json({
error: 'InvalidCredentials',
message: 'Invalid email or password'
});
}
const accessToken = tokenService.generateAccessToken(user);
const refreshToken = tokenService.generateRefreshToken(user);
res.json({
message: 'Login successful',
user: {
id: user.id,
email: user.email,
name: user.name,
role: user.role
},
accessToken,
refreshToken
});
} catch (err) {
console.error('Login error:', err);
res.status(500).json({
error: 'ServerError',
message: 'Login failed'
});
}
}
async refresh(req, res) {
try {
const { refreshToken } = req.body;
if (!refreshToken) {
return res.status(400).json({
error: 'MissingToken',
message: 'Refresh token is required'
});
}
const decoded = tokenService.verifyRefreshToken(refreshToken);
if (decoded.type !== 'refresh') {
return res.status(400).json({
error: 'InvalidToken',
message: 'Invalid refresh token'
});
}
const user = await User.findById(decoded.sub);
if (!user) {
return res.status(401).json({
error: 'UserNotFound',
message: 'User not found'
});
}
const accessToken = tokenService.generateAccessToken(user);
const newRefreshToken = tokenService.generateRefreshToken(user);
res.json({
accessToken,
refreshToken: newRefreshToken
});
} catch (err) {
if (err.name === 'TokenExpiredError') {
return res.status(401).json({
error: 'TokenExpired',
message: 'Refresh token has expired'
});
}
return res.status(401).json({
error: 'InvalidToken',
message: 'Invalid refresh token'
});
}
}
async me(req, res) {
try {
const user = await User.findById(req.user.sub);
if (!user) {
return res.status(404).json({
error: 'UserNotFound',
message: 'User not found'
});
}
res.json({
user: {
id: user.id,
email: user.email,
name: user.name,
role: user.role
}
});
} catch (err) {
console.error('Get user error:', err);
res.status(500).json({
error: 'ServerError',
message: 'Failed to get user'
});
}
}
async logout(req, res) {
res.json({
message: 'Logout successful'
});
}
}
module.exports = new AuthController();
路由配置 #
javascript
const express = require('express');
const router = express.Router();
const authController = require('../controllers/authController');
const { authenticate, authorize } = require('../middleware/authMiddleware');
router.post('/register', authController.register);
router.post('/login', authController.login);
router.post('/refresh', authController.refresh);
router.post('/logout', authController.logout);
router.get('/me', authenticate, authController.me);
router.get('/admin', authenticate, authorize('admin'), (req, res) => {
res.json({ message: 'Admin only content' });
});
module.exports = router;
应用入口 #
javascript
require('dotenv').config();
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const cookieParser = require('cookie-parser');
const authRoutes = require('./routes/authRoutes');
const app = express();
app.use(helmet());
app.use(cors({
origin: process.env.CORS_ORIGIN || 'http://localhost:3000',
credentials: true
}));
app.use(express.json());
app.use(cookieParser());
app.use('/api/auth', authRoutes);
app.get('/health', (req, res) => {
res.json({ status: 'ok' });
});
app.use((err, req, res, next) => {
console.error(err.stack);
res.status(500).json({
error: 'ServerError',
message: 'Something went wrong'
});
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
NestJS + JWT 实现 #
安装依赖 #
bash
npm install @nestjs/jwt @nestjs/passport passport passport-jwt bcrypt
npm install class-validator class-transformer
JWT 策略 #
typescript
import { ExtractJwt, Strategy } from 'passport-jwt';
import { PassportStrategy } from '@nestjs/passport';
import { Injectable, UnauthorizedException } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { UsersService } from '../users/users.service';
@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
constructor(
private configService: ConfigService,
private usersService: UsersService
) {
super({
jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
ignoreExpiration: false,
secretOrKey: configService.get<string>('JWT_SECRET'),
issuer: 'nestjs-jwt-auth',
audience: 'api'
});
}
async validate(payload: any) {
const user = await this.usersService.findById(payload.sub);
if (!user) {
throw new UnauthorizedException('User not found');
}
return {
id: payload.sub,
email: payload.email,
role: payload.role
};
}
}
认证服务 #
typescript
import { Injectable, UnauthorizedException } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt';
import { UsersService } from '../users/users.service';
import * as bcrypt from 'bcrypt';
import { LoginDto } from './dto/login.dto';
import { RegisterDto } from './dto/register.dto';
@Injectable()
export class AuthService {
constructor(
private usersService: UsersService,
private jwtService: JwtService
) {}
async register(registerDto: RegisterDto) {
const hashedPassword = await bcrypt.hash(registerDto.password, 12);
const user = await this.usersService.create({
...registerDto,
password: hashedPassword
});
const tokens = await this.generateTokens(user);
return {
user: {
id: user.id,
email: user.email,
name: user.name,
role: user.role
},
...tokens
};
}
async login(loginDto: LoginDto) {
const user = await this.usersService.findByEmail(loginDto.email);
if (!user) {
throw new UnauthorizedException('Invalid credentials');
}
const isPasswordValid = await bcrypt.compare(
loginDto.password,
user.password
);
if (!isPasswordValid) {
throw new UnauthorizedException('Invalid credentials');
}
const tokens = await this.generateTokens(user);
return {
user: {
id: user.id,
email: user.email,
name: user.name,
role: user.role
},
...tokens
};
}
async refresh(refreshToken: string) {
try {
const payload = this.jwtService.verify(refreshToken, {
secret: process.env.JWT_REFRESH_SECRET
});
if (payload.type !== 'refresh') {
throw new UnauthorizedException('Invalid token type');
}
const user = await this.usersService.findById(payload.sub);
if (!user) {
throw new UnauthorizedException('User not found');
}
return this.generateTokens(user);
} catch (err) {
throw new UnauthorizedException('Invalid refresh token');
}
}
private async generateTokens(user: any) {
const payload = {
sub: user.id,
email: user.email,
role: user.role
};
const accessToken = this.jwtService.sign(payload, {
expiresIn: '15m',
issuer: 'nestjs-jwt-auth',
audience: 'api'
});
const refreshToken = this.jwtService.sign(
{ sub: user.id, type: 'refresh' },
{
secret: process.env.JWT_REFRESH_SECRET,
expiresIn: '7d',
issuer: 'nestjs-jwt-auth'
}
);
return { accessToken, refreshToken };
}
}
认证控制器 #
typescript
import { Controller, Post, Body, UseGuards, Get, Request } from '@nestjs/common';
import { AuthService } from './auth.service';
import { LoginDto } from './dto/login.dto';
import { RegisterDto } from './dto/register.dto';
import { JwtAuthGuard } from './guards/jwt-auth.guard';
@Controller('auth')
export class AuthController {
constructor(private authService: AuthService) {}
@Post('register')
async register(@Body() registerDto: RegisterDto) {
return this.authService.register(registerDto);
}
@Post('login')
async login(@Body() loginDto: LoginDto) {
return this.authService.login(loginDto);
}
@Post('refresh')
async refresh(@Body('refreshToken') refreshToken: string) {
return this.authService.refresh(refreshToken);
}
@UseGuards(JwtAuthGuard)
@Get('me')
async me(@Request() req) {
return { user: req.user };
}
}
Spring Boot + JWT 实现 #
依赖配置 #
xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.12.3</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.12.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.12.3</version>
<scope>runtime</scope>
</dependency>
</dependencies>
JWT 工具类 #
java
import io.jsonwebtoken.*;
import io.jsonwebtoken.security.Keys;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@Component
public class JwtTokenUtil {
@Value("${jwt.secret}")
private String secret;
@Value("${jwt.expiration}")
private Long expiration;
private SecretKey getSigningKey() {
return Keys.hmacShaKeyFor(secret.getBytes(StandardCharsets.UTF_8));
}
public String generateToken(UserDetails userDetails) {
Map<String, Object> claims = new HashMap<>();
return createToken(claims, userDetails.getUsername());
}
public String generateToken(String subject, Map<String, Object> claims) {
return createToken(claims, subject);
}
private String createToken(Map<String, Object> claims, String subject) {
return Jwts.builder()
.claims(claims)
.subject(subject)
.issuedAt(new Date(System.currentTimeMillis()))
.expiration(new Date(System.currentTimeMillis() + expiration * 1000))
.issuer("spring-boot-jwt-auth")
.audience().add("api").and()
.signWith(getSigningKey())
.compact();
}
public String extractUsername(String token) {
return extractClaim(token, Claims::getSubject);
}
public Date extractExpiration(String token) {
return extractClaim(token, Claims::getExpiration);
}
public <T> T extractClaim(String token, Function<Claims, T> claimsResolver) {
final Claims claims = extractAllClaims(token);
return claimsResolver.apply(claims);
}
private Claims extractAllClaims(String token) {
return Jwts.parser()
.verifyWith(getSigningKey())
.build()
.parseSignedClaims(token)
.getPayload();
}
public Boolean isTokenExpired(String token) {
return extractExpiration(token).before(new Date());
}
public Boolean validateToken(String token, UserDetails userDetails) {
final String username = extractUsername(token);
return (username.equals(userDetails.getUsername()) && !isTokenExpired(token));
}
}
JWT 过滤器 #
java
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.web.authentication.WebAuthenticationDetailsSource;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
@Component
public class JwtAuthenticationFilter extends OncePerRequestFilter {
private final JwtTokenUtil jwtTokenUtil;
private final UserDetailsService userDetailsService;
public JwtAuthenticationFilter(JwtTokenUtil jwtTokenUtil, UserDetailsService userDetailsService) {
this.jwtTokenUtil = jwtTokenUtil;
this.userDetailsService = userDetailsService;
}
@Override
protected void doFilterInternal(
HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain
) throws ServletException, IOException {
final String authHeader = request.getHeader("Authorization");
String username = null;
String jwtToken = null;
if (authHeader != null && authHeader.startsWith("Bearer ")) {
jwtToken = authHeader.substring(7);
try {
username = jwtTokenUtil.extractUsername(jwtToken);
} catch (Exception e) {
logger.error("Error extracting username from token", e);
}
}
if (username != null && SecurityContextHolder.getContext().getAuthentication() == null) {
UserDetails userDetails = userDetailsService.loadUserByUsername(username);
if (jwtTokenUtil.validateToken(jwtToken, userDetails)) {
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(
userDetails, null, userDetails.getAuthorities());
authentication.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
SecurityContextHolder.getContext().setAuthentication(authentication);
}
}
filterChain.doFilter(request, response);
}
}
FastAPI + JWT 实现 #
安装依赖 #
bash
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart
JWT 配置 #
python
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key-at-least-32-characters"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"iss": "fastapi-jwt-auth",
"aud": "api"
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: Dict[str, Any]) -> str:
to_encode = data.copy()
to_encode["type"] = "refresh"
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"iss": "fastapi-jwt-auth"
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> Dict[str, Any]:
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM],
issuer="fastapi-jwt-auth",
audience="api"
)
return payload
except JWTError:
return None
FastAPI 应用 #
python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from typing import Optional
app = FastAPI(title="JWT Auth API")
security = HTTPBearer()
class UserCreate(BaseModel):
email: EmailStr
password: str
name: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class UserResponse(BaseModel):
id: str
email: str
name: str
role: str
fake_users_db = {}
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
payload = decode_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
user = fake_users_db.get(user_id)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
@app.post("/auth/register", response_model=dict)
async def register(user_data: UserCreate):
if any(u["email"] == user_data.email for u in fake_users_db.values()):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
user_id = f"user_{len(fake_users_db) + 1}"
hashed_password = get_password_hash(user_data.password)
user = {
"id": user_id,
"email": user_data.email,
"password": hashed_password,
"name": user_data.name,
"role": "user"
}
fake_users_db[user_id] = user
access_token = create_access_token(
{"sub": user_id, "email": user["email"], "role": user["role"]}
)
refresh_token = create_refresh_token({"sub": user_id})
return {
"message": "User registered successfully",
"user": UserResponse(**user),
"access_token": access_token,
"refresh_token": refresh_token
}
@app.post("/auth/login", response_model=Token)
async def login(user_data: UserLogin):
user = next(
(u for u in fake_users_db.values() if u["email"] == user_data.email),
None
)
if not user or not verify_password(user_data.password, user["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
access_token = create_access_token(
{"sub": user["id"], "email": user["email"], "role": user["role"]}
)
refresh_token = create_refresh_token({"sub": user["id"]})
return Token(
access_token=access_token,
refresh_token=refresh_token
)
@app.post("/auth/refresh", response_model=Token)
async def refresh(refresh_token: str):
payload = decode_token(refresh_token)
if payload is None or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
user_id = payload.get("sub")
user = fake_users_db.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
new_access_token = create_access_token(
{"sub": user["id"], "email": user["email"], "role": user["role"]}
)
new_refresh_token = create_refresh_token({"sub": user["id"]})
return Token(
access_token=new_access_token,
refresh_token=new_refresh_token
)
@app.get("/auth/me", response_model=UserResponse)
async def me(current_user: dict = Depends(get_current_user)):
return UserResponse(**current_user)
@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
return {"message": "This is a protected route", "user": current_user["email"]}
总结 #
通过本章的学习,你已经掌握了:
text
┌─────────────────────────────────────────────────────────────┐
│ 实战要点总结 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Token 服务 │
│ - 访问令牌和刷新令牌分离 │
│ - 统一的生成和验证逻辑 │
│ │
│ 2. 认证中间件 │
│ - Token 提取和验证 │
│ - 错误处理 │
│ │
│ 3. 权限控制 │
│ - 基于角色的访问控制 │
│ - 细粒度权限检查 │
│ │
│ 4. 安全实践 │
│ - 密码哈希 │
│ - HTTPS 传输 │
│ - 安全头配置 │
│ │
│ 5. 多框架实现 │
│ - Express │
│ - NestJS │
│ - Spring Boot │
│ - FastAPI │
│ │
└─────────────────────────────────────────────────────────────┘
恭喜你完成了 JWT 文档的学习!现在你已经从入门到精通,可以安全地在项目中使用 JWT 了。
最后更新:2026-03-28