Android WebSocket #

一、WebSocket概述 #

WebSocket是一种在单个TCP连接上进行全双工通信的协议,适用于需要实时通信的场景,如即时通讯、实时数据推送等。

1.1 WebSocket特点 #

  • 全双工通信
  • 低延迟
  • 支持跨域
  • 二进制数据支持

1.2 应用场景 #

  • 即时通讯
  • 实时数据推送
  • 在线协作
  • 游戏
  • 股票行情

二、使用OkHttp实现WebSocket #

2.1 添加依赖 #

kotlin
dependencies {
    implementation("com.squareup.okhttp3:okhttp:4.12.0")
}

2.2 创建WebSocket管理类 #

kotlin
class WebSocketManager private constructor() {
    
    private var webSocket: WebSocket? = null
    private var client: OkHttpClient? = null
    private var listener: WebSocketListener? = null
    private var isConnected = false
    
    companion object {
        val instance: WebSocketManager by lazy { WebSocketManager() }
    }
    
    fun connect(url: String, listener: WebSocketListener) {
        this.listener = listener
        
        client = OkHttpClient.Builder()
            .pingInterval(30, TimeUnit.SECONDS)
            .build()
        
        val request = Request.Builder()
            .url(url)
            .build()
        
        webSocket = client?.newWebSocket(request, object : okhttp3.WebSocketListener() {
            override fun onOpen(webSocket: WebSocket, response: Response) {
                isConnected = true
                listener.onOpen()
            }
            
            override fun onMessage(webSocket: WebSocket, text: String) {
                listener.onMessage(text)
            }
            
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                listener.onMessage(bytes)
            }
            
            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                webSocket.close(1000, null)
                listener.onClosing(code, reason)
            }
            
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                isConnected = false
                listener.onClosed(code, reason)
            }
            
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                isConnected = false
                listener.onFailure(t)
            }
        })
    }
    
    fun send(message: String): Boolean {
        return webSocket?.send(message) ?: false
    }
    
    fun send(bytes: ByteString): Boolean {
        return webSocket?.send(bytes) ?: false
    }
    
    fun close(code: Int = 1000, reason: String? = null) {
        webSocket?.close(code, reason)
        webSocket = null
        isConnected = false
    }
    
    fun isConnected(): Boolean = isConnected
    
    interface WebSocketListener {
        fun onOpen()
        fun onMessage(text: String)
        fun onMessage(bytes: ByteString) {}
        fun onClosing(code: Int, reason: String) {}
        fun onClosed(code: Int, reason: String)
        fun onFailure(t: Throwable)
    }
}

2.3 使用WebSocket #

kotlin
class MainActivity : AppCompatActivity(), WebSocketManager.WebSocketListener {
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        
        WebSocketManager.instance.connect("wss://example.com/ws", this)
    }
    
    fun sendMessage(message: String) {
        if (WebSocketManager.instance.isConnected()) {
            WebSocketManager.instance.send(message)
        }
    }
    
    override fun onOpen() {
        runOnUiThread {
            Toast.makeText(this, "WebSocket连接成功", Toast.LENGTH_SHORT).show()
        }
    }
    
    override fun onMessage(text: String) {
        runOnUiThread {
            // 处理收到的消息
            handleMessage(text)
        }
    }
    
    override fun onClosed(code: Int, reason: String) {
        runOnUiThread {
            Toast.makeText(this, "WebSocket已关闭", Toast.LENGTH_SHORT).show()
        }
    }
    
    override fun onFailure(t: Throwable) {
        runOnUiThread {
            Toast.makeText(this, "WebSocket连接失败: ${t.message}", Toast.LENGTH_SHORT).show()
        }
    }
    
    override fun onDestroy() {
        super.onDestroy()
        WebSocketManager.instance.close()
    }
}

三、自动重连机制 #

kotlin
class ReconnectingWebSocketManager {
    
    private var webSocket: WebSocket? = null
    private var client: OkHttpClient? = null
    private var url: String? = null
    private var listener: WebSocketListener? = null
    
    private var isConnected = false
    private var isManualClose = false
    private var retryCount = 0
    private val maxRetryCount = 5
    private val retryDelay = 3000L
    
    private val handler = Handler(Looper.getMainLooper())
    
    fun connect(url: String, listener: WebSocketListener) {
        this.url = url
        this.listener = listener
        this.isManualClose = false
        
        doConnect()
    }
    
    private fun doConnect() {
        if (client == null) {
            client = OkHttpClient.Builder()
                .pingInterval(30, TimeUnit.SECONDS)
                .build()
        }
        
        val request = Request.Builder()
            .url(url!!)
            .build()
        
        webSocket = client?.newWebSocket(request, webSocketListener)
    }
    
    private val webSocketListener = object : okhttp3.WebSocketListener() {
        override fun onOpen(webSocket: WebSocket, response: Response) {
            isConnected = true
            retryCount = 0
            listener?.onOpen()
        }
        
        override fun onMessage(webSocket: WebSocket, text: String) {
            listener?.onMessage(text)
        }
        
        override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
            isConnected = false
            listener?.onClosed(code, reason)
            
            if (!isManualClose) {
                retryConnect()
            }
        }
        
