Socket.IO 高级特性 #

集群部署 #

为什么需要集群? #

text
┌─────────────────────────────────────────────────────────────┐
│                    单服务器 vs 集群                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  单服务器问题:                                              │
│  ❌ 单点故障:服务器宕机,服务完全不可用                    │
│  ❌ 性能瓶颈:单机连接数有限                                │
│  ❌ 无法扩展:无法应对流量增长                              │
│                                                             │
│  集群优势:                                                  │
│  ✅ 高可用:一台服务器故障,其他继续服务                    │
│  ✅ 负载均衡:分散连接,提高性能                            │
│  ✅ 可扩展:按需添加服务器                                  │
│  ✅ 零停机:滚动更新,服务不中断                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Node.js 集群模式 #

javascript
const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
  const cpuCount = os.cpus().length;
  
  for (let i = 0; i < cpuCount; i++) {
    cluster.fork();
  }
  
  cluster.on('exit', (worker) => {
    console.log(`Worker ${worker.id} 已退出`);
    cluster.fork();
  });
} else {
  const express = require('express');
  const { createServer } = require('http');
  const { Server } = require('socket.io');
  const { createAdapter } = require('@socket.io/cluster-adapter');
  const { setupMaster, setupWorker } = require('@socket.io/sticky');
  
  const app = express();
  const httpServer = createServer(app);
  
  const io = new Server(httpServer);
  io.adapter(createAdapter());
  setupWorker(io);
  
  io.on('connection', (socket) => {
    console.log(`Worker ${process.pid}: 用户连接 ${socket.id}`);
  });
  
  httpServer.listen(3001);
}

PM2 集群部署 #

javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/cluster-adapter');
const { setupWorker } = require('@socket.io/sticky');

const app = express();
const httpServer = createServer(app);

const io = new Server(httpServer);
io.adapter(createAdapter());
setupWorker(io);

io.on('connection', (socket) => {
  console.log('用户连接:', socket.id);
});

httpServer.listen(3001);

ecosystem.config.js:

javascript
module.exports = {
  apps: [{
    name: 'socket-server',
    script: './server.js',
    instances: 'max',
    exec_mode: 'cluster',
    env: {
      NODE_ENV: 'production',
      PORT: 3001
    }
  }]
};

Sticky Sessions #

text
┌─────────────────────────────────────────────────────────────┐
│                    Sticky Sessions                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  为什么需要 Sticky Sessions?                                │
│  ─────────────────────────────────────────────────────────  │
│  - WebSocket 握手需要多次 HTTP 请求                        │
│  - 这些请求必须路由到同一服务器                            │
│  - 否则握手会失败                                          │
│                                                             │
│  实现方式:                                                  │
│  ─────────────────────────────────────────────────────────  │
│  1. 基于 Cookie                                            │
│  2. 基于 IP Hash                                           │
│  3. 使用 @socket.io/sticky 模块                            │
│                                                             │
│  Nginx 配置:                                               │
│  ─────────────────────────────────────────────────────────  │
│  upstream socket_servers {                                  │
│    ip_hash;                                                 │
│    server 127.0.0.1:3001;                                   │
│    server 127.0.0.1:3002;                                   │
│    server 127.0.0.1:3003;                                   │
│  }                                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Redis 适配器 #

安装配置 #

bash
npm install @socket.io/redis-adapter redis
javascript
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const io = new Server(httpServer, {
  cors: { origin: '*' }
});

const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();

Promise.all([
  pubClient.connect(),
  subClient.connect()
]).then(() => {
  io.adapter(createAdapter(pubClient, subClient));
  console.log('Redis 适配器已连接');
});

io.on('connection', (socket) => {
  socket.join('room-1');
  io.to('room-1').emit('hello', 'world');
});

Redis 集群配置 #

javascript
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createCluster } = require('redis');

const io = new Server(httpServer);

const cluster = createCluster([
  { url: 'redis://node1:6379' },
  { url: 'redis://node2:6379' },
  { url: 'redis://node3:6379' }
]);

cluster.connect().then(() => {
  const pubClient = cluster.masters[0].client;
  const subClient = cluster.masters[0].client.duplicate();
  
  io.adapter(createAdapter(pubClient, subClient));
});

