WebSocket 服务端实现 #
服务端架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ WebSocket 服务端架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 路由处理 │ │ 消息处理 │ │ 业务逻辑 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 连接管理层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 连接池 │ │ 心跳检测 │ │ 状态管理 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WebSocket 层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 握手处理 │ │ 帧解析 │ │ 协议扩展 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ HTTP 层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 协议升级 │ │ 认证授权 │ │ CORS │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Node.js 实现 #
使用 ws 库 #
javascript
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws, request) {
const clientIp = request.socket.remoteAddress;
console.log(`新客户端连接: ${clientIp}`);
ws.on('message', function message(data) {
console.log('收到消息:', data.toString());
ws.send(`服务器收到: ${data}`);
});
ws.on('close', function close() {
console.log('客户端断开连接');
});
ws.on('error', function error(error) {
console.error('WebSocket 错误:', error);
});
ws.send('欢迎连接 WebSocket 服务器!');
});
console.log('WebSocket 服务器运行在 ws://localhost:8080');
使用 Express 集成 #
javascript
const express = require('express');
const http = require('http');
const WebSocket = require('ws');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
app.get('/', (req, res) => {
res.send('WebSocket Server');
});
wss.on('connection', (ws, req) => {
console.log('新连接');
ws.on('message', (message) => {
console.log('收到:', message.toString());
});
});
server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});
广播消息 #
javascript
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
function broadcast(message, excludeClient = null) {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN && client !== excludeClient) {
client.send(message);
}
});
}
wss.on('connection', function connection(ws) {
ws.on('message', function message(data) {
broadcast(data.toString(), ws);
});
ws.send('欢迎加入聊天室!');
});
房间管理 #
javascript
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
const rooms = new Map();
function joinRoom(ws, roomId) {
if (!rooms.has(roomId)) {
rooms.set(roomId, new Set());
}
rooms.get(roomId).add(ws);
ws.roomId = roomId;
}
function leaveRoom(ws) {
if (ws.roomId && rooms.has(ws.roomId)) {
rooms.get(ws.roomId).delete(ws);
if (rooms.get(ws.roomId).size === 0) {
rooms.delete(ws.roomId);
}
}
}
function broadcastToRoom(roomId, message, excludeClient = null) {
if (!rooms.has(roomId)) return;
rooms.get(roomId).forEach(client => {
if (client.readyState === WebSocket.OPEN && client !== excludeClient) {
client.send(message);
}
});
}
wss.on('connection', (ws) => {
ws.on('message', (data) => {
try {
const msg = JSON.parse(data);
switch (msg.type) {
case 'join':
joinRoom(ws, msg.room);
broadcastToRoom(msg.room, `${msg.user} 加入了房间`);
break;
case 'message':
broadcastToRoom(ws.roomId, `${msg.user}: ${msg.content}`, ws);
break;
case 'leave':
leaveRoom(ws);
break;
}
} catch (e) {
console.error('消息解析错误:', e);
}
});
ws.on('close', () => {
leaveRoom(ws);
});
});
心跳检测 #
javascript
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
function heartbeat() {
this.isAlive = true;
}
const interval = setInterval(function ping() {
wss.clients.forEach(function each(ws) {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('connection', function connection(ws) {
ws.isAlive = true;
ws.on('pong', heartbeat);
ws.on('message', function message(data) {
console.log('收到消息:', data.toString());
});
});
wss.on('close', function close() {
clearInterval(interval);
});
使用 Socket.IO #
javascript
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: '*',
methods: ['GET', 'POST']
}
});
io.on('connection', (socket) => {
console.log('用户连接:', socket.id);
socket.on('join', (room) => {
socket.join(room);
socket.to(room).emit('message', `用户 ${socket.id} 加入了房间`);
});
socket.on('chat', (room, message) => {
io.to(room).emit('message', {
user: socket.id,
message: message
});
});
socket.on('disconnect', () => {
console.log('用户断开:', socket.id);
});
});
httpServer.listen(8080);
text
┌─────────────────────────────────────────────────────────────┐
│ Socket.IO vs ws │
├─────────────────────────────────────────────────────────────┤
│ │
│ Socket.IO 特性: │
│ ───────────────────────────────────────────────────────── │
│ ✅ 自动重连 │
│ ✅ 降级支持(WebSocket 不可用时使用轮询) │
│ ✅ 房间和命名空间 │
│ ✅ 二进制事件支持 │
│ ✅ 心跳机制内置 │
│ ✅ 广播和房间管理 │
│ │
│ ws 特性: │
│ ───────────────────────────────────────────────────────── │
│ ✅ 轻量级 │
│ ✅ 原生 WebSocket 协议 │
│ ✅ 更高性能 │
│ ✅ 更低开销 │
│ │
│ 选择建议: │
│ ───────────────────────────────────────────────────────── │
│ Socket.IO:需要降级支持、房间管理、快速开发 │
│ ws:追求性能、原生协议、自定义实现 │
│ │
└─────────────────────────────────────────────────────────────┘
Python 实现 #
使用 websockets 库 #
python
import asyncio
import websockets
import json
connected_clients = set()
async def handler(websocket, path):
connected_clients.add(websocket)
print(f"客户端连接,当前连接数: {len(connected_clients)}")
try:
async for message in websocket:
print(f"收到消息: {message}")
try:
data = json.loads(message)
await process_message(websocket, data)
except json.JSONDecodeError:
await websocket.send(f"服务器收到: {message}")
except websockets.exceptions.ConnectionClosed:
pass
finally:
connected_clients.remove(websocket)
print(f"客户端断开,当前连接数: {len(connected_clients)}")
async def process_message(websocket, data):
msg_type = data.get('type')
if msg_type == 'broadcast':
message = data.get('message', '')
for client in connected_clients:
if client != websocket:
await client.send(message)
elif msg_type == 'echo':
await websocket.send(data.get('message', ''))
async def main():
async with websockets.serve(handler, 'localhost', 8080):
print("WebSocket 服务器运行在 ws://localhost:8080")
await asyncio.Future()
if __name__ == '__main__':
asyncio.run(main())
使用 FastAPI #
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str, exclude: WebSocket = None):
for connection in self.active_connections:
if connection != exclude:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message.get('type') == 'broadcast':
await manager.broadcast(
message.get('content'),
exclude=websocket
)
else:
await manager.send_personal_message(
f"收到: {data}",
websocket
)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast("用户离开聊天室")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8080)
使用 Django Channels #
python
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
async def chat_message(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
Go 实现 #
使用 gorilla/websocket #
go
package main
import (
"log"
"net/http"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("升级失败:", err)
return
}
defer conn.Close()
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println("读取消息失败:", err)
break
}
log.Printf("收到消息: %s", message)
err = conn.WriteMessage(messageType, message)
if err != nil {
log.Println("发送消息失败:", err)
break
}
}
}
func main() {
http.HandleFunc("/ws", handleWebSocket)
log.Println("WebSocket 服务器运行在 :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
聊天室实现 #
go
package main
import (
"encoding/json"
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
type Client struct {
conn *websocket.Conn
send chan []byte
room *Room
}
type Room struct {
name string
clients map[*Client]bool
join chan *Client
leave chan *Client
message chan []byte
mutex sync.RWMutex
}
type Message struct {
Type string `json:"type"`
Content string `json:"content"`
User string `json:"user"`
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func NewRoom(name string) *Room {
return &Room{
name: name,
clients: make(map[*Client]bool),
join: make(chan *Client),
leave: make(chan *Client),
message: make(chan []byte),
}
}
func (r *Room) Run() {
for {
select {
case client := <-r.join:
r.mutex.Lock()
r.clients[client] = true
r.mutex.Unlock()
case client := <-r.leave:
r.mutex.Lock()
delete(r.clients, client)
close(client.send)
r.mutex.Unlock()
case message := <-r.message:
r.mutex.RLock()
for client := range r.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(r.clients, client)
}
}
r.mutex.RUnlock()
}
}
}
func (c *Client) readPump() {
defer func() {
c.room.leave <- c
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break
}
var msg Message
if err := json.Unmarshal(message, &msg); err == nil {
c.room.message <- message
}
}
}
func (c *Client) writePump() {
defer c.conn.Close()
for message := range c.send {
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
break
}
}
}
func main() {
room := NewRoom("general")
go room.Run()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
room: room,
}
room.join <- client
go client.writePump()
go client.readPump()
})
log.Println("服务器运行在 :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Java 实现 #
使用 Spring WebSocket #
java
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new ChatHandler(), "/ws")
.setAllowedOrigins("*");
}
}
@Component
public class ChatHandler extends TextWebSocketHandler {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.put(session.getId(), session);
broadcast("用户 " + session.getId() + " 加入聊天室");
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
broadcast("用户 " + session.getId() + ": " + payload);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session.getId());
broadcast("用户 " + session.getId() + " 离开聊天室");
}
private void broadcast(String message) throws Exception {
for (WebSocketSession session : sessions.values()) {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
}
}
使用 Spring WebSocket STOMP #
java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
@Controller
public class ChatController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
public ChatMessage send(ChatMessage message) {
return message;
}
@MessageMapping("/chat/{roomId}")
@SendTo("/topic/room/{roomId}")
public ChatMessage sendToRoom(@DestinationVariable String roomId, ChatMessage message) {
return message;
}
}
public class ChatMessage {
private String user;
private String content;
public String getUser() { return user; }
public void setUser(String user) { this.user = user; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
}
text
┌─────────────────────────────────────────────────────────────┐
│ STOMP 协议说明 │
├─────────────────────────────────────────────────────────────┤
│ │
│ STOMP(Simple Text Oriented Messaging Protocol) │
│ 是一种简单的消息协议,用于在 WebSocket 上传输消息 │
│ │
│ 帧格式: │
│ ───────────────────────────────────────────────────────── │
│ COMMAND │
│ header1:value1 │
│ header2:value2 │
│ │
│ body^@ │
│ │
│ 命令类型: │
│ ───────────────────────────────────────────────────────── │
│ CONNECT 建立连接 │
│ SEND 发送消息 │
│ SUBSCRIBE 订阅主题 │
│ UNSUBSCRIBE 取消订阅 │
│ MESSAGE 接收消息 │
│ ERROR 错误 │
│ DISCONNECT 断开连接 │
│ │
│ 优势: │
│ ✅ 发布/订阅模式 │
│ ✅ 消息路由 │
│ ✅ 错误处理 │
│ ✅ 与消息队列集成 │
│ │
└─────────────────────────────────────────────────────────────┘
连接管理 #
连接池设计 #
javascript
class ConnectionPool {
constructor() {
this.connections = new Map();
this.rooms = new Map();
}
add(id, ws, metadata = {}) {
this.connections.set(id, {
ws,
metadata,
rooms: new Set(),
lastActivity: Date.now()
});
}
remove(id) {
const connection = this.connections.get(id);
if (connection) {
connection.rooms.forEach(room => {
this.leaveRoom(id, room);
});
this.connections.delete(id);
}
}
get(id) {
return this.connections.get(id);
}
joinRoom(id, room) {
const connection = this.connections.get(id);
if (connection) {
connection.rooms.add(room);
if (!this.rooms.has(room)) {
this.rooms.set(room, new Set());
}
this.rooms.get(room).add(id);
}
}
leaveRoom(id, room) {
const connection = this.connections.get(id);
if (connection && connection.rooms.has(room)) {
connection.rooms.delete(room);
if (this.rooms.has(room)) {
this.rooms.get(room).delete(id);
if (this.rooms.get(room).size === 0) {
this.rooms.delete(room);
}
}
}
}
getRoomMembers(room) {
const members = [];
if (this.rooms.has(room)) {
this.rooms.get(room).forEach(id => {
members.push(this.connections.get(id));
});
}
return members;
}
broadcast(room, message, excludeId = null) {
if (!this.rooms.has(room)) return;
this.rooms.get(room).forEach(id => {
if (id !== excludeId) {
const connection = this.connections.get(id);
if (connection && connection.ws.readyState === WebSocket.OPEN) {
connection.ws.send(message);
}
}
});
}
updateActivity(id) {
const connection = this.connections.get(id);
if (connection) {
connection.lastActivity = Date.now();
}
}
getInactiveConnections(timeout = 300000) {
const now = Date.now();
const inactive = [];
this.connections.forEach((connection, id) => {
if (now - connection.lastActivity > timeout) {
inactive.push(id);
}
});
return inactive;
}
}
认证中间件 #
javascript
const WebSocket = require('ws');
const jwt = require('jsonwebtoken');
const wss = new WebSocket.Server({
port: 8080,
verifyClient: (info, callback) => {
const token = info.req.headers['sec-websocket-protocol'];
if (!token) {
callback(false, 401, '未提供认证令牌');
return;
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
info.req.user = decoded;
callback(true);
} catch (error) {
callback(false, 401, '无效的认证令牌');
}
}
});
wss.on('connection', (ws, request) => {
const user = request.user;
console.log(`用户 ${user.id} 已连接`);
ws.user = user;
ws.on('message', (data) => {
console.log(`用户 ${user.id} 发送消息:`, data.toString());
});
});
下一步 #
现在你已经掌握了 WebSocket 服务端实现,接下来学习 客户端实现,了解如何在不同平台实现 WebSocket 客户端!
最后更新:2026-03-29