        override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
            isConnected = false
            listener?.onFailure(t)
            
            if (!isManualClose) {
                retryConnect()
            }
        }
    }
    
    private fun retryConnect() {
        if (retryCount < maxRetryCount) {
            retryCount++
            handler.postDelayed({
                doConnect()
            }, retryDelay * retryCount)
        }
    }
    
    fun send(message: String): Boolean {
        return webSocket?.send(message) ?: false
    }
    
    fun close() {
        isManualClose = true
        webSocket?.close(1000, "User closed")
        webSocket = null
        isConnected = false
    }
}

四、心跳机制 #

kotlin
class HeartbeatWebSocketManager {
    
    private var webSocket: WebSocket? = null
    private var heartbeatRunnable: Runnable? = null
    private val heartbeatInterval = 30000L
    
    private val handler = Handler(Looper.getMainLooper())
    
    private fun startHeartbeat() {
        stopHeartbeat()
        
        heartbeatRunnable = object : Runnable {
            override fun run() {
                if (isConnected()) {
                    webSocket?.send("{\"type\":\"heartbeat\"}")
                    handler.postDelayed(this, heartbeatInterval)
                }
            }
        }
        
        handler.post(heartbeatRunnable!!)
    }
    
    private fun stopHeartbeat() {
        heartbeatRunnable?.let {
            handler.removeCallbacks(it)
            heartbeatRunnable = null
        }
    }
    
    fun close() {
        stopHeartbeat()
        webSocket?.close(1000, null)
    }
}

五、消息处理 #

5.1 消息模型 #

kotlin
sealed class WebSocketMessage {
    data class Text(val content: String) : WebSocketMessage()
    data class Binary(val data: ByteArray) : WebSocketMessage()
}

data class ChatMessage(
    val type: String,
    val from: String,
    val to: String,
    val content: String,
    val timestamp: Long
)

5.2 消息解析 #

kotlin
fun parseMessage(text: String): ChatMessage? {
    return try {
        Gson().fromJson(text, ChatMessage::class.java)
    } catch (e: Exception) {
        null
    }
}

fun createMessage(to: String, content: String): String {
    val message = ChatMessage(
        type = "chat",
        from = UserManager.currentUserId,
        to = to,
        content = content,
        timestamp = System.currentTimeMillis()
    )
    return Gson().toJson(message)
}

5.3 消息队列 #

kotlin
class MessageQueue {
    
    private val queue = ConcurrentLinkedQueue<String>()
    private var isProcessing = false
    
    fun enqueue(message: String) {
        queue.offer(message)
        processQueue()
    }
    
    private fun processQueue() {
        if (isProcessing) return
        
        isProcessing = true
        
        while (queue.isNotEmpty() && WebSocketManager.instance.isConnected()) {
            val message = queue.poll()
            WebSocketManager.instance.send(message)
        }
        
        isProcessing = false
    }
}

六、与LiveData/Flow结合 #

kotlin
class WebSocketViewModel : ViewModel() {
    
    private val _messages = MutableSharedFlow<ChatMessage>()
    val messages: SharedFlow<ChatMessage> = _messages
    
    private val _connectionState = MutableStateFlow<ConnectionState>(ConnectionState.Disconnected)
    val connectionState: StateFlow<ConnectionState> = _connectionState
    
    private val webSocketManager = WebSocketManager.instance
    
    init {
        webSocketManager.connect("wss://example.com/ws", object : WebSocketManager.WebSocketListener {
            override fun onOpen() {
                viewModelScope.launch {
                    _connectionState.value = ConnectionState.Connected
                }
            }
            
            override fun onMessage(text: String) {
                viewModelScope.launch {
                    parseMessage(text)?.let {
                        _messages.emit(it)
                    }
                }
            }
            
            override fun onClosed(code: Int, reason: String) {
                viewModelScope.launch {
                    _connectionState.value = ConnectionState.Disconnected
                }
            }
            
            override fun onFailure(t: Throwable) {
                viewModelScope.launch {
                    _connectionState.value = ConnectionState.Error(t.message ?: "Unknown error")
                }
            }
        })
    }
    
    fun sendMessage(to: String, content: String) {
        val message = createMessage(to, content)
        webSocketManager.send(message)
    }
    
    override fun onCleared() {
        super.onCleared()
        webSocketManager.close()
    }
}

sealed class ConnectionState {
    object Connected : ConnectionState()
    object Disconnected : ConnectionState()
    data class Error(val message: String) : ConnectionState()
}

七、总结 #

本章详细介绍了Android WebSocket:

  1. WebSocket的基本概念
  2. 使用OkHttp实现WebSocket
  3. 自动重连机制
  4. 心跳机制
  5. 消息处理
  6. 与LiveData/Flow结合

WebSocket是实现实时通信的重要技术,掌握它对于开发即时通讯类应用非常重要。

最后更新:2026-03-26