WebSocket 服务端实现 #

服务端架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                    WebSocket 服务端架构                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   应用层                             │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ 路由处理 │  │ 消息处理 │  │ 业务逻辑 │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   连接管理层                         │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ 连接池   │  │ 心跳检测 │  │ 状态管理 │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   WebSocket 层                       │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ 握手处理 │  │ 帧解析   │  │ 协议扩展 │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   HTTP 层                            │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ 协议升级 │  │ 认证授权 │  │ CORS    │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Node.js 实现 #

使用 ws 库 #

javascript
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws, request) {
  const clientIp = request.socket.remoteAddress;
  console.log(`新客户端连接: ${clientIp}`);
  
  ws.on('message', function message(data) {
    console.log('收到消息:', data.toString());
    
    ws.send(`服务器收到: ${data}`);
  });
  
  ws.on('close', function close() {
    console.log('客户端断开连接');
  });
  
  ws.on('error', function error(error) {
    console.error('WebSocket 错误:', error);
  });
  
  ws.send('欢迎连接 WebSocket 服务器!');
});

console.log('WebSocket 服务器运行在 ws://localhost:8080');

使用 Express 集成 #

javascript
const express = require('express');
const http = require('http');
const WebSocket = require('ws');

const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

app.get('/', (req, res) => {
  res.send('WebSocket Server');
});

wss.on('connection', (ws, req) => {
  console.log('新连接');
  
  ws.on('message', (message) => {
    console.log('收到:', message.toString());
  });
});

server.listen(8080, () => {
  console.log('服务器运行在 http://localhost:8080');
});

广播消息 #

javascript
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

function broadcast(message, excludeClient = null) {
  wss.clients.forEach(function each(client) {
    if (client.readyState === WebSocket.OPEN && client !== excludeClient) {
      client.send(message);
    }
  });
}

wss.on('connection', function connection(ws) {
  ws.on('message', function message(data) {
    broadcast(data.toString(), ws);
  });
  
  ws.send('欢迎加入聊天室!');
});

房间管理 #

javascript
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

const rooms = new Map();

function joinRoom(ws, roomId) {
  if (!rooms.has(roomId)) {
    rooms.set(roomId, new Set());
  }
  rooms.get(roomId).add(ws);
  ws.roomId = roomId;
}

function leaveRoom(ws) {
  if (ws.roomId && rooms.has(ws.roomId)) {
    rooms.get(ws.roomId).delete(ws);
    if (rooms.get(ws.roomId).size === 0) {
      rooms.delete(ws.roomId);
    }
  }
}

function broadcastToRoom(roomId, message, excludeClient = null) {
  if (!rooms.has(roomId)) return;
  
  rooms.get(roomId).forEach(client => {
    if (client.readyState === WebSocket.OPEN && client !== excludeClient) {
      client.send(message);
    }
  });
}

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    try {
      const msg = JSON.parse(data);
      
      switch (msg.type) {
        case 'join':
          joinRoom(ws, msg.room);
          broadcastToRoom(msg.room, `${msg.user} 加入了房间`);
          break;
          
        case 'message':
          broadcastToRoom(ws.roomId, `${msg.user}: ${msg.content}`, ws);
          break;
          
        case 'leave':
          leaveRoom(ws);
          break;
      }
    } catch (e) {
      console.error('消息解析错误:', e);
    }
  });
  
  ws.on('close', () => {
    leaveRoom(ws);
  });
});

心跳检测 #

javascript
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

function heartbeat() {
  this.isAlive = true;
}

