Socket.IO 高级特性 #
集群部署 #
为什么需要集群? #
text
┌─────────────────────────────────────────────────────────────┐
│ 单服务器 vs 集群 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 单服务器问题: │
│ ❌ 单点故障:服务器宕机,服务完全不可用 │
│ ❌ 性能瓶颈:单机连接数有限 │
│ ❌ 无法扩展:无法应对流量增长 │
│ │
│ 集群优势: │
│ ✅ 高可用:一台服务器故障,其他继续服务 │
│ ✅ 负载均衡:分散连接,提高性能 │
│ ✅ 可扩展:按需添加服务器 │
│ ✅ 零停机:滚动更新,服务不中断 │
│ │
└─────────────────────────────────────────────────────────────┘
Node.js 集群模式 #
javascript
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const cpuCount = os.cpus().length;
for (let i = 0; i < cpuCount; i++) {
cluster.fork();
}
cluster.on('exit', (worker) => {
console.log(`Worker ${worker.id} 已退出`);
cluster.fork();
});
} else {
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/cluster-adapter');
const { setupMaster, setupWorker } = require('@socket.io/sticky');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer);
io.adapter(createAdapter());
setupWorker(io);
io.on('connection', (socket) => {
console.log(`Worker ${process.pid}: 用户连接 ${socket.id}`);
});
httpServer.listen(3001);
}
PM2 集群部署 #
javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/cluster-adapter');
const { setupWorker } = require('@socket.io/sticky');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer);
io.adapter(createAdapter());
setupWorker(io);
io.on('connection', (socket) => {
console.log('用户连接:', socket.id);
});
httpServer.listen(3001);
ecosystem.config.js:
javascript
module.exports = {
apps: [{
name: 'socket-server',
script: './server.js',
instances: 'max',
exec_mode: 'cluster',
env: {
NODE_ENV: 'production',
PORT: 3001
}
}]
};
Sticky Sessions #
text
┌─────────────────────────────────────────────────────────────┐
│ Sticky Sessions │
├─────────────────────────────────────────────────────────────┤
│ │
│ 为什么需要 Sticky Sessions? │
│ ───────────────────────────────────────────────────────── │
│ - WebSocket 握手需要多次 HTTP 请求 │
│ - 这些请求必须路由到同一服务器 │
│ - 否则握手会失败 │
│ │
│ 实现方式: │
│ ───────────────────────────────────────────────────────── │
│ 1. 基于 Cookie │
│ 2. 基于 IP Hash │
│ 3. 使用 @socket.io/sticky 模块 │
│ │
│ Nginx 配置: │
│ ───────────────────────────────────────────────────────── │
│ upstream socket_servers { │
│ ip_hash; │
│ server 127.0.0.1:3001; │
│ server 127.0.0.1:3002; │
│ server 127.0.0.1:3003; │
│ } │
│ │
└─────────────────────────────────────────────────────────────┘
Redis 适配器 #
安装配置 #
bash
npm install @socket.io/redis-adapter redis
javascript
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const io = new Server(httpServer, {
cors: { origin: '*' }
});
const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();
Promise.all([
pubClient.connect(),
subClient.connect()
]).then(() => {
io.adapter(createAdapter(pubClient, subClient));
console.log('Redis 适配器已连接');
});
io.on('connection', (socket) => {
socket.join('room-1');
io.to('room-1').emit('hello', 'world');
});
Redis 集群配置 #
javascript
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createCluster } = require('redis');
const io = new Server(httpServer);
const cluster = createCluster([
{ url: 'redis://node1:6379' },
{ url: 'redis://node2:6379' },
{ url: 'redis://node3:6379' }
]);
cluster.connect().then(() => {
const pubClient = cluster.masters[0].client;
const subClient = cluster.masters[0].client.duplicate();
io.adapter(createAdapter(pubClient, subClient));
});
跨服务器通信 #
text
┌─────────────────────────────────────────────────────────────┐
│ Redis 适配器架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Server 1 │ │ Server 2 │ │ Server 3 │ │
│ │ Socket.IO │ │ Socket.IO │ │ Socket.IO │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Redis │ │
│ │ Pub/Sub │ │
│ └─────────────────┘ │
│ │
│ 工作流程: │
│ 1. Server 1 收到消息 │
│ 2. 发布到 Redis 频道 │
│ 3. Server 2 和 Server 3 订阅并收到消息 │
│ 4. 各服务器推送给自己的客户端 │
│ │
└─────────────────────────────────────────────────────────────┘
性能优化 #
连接优化 #
javascript
const io = new Server(httpServer, {
pingInterval: 25000,
pingTimeout: 60000,
transports: ['websocket'],
maxHttpBufferSize: 1e5,
cors: {
origin: 'https://example.com',
methods: ['GET', 'POST']
}
});
消息优化 #
javascript
io.on('connection', (socket) => {
let updateQueue = [];
let isFlushing = false;
function queueUpdate(data) {
updateQueue.push(data);
if (!isFlushing) {
isFlushing = true;
setImmediate(flushUpdates);
}
}
function flushUpdates() {
if (updateQueue.length > 0) {
socket.emit('batch updates', updateQueue);
updateQueue = [];
}
isFlushing = false;
}
socket.on('position', (pos) => {
queueUpdate({ type: 'position', data: pos });
});
});
节流与防抖 #
javascript
function throttle(fn, delay) {
let lastCall = 0;
return function(...args) {
const now = Date.now();
if (now - lastCall >= delay) {
lastCall = now;
fn.apply(this, args);
}
};
}
function debounce(fn, delay) {
let timeoutId;
return function(...args) {
clearTimeout(timeoutId);
timeoutId = setTimeout(() => fn.apply(this, args), delay);
};
}
io.on('connection', (socket) => {
const throttledEmit = throttle((data) => {
socket.broadcast.emit('position', data);
}, 100);
socket.on('position', throttledEmit);
const debouncedSave = debounce((data) => {
saveToDatabase(data);
}, 1000);
socket.on('update', debouncedSave);
});
内存管理 #
javascript
const connectedUsers = new Map();
const roomCache = new Map();
io.on('connection', (socket) => {
connectedUsers.set(socket.id, {
id: socket.id,
joinedAt: Date.now()
});
socket.on('disconnect', () => {
connectedUsers.delete(socket.id);
socket.rooms.forEach(room => {
const roomData = roomCache.get(room);
if (roomData) {
roomData.users.delete(socket.id);
if (roomData.users.size === 0) {
roomCache.delete(room);
}
}
});
});
});
setInterval(() => {
const now = Date.now();
const maxAge = 3600000;
for (const [key, value] of roomCache) {
if (now - value.lastAccess > maxAge) {
roomCache.delete(key);
}
}
}, 60000);
监控与调试 #
连接状态监控 #
javascript
const stats = {
totalConnections: 0,
currentConnections: 0,
messagesReceived: 0,
messagesSent: 0
};
io.on('connection', (socket) => {
stats.totalConnections++;
stats.currentConnections++;
socket.on('disconnect', () => {
stats.currentConnections--;
});
socket.use((event, next) => {
stats.messagesReceived++;
next();
});
});
setInterval(() => {
console.log('统计信息:', stats);
}, 60000);
app.get('/stats', (req, res) => {
res.json(stats);
});
Prometheus 集成 #
javascript
const promClient = require('prom-client');
const register = new promClient.Registry();
const connectedSockets = new promClient.Gauge({
name: 'socket_io_connected',
help: '当前连接数',
registers: [register]
});
const messagesReceived = new promClient.Counter({
name: 'socket_io_messages_received_total',
help: '接收消息总数',
registers: [register]
});
const messagesSent = new promClient.Counter({
name: 'socket_io_messages_sent_total',
help: '发送消息总数',
registers: [register]
});
io.on('connection', (socket) => {
connectedSockets.inc();
socket.on('disconnect', () => {
connectedSockets.dec();
});
socket.use((event, next) => {
messagesReceived.inc();
next();
});
});
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.send(await register.metrics());
});
健康检查 #
javascript
app.get('/health', async (req, res) => {
const sockets = await io.fetchSockets();
res.json({
status: 'healthy',
uptime: process.uptime(),
connections: sockets.length,
memory: process.memoryUsage()
});
});
app.get('/ready', async (req, res) => {
try {
const sockets = await io.fetchSockets();
res.json({ ready: true });
} catch (err) {
res.status(503).json({ ready: false, error: err.message });
}
});
安全配置 #
CORS 配置 #
javascript
const io = new Server(httpServer, {
cors: {
origin: (origin, callback) => {
const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];
if (!origin || allowedOrigins.includes(origin)) {
callback(null, true);
} else {
callback(new Error('不允许的来源'));
}
},
methods: ['GET', 'POST'],
credentials: true
}
});
速率限制 #
javascript
const rateLimit = require('socket.io-rate-limiter');
const limiter = rateLimit({
windowMs: 60000,
max: 100,
message: '请求过于频繁,请稍后再试'
});
io.use(limiter);
io.on('connection', (socket) => {
const messageLimiter = {
count: 0,
resetTime: Date.now() + 60000
};
socket.use((event, next) => {
const now = Date.now();
if (now > messageLimiter.resetTime) {
messageLimiter.count = 0;
messageLimiter.resetTime = now + 60000;
}
if (event[0] === 'chat message') {
messageLimiter.count++;
if (messageLimiter.count > 10) {
return next(new Error('发送消息过于频繁'));
}
}
next();
});
});
输入验证 #
javascript
const Joi = require('joi');
const schemas = {
'chat message': Joi.object({
room: Joi.string().required(),
message: Joi.string().max(500).required()
}),
'join room': Joi.object({
roomId: Joi.string().required()
})
};
io.on('connection', (socket) => {
socket.use((event, next) => {
const [eventName, data] = event;
const schema = schemas[eventName];
if (schema) {
const { error } = schema.validate(data);
if (error) {
return next(new Error(`验证失败: ${error.message}`));
}
}
next();
});
});
完整示例:生产级配置 #
javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const promClient = require('prom-client');
const helmet = require('helmet');
const cors = require('cors');
const app = express();
app.use(helmet());
app.use(cors({
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true
}));
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
methods: ['GET', 'POST'],
credentials: true
},
pingInterval: 25000,
pingTimeout: 60000,
maxHttpBufferSize: 1e5
});
async function setupRedisAdapter() {
const pubClient = createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379'
});
const subClient = pubClient.duplicate();
await Promise.all([
pubClient.connect(),
subClient.connect()
]);
io.adapter(createAdapter(pubClient, subClient));
console.log('Redis 适配器已连接');
}
setupRedisAdapter().catch(console.error);
const register = new promClient.Registry();
promClient.collectDefaultMetrics({ register });
const connectedSockets = new promClient.Gauge({
name: 'socket_io_connected',
help: '当前连接数',
registers: [register]
});
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('需要认证'));
}
try {
socket.user = verifyToken(token);
next();
} catch (err) {
next(new Error('无效令牌'));
}
});
io.on('connection', (socket) => {
connectedSockets.inc();
console.log(`用户连接: ${socket.user.id}`);
socket.on('disconnect', () => {
connectedSockets.dec();
console.log(`用户断开: ${socket.user.id}`);
});
});
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
uptime: process.uptime(),
timestamp: new Date().toISOString()
});
});
app.get('/ready', async (req, res) => {
try {
res.json({ ready: true });
} catch (err) {
res.status(503).json({ ready: false });
}
});
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.send(await register.metrics());
});
const PORT = process.env.PORT || 3001;
httpServer.listen(PORT, () => {
console.log(`服务器运行在端口 ${PORT}`);
});
process.on('SIGTERM', () => {
console.log('收到 SIGTERM,正在关闭...');
httpServer.close(() => {
console.log('服务器已关闭');
process.exit(0);
});
});
下一步 #
最后更新:2026-03-29