PubSub与Presence #

一、PubSub概述 #

1.1 什么是PubSub #

PubSub(发布订阅)是Phoenix的消息分发系统,支持:

  • 跨进程消息传递
  • 多节点同步
  • 分布式消息广播

1.2 PubSub架构 #

text
Publisher
    │
    ▼
PubSub Server
    │
    ├── Subscriber 1
    ├── Subscriber 2
    └── Subscriber 3

二、配置PubSub #

2.1 应用配置 #

elixir
defmodule Hello.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.PubSub, name: Hello.PubSub}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

2.2 Endpoint配置 #

elixir
defmodule HelloWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :hello

  pubsub_server: Hello.PubSub
end

三、使用PubSub #

3.1 订阅主题 #

elixir
defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:" <> room_id, _params, socket) do
    :ok = Phoenix.PubSub.subscribe(Hello.PubSub, "room:#{room_id}")
    {:ok, socket}
  end
end

3.2 发布消息 #

elixir
def broadcast_message(room_id, message) do
  Phoenix.PubSub.broadcast(Hello.PubSub, "room:#{room_id}", {:new_message, message})
end

def broadcast_from_socket(socket, event, payload) do
  broadcast!(socket, event, payload)
end

3.3 直接广播 #

elixir
def notify_user(user_id, notification) do
  HelloWeb.Endpoint.broadcast("user:#{user_id}", "notification", notification)
end

def notify_all_users(notification) do
  HelloWeb.Endpoint.broadcast("users:all", "notification", notification)
end

四、Presence概述 #

4.1 什么是Presence #

Presence是Phoenix的用户在线状态系统,用于:

  • 追踪用户连接状态
  • 自动处理断线
  • 同步多节点状态

4.2 Presence特点 #

  • 自动清理断线用户
  • 支持分布式追踪
  • 冲突自动解决
  • 低延迟同步

五、配置Presence #

5.1 创建Presence模块 #

elixir
defmodule HelloWeb.Presence do
  use Phoenix.Presence,
    otp_app: :hello,
    pubsub_server: Hello.PubSub

  def fetch(_topic, presences) do
    users = Hello.Accounts.list_users(Map.keys(presences))

    Enum.map(presences, fn {user_id, %{metas: metas}} ->
      user = Enum.find(users, fn u -> u.id == user_id end)

      {user_id, %{metas: metas, user: user}}
    end)
    |> Enum.into(%{})
  end
end

5.2 启动Presence #

elixir
defmodule Hello.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.PubSub, name: Hello.PubSub},
      HelloWeb.Presence
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

六、使用Presence #

6.1 追踪用户 #

elixir
defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel
  alias HelloWeb.Presence

  def join("room:" <> room_id, _params, socket) do
    send(self(), :after_join)
    {:ok, socket}
  end

  def handle_info(:after_join, socket) do
    {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
      online_at: inspect(System.system_time(:second)),
      device: "browser"
    })

    push(socket, "presence_state", Presence.list(socket))
    {:noreply, socket}
  end
end

6.2 获取在线用户 #

elixir
def list_present_users(topic) do
  Presence.list(topic)
  |> Enum.map(fn {user_id, %{metas: metas}} ->
    %{id: user_id, metas: metas}
  end)
end

def user_present?(topic, user_id) do
  case Presence.get_by_key(topic, user_id) do
    nil -> false
    _ -> true
  end
end

6.3 监听状态变化 #

elixir
def handle_info(:after_join, socket) do
  {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
    online_at: inspect(System.system_time(:second))
  })

  push(socket, "presence_state", Presence.list(socket))

  {:noreply, socket}
end

def handle_info(%Phoenix.Socket.Broadcast{
  event: "presence_diff",
  payload: %{joins: joins, leaves: leaves}
}, socket) do

  for {user_id, _meta} <- joins do
    broadcast!(socket, "user_joined", %{user_id: user_id})
  end

  for {user_id, _meta} <- leaves do
    broadcast!(socket, "user_left", %{user_id: user_id})
  end

  {:noreply, socket}
end

七、客户端Presence #

7.1 JavaScript客户端 #

javascript
let channel = socket.channel("room:123", {})

channel.join()
  .receive("ok", resp => {
    console.log("Joined successfully", resp)
  })

channel.on("presence_state", state => {
  console.log("Current users:", state)
  renderUsers(state)
})

channel.on("presence_diff", diff => {
  console.log("Users changed:", diff)
  handlePresenceDiff(diff)
})

function renderUsers(presences) {
  let usersList = document.getElementById("users")
  usersList.innerHTML = ""

  Presence.list(presences, (id, {metas}) => {
    let userElement = document.createElement("li")
    userElement.innerText = `${id} (${metas.length} connections)`
    usersList.appendChild(userElement)
  })
}

function handlePresenceDiff(diff) {
  Object.keys(diff.joins).forEach(userId => {
    console.log(`${userId} joined`)
  })

  Object.keys(diff.leaves).forEach(userId => {
    console.log(`${userId} left`)
  })
}

7.2 Presence辅助函数 #

javascript
let Presence = {
  syncState(currentState, newState) {
    let state = {...currentState}
    Object.keys(newState).forEach(key => {
      state[key] = newState[key]
    })
    return state
  },

  syncDiff(currentState, {joins, leaves}) {
    let state = {...currentState}
    Object.keys(joins).forEach(key => {
      state[key] = joins[key]
    })
    Object.keys(leaves).forEach(key => {
      delete state[key]
    })
    return state
  },

  list(presences, onPresence) {
    return Object.keys(presences).map(key => {
      return onPresence(key, presences[key])
    })
  }
}

八、高级Presence #

8.1 多设备追踪 #

elixir
def handle_info(:after_join, socket) do
  {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
    online_at: inspect(System.system_time(:second)),
    device: get_device_type(socket),
    browser: get_browser(socket),
    ip: get_client_ip(socket)
  })

  {:noreply, socket}
end

8.2 自定义元数据 #

elixir
def handle_info(:after_join, socket) do
  {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
    online_at: inspect(System.system_time(:second)),
    status: "online",
    typing: false,
    room_id: socket.topic
  })

  {:noreply, socket}
end

def handle_in("typing", %{"typing" => typing}, socket) do
  {:ok, _} = Presence.update(socket, socket.assigns.user_id, fn meta ->
    %{meta | typing: typing}
  end)

  {:noreply, socket}
end

8.3 获取特定用户状态 #

elixir
def get_user_presence(topic, user_id) do
  case Presence.get_by_key(topic, user_id) do
    nil -> nil
    %{metas: metas} -> metas
  end
end

def get_user_connections(topic, user_id) do
  case Presence.get_by_key(topic, user_id) do
    nil -> 0
    %{metas: metas} -> length(metas)
  end
end

九、分布式Presence #

9.1 多节点配置 #

elixir
config :hello, HelloWeb.Presence,
  pubsub_server: Hello.PubSub

9.2 节点间同步 #

Presence自动处理多节点间的状态同步:

elixir
defmodule HelloWeb.Presence do
  use Phoenix.Presence,
    otp_app: :hello,
    pubsub_server: Hello.PubSub
end

十、最佳实践 #

10.1 合理使用Topic #

elixir
def join("room:" <> room_id, _params, socket) do
  if authorized?(room_id, socket) do
    {:ok, socket}
  else
    {:error, %{reason: "unauthorized"}}
  end
end

def join("user:" <> user_id, _params, socket) do
  if socket.assigns.user_id == user_id do
    {:ok, socket}
  else
    {:error, %{reason: "unauthorized"}}
  end
end

10.2 清理资源 #

elixir
def terminate(_reason, socket) do
  cleanup_user_resources(socket.assigns.user_id)
  :ok
end

10.3 错误处理 #

elixir
def handle_in("new_message", %{"body" => body}, socket) do
  case create_message(socket, body) do
    {:ok, message} ->
      broadcast!(socket, "new_message", message)
      {:noreply, socket}

    {:error, changeset} ->
      {:reply, {:error, format_errors(changeset)}, socket}
  end
end

十一、总结 #

11.1 PubSub核心 #

函数 说明
subscribe/2 订阅主题
broadcast/3 广播消息
unsubscribe/2 取消订阅

11.2 Presence核心 #

函数 说明
track/3 追踪用户
list/1 获取在线列表
get_by_key/2 获取用户状态
update/3 更新元数据

11.3 下一步 #

现在你已经了解了PubSub与Presence,接下来让我们学习 LiveView简介,深入了解实时视图!

最后更新:2026-03-28