const interval = setInterval(function ping() {
  wss.clients.forEach(function each(ws) {
    if (ws.isAlive === false) {
      return ws.terminate();
    }
    
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

wss.on('connection', function connection(ws) {
  ws.isAlive = true;
  ws.on('pong', heartbeat);
  
  ws.on('message', function message(data) {
    console.log('收到消息:', data.toString());
  });
});

wss.on('close', function close() {
  clearInterval(interval);
});

使用 Socket.IO #

javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');

const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
  cors: {
    origin: '*',
    methods: ['GET', 'POST']
  }
});

io.on('connection', (socket) => {
  console.log('用户连接:', socket.id);
  
  socket.on('join', (room) => {
    socket.join(room);
    socket.to(room).emit('message', `用户 ${socket.id} 加入了房间`);
  });
  
  socket.on('chat', (room, message) => {
    io.to(room).emit('message', {
      user: socket.id,
      message: message
    });
  });
  
  socket.on('disconnect', () => {
    console.log('用户断开:', socket.id);
  });
});

httpServer.listen(8080);
text
┌─────────────────────────────────────────────────────────────┐
│                    Socket.IO vs ws                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Socket.IO 特性:                                           │
│  ─────────────────────────────────────────────────────────  │
│  ✅ 自动重连                                                │
│  ✅ 降级支持(WebSocket 不可用时使用轮询)                  │
│  ✅ 房间和命名空间                                          │
│  ✅ 二进制事件支持                                          │
│  ✅ 心跳机制内置                                            │
│  ✅ 广播和房间管理                                          │
│                                                             │
│  ws 特性:                                                  │
│  ─────────────────────────────────────────────────────────  │
│  ✅ 轻量级                                                  │
│  ✅ 原生 WebSocket 协议                                     │
│  ✅ 更高性能                                                │
│  ✅ 更低开销                                                │
│                                                             │
│  选择建议:                                                  │
│  ─────────────────────────────────────────────────────────  │
│  Socket.IO:需要降级支持、房间管理、快速开发                │
│  ws:追求性能、原生协议、自定义实现                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Python 实现 #

使用 websockets 库 #

python
import asyncio
import websockets
import json

connected_clients = set()

async def handler(websocket, path):
    connected_clients.add(websocket)
    print(f"客户端连接,当前连接数: {len(connected_clients)}")
    
    try:
        async for message in websocket:
            print(f"收到消息: {message}")
            
            try:
                data = json.loads(message)
                await process_message(websocket, data)
            except json.JSONDecodeError:
                await websocket.send(f"服务器收到: {message}")
                
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        connected_clients.remove(websocket)
        print(f"客户端断开,当前连接数: {len(connected_clients)}")

async def process_message(websocket, data):
    msg_type = data.get('type')
    
    if msg_type == 'broadcast':
        message = data.get('message', '')
        for client in connected_clients:
            if client != websocket:
                await client.send(message)
    elif msg_type == 'echo':
        await websocket.send(data.get('message', ''))

async def main():
    async with websockets.serve(handler, 'localhost', 8080):
        print("WebSocket 服务器运行在 ws://localhost:8080")
        await asyncio.Future()

if __name__ == '__main__':
    asyncio.run(main())

使用 FastAPI #

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str, exclude: WebSocket = None):
        for connection in self.active_connections:
            if connection != exclude:
                await connection.send_text(message)

manager = ConnectionManager()

@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            
            if message.get('type') == 'broadcast':
                await manager.broadcast(
                    message.get('content'),
                    exclude=websocket
                )
            else:
                await manager.send_personal_message(
                    f"收到: {data}",
                    websocket
                )
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast("用户离开聊天室")

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8080)

使用 Django Channels #

python
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
    
    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )
    
    async def chat_message(self, event):
        message = event['message']
        
        await self.send(text_data=json.dumps({
            'message': message
        }))

Go 实现 #

使用 gorilla/websocket #

go
package main

import (
    "log"
    "net/http"
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("升级失败:", err)
        return
    }
    defer conn.Close()
    
    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println("读取消息失败:", err)
            break
        }
        
        log.Printf("收到消息: %s", message)
        
        err = conn.WriteMessage(messageType, message)
        if err != nil {
            log.Println("发送消息失败:", err)
            break
        }
    }
}

