Android WebSocket #
一、WebSocket概述 #
WebSocket是一种在单个TCP连接上进行全双工通信的协议,适用于需要实时通信的场景,如即时通讯、实时数据推送等。
1.1 WebSocket特点 #
- 全双工通信
- 低延迟
- 支持跨域
- 二进制数据支持
1.2 应用场景 #
- 即时通讯
- 实时数据推送
- 在线协作
- 游戏
- 股票行情
二、使用OkHttp实现WebSocket #
2.1 添加依赖 #
kotlin
dependencies {
implementation("com.squareup.okhttp3:okhttp:4.12.0")
}
2.2 创建WebSocket管理类 #
kotlin
class WebSocketManager private constructor() {
private var webSocket: WebSocket? = null
private var client: OkHttpClient? = null
private var listener: WebSocketListener? = null
private var isConnected = false
companion object {
val instance: WebSocketManager by lazy { WebSocketManager() }
}
fun connect(url: String, listener: WebSocketListener) {
this.listener = listener
client = OkHttpClient.Builder()
.pingInterval(30, TimeUnit.SECONDS)
.build()
val request = Request.Builder()
.url(url)
.build()
webSocket = client?.newWebSocket(request, object : okhttp3.WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
isConnected = true
listener.onOpen()
}
override fun onMessage(webSocket: WebSocket, text: String) {
listener.onMessage(text)
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
listener.onMessage(bytes)
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
webSocket.close(1000, null)
listener.onClosing(code, reason)
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
isConnected = false
listener.onClosed(code, reason)
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
isConnected = false
listener.onFailure(t)
}
})
}
fun send(message: String): Boolean {
return webSocket?.send(message) ?: false
}
fun send(bytes: ByteString): Boolean {
return webSocket?.send(bytes) ?: false
}
fun close(code: Int = 1000, reason: String? = null) {
webSocket?.close(code, reason)
webSocket = null
isConnected = false
}
fun isConnected(): Boolean = isConnected
interface WebSocketListener {
fun onOpen()
fun onMessage(text: String)
fun onMessage(bytes: ByteString) {}
fun onClosing(code: Int, reason: String) {}
fun onClosed(code: Int, reason: String)
fun onFailure(t: Throwable)
}
}
2.3 使用WebSocket #
kotlin
class MainActivity : AppCompatActivity(), WebSocketManager.WebSocketListener {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
WebSocketManager.instance.connect("wss://example.com/ws", this)
}
fun sendMessage(message: String) {
if (WebSocketManager.instance.isConnected()) {
WebSocketManager.instance.send(message)
}
}
override fun onOpen() {
runOnUiThread {
Toast.makeText(this, "WebSocket连接成功", Toast.LENGTH_SHORT).show()
}
}
override fun onMessage(text: String) {
runOnUiThread {
// 处理收到的消息
handleMessage(text)
}
}
override fun onClosed(code: Int, reason: String) {
runOnUiThread {
Toast.makeText(this, "WebSocket已关闭", Toast.LENGTH_SHORT).show()
}
}
override fun onFailure(t: Throwable) {
runOnUiThread {
Toast.makeText(this, "WebSocket连接失败: ${t.message}", Toast.LENGTH_SHORT).show()
}
}
override fun onDestroy() {
super.onDestroy()
WebSocketManager.instance.close()
}
}
三、自动重连机制 #
kotlin
class ReconnectingWebSocketManager {
private var webSocket: WebSocket? = null
private var client: OkHttpClient? = null
private var url: String? = null
private var listener: WebSocketListener? = null
private var isConnected = false
private var isManualClose = false
private var retryCount = 0
private val maxRetryCount = 5
private val retryDelay = 3000L
private val handler = Handler(Looper.getMainLooper())
fun connect(url: String, listener: WebSocketListener) {
this.url = url
this.listener = listener
this.isManualClose = false
doConnect()
}
private fun doConnect() {
if (client == null) {
client = OkHttpClient.Builder()
.pingInterval(30, TimeUnit.SECONDS)
.build()
}
val request = Request.Builder()
.url(url!!)
.build()
webSocket = client?.newWebSocket(request, webSocketListener)
}
private val webSocketListener = object : okhttp3.WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
isConnected = true
retryCount = 0
listener?.onOpen()
}
override fun onMessage(webSocket: WebSocket, text: String) {
listener?.onMessage(text)
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
isConnected = false
listener?.onClosed(code, reason)
if (!isManualClose) {
retryConnect()
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
isConnected = false
listener?.onFailure(t)
if (!isManualClose) {
retryConnect()
}
}
}
private fun retryConnect() {
if (retryCount < maxRetryCount) {
retryCount++
handler.postDelayed({
doConnect()
}, retryDelay * retryCount)
}
}
fun send(message: String): Boolean {
return webSocket?.send(message) ?: false
}
fun close() {
isManualClose = true
webSocket?.close(1000, "User closed")
webSocket = null
isConnected = false
}
}
四、心跳机制 #
kotlin
class HeartbeatWebSocketManager {
private var webSocket: WebSocket? = null
private var heartbeatRunnable: Runnable? = null
private val heartbeatInterval = 30000L
private val handler = Handler(Looper.getMainLooper())
private fun startHeartbeat() {
stopHeartbeat()
heartbeatRunnable = object : Runnable {
override fun run() {
if (isConnected()) {
webSocket?.send("{\"type\":\"heartbeat\"}")
handler.postDelayed(this, heartbeatInterval)
}
}
}
handler.post(heartbeatRunnable!!)
}
private fun stopHeartbeat() {
heartbeatRunnable?.let {
handler.removeCallbacks(it)
heartbeatRunnable = null
}
}
fun close() {
stopHeartbeat()
webSocket?.close(1000, null)
}
}
五、消息处理 #
5.1 消息模型 #
kotlin
sealed class WebSocketMessage {
data class Text(val content: String) : WebSocketMessage()
data class Binary(val data: ByteArray) : WebSocketMessage()
}
data class ChatMessage(
val type: String,
val from: String,
val to: String,
val content: String,
val timestamp: Long
)
5.2 消息解析 #
kotlin
fun parseMessage(text: String): ChatMessage? {
return try {
Gson().fromJson(text, ChatMessage::class.java)
} catch (e: Exception) {
null
}
}
fun createMessage(to: String, content: String): String {
val message = ChatMessage(
type = "chat",
from = UserManager.currentUserId,
to = to,
content = content,
timestamp = System.currentTimeMillis()
)
return Gson().toJson(message)
}
5.3 消息队列 #
kotlin
class MessageQueue {
private val queue = ConcurrentLinkedQueue<String>()
private var isProcessing = false
fun enqueue(message: String) {
queue.offer(message)
processQueue()
}
private fun processQueue() {
if (isProcessing) return
isProcessing = true
while (queue.isNotEmpty() && WebSocketManager.instance.isConnected()) {
val message = queue.poll()
WebSocketManager.instance.send(message)
}
isProcessing = false
}
}
六、与LiveData/Flow结合 #
kotlin
class WebSocketViewModel : ViewModel() {
private val _messages = MutableSharedFlow<ChatMessage>()
val messages: SharedFlow<ChatMessage> = _messages
private val _connectionState = MutableStateFlow<ConnectionState>(ConnectionState.Disconnected)
val connectionState: StateFlow<ConnectionState> = _connectionState
private val webSocketManager = WebSocketManager.instance
init {
webSocketManager.connect("wss://example.com/ws", object : WebSocketManager.WebSocketListener {
override fun onOpen() {
viewModelScope.launch {
_connectionState.value = ConnectionState.Connected
}
}
override fun onMessage(text: String) {
viewModelScope.launch {
parseMessage(text)?.let {
_messages.emit(it)
}
}
}
override fun onClosed(code: Int, reason: String) {
viewModelScope.launch {
_connectionState.value = ConnectionState.Disconnected
}
}
override fun onFailure(t: Throwable) {
viewModelScope.launch {
_connectionState.value = ConnectionState.Error(t.message ?: "Unknown error")
}
}
})
}
fun sendMessage(to: String, content: String) {
val message = createMessage(to, content)
webSocketManager.send(message)
}
override fun onCleared() {
super.onCleared()
webSocketManager.close()
}
}
sealed class ConnectionState {
object Connected : ConnectionState()
object Disconnected : ConnectionState()
data class Error(val message: String) : ConnectionState()
}
七、总结 #
本章详细介绍了Android WebSocket:
- WebSocket的基本概念
- 使用OkHttp实现WebSocket
- 自动重连机制
- 心跳机制
- 消息处理
- 与LiveData/Flow结合
WebSocket是实现实时通信的重要技术,掌握它对于开发即时通讯类应用非常重要。
最后更新:2026-03-26