Channel基础 #
一、Channel概述 #
1.1 什么是Channel #
Phoenix Channel是基于WebSocket的实时通信机制,支持:
- 双向通信
- 发布订阅模式
- 自动重连
- 多节点同步
1.2 Channel架构 #
text
客户端 (WebSocket)
│
▼
Endpoint (Socket处理)
│
▼
Socket (连接管理)
│
▼
Channel (消息处理)
│
▼
PubSub (消息分发)
二、创建Channel #
2.1 定义Channel #
elixir
defmodule HelloWeb.RoomChannel do
use Phoenix.Channel
def join("room:" <> room_id, _params, socket) do
{:ok, socket}
end
def handle_in("new_message", %{"body" => body}, socket) do
broadcast!(socket, "new_message", %{body: body})
{:noreply, socket}
end
end
2.2 注册Channel #
elixir
defmodule HelloWeb.UserSocket do
use Phoenix.Socket
channel "room:*", HelloWeb.RoomChannel
@impl true
def connect(_params, socket, _connect_info) do
{:ok, socket}
end
@impl true
def id(_socket), do: nil
end
2.3 Endpoint配置 #
elixir
defmodule HelloWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :hello
socket "/socket", HelloWeb.UserSocket,
websocket: true,
longpoll: false
end
三、加入Channel #
3.1 join回调 #
elixir
defmodule HelloWeb.RoomChannel do
use Phoenix.Channel
def join("room:" <> room_id, _params, socket) do
if authorized?(room_id, socket) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
defp authorized?(room_id, socket) do
user = socket.assigns.current_user
room_id in user.room_ids
end
end
3.2 返回初始数据 #
elixir
def join("room:" <> room_id, _params, socket) do
messages = Hello.Chat.list_messages(room_id)
{:ok, %{messages: messages}, socket}
end
3.3 拒绝加入 #
elixir
def join("room:" <> room_id, _params, socket) do
case check_access(room_id, socket) do
:ok -> {:ok, socket}
{:error, reason} -> {:error, %{reason: reason}}
end
end
四、处理消息 #
4.1 handle_in #
elixir
def handle_in("new_message", %{"body" => body}, socket) do
"room:" <> room_id = socket.topic
case Hello.Chat.create_message(room_id, socket.assigns.user_id, body) do
{:ok, message} ->
broadcast!(socket, "new_message", message)
{:noreply, socket}
{:error, changeset} ->
{:reply, {:error, format_errors(changeset)}, socket}
end
end
4.2 回复消息 #
elixir
def handle_in("get_messages", _params, socket) do
"room:" <> room_id = socket.topic
messages = Hello.Chat.list_messages(room_id)
{:reply, {:ok, %{messages: messages}}, socket}
end
4.3 停止Channel #
elixir
def handle_in("leave", _params, socket) do
{:stop, {:shutdown, :left}, socket}
end
五、广播消息 #
5.1 broadcast #
elixir
def handle_in("new_message", %{"body" => body}, socket) do
broadcast!(socket, "new_message", %{body: body, user: socket.assigns.user.name})
{:noreply, socket}
end
5.2 broadcast_from #
elixir
def handle_in("typing", _params, socket) do
broadcast_from!(socket, "user_typing", %{user: socket.assigns.user.name})
{:noreply, socket}
end
5.3 直接使用PubSub #
elixir
def notify_user(user_id, event, payload) do
HelloWeb.Endpoint.broadcast("user:#{user_id}", event, payload)
end
六、Socket Assigns #
6.1 设置Assigns #
elixir
defmodule HelloWeb.UserSocket do
use Phoenix.Socket
def connect(%{"token" => token}, socket, _connect_info) do
case verify_token(token) do
{:ok, user_id} ->
socket = assign(socket, :user_id, user_id)
{:ok, socket}
{:error, _} ->
:error
end
end
end
6.2 在Channel中使用 #
elixir
def handle_in("new_message", %{"body" => body}, socket) do
user_id = socket.assigns.user_id
{:ok, message} = Hello.Chat.create_message(user_id, body)
broadcast!(socket, "new_message", message)
{:noreply, socket}
end
七、客户端连接 #
7.1 JavaScript客户端 #
javascript
import {Socket} from "phoenix"
let socket = new Socket("/socket", {
params: {token: userToken}
})
socket.connect()
let channel = socket.channel("room:123", {})
channel.join()
.receive("ok", resp => { console.log("Joined successfully", resp) })
.receive("error", resp => { console.log("Unable to join", resp) })
channel.on("new_message", payload => {
console.log("New message:", payload)
})
channel.push("new_message", {body: "Hello!"})
.receive("ok", resp => { console.log("Message sent", resp) })
.receive("error", resp => { console.log("Failed to send", resp) })
7.2 处理事件 #
javascript
channel.on("new_message", payload => {
let messagesContainer = document.getElementById("messages")
let messageElement = document.createElement("div")
messageElement.innerText = `${payload.user}: ${payload.body}`
messagesContainer.appendChild(messageElement)
})
channel.on("user_joined", payload => {
console.log(`${payload.user} joined the room`)
})
channel.on("user_left", payload => {
console.log(`${payload.user} left the room`)
})
八、Channel生命周期 #
8.1 join/3 #
elixir
def join("room:" <> room_id, _params, socket) do
send(self(), {:after_join, room_id})
{:ok, socket}
end
8.2 handle_info #
elixir
def handle_info({:after_join, room_id}, socket) do
messages = Hello.Chat.list_messages(room_id)
push(socket, "messages", %{messages: messages})
broadcast_from!(socket, "user_joined", %{user: socket.assigns.user.name})
{:noreply, socket}
end
8.3 terminate #
elixir
def terminate(_reason, socket) do
"room:" <> room_id = socket.topic
broadcast_from!(socket, "user_left", %{user: socket.assigns.user.name})
:ok
end
九、错误处理 #
9.1 捕获错误 #
elixir
def handle_in("new_message", %{"body" => body}, socket) do
case create_message(socket, body) do
{:ok, message} ->
broadcast!(socket, "new_message", message)
{:noreply, socket}
{:error, changeset} ->
{:reply, {:error, format_errors(changeset)}, socket}
end
rescue
e ->
Logger.error("Failed to create message: #{inspect(e)}")
{:reply, {:error, %{reason: "Internal error"}}, socket}
end
9.2 格式化错误 #
elixir
defp format_errors(changeset) do
Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
Enum.reduce(opts, msg, fn {key, value}, acc ->
String.replace(acc, "%{#{key}}", to_string(value))
end)
end)
end
十、总结 #
10.1 核心概念 #
| 概念 | 说明 |
|---|---|
| Socket | WebSocket连接 |
| Channel | 消息通道 |
| Topic | 主题标识 |
| join | 加入Channel |
| handle_in | 处理入站消息 |
| broadcast | 广播消息 |
10.2 常用操作 #
| 操作 | 函数 |
|---|---|
| 加入 | join/3 |
| 处理消息 | handle_in/3 |
| 广播 | broadcast/3 |
| 推送 | push/3 |
| 回复 |
10.3 下一步 #
现在你已经了解了Channel基础,接下来让我们学习 PubSub与Presence,深入了解消息分发和用户状态!
最后更新:2026-03-28