跨服务器通信 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Redis 适配器架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐   │
│   │  Server 1   │    │  Server 2   │    │  Server 3   │   │
│   │  Socket.IO  │    │  Socket.IO  │    │  Socket.IO  │   │
│   └──────┬──────┘    └──────┬──────┘    └──────┬──────┘   │
│          │                  │                  │           │
│          └──────────────────┼──────────────────┘           │
│                             │                              │
│                    ┌────────▼────────┐                     │
│                    │     Redis       │                     │
│                    │   Pub/Sub       │                     │
│                    └─────────────────┘                     │
│                                                             │
│  工作流程:                                                  │
│  1. Server 1 收到消息                                       │
│  2. 发布到 Redis 频道                                       │
│  3. Server 2 和 Server 3 订阅并收到消息                    │
│  4. 各服务器推送给自己的客户端                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

性能优化 #

连接优化 #

javascript
const io = new Server(httpServer, {
  pingInterval: 25000,
  pingTimeout: 60000,
  
  transports: ['websocket'],
  
  maxHttpBufferSize: 1e5,
  
  cors: {
    origin: 'https://example.com',
    methods: ['GET', 'POST']
  }
});

消息优化 #

javascript
io.on('connection', (socket) => {
  let updateQueue = [];
  let isFlushing = false;
  
  function queueUpdate(data) {
    updateQueue.push(data);
    
    if (!isFlushing) {
      isFlushing = true;
      setImmediate(flushUpdates);
    }
  }
  
  function flushUpdates() {
    if (updateQueue.length > 0) {
      socket.emit('batch updates', updateQueue);
      updateQueue = [];
    }
    isFlushing = false;
  }
  
  socket.on('position', (pos) => {
    queueUpdate({ type: 'position', data: pos });
  });
});

节流与防抖 #

javascript
function throttle(fn, delay) {
  let lastCall = 0;
  return function(...args) {
    const now = Date.now();
    if (now - lastCall >= delay) {
      lastCall = now;
      fn.apply(this, args);
    }
  };
}

function debounce(fn, delay) {
  let timeoutId;
  return function(...args) {
    clearTimeout(timeoutId);
    timeoutId = setTimeout(() => fn.apply(this, args), delay);
  };
}

io.on('connection', (socket) => {
  const throttledEmit = throttle((data) => {
    socket.broadcast.emit('position', data);
  }, 100);
  
  socket.on('position', throttledEmit);
  
  const debouncedSave = debounce((data) => {
    saveToDatabase(data);
  }, 1000);
  
  socket.on('update', debouncedSave);
});

内存管理 #

javascript
const connectedUsers = new Map();
const roomCache = new Map();

io.on('connection', (socket) => {
  connectedUsers.set(socket.id, {
    id: socket.id,
    joinedAt: Date.now()
  });
  
  socket.on('disconnect', () => {
    connectedUsers.delete(socket.id);
    
    socket.rooms.forEach(room => {
      const roomData = roomCache.get(room);
      if (roomData) {
        roomData.users.delete(socket.id);
        if (roomData.users.size === 0) {
          roomCache.delete(room);
        }
      }
    });
  });
});

setInterval(() => {
  const now = Date.now();
  const maxAge = 3600000;
  
  for (const [key, value] of roomCache) {
    if (now - value.lastAccess > maxAge) {
      roomCache.delete(key);
    }
  }
}, 60000);

监控与调试 #

连接状态监控 #

javascript
const stats = {
  totalConnections: 0,
  currentConnections: 0,
  messagesReceived: 0,
  messagesSent: 0
};

io.on('connection', (socket) => {
  stats.totalConnections++;
  stats.currentConnections++;
  
  socket.on('disconnect', () => {
    stats.currentConnections--;
  });
  
  socket.use((event, next) => {
    stats.messagesReceived++;
    next();
  });
});

setInterval(() => {
  console.log('统计信息:', stats);
}, 60000);

app.get('/stats', (req, res) => {
  res.json(stats);
});

Prometheus 集成 #

javascript
const promClient = require('prom-client');

const register = new promClient.Registry();

const connectedSockets = new promClient.Gauge({
  name: 'socket_io_connected',
  help: '当前连接数',
  registers: [register]
});

const messagesReceived = new promClient.Counter({
  name: 'socket_io_messages_received_total',
  help: '接收消息总数',
  registers: [register]
});

const messagesSent = new promClient.Counter({
  name: 'socket_io_messages_sent_total',
  help: '发送消息总数',
  registers: [register]
});

io.on('connection', (socket) => {
  connectedSockets.inc();
  
  socket.on('disconnect', () => {
    connectedSockets.dec();
  });
  
  socket.use((event, next) => {
    messagesReceived.inc();
    next();
  });
});

app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.send(await register.metrics());
});

健康检查 #

javascript
app.get('/health', async (req, res) => {
  const sockets = await io.fetchSockets();
  
  res.json({
    status: 'healthy',
    uptime: process.uptime(),
    connections: sockets.length,
    memory: process.memoryUsage()
  });
});

