Channel基础 #

一、Channel概述 #

1.1 什么是Channel #

Phoenix Channel是基于WebSocket的实时通信机制,支持:

  • 双向通信
  • 发布订阅模式
  • 自动重连
  • 多节点同步

1.2 Channel架构 #

text
客户端 (WebSocket)
    │
    ▼
Endpoint (Socket处理)
    │
    ▼
Socket (连接管理)
    │
    ▼
Channel (消息处理)
    │
    ▼
PubSub (消息分发)

二、创建Channel #

2.1 定义Channel #

elixir
defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:" <> room_id, _params, socket) do
    {:ok, socket}
  end

  def handle_in("new_message", %{"body" => body}, socket) do
    broadcast!(socket, "new_message", %{body: body})
    {:noreply, socket}
  end
end

2.2 注册Channel #

elixir
defmodule HelloWeb.UserSocket do
  use Phoenix.Socket

  channel "room:*", HelloWeb.RoomChannel

  @impl true
  def connect(_params, socket, _connect_info) do
    {:ok, socket}
  end

  @impl true
  def id(_socket), do: nil
end

2.3 Endpoint配置 #

elixir
defmodule HelloWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :hello

  socket "/socket", HelloWeb.UserSocket,
    websocket: true,
    longpoll: false
end

三、加入Channel #

3.1 join回调 #

elixir
defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:" <> room_id, _params, socket) do
    if authorized?(room_id, socket) do
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  defp authorized?(room_id, socket) do
    user = socket.assigns.current_user
    room_id in user.room_ids
  end
end

3.2 返回初始数据 #

elixir
def join("room:" <> room_id, _params, socket) do
  messages = Hello.Chat.list_messages(room_id)
  {:ok, %{messages: messages}, socket}
end

3.3 拒绝加入 #

elixir
def join("room:" <> room_id, _params, socket) do
  case check_access(room_id, socket) do
    :ok -> {:ok, socket}
    {:error, reason} -> {:error, %{reason: reason}}
  end
end

四、处理消息 #

4.1 handle_in #

elixir
def handle_in("new_message", %{"body" => body}, socket) do
  "room:" <> room_id = socket.topic

  case Hello.Chat.create_message(room_id, socket.assigns.user_id, body) do
    {:ok, message} ->
      broadcast!(socket, "new_message", message)
      {:noreply, socket}

    {:error, changeset} ->
      {:reply, {:error, format_errors(changeset)}, socket}
  end
end

4.2 回复消息 #

elixir
def handle_in("get_messages", _params, socket) do
  "room:" <> room_id = socket.topic
  messages = Hello.Chat.list_messages(room_id)
  {:reply, {:ok, %{messages: messages}}, socket}
end

4.3 停止Channel #

elixir
def handle_in("leave", _params, socket) do
  {:stop, {:shutdown, :left}, socket}
end

五、广播消息 #

5.1 broadcast #

elixir
def handle_in("new_message", %{"body" => body}, socket) do
  broadcast!(socket, "new_message", %{body: body, user: socket.assigns.user.name})
  {:noreply, socket}
end

5.2 broadcast_from #

elixir
def handle_in("typing", _params, socket) do
  broadcast_from!(socket, "user_typing", %{user: socket.assigns.user.name})
  {:noreply, socket}
end

5.3 直接使用PubSub #

elixir
def notify_user(user_id, event, payload) do
  HelloWeb.Endpoint.broadcast("user:#{user_id}", event, payload)
end

六、Socket Assigns #

6.1 设置Assigns #

elixir
defmodule HelloWeb.UserSocket do
  use Phoenix.Socket

  def connect(%{"token" => token}, socket, _connect_info) do
    case verify_token(token) do
      {:ok, user_id} ->
        socket = assign(socket, :user_id, user_id)
        {:ok, socket}

      {:error, _} ->
        :error
    end
  end
end

6.2 在Channel中使用 #

elixir
def handle_in("new_message", %{"body" => body}, socket) do
  user_id = socket.assigns.user_id
  {:ok, message} = Hello.Chat.create_message(user_id, body)
  broadcast!(socket, "new_message", message)
  {:noreply, socket}
end

七、客户端连接 #

7.1 JavaScript客户端 #

javascript
import {Socket} from "phoenix"

let socket = new Socket("/socket", {
  params: {token: userToken}
})

socket.connect()

let channel = socket.channel("room:123", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

channel.on("new_message", payload => {
  console.log("New message:", payload)
})

channel.push("new_message", {body: "Hello!"})
  .receive("ok", resp => { console.log("Message sent", resp) })
  .receive("error", resp => { console.log("Failed to send", resp) })

7.2 处理事件 #

javascript
channel.on("new_message", payload => {
  let messagesContainer = document.getElementById("messages")
  let messageElement = document.createElement("div")
  messageElement.innerText = `${payload.user}: ${payload.body}`
  messagesContainer.appendChild(messageElement)
})

channel.on("user_joined", payload => {
  console.log(`${payload.user} joined the room`)
})

channel.on("user_left", payload => {
  console.log(`${payload.user} left the room`)
})

八、Channel生命周期 #

8.1 join/3 #

elixir
def join("room:" <> room_id, _params, socket) do
  send(self(), {:after_join, room_id})
  {:ok, socket}
end

8.2 handle_info #

elixir
def handle_info({:after_join, room_id}, socket) do
  messages = Hello.Chat.list_messages(room_id)
  push(socket, "messages", %{messages: messages})

  broadcast_from!(socket, "user_joined", %{user: socket.assigns.user.name})

  {:noreply, socket}
end

8.3 terminate #

elixir
def terminate(_reason, socket) do
  "room:" <> room_id = socket.topic

  broadcast_from!(socket, "user_left", %{user: socket.assigns.user.name})

  :ok
end

九、错误处理 #

9.1 捕获错误 #

elixir
def handle_in("new_message", %{"body" => body}, socket) do
  case create_message(socket, body) do
    {:ok, message} ->
      broadcast!(socket, "new_message", message)
      {:noreply, socket}

    {:error, changeset} ->
      {:reply, {:error, format_errors(changeset)}, socket}
  end
rescue
  e ->
    Logger.error("Failed to create message: #{inspect(e)}")
    {:reply, {:error, %{reason: "Internal error"}}, socket}
end

9.2 格式化错误 #

elixir
defp format_errors(changeset) do
  Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
    Enum.reduce(opts, msg, fn {key, value}, acc ->
      String.replace(acc, "%{#{key}}", to_string(value))
    end)
  end)
end

十、总结 #

10.1 核心概念 #

概念 说明
Socket WebSocket连接
Channel 消息通道
Topic 主题标识
join 加入Channel
handle_in 处理入站消息
broadcast 广播消息

10.2 常用操作 #

操作 函数
加入 join/3
处理消息 handle_in/3
广播 broadcast/3
推送 push/3
回复

10.3 下一步 #

现在你已经了解了Channel基础,接下来让我们学习 PubSub与Presence,深入了解消息分发和用户状态!

最后更新:2026-03-28