func main() {
    http.HandleFunc("/ws", handleWebSocket)
    log.Println("WebSocket 服务器运行在 :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

聊天室实现 #

go
package main

import (
    "encoding/json"
    "log"
    "net/http"
    "sync"
    
    "github.com/gorilla/websocket"
)

type Client struct {
    conn *websocket.Conn
    send chan []byte
    room *Room
}

type Room struct {
    name    string
    clients map[*Client]bool
    join    chan *Client
    leave   chan *Client
    message chan []byte
    mutex   sync.RWMutex
}

type Message struct {
    Type    string `json:"type"`
    Content string `json:"content"`
    User    string `json:"user"`
}

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

func NewRoom(name string) *Room {
    return &Room{
        name:    name,
        clients: make(map[*Client]bool),
        join:    make(chan *Client),
        leave:   make(chan *Client),
        message: make(chan []byte),
    }
}

func (r *Room) Run() {
    for {
        select {
        case client := <-r.join:
            r.mutex.Lock()
            r.clients[client] = true
            r.mutex.Unlock()
            
        case client := <-r.leave:
            r.mutex.Lock()
            delete(r.clients, client)
            close(client.send)
            r.mutex.Unlock()
            
        case message := <-r.message:
            r.mutex.RLock()
            for client := range r.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(r.clients, client)
                }
            }
            r.mutex.RUnlock()
        }
    }
}

func (c *Client) readPump() {
    defer func() {
        c.room.leave <- c
        c.conn.Close()
    }()
    
    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            break
        }
        
        var msg Message
        if err := json.Unmarshal(message, &msg); err == nil {
            c.room.message <- message
        }
    }
}

func (c *Client) writePump() {
    defer c.conn.Close()
    
    for message := range c.send {
        if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
            break
        }
    }
}

func main() {
    room := NewRoom("general")
    go room.Run()
    
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            return
        }
        
        client := &Client{
            conn: conn,
            send: make(chan []byte, 256),
            room: room,
        }
        
        room.join <- client
        
        go client.writePump()
        go client.readPump()
    })
    
    log.Println("服务器运行在 :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Java 实现 #

使用 Spring WebSocket #

java
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ChatHandler(), "/ws")
                .setAllowedOrigins("*");
    }
}

@Component
public class ChatHandler extends TextWebSocketHandler {
    
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.put(session.getId(), session);
        broadcast("用户 " + session.getId() + " 加入聊天室");
    }
    
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        broadcast("用户 " + session.getId() + ": " + payload);
    }
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session.getId());
        broadcast("用户 " + session.getId() + " 离开聊天室");
    }
    
    private void broadcast(String message) throws Exception {
        for (WebSocketSession session : sessions.values()) {
            if (session.isOpen()) {
                session.sendMessage(new TextMessage(message));
            }
        }
    }
}

使用 Spring WebSocket STOMP #

java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }
}

@Controller
public class ChatController {
    
    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public ChatMessage send(ChatMessage message) {
        return message;
    }
    
    @MessageMapping("/chat/{roomId}")
    @SendTo("/topic/room/{roomId}")
    public ChatMessage sendToRoom(@DestinationVariable String roomId, ChatMessage message) {
        return message;
    }
}

public class ChatMessage {
    private String user;
    private String content;
    
    public String getUser() { return user; }
    public void setUser(String user) { this.user = user; }
    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
}
text
┌─────────────────────────────────────────────────────────────┐
│                    STOMP 协议说明                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  STOMP(Simple Text Oriented Messaging Protocol)           │
│  是一种简单的消息协议,用于在 WebSocket 上传输消息           │
│                                                             │
│  帧格式:                                                    │
│  ─────────────────────────────────────────────────────────  │
│  COMMAND                                                    │
│  header1:value1                                             │
│  header2:value2                                             │
│                                                             │
│  body^@                                                     │
│                                                             │
│  命令类型:                                                  │
│  ─────────────────────────────────────────────────────────  │
│  CONNECT     建立连接                                       │
│  SEND        发送消息                                       │
│  SUBSCRIBE   订阅主题                                       │
│  UNSUBSCRIBE 取消订阅                                       │
│  MESSAGE     接收消息                                       │
│  ERROR       错误                                           │
│  DISCONNECT  断开连接                                       │
│                                                             │
│  优势:                                                     │
│  ✅ 发布/订阅模式                                           │
│  ✅ 消息路由                                                 │
│  ✅ 错误处理                                                 │
│  ✅ 与消息队列集成                                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

连接管理 #

连接池设计 #

javascript
class ConnectionPool {
  constructor() {
    this.connections = new Map();
    this.rooms = new Map();
  }
  
  add(id, ws, metadata = {}) {
    this.connections.set(id, {
      ws,
      metadata,
      rooms: new Set(),
      lastActivity: Date.now()
    });
  }
  
  remove(id) {
    const connection = this.connections.get(id);
    if (connection) {
      connection.rooms.forEach(room => {
        this.leaveRoom(id, room);
      });
      this.connections.delete(id);
    }
  }
  
  get(id) {
    return this.connections.get(id);
  }
  
  joinRoom(id, room) {
    const connection = this.connections.get(id);
    if (connection) {
      connection.rooms.add(room);
      
      if (!this.rooms.has(room)) {
        this.rooms.set(room, new Set());
      }
      this.rooms.get(room).add(id);
    }
  }
  
  leaveRoom(id, room) {
    const connection = this.connections.get(id);
    if (connection && connection.rooms.has(room)) {
      connection.rooms.delete(room);
      
      if (this.rooms.has(room)) {
        this.rooms.get(room).delete(id);
        if (this.rooms.get(room).size === 0) {
          this.rooms.delete(room);
        }
      }
    }
  }
  
  getRoomMembers(room) {
    const members = [];
    if (this.rooms.has(room)) {
      this.rooms.get(room).forEach(id => {
        members.push(this.connections.get(id));
      });
    }
    return members;
  }
  
  broadcast(room, message, excludeId = null) {
    if (!this.rooms.has(room)) return;
    
    this.rooms.get(room).forEach(id => {
      if (id !== excludeId) {
        const connection = this.connections.get(id);
        if (connection && connection.ws.readyState === WebSocket.OPEN) {
          connection.ws.send(message);
        }
      }
    });
  }
  
  updateActivity(id) {
    const connection = this.connections.get(id);
    if (connection) {
      connection.lastActivity = Date.now();
    }
  }
  
  getInactiveConnections(timeout = 300000) {
    const now = Date.now();
    const inactive = [];
    
    this.connections.forEach((connection, id) => {
      if (now - connection.lastActivity > timeout) {
        inactive.push(id);
      }
    });
    
    return inactive;
  }
}

认证中间件 #

javascript
const WebSocket = require('ws');
const jwt = require('jsonwebtoken');

const wss = new WebSocket.Server({ 
  port: 8080,
  verifyClient: (info, callback) => {
    const token = info.req.headers['sec-websocket-protocol'];
    
    if (!token) {
      callback(false, 401, '未提供认证令牌');
      return;
    }
    
    try {
      const decoded = jwt.verify(token, process.env.JWT_SECRET);
      info.req.user = decoded;
      callback(true);
    } catch (error) {
      callback(false, 401, '无效的认证令牌');
    }
  }
});

wss.on('connection', (ws, request) => {
  const user = request.user;
  console.log(`用户 ${user.id} 已连接`);
  
  ws.user = user;
  
  ws.on('message', (data) => {
    console.log(`用户 ${user.id} 发送消息:`, data.toString());
  });
});

下一步 #

现在你已经掌握了 WebSocket 服务端实现,接下来学习 客户端实现,了解如何在不同平台实现 WebSocket 客户端!

最后更新:2026-03-29