app.get('/ready', async (req, res) => {
  try {
    const sockets = await io.fetchSockets();
    res.json({ ready: true });
  } catch (err) {
    res.status(503).json({ ready: false, error: err.message });
  }
});

安全配置 #

CORS 配置 #

javascript
const io = new Server(httpServer, {
  cors: {
    origin: (origin, callback) => {
      const allowedOrigins = [
        'https://example.com',
        'https://app.example.com'
      ];
      
      if (!origin || allowedOrigins.includes(origin)) {
        callback(null, true);
      } else {
        callback(new Error('不允许的来源'));
      }
    },
    methods: ['GET', 'POST'],
    credentials: true
  }
});

速率限制 #

javascript
const rateLimit = require('socket.io-rate-limiter');

const limiter = rateLimit({
  windowMs: 60000,
  max: 100,
  message: '请求过于频繁,请稍后再试'
});

io.use(limiter);

io.on('connection', (socket) => {
  const messageLimiter = {
    count: 0,
    resetTime: Date.now() + 60000
  };
  
  socket.use((event, next) => {
    const now = Date.now();
    
    if (now > messageLimiter.resetTime) {
      messageLimiter.count = 0;
      messageLimiter.resetTime = now + 60000;
    }
    
    if (event[0] === 'chat message') {
      messageLimiter.count++;
      
      if (messageLimiter.count > 10) {
        return next(new Error('发送消息过于频繁'));
      }
    }
    
    next();
  });
});

输入验证 #

javascript
const Joi = require('joi');

const schemas = {
  'chat message': Joi.object({
    room: Joi.string().required(),
    message: Joi.string().max(500).required()
  }),
  'join room': Joi.object({
    roomId: Joi.string().required()
  })
};

io.on('connection', (socket) => {
  socket.use((event, next) => {
    const [eventName, data] = event;
    const schema = schemas[eventName];
    
    if (schema) {
      const { error } = schema.validate(data);
      
      if (error) {
        return next(new Error(`验证失败: ${error.message}`));
      }
    }
    
    next();
  });
});

完整示例:生产级配置 #

javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const promClient = require('prom-client');
const helmet = require('helmet');
const cors = require('cors');

const app = express();

app.use(helmet());
app.use(cors({
  origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
  credentials: true
}));

const httpServer = createServer(app);

const io = new Server(httpServer, {
  cors: {
    origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
    methods: ['GET', 'POST'],
    credentials: true
  },
  pingInterval: 25000,
  pingTimeout: 60000,
  maxHttpBufferSize: 1e5
});

async function setupRedisAdapter() {
  const pubClient = createClient({
    url: process.env.REDIS_URL || 'redis://localhost:6379'
  });
  const subClient = pubClient.duplicate();
  
  await Promise.all([
    pubClient.connect(),
    subClient.connect()
  ]);
  
  io.adapter(createAdapter(pubClient, subClient));
  console.log('Redis 适配器已连接');
}

setupRedisAdapter().catch(console.error);

const register = new promClient.Registry();
promClient.collectDefaultMetrics({ register });

const connectedSockets = new promClient.Gauge({
  name: 'socket_io_connected',
  help: '当前连接数',
  registers: [register]
});

io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  
  if (!token) {
    return next(new Error('需要认证'));
  }
  
  try {
    socket.user = verifyToken(token);
    next();
  } catch (err) {
    next(new Error('无效令牌'));
  }
});

io.on('connection', (socket) => {
  connectedSockets.inc();
  
  console.log(`用户连接: ${socket.user.id}`);
  
  socket.on('disconnect', () => {
    connectedSockets.dec();
    console.log(`用户断开: ${socket.user.id}`);
  });
});

app.get('/health', (req, res) => {
  res.json({
    status: 'healthy',
    uptime: process.uptime(),
    timestamp: new Date().toISOString()
  });
});

app.get('/ready', async (req, res) => {
  try {
    res.json({ ready: true });
  } catch (err) {
    res.status(503).json({ ready: false });
  }
});

app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.send(await register.metrics());
});

const PORT = process.env.PORT || 3001;
httpServer.listen(PORT, () => {
  console.log(`服务器运行在端口 ${PORT}`);
});

process.on('SIGTERM', () => {
  console.log('收到 SIGTERM,正在关闭...');
  httpServer.close(() => {
    console.log('服务器已关闭');
    process.exit(0);
  });
});

下一步 #

现在你已经掌握了 Socket.IO 的高级特性,接下来学习 最佳实践,了解生产环境中的最佳实践和常见问题解决方案!

最后更新:2026-03-29