WebSocket 客户端实现 #
客户端架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ WebSocket 客户端架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ UI 组件 │ │ 状态管理 │ │ 业务逻辑 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 封装层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 消息队列 │ │ 事件管理 │ │ 状态同步 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 连接管理层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 重连机制 │ │ 心跳检测 │ │ 认证授权 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WebSocket 层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 原生 API │ │ Socket.IO│ │ 封装库 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
浏览器客户端 #
基础封装类 #
javascript
class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnect: true,
reconnectInterval: 1000,
maxReconnectAttempts: 5,
heartbeatInterval: 30000,
...options
};
this.ws = null;
this.reconnectAttempts = 0;
this.heartbeatTimer = null;
this.messageQueue = [];
this.eventHandlers = new Map();
this.connect();
}
connect() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
return;
}
this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => {
console.log('WebSocket 连接成功');
this.reconnectAttempts = 0;
this.flushMessageQueue();
this.startHeartbeat();
this.emit('open', event);
};
this.ws.onmessage = (event) => {
this.emit('message', event.data);
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
this.emit('error', error);
};
this.ws.onclose = (event) => {
console.log('WebSocket 关闭:', event.code, event.reason);
this.stopHeartbeat();
this.emit('close', event);
if (this.options.reconnect && this.reconnectAttempts < this.options.maxReconnectAttempts) {
this.reconnect();
}
};
}
reconnect() {
this.reconnectAttempts++;
const delay = this.options.reconnectInterval * Math.pow(2, this.reconnectAttempts - 1);
console.log(`尝试重连 (${this.reconnectAttempts}/${this.options.maxReconnectAttempts})...`);
setTimeout(() => {
this.connect();
}, delay);
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
this.ws.send(message);
} else {
this.messageQueue.push(data);
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const data = this.messageQueue.shift();
this.send(data);
}
}
startHeartbeat() {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, this.options.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
off(event, handler) {
if (this.eventHandlers.has(event)) {
const handlers = this.eventHandlers.get(event);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
emit(event, data) {
if (this.eventHandlers.has(event)) {
this.eventHandlers.get(event).forEach(handler => handler(data));
}
}
close(code = 1000, reason = 'Normal closure') {
this.options.reconnect = false;
this.stopHeartbeat();
if (this.ws) {
this.ws.close(code, reason);
}
}
}
const client = new WebSocketClient('wss://example.com/ws', {
reconnect: true,
maxReconnectAttempts: 10
});
client.on('message', (data) => {
console.log('收到消息:', data);
});
client.send({ type: 'hello', content: 'World' });
使用 Socket.IO 客户端 #
javascript
import { io } from 'socket.io-client';
const socket = io('https://example.com', {
path: '/socket.io',
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionAttempts: 10,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
timeout: 20000,
auth: {
token: 'your-auth-token'
}
});
socket.on('connect', () => {
console.log('连接成功:', socket.id);
});
socket.on('disconnect', (reason) => {
console.log('断开连接:', reason);
});
socket.on('connect_error', (error) => {
console.error('连接错误:', error);
});
socket.emit('chat message', 'Hello World');
socket.on('chat message', (msg) => {
console.log('收到消息:', msg);
});
socket.emit('join', 'room1');
socket.on('room message', (data) => {
console.log('房间消息:', data);
});
React 集成 #
自定义 Hook #
javascript
import { useState, useEffect, useCallback, useRef } from 'react';
function useWebSocket(url, options = {}) {
const [readyState, setReadyState] = useState(WebSocket.CONNECTING);
const [lastMessage, setLastMessage] = useState(null);
const wsRef = useRef(null);
const reconnectTimeoutRef = useRef(null);
const reconnectAttemptsRef = useRef(0);
const {
reconnect = true,
reconnectInterval = 1000,
maxReconnectAttempts = 5,
onOpen,
onMessage,
onClose,
onError
} = options;
const connect = useCallback(() => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
return;
}
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = (event) => {
setReadyState(WebSocket.OPEN);
reconnectAttemptsRef.current = 0;
onOpen?.(event);
};
ws.onmessage = (event) => {
setLastMessage(event.data);
onMessage?.(event.data);
};
ws.onerror = (error) => {
onError?.(error);
};
ws.onclose = (event) => {
setReadyState(WebSocket.CLOSED);
onClose?.(event);
if (reconnect && reconnectAttemptsRef.current < maxReconnectAttempts) {
reconnectAttemptsRef.current++;
const delay = reconnectInterval * Math.pow(2, reconnectAttemptsRef.current - 1);
reconnectTimeoutRef.current = setTimeout(() => {
connect();
}, delay);
}
};
}, [url, reconnect, reconnectInterval, maxReconnectAttempts, onOpen, onMessage, onClose, onError]);
const sendMessage = useCallback((data) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
wsRef.current.send(message);
}
}, []);
const disconnect = useCallback(() => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
wsRef.current?.close();
}, []);
useEffect(() => {
connect();
return () => {
disconnect();
};
}, [connect, disconnect]);
return {
readyState,
lastMessage,
sendMessage,
disconnect,
reconnect: connect
};
}
function ChatComponent() {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState('');
const { readyState, lastMessage, sendMessage } = useWebSocket(
'wss://example.com/chat',
{
onMessage: (data) => {
const message = JSON.parse(data);
setMessages(prev => [...prev, message]);
}
}
);
const handleSend = () => {
if (input.trim()) {
sendMessage({ type: 'chat', content: input });
setInput('');
}
};
const connectionStatus = {
[WebSocket.CONNECTING]: '连接中...',
[WebSocket.OPEN]: '已连接',
[WebSocket.CLOSING]: '关闭中...',
[WebSocket.CLOSED]: '已断开'
}[readyState];
return (
<div>
<div>状态: {connectionStatus}</div>
<div>
{messages.map((msg, i) => (
<div key={i}>{msg.content}</div>
))}
</div>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && handleSend()}
/>
<button onClick={handleSend}>发送</button>
</div>
);
}
使用 Context #
javascript
import React, { createContext, useContext, useEffect, useState, useCallback } from 'react';
const WebSocketContext = createContext(null);
export function WebSocketProvider({ url, children }) {
const [ws, setWs] = useState(null);
const [readyState, setReadyState] = useState(WebSocket.CONNECTING);
const [lastMessage, setLastMessage] = useState(null);
const messageQueueRef = useRef([]);
useEffect(() => {
const socket = new WebSocket(url);
socket.onopen = () => {
setReadyState(WebSocket.OPEN);
while (messageQueueRef.current.length > 0) {
const msg = messageQueueRef.current.shift();
socket.send(msg);
}
};
socket.onmessage = (event) => {
setLastMessage(event.data);
};
socket.onclose = () => {
setReadyState(WebSocket.CLOSED);
};
setWs(socket);
return () => {
socket.close();
};
}, [url]);
const sendMessage = useCallback((data) => {
const message = typeof data === 'string' ? data : JSON.stringify(data);
if (ws?.readyState === WebSocket.OPEN) {
ws.send(message);
} else {
messageQueueRef.current.push(message);
}
}, [ws]);
const value = {
ws,
readyState,
lastMessage,
sendMessage
};
return (
<WebSocketContext.Provider value={value}>
{children}
</WebSocketContext.Provider>
);
}
export function useWebSocketContext() {
const context = useContext(WebSocketContext);
if (!context) {
throw new Error('useWebSocketContext must be used within WebSocketProvider');
}
return context;
}
function App() {
return (
<WebSocketProvider url="wss://example.com/ws">
<ChatComponent />
<NotificationComponent />
</WebSocketProvider>
);
}
Vue 集成 #
组合式 API #
javascript
import { ref, onUnmounted, watch } from 'vue';
export function useWebSocket(url, options = {}) {
const data = ref(null);
const status = ref('CONNECTING');
const wsRef = ref(null);
const {
reconnect = true,
reconnectInterval = 1000,
maxReconnectAttempts = 5,
onMessage,
onOpen,
onClose,
onError
} = options;
let reconnectAttempts = 0;
const connect = () => {
if (wsRef.value?.readyState === WebSocket.OPEN) {
return;
}
const ws = new WebSocket(url);
wsRef.value = ws;
ws.onopen = (event) => {
status.value = 'OPEN';
reconnectAttempts = 0;
onOpen?.(event);
};
ws.onmessage = (event) => {
data.value = event.data;
onMessage?.(event.data);
};
ws.onerror = (error) => {
onError?.(error);
};
ws.onclose = (event) => {
status.value = 'CLOSED';
onClose?.(event);
if (reconnect && reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
setTimeout(() => connect(), reconnectInterval * reconnectAttempts);
}
};
};
const send = (message) => {
if (wsRef.value?.readyState === WebSocket.OPEN) {
const payload = typeof message === 'string' ? message : JSON.stringify(message);
wsRef.value.send(payload);
}
};
const close = () => {
wsRef.value?.close();
};
connect();
onUnmounted(() => {
close();
});
return {
data,
status,
ws: wsRef,
send,
close
};
}
Vue 组件示例 #
vue
<template>
<div class="chat">
<div class="status">
连接状态: {{ connectionStatus }}
</div>
<div class="messages">
<div v-for="(msg, index) in messages" :key="index" class="message">
{{ msg.user }}: {{ msg.content }}
</div>
</div>
<div class="input-area">
<input
v-model="inputMessage"
@keyup.enter="sendMessage"
placeholder="输入消息..."
/>
<button @click="sendMessage">发送</button>
</div>
</div>
</template>
<script setup>
import { ref, computed } from 'vue';
import { useWebSocket } from './useWebSocket';
const messages = ref([]);
const inputMessage = ref('');
const { status, send } = useWebSocket('wss://example.com/chat', {
onMessage: (data) => {
const message = JSON.parse(data);
messages.value.push(message);
}
});
const connectionStatus = computed(() => {
const statusMap = {
'CONNECTING': '连接中...',
'OPEN': '已连接',
'CLOSING': '关闭中...',
'CLOSED': '已断开'
};
return statusMap[status.value] || status.value;
});
const sendMessage = () => {
if (inputMessage.value.trim()) {
send({
type: 'chat',
content: inputMessage.value
});
inputMessage.value = '';
}
};
</script>
Node.js 客户端 #
使用 ws 库 #
javascript
const WebSocket = require('ws');
class NodeWebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnect: true,
reconnectInterval: 1000,
maxReconnectAttempts: 10,
headers: {},
...options
};
this.ws = null;
this.reconnectAttempts = 0;
this.eventHandlers = new Map();
this.connect();
}
connect() {
this.ws = new WebSocket(this.url, {
headers: this.options.headers
});
this.ws.on('open', () => {
console.log('连接成功');
this.reconnectAttempts = 0;
this.emit('open');
});
this.ws.on('message', (data, isBinary) => {
const message = isBinary ? data : data.toString();
this.emit('message', message);
});
this.ws.on('error', (error) => {
console.error('错误:', error.message);
this.emit('error', error);
});
this.ws.on('close', (code, reason) => {
console.log('连接关闭:', code, reason.toString());
this.emit('close', { code, reason: reason.toString() });
if (this.options.reconnect && this.reconnectAttempts < this.options.maxReconnectAttempts) {
this.reconnect();
}
});
}
reconnect() {
this.reconnectAttempts++;
const delay = this.options.reconnectInterval * Math.pow(2, this.reconnectAttempts - 1);
setTimeout(() => {
console.log(`重连中 (${this.reconnectAttempts}/${this.options.maxReconnectAttempts})...`);
this.connect();
}, delay);
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
this.ws.send(message);
}
}
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
emit(event, data) {
if (this.eventHandlers.has(event)) {
this.eventHandlers.get(event).forEach(handler => handler(data));
}
}
close() {
this.options.reconnect = false;
this.ws.close();
}
}
const client = new NodeWebSocketClient('ws://localhost:8080', {
headers: {
'Authorization': 'Bearer your-token'
}
});
client.on('message', (data) => {
console.log('收到:', data);
});
移动端实现 #
React Native #
javascript
import { useEffect, useState, useRef } from 'react';
function useWebSocket(url) {
const [connected, setConnected] = useState(false);
const [messages, setMessages] = useState([]);
const wsRef = useRef(null);
useEffect(() => {
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = () => {
console.log('WebSocket 连接成功');
setConnected(true);
};
ws.onmessage = (event) => {
setMessages(prev => [...prev, event.data]);
};
ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
};
ws.onclose = () => {
console.log('WebSocket 关闭');
setConnected(false);
};
return () => {
ws.close();
};
}, [url]);
const sendMessage = (message) => {
if (wsRef.current && connected) {
wsRef.current.send(JSON.stringify(message));
}
};
return { connected, messages, sendMessage };
}
function ChatScreen() {
const { connected, messages, sendMessage } = useWebSocket('wss://example.com/ws');
const [input, setInput] = useState('');
return (
<View style={{ flex: 1 }}>
<Text>状态: {connected ? '已连接' : '未连接'}</Text>
<ScrollView>
{messages.map((msg, i) => (
<Text key={i}>{msg}</Text>
))}
</ScrollView>
<TextInput
value={input}
onChangeText={setInput}
placeholder="输入消息"
/>
<Button
title="发送"
onPress={() => {
sendMessage({ content: input });
setInput('');
}}
disabled={!connected}
/>
</View>
);
}
Flutter #
dart
import 'package:flutter/material.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class WebSocketDemo extends StatefulWidget {
@override
_WebSocketDemoState createState() => _WebSocketDemoState();
}
class _WebSocketDemoState extends State<WebSocketDemo> {
final TextEditingController _controller = TextEditingController();
final WebSocketChannel _channel = WebSocketChannel.connect(
Uri.parse('wss://example.com/ws'),
);
final List<String> _messages = [];
@override
void initState() {
super.initState();
_channel.stream.listen(
(message) {
setState(() {
_messages.add(message);
});
},
onError: (error) {
print('WebSocket 错误: $error');
},
onDone: () {
print('WebSocket 关闭');
},
);
}
void _sendMessage() {
if (_controller.text.isNotEmpty) {
_channel.sink.add(_controller.text);
_controller.clear();
}
}
@override
void dispose() {
_channel.sink.close();
_controller.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('WebSocket Demo')),
body: Column(
children: [
Expanded(
child: ListView.builder(
itemCount: _messages.length,
itemBuilder: (context, index) {
return ListTile(
title: Text(_messages[index]),
);
},
),
),
Padding(
padding: EdgeInsets.all(8.0),
child: Row(
children: [
Expanded(
child: TextField(
controller: _controller,
decoration: InputDecoration(
hintText: '输入消息',
),
),
),
IconButton(
icon: Icon(Icons.send),
onPressed: _sendMessage,
),
],
),
),
],
),
);
}
}
桌面应用 #
Electron #
javascript
const { ipcMain, ipcRenderer } = require('electron');
class ElectronWebSocket {
constructor() {
this.ws = null;
this.window = null;
}
init(window) {
this.window = window;
ipcMain.on('ws-connect', (event, url) => {
this.connect(url);
});
ipcMain.on('ws-send', (event, data) => {
this.send(data);
});
ipcMain.on('ws-close', () => {
this.close();
});
}
connect(url) {
this.ws = new WebSocket(url);
this.ws.on('open', () => {
this.window.webContents.send('ws-open');
});
this.ws.on('message', (data) => {
this.window.webContents.send('ws-message', data.toString());
});
this.ws.on('close', () => {
this.window.webContents.send('ws-close');
});
this.ws.on('error', (error) => {
this.window.webContents.send('ws-error', error.message);
});
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
}
}
close() {
if (this.ws) {
this.ws.close();
}
}
}
const renderer = {
connect: (url) => ipcRenderer.send('ws-connect', url),
send: (data) => ipcRenderer.send('ws-send', data),
close: () => ipcRenderer.send('ws-close'),
onOpen: (callback) => ipcRenderer.on('ws-open', callback),
onMessage: (callback) => ipcRenderer.on('ws-message', (event, data) => callback(data)),
onClose: (callback) => ipcRenderer.on('ws-close', callback),
onError: (callback) => ipcRenderer.on('ws-error', (event, error) => callback(error)),
};
消息协议设计 #
消息格式 #
text
┌─────────────────────────────────────────────────────────────┐
│ 消息格式设计 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 基础消息格式: │
│ ───────────────────────────────────────────────────────── │
│ { │
│ "type": "message_type", │
│ "payload": { ... }, │
│ "timestamp": 1234567890, │
│ "id": "unique-message-id" │
│ } │
│ │
│ 消息类型: │
│ ───────────────────────────────────────────────────────── │
│ - auth 认证 │
│ - ping/pong 心跳 │
│ - message 普通消息 │
│ - event 事件通知 │
│ - error 错误消息 │
│ - ack 确认响应 │
│ │
│ 示例: │
│ ───────────────────────────────────────────────────────── │
│ // 认证消息 │
│ { │
│ "type": "auth", │
│ "payload": { │
│ "token": "jwt-token-here" │
│ } │
│ } │
│ │
│ // 聊天消息 │
│ { │
│ "type": "message", │
│ "payload": { │
│ "room": "general", │
│ "content": "Hello World" │
│ }, │
│ "id": "msg-123" │
│ } │
│ │
│ // 确认响应 │
│ { │
│ "type": "ack", │
│ "payload": { │
│ "messageId": "msg-123", │
│ "status": "received" │
│ } │
│ } │
│ │
└─────────────────────────────────────────────────────────────┘
消息处理器 #
javascript
class MessageHandler {
constructor(wsClient) {
this.wsClient = wsClient;
this.handlers = new Map();
this.pendingAcks = new Map();
this.registerDefaultHandlers();
}
registerDefaultHandlers() {
this.on('auth', this.handleAuth.bind(this));
this.on('ping', this.handlePing.bind(this));
this.on('ack', this.handleAck.bind(this));
this.on('error', this.handleError.bind(this));
}
on(type, handler) {
this.handlers.set(type, handler);
}
handle(message) {
try {
const data = typeof message === 'string' ? JSON.parse(message) : message;
const handler = this.handlers.get(data.type);
if (handler) {
handler(data.payload, data);
} else {
console.warn('未知的消息类型:', data.type);
}
} catch (error) {
console.error('消息处理错误:', error);
}
}
send(type, payload, requireAck = false) {
const message = {
type,
payload,
timestamp: Date.now(),
id: this.generateId()
};
if (requireAck) {
return new Promise((resolve, reject) => {
this.pendingAcks.set(message.id, { resolve, reject });
this.wsClient.send(message);
setTimeout(() => {
if (this.pendingAcks.has(message.id)) {
this.pendingAcks.delete(message.id);
reject(new Error('ACK timeout'));
}
}, 5000);
});
}
this.wsClient.send(message);
}
handleAuth(payload) {
console.log('认证响应:', payload);
}
handlePing(payload) {
this.wsClient.send({ type: 'pong', payload });
}
handleAck(payload) {
const pending = this.pendingAcks.get(payload.messageId);
if (pending) {
this.pendingAcks.delete(payload.messageId);
pending.resolve(payload);
}
}
handleError(payload) {
console.error('服务器错误:', payload);
}
generateId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
下一步 #
现在你已经掌握了 WebSocket 客户端实现,接下来学习 高级主题,深入了解重连机制、负载均衡、集群方案等进阶内容!
最后更新:2026-03-29