Socket.IO 中间件 #
中间件概念 #
中间件是在事件处理之前执行的函数,可以用于认证、授权、日志记录、数据验证等。
中间件类型 #
text
┌─────────────────────────────────────────────────────────────┐
│ 中间件类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 连接中间件 │
│ ───────────────────────────────────────────────────────── │
│ io.use((socket, next) => { ... }) │
│ 在连接建立时执行 │
│ 用于:认证、授权、初始化 │
│ │
│ 事件中间件 │
│ ───────────────────────────────────────────────────────── │
│ socket.use((event, next) => { ... }) │
│ 在每个事件处理前执行 │
│ 用于:日志、验证、权限检查 │
│ │
│ 命名空间中间件 │
│ ───────────────────────────────────────────────────────── │
│ io.of('/namespace').use((socket, next) => { ... }) │
│ 只对特定命名空间生效 │
│ │
└─────────────────────────────────────────────────────────────┘
连接中间件 #
基本用法 #
javascript
io.use((socket, next) => {
console.log('新连接:', socket.id);
next();
});
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('缺少认证令牌'));
}
if (!verifyToken(token)) {
return next(new Error('无效的令牌'));
}
next();
});
io.on('connection', (socket) => {
console.log('认证通过,连接成功');
});
中间件执行流程 #
text
┌─────────────────────────────────────────────────────────────┐
│ 中间件执行流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端连接 │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 中间件 1 │ │
│ └──────┬──────┘ │
│ │ next() │
│ ▼ │
│ ┌─────────────┐ │
│ │ 中间件 2 │ │
│ └──────┬──────┘ │
│ │ next() │
│ ▼ │
│ ┌─────────────┐ │
│ │ 中间件 3 │ │
│ └──────┬──────┘ │
│ │ next() │
│ ▼ │
│ ┌─────────────┐ │
│ │ connection │ │
│ │ 事件 │ │
│ └─────────────┘ │
│ │
│ 如果中间件调用 next(new Error()),连接将被拒绝 │
│ │
└─────────────────────────────────────────────────────────────┘
认证中间件 #
javascript
const jwt = require('jsonwebtoken');
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('Authentication error'));
}
jwt.verify(token, process.env.JWT_SECRET, (err, decoded) => {
if (err) {
return next(new Error('Invalid token'));
}
socket.user = decoded;
next();
});
});
io.on('connection', (socket) => {
console.log('用户连接:', socket.user.id);
socket.emit('authenticated', {
user: socket.user
});
});
客户端认证 #
javascript
const socket = io('http://localhost:3001', {
auth: {
token: 'your-jwt-token'
}
});
socket.on('connect_error', (err) => {
if (err.message === 'Authentication error') {
console.log('认证失败');
}
});
socket.on('authenticated', (data) => {
console.log('认证成功:', data.user);
});
授权中间件 #
javascript
io.use((socket, next) => {
const user = socket.user;
if (!user) {
return next(new Error('未认证'));
}
if (user.role !== 'admin') {
return next(new Error('需要管理员权限'));
}
next();
});
io.of('/admin').use((socket, next) => {
if (socket.user?.role !== 'admin') {
return next(new Error('禁止访问'));
}
next();
});
事件中间件 #
基本用法 #
javascript
io.on('connection', (socket) => {
socket.use((event, next) => {
console.log(`事件: ${event[0]}`);
console.log('参数:', event.slice(1));
next();
});
socket.on('chat message', (msg) => {
console.log('消息:', msg);
});
});
事件验证 #
javascript
io.on('connection', (socket) => {
socket.use((event, next) => {
const [eventName, ...args] = event;
if (eventName === 'chat message') {
const [message] = args;
if (!message || typeof message !== 'string') {
return next(new Error('消息格式无效'));
}
if (message.length > 1000) {
return next(new Error('消息过长'));
}
}
next();
});
socket.on('chat message', (msg) => {
io.emit('chat message', msg);
});
socket.on('error', (err) => {
console.log('事件错误:', err.message);
});
});
速率限制 #
javascript
const rateLimit = require('express-rate-limit');
io.on('connection', (socket) => {
const messageLimiter = new Map();
const LIMIT = 10;
const WINDOW = 60000;
socket.use((event, next) => {
if (event[0] === 'chat message') {
const now = Date.now();
const count = messageLimiter.get(socket.id) || { count: 0, startTime: now };
if (now - count.startTime > WINDOW) {
count.count = 0;
count.startTime = now;
}
count.count++;
messageLimiter.set(socket.id, count);
if (count.count > LIMIT) {
return next(new Error('发送消息过于频繁'));
}
}
next();
});
});
权限检查 #
javascript
io.on('connection', (socket) => {
socket.use((event, next) => {
const [eventName] = event;
const permissions = {
'admin:ban': ['admin'],
'admin:kick': ['admin', 'moderator'],
'chat:message': ['user', 'moderator', 'admin'],
'room:create': ['user', 'moderator', 'admin']
};
const requiredRoles = permissions[eventName];
if (requiredRoles && !requiredRoles.includes(socket.user?.role)) {
return next(new Error('权限不足'));
}
next();
});
socket.on('admin:ban', (userId) => {
console.log(`管理员 ${socket.user.id} 封禁用户 ${userId}`);
});
});
命名空间中间件 #
独立中间件 #
javascript
const chat = io.of('/chat');
chat.use((socket, next) => {
console.log('聊天命名空间中间件');
next();
});
chat.on('connection', (socket) => {
console.log('聊天连接');
});
const game = io.of('/game');
game.use((socket, next) => {
console.log('游戏命名空间中间件');
next();
});
game.on('connection', (socket) => {
console.log('游戏连接');
});
共享中间件 #
javascript
function authMiddleware(socket, next) {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('需要认证'));
}
try {
socket.user = verifyToken(token);
next();
} catch (err) {
next(new Error('无效令牌'));
}
}
io.use(authMiddleware);
io.of('/chat').use(authMiddleware);
io.of('/game').use(authMiddleware);
错误处理 #
中间件错误 #
javascript
io.use((socket, next) => {
next(new Error('认证失败'));
});
io.on('connection', (socket) => {
});
io.engine.on('connection_error', (err) => {
console.log('连接错误:', err.message);
});
客户端错误处理 #
javascript
const socket = io('http://localhost:3001', {
auth: { token: 'invalid-token' }
});
socket.on('connect_error', (err) => {
console.log('连接错误:', err.message);
if (err.message === '认证失败') {
console.log('请重新登录');
}
});
事件错误处理 #
javascript
io.on('connection', (socket) => {
socket.use((event, next) => {
next(new Error('事件处理错误'));
});
socket.on('error', (err) => {
console.log('错误:', err);
socket.emit('error', { message: err.message });
});
});
统一错误处理 #
javascript
io.on('connection', (socket) => {
socket.use((event, next) => {
try {
next();
} catch (err) {
console.error('中间件错误:', err);
socket.emit('error', { message: '服务器错误' });
}
});
socket.on('error', (err) => {
console.error('Socket 错误:', err);
});
});
process.on('uncaughtException', (err) => {
console.error('未捕获异常:', err);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('未处理的 Promise 拒绝:', reason);
});
日志中间件 #
连接日志 #
javascript
io.use((socket, next) => {
const startTime = Date.now();
socket.on('disconnect', (reason) => {
const duration = Date.now() - startTime;
console.log({
event: 'disconnect',
socketId: socket.id,
userId: socket.user?.id,
duration: `${duration}ms`,
reason
});
});
next();
});
事件日志 #
javascript
io.on('connection', (socket) => {
socket.use((event, next) => {
const [eventName, ...args] = event;
const startTime = Date.now();
console.log({
event: eventName,
socketId: socket.id,
userId: socket.user?.id,
args: JSON.stringify(args),
timestamp: new Date().toISOString()
});
const originalEmit = socket.emit;
socket.emit = function(name, ...emitArgs) {
console.log({
emit: name,
socketId: socket.id,
args: JSON.stringify(emitArgs)
});
return originalEmit.apply(this, [name, ...emitArgs]);
};
next();
});
});
结构化日志 #
javascript
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'socket.log' })
]
});
io.use((socket, next) => {
logger.info('connection', {
socketId: socket.id,
ip: socket.handshake.address,
userAgent: socket.handshake.headers['user-agent']
});
next();
});
io.on('connection', (socket) => {
socket.use((event, next) => {
logger.info('event', {
socketId: socket.id,
event: event[0],
args: event.slice(1)
});
next();
});
});
完整示例:带认证的聊天室 #
javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const jwt = require('jsonwebtoken');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: { origin: '*' }
});
const messageLimiter = new Map();
const RATE_LIMIT = 10;
const RATE_WINDOW = 60000;
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('需要认证令牌'));
}
jwt.verify(token, process.env.JWT_SECRET, (err, decoded) => {
if (err) {
return next(new Error('无效的令牌'));
}
socket.user = decoded;
next();
});
});
io.use((socket, next) => {
if (socket.user.status === 'banned') {
return next(new Error('账号已被封禁'));
}
next();
});
io.on('connection', (socket) => {
console.log(`用户连接: ${socket.user.name} (${socket.user.id})`);
socket.use((event, next) => {
if (event[0] === 'chat message') {
const now = Date.now();
const limit = messageLimiter.get(socket.id) || { count: 0, startTime: now };
if (now - limit.startTime > RATE_WINDOW) {
limit.count = 0;
limit.startTime = now;
}
limit.count++;
messageLimiter.set(socket.id, limit);
if (limit.count > RATE_LIMIT) {
return next(new Error('发送消息过于频繁,请稍后再试'));
}
}
next();
});
socket.on('join room', (roomId) => {
socket.join(roomId);
socket.currentRoom = roomId;
socket.to(roomId).emit('user joined', {
userId: socket.user.id,
username: socket.user.name
});
socket.emit('joined room', roomId);
});
socket.on('chat message', (message) => {
if (!message || message.trim() === '') {
return socket.emit('error', { message: '消息不能为空' });
}
if (message.length > 500) {
return socket.emit('error', { message: '消息过长' });
}
io.to(socket.currentRoom).emit('chat message', {
userId: socket.user.id,
username: socket.user.name,
message: message.trim(),
timestamp: Date.now()
});
});
socket.on('leave room', () => {
if (socket.currentRoom) {
socket.to(socket.currentRoom).emit('user left', {
userId: socket.user.id,
username: socket.user.name
});
socket.leave(socket.currentRoom);
socket.currentRoom = null;
}
});
socket.on('disconnect', () => {
if (socket.currentRoom) {
socket.to(socket.currentRoom).emit('user left', {
userId: socket.user.id,
username: socket.user.name
});
}
messageLimiter.delete(socket.id);
console.log(`用户断开: ${socket.user.name}`);
});
socket.on('error', (err) => {
console.error('Socket 错误:', err);
socket.emit('error', { message: err.message });
});
});
app.post('/login', (req, res) => {
const user = { id: 1, name: 'John', role: 'user' };
const token = jwt.sign(user, process.env.JWT_SECRET, { expiresIn: '1h' });
res.json({ token });
});
httpServer.listen(3001, () => {
console.log('服务器运行在 http://localhost:3001');
});
中间件最佳实践 #
text
┌─────────────────────────────────────────────────────────────┐
│ 中间件最佳实践 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 认证放在最前面 │
│ ─────────────────────────────────────────────────────── │
│ io.use(authMiddleware); │
│ io.use(loggingMiddleware); │
│ │
│ 2. 错误信息不要泄露敏感信息 │
│ ─────────────────────────────────────────────────────── │
│ return next(new Error('认证失败')); │
│ 而非: return next(new Error(`Token ${token} 无效`)); │
│ │
│ 3. 使用类型化的错误 │
│ ─────────────────────────────────────────────────────── │
│ class AuthError extends Error { │
│ constructor(message) { │
│ super(message); │
│ this.name = 'AuthError'; │
│ } │
│ } │
│ │
│ 4. 记录所有认证失败 │
│ ─────────────────────────────────────────────────────── │
│ 用于安全审计和入侵检测 │
│ │
│ 5. 速率限制保护 │
│ ─────────────────────────────────────────────────────── │
│ 防止消息轰炸和资源滥用 │
│ │
│ 6. 清理资源 │
│ ─────────────────────────────────────────────────────── │
│ 在 disconnect 事件中清理中间件创建的资源 │
│ │
└─────────────────────────────────────────────────────────────┘
下一步 #
最后更新:2026-03-29