WebSocket 高级主题 #
集群方案 #
单机 vs 集群 #
text
┌─────────────────────────────────────────────────────────────┐
│ 单机 vs 集群架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 单机架构: │
│ ───────────────────────────────────────────────────────── │
│ ┌─────────────────────────────────────────┐ │
│ │ WebSocket 服务器 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ C1 │ │ C2 │ │ C3 │ │ C4 │ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────────┘ │
│ │
│ 问题: │
│ ❌ 单点故障 │
│ ❌ 连接数受限 │
│ ❌ 无法水平扩展 │
│ │
│ 集群架构: │
│ ───────────────────────────────────────────────────────── │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 负载均衡器 │ │
│ └───────────────────┬─────────────────────────────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Server1 │ │ Server2 │ │ Server3 │ │
│ │ C1, C2 │ │ C3, C4 │ │ C5, C6 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 消息中间件 │ │
│ │ (Redis Pub/Sub)│ │
│ └─────────────────┘ │
│ │
│ 优势: │
│ ✅ 高可用 │
│ ✅ 水平扩展 │
│ ✅ 负载分担 │
│ │
└─────────────────────────────────────────────────────────────┘
Redis Pub/Sub 集群 #
javascript
const WebSocket = require('ws');
const Redis = require('ioredis');
class WebSocketCluster {
constructor(port) {
this.port = port;
this.wss = null;
this.subscriber = new Redis();
this.publisher = new Redis();
this.clients = new Map();
this.init();
}
init() {
this.wss = new WebSocket.Server({ port: this.port });
this.wss.on('connection', (ws, request) => {
const clientId = this.generateClientId();
ws.id = clientId;
this.clients.set(clientId, ws);
ws.on('message', (data) => {
this.handleMessage(ws, data);
});
ws.on('close', () => {
this.clients.delete(clientId);
});
});
this.subscriber.subscribe('ws:broadcast', 'ws:room:*');
this.subscriber.on('message', (channel, message) => {
this.handleRedisMessage(channel, message);
});
console.log(`WebSocket 服务器运行在端口 ${this.port}`);
}
handleMessage(ws, data) {
try {
const msg = JSON.parse(data);
switch (msg.type) {
case 'broadcast':
this.publisher.publish('ws:broadcast', JSON.stringify({
from: ws.id,
data: msg.data
}));
break;
case 'room':
this.publisher.publish(`ws:room:${msg.room}`, JSON.stringify({
from: ws.id,
data: msg.data
}));
break;
case 'private':
this.publisher.publish('ws:private', JSON.stringify({
to: msg.to,
data: msg.data
}));
break;
}
} catch (error) {
console.error('消息处理错误:', error);
}
}
handleRedisMessage(channel, message) {
const msg = JSON.parse(message);
if (channel === 'ws:broadcast') {
this.broadcast(msg.data, msg.from);
} else if (channel.startsWith('ws:room:')) {
const room = channel.replace('ws:room:', '');
this.broadcastToRoom(room, msg.data, msg.from);
} else if (channel === 'ws:private') {
this.sendToClient(msg.to, msg.data);
}
}
broadcast(data, excludeClient) {
const message = JSON.stringify(data);
this.clients.forEach((ws, id) => {
if (id !== excludeClient && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
broadcastToRoom(room, data, excludeClient) {
const message = JSON.stringify({ room, data });
this.clients.forEach((ws, id) => {
if (id !== excludeClient && ws.room === room && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
sendToClient(clientId, data) {
const ws = this.clients.get(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
generateClientId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
const server = new WebSocketCluster(8080);
使用 Redis Adapter (Socket.IO) #
javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const Redis = require('ioredis');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: '*',
methods: ['GET', 'POST']
}
});
const pubClient = new Redis({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
io.on('connection', (socket) => {
console.log('客户端连接:', socket.id);
socket.on('join', (room) => {
socket.join(room);
socket.to(room).emit('message', `用户 ${socket.id} 加入房间`);
});
socket.on('chat', (room, message) => {
io.to(room).emit('message', {
user: socket.id,
message: message
});
});
socket.on('disconnect', () => {
console.log('客户端断开:', socket.id);
});
});
httpServer.listen(8080);
负载均衡 #
Nginx 配置 #
nginx
upstream websocket_backend {
least_conn;
server 127.0.0.1:8081;
server 127.0.0.1:8082;
server 127.0.0.1:8083;
}
server {
listen 80;
server_name example.com;
location /ws {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
proxy_buffering off;
}
}
text
┌─────────────────────────────────────────────────────────────┐
│ 负载均衡策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 轮询(Round Robin) │
│ ───────────────────────────────────────────────────── │
│ 按顺序分配请求 │
│ 适合服务器性能相近的场景 │
│ │
│ 2. 最少连接(Least Connections) │
│ ───────────────────────────────────────────────────── │
│ 分配给连接数最少的服务器 │
│ 适合 WebSocket 长连接场景 │
│ │
│ 3. IP 哈希(IP Hash) │
│ ───────────────────────────────────────────────────── │
│ 根据客户端 IP 分配服务器 │
│ 确保同一客户端连接同一服务器 │
│ 但不适合 WebSocket 集群(需要跨服务器通信) │
│ │
│ 4. 一致性哈希(Consistent Hashing) │
│ ───────────────────────────────────────────────────── │
│ 根据特定键值分配服务器 │
│ 适合需要会话保持的场景 │
│ │
│ WebSocket 推荐配置: │
│ ───────────────────────────────────────────────────────── │
│ - 使用 least_conn 策略 │
│ - 配置健康检查 │
│ - 设置合理的超时时间 │
│ - 关闭代理缓冲 │
│ │
└─────────────────────────────────────────────────────────────┘
HAProxy 配置 #
text
frontend websocket_front
bind *:80
mode tcp
acl is_websocket hdr(Upgrade) -i websocket
use_backend websocket_back if is_websocket
backend websocket_back
mode tcp
balance leastconn
option tcp-check
tcp-check connect
server ws1 127.0.0.1:8081 check
server ws2 127.0.0.1:8082 check
server ws3 127.0.0.1:8083 check
timeout connect 60s
timeout client 3600s
timeout server 3600s
消息队列集成 #
RabbitMQ 集成 #
javascript
const WebSocket = require('ws');
const amqp = require('amqplib');
class WebSocketWithRabbitMQ {
constructor(wsPort, amqpUrl) {
this.wsPort = wsPort;
this.amqpUrl = amqpUrl;
this.wss = null;
this.channel = null;
this.clients = new Map();
this.init();
}
async init() {
await this.initRabbitMQ();
this.initWebSocket();
}
async initRabbitMQ() {
const connection = await amqp.connect(this.amqpUrl);
this.channel = await connection.createChannel();
await this.channel.assertExchange('websocket', 'topic', { durable: false });
const queue = await this.channel.assertQueue('', { exclusive: true });
await this.channel.bindQueue(queue.queue, 'websocket', 'broadcast.#');
await this.channel.bindQueue(queue.queue, 'websocket', 'room.#');
this.channel.consume(queue.queue, (msg) => {
const routingKey = msg.fields.routingKey;
const content = JSON.parse(msg.content.toString());
this.handleRabbitMQMessage(routingKey, content);
}, { noAck: true });
}
initWebSocket() {
this.wss = new WebSocket.Server({ port: this.wsPort });
this.wss.on('connection', (ws) => {
const clientId = this.generateClientId();
ws.id = clientId;
this.clients.set(clientId, ws);
ws.on('message', (data) => {
this.handleClientMessage(ws, data);
});
ws.on('close', () => {
this.clients.delete(clientId);
});
});
}
handleClientMessage(ws, data) {
const msg = JSON.parse(data);
if (msg.type === 'broadcast') {
this.channel.publish(
'websocket',
'broadcast.message',
Buffer.from(JSON.stringify({
from: ws.id,
data: msg.data
}))
);
} else if (msg.type === 'room') {
this.channel.publish(
'websocket',
`room.${msg.room}`,
Buffer.from(JSON.stringify({
from: ws.id,
data: msg.data
}))
);
}
}
handleRabbitMQMessage(routingKey, content) {
if (routingKey.startsWith('broadcast.')) {
this.broadcast(content);
} else if (routingKey.startsWith('room.')) {
const room = routingKey.replace('room.', '');
this.broadcastToRoom(room, content);
}
}
broadcast(data) {
const message = JSON.stringify(data);
this.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
broadcastToRoom(room, data) {
const message = JSON.stringify({ room, ...data });
this.clients.forEach((ws) => {
if (ws.room === room && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
generateClientId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
const server = new WebSocketWithRabbitMQ(8080, 'amqp://localhost');
Kafka 集成 #
javascript
const WebSocket = require('ws');
const { Kafka } = require('kafkajs');
class WebSocketWithKafka {
constructor(wsPort, kafkaBrokers) {
this.wsPort = wsPort;
this.kafka = new Kafka({
brokers: kafkaBrokers,
clientId: 'websocket-server'
});
this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({ groupId: 'websocket-group' });
this.wss = null;
this.clients = new Map();
this.init();
}
async init() {
await this.producer.connect();
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'websocket-broadcast', fromBeginning: false });
await this.consumer.subscribe({ topic: 'websocket-room', fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const content = JSON.parse(message.value.toString());
this.handleKafkaMessage(topic, content);
}
});
this.initWebSocket();
}
initWebSocket() {
this.wss = new WebSocket.Server({ port: this.wsPort });
this.wss.on('connection', (ws) => {
const clientId = this.generateClientId();
ws.id = clientId;
this.clients.set(clientId, ws);
ws.on('message', async (data) => {
const msg = JSON.parse(data);
if (msg.type === 'broadcast') {
await this.producer.send({
topic: 'websocket-broadcast',
messages: [{
value: JSON.stringify({
from: ws.id,
data: msg.data
})
}]
});
}
});
ws.on('close', () => {
this.clients.delete(clientId);
});
});
}
handleKafkaMessage(topic, content) {
if (topic === 'websocket-broadcast') {
this.broadcast(content);
}
}
broadcast(data) {
const message = JSON.stringify(data);
this.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
generateClientId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
const server = new WebSocketWithKafka(8080, ['localhost:9092']);
性能优化 #
连接数优化 #
javascript
const WebSocket = require('ws');
const http = require('http');
const server = http.createServer();
const wss = new WebSocket.Server({
server,
perMessageDeflate: false,
maxPayload: 1024 * 1024,
clientTracking: true,
backlog: 1024
});
wss.on('connection', (ws, request) => {
ws.on('message', (data) => {
handleMessage(ws, data);
});
});
server.listen(8080, () => {
console.log('服务器启动');
});
process.on('SIGINT', () => {
wss.close(() => {
process.exit(0);
});
});
text
┌─────────────────────────────────────────────────────────────┐
│ 连接数优化策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 系统参数调优 │
│ ───────────────────────────────────────────────────── │
│ # 增加文件描述符限制 │
│ ulimit -n 65535 │
│ │
│ # TCP 参数优化 │
│ net.core.somaxconn = 65535 │
│ net.ipv4.tcp_max_syn_backlog = 65535 │
│ net.ipv4.tcp_tw_reuse = 1 │
│ │
│ 2. WebSocket 配置优化 │
│ ───────────────────────────────────────────────────── │
│ perMessageDeflate: false 禁用压缩减少 CPU 开销 │
│ maxPayload: 限制消息大小防止内存溢出 │
│ clientTracking: true 启用客户端跟踪 │
│ │
│ 3. 内存优化 │
│ ───────────────────────────────────────────────────── │
│ - 使用 Buffer 池 │
│ - 及时清理断开的连接 │
│ - 限制消息队列长度 │
│ │
│ 4. 连接复用 │
│ ───────────────────────────────────────────────────── │
│ - 使用 HTTP/2 多路复用 │
│ - 合并多个 WebSocket 连接 │
│ │
└─────────────────────────────────────────────────────────────┘
消息处理优化 #
javascript
const WebSocket = require('ws');
class OptimizedWebSocketServer {
constructor(port) {
this.wss = new WebSocket.Server({ port });
this.messageQueue = [];
this.isProcessing = false;
this.batchSize = 100;
this.batchInterval = 10;
this.init();
}
init() {
this.wss.on('connection', (ws) => {
ws.on('message', (data) => {
this.messageQueue.push({ ws, data });
if (!this.isProcessing) {
this.processBatch();
}
});
});
setInterval(() => {
if (!this.isProcessing && this.messageQueue.length > 0) {
this.processBatch();
}
}, this.batchInterval);
}
async processBatch() {
if (this.messageQueue.length === 0) {
this.isProcessing = false;
return;
}
this.isProcessing = true;
const batch = this.messageQueue.splice(0, this.batchSize);
setImmediate(() => {
batch.forEach(({ ws, data }) => {
try {
this.handleMessage(ws, data);
} catch (error) {
console.error('消息处理错误:', error);
}
});
this.isProcessing = false;
if (this.messageQueue.length > 0) {
this.processBatch();
}
});
}
handleMessage(ws, data) {
const message = JSON.parse(data);
switch (message.type) {
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
case 'broadcast':
this.broadcast(message.data, ws);
break;
}
}
broadcast(data, excludeWs) {
const message = JSON.stringify(data);
this.wss.clients.forEach((client) => {
if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
}
const server = new OptimizedWebSocketServer(8080);
内存管理 #
javascript
class MemoryManager {
constructor(options = {}) {
this.maxConnections = options.maxConnections || 10000;
this.maxMemoryMB = options.maxMemoryMB || 512;
this.checkInterval = options.checkInterval || 60000;
this.connections = new Map();
this.startMonitoring();
}
addConnection(id, ws) {
if (this.connections.size >= this.maxConnections) {
this.evictOldConnections();
}
this.connections.set(id, {
ws,
createdAt: Date.now(),
lastActivity: Date.now()
});
}
removeConnection(id) {
this.connections.delete(id);
}
updateActivity(id) {
const connection = this.connections.get(id);
if (connection) {
connection.lastActivity = Date.now();
}
}
evictOldConnections() {
const sortedConnections = [...this.connections.entries()]
.sort((a, b) => a[1].lastActivity - b[1].lastActivity);
const toEvict = Math.floor(this.maxConnections * 0.1);
for (let i = 0; i < toEvict; i++) {
const [id, connection] = sortedConnections[i];
connection.ws.close(1001, 'Server overloaded');
this.connections.delete(id);
}
}
startMonitoring() {
setInterval(() => {
const memoryUsage = process.memoryUsage();
const memoryMB = memoryUsage.heapUsed / 1024 / 1024;
if (memoryMB > this.maxMemoryMB) {
console.warn(`内存使用过高: ${memoryMB.toFixed(2)}MB`);
this.cleanupInactiveConnections();
}
console.log(`连接数: ${this.connections.size}, 内存: ${memoryMB.toFixed(2)}MB`);
}, this.checkInterval);
}
cleanupInactiveConnections(timeout = 300000) {
const now = Date.now();
this.connections.forEach((connection, id) => {
if (now - connection.lastActivity > timeout) {
connection.ws.close(1001, 'Connection timeout');
this.connections.delete(id);
}
});
}
}
监控与告警 #
Prometheus 指标 #
javascript
const WebSocket = require('ws');
const client = require('prom-client');
const register = new client.Registry();
const connectedClients = new client.Gauge({
name: 'websocket_connected_clients',
help: '当前连接的客户端数量',
registers: [register]
});
const messagesReceived = new client.Counter({
name: 'websocket_messages_received_total',
help: '接收的消息总数',
registers: [register]
});
const messagesSent = new client.Counter({
name: 'websocket_messages_sent_total',
help: '发送的消息总数',
registers: [register]
});
const connectionDuration = new client.Histogram({
name: 'websocket_connection_duration_seconds',
help: '连接持续时间',
buckets: [1, 5, 10, 30, 60, 300, 600, 1800, 3600],
registers: [register]
});
const messageLatency = new client.Histogram({
name: 'websocket_message_latency_seconds',
help: '消息处理延迟',
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
registers: [register]
});
class MonitoredWebSocketServer {
constructor(port) {
this.wss = new WebSocket.Server({ port });
this.connectionTimes = new Map();
this.init();
}
init() {
this.wss.on('connection', (ws, request) => {
const connectionId = Date.now();
this.connectionTimes.set(ws, connectionId);
connectedClients.inc();
ws.on('message', (data) => {
const start = Date.now();
messagesReceived.inc();
this.handleMessage(ws, data);
const latency = (Date.now() - start) / 1000;
messageLatency.observe(latency);
});
ws.on('close', () => {
connectedClients.dec();
const connectionTime = this.connectionTimes.get(ws);
if (connectionTime) {
const duration = (Date.now() - connectionTime) / 1000;
connectionDuration.observe(duration);
this.connectionTimes.delete(ws);
}
});
});
}
handleMessage(ws, data) {
ws.send(data);
messagesSent.inc();
}
getMetrics() {
return register.metrics();
}
}
const server = new MonitoredWebSocketServer(8080);
const express = require('express');
const app = express();
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.send(await server.getMetrics());
});
app.listen(9090);
健康检查 #
javascript
class HealthChecker {
constructor(wss, options = {}) {
this.wss = wss;
this.checkInterval = options.checkInterval || 30000;
this.timeout = options.timeout || 5000;
this.healthyThreshold = options.healthyThreshold || 0.8;
this.isHealthy = true;
this.lastCheck = null;
this.startHealthCheck();
}
startHealthCheck() {
setInterval(() => {
this.performHealthCheck();
}, this.checkInterval);
}
async performHealthCheck() {
const checks = {
connections: this.checkConnections(),
memory: this.checkMemory(),
responsiveness: await this.checkResponsiveness()
};
const allHealthy = Object.values(checks).every(check => check.healthy);
this.isHealthy = allHealthy;
this.lastCheck = {
timestamp: Date.now(),
checks,
healthy: allHealthy
};
if (!allHealthy) {
console.warn('健康检查失败:', this.lastCheck);
}
}
checkConnections() {
const clientCount = this.wss.clients.size;
const maxConnections = 10000;
return {
healthy: clientCount < maxConnections * this.healthyThreshold,
count: clientCount,
max: maxConnections
};
}
checkMemory() {
const memoryUsage = process.memoryUsage();
const heapUsedMB = memoryUsage.heapUsed / 1024 / 1024;
const maxMemoryMB = 512;
return {
healthy: heapUsedMB < maxMemoryMB * this.healthyThreshold,
heapUsedMB: heapUsedMB.toFixed(2),
maxMemoryMB
};
}
async checkResponsiveness() {
return new Promise((resolve) => {
const start = Date.now();
const testWs = new WebSocket('ws://localhost:8080');
const timeout = setTimeout(() => {
testWs.close();
resolve({
healthy: false,
reason: 'Connection timeout'
});
}, this.timeout);
testWs.on('open', () => {
testWs.send(JSON.stringify({ type: 'ping' }));
});
testWs.on('message', () => {
clearTimeout(timeout);
testWs.close();
const latency = Date.now() - start;
resolve({
healthy: latency < 1000,
latency
});
});
testWs.on('error', (error) => {
clearTimeout(timeout);
resolve({
healthy: false,
error: error.message
});
});
});
}
getStatus() {
return {
healthy: this.isHealthy,
lastCheck: this.lastCheck
};
}
}
下一步 #
现在你已经掌握了 WebSocket 高级主题,接下来学习 安全实践,了解如何构建安全的 WebSocket 应用!
最后更新:2026-03-29