PubSub与Presence #
一、PubSub概述 #
1.1 什么是PubSub #
PubSub(发布订阅)是Phoenix的消息分发系统,支持:
- 跨进程消息传递
- 多节点同步
- 分布式消息广播
1.2 PubSub架构 #
text
Publisher
│
▼
PubSub Server
│
├── Subscriber 1
├── Subscriber 2
└── Subscriber 3
二、配置PubSub #
2.1 应用配置 #
elixir
defmodule Hello.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.PubSub, name: Hello.PubSub}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
2.2 Endpoint配置 #
elixir
defmodule HelloWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :hello
pubsub_server: Hello.PubSub
end
三、使用PubSub #
3.1 订阅主题 #
elixir
defmodule HelloWeb.RoomChannel do
use Phoenix.Channel
def join("room:" <> room_id, _params, socket) do
:ok = Phoenix.PubSub.subscribe(Hello.PubSub, "room:#{room_id}")
{:ok, socket}
end
end
3.2 发布消息 #
elixir
def broadcast_message(room_id, message) do
Phoenix.PubSub.broadcast(Hello.PubSub, "room:#{room_id}", {:new_message, message})
end
def broadcast_from_socket(socket, event, payload) do
broadcast!(socket, event, payload)
end
3.3 直接广播 #
elixir
def notify_user(user_id, notification) do
HelloWeb.Endpoint.broadcast("user:#{user_id}", "notification", notification)
end
def notify_all_users(notification) do
HelloWeb.Endpoint.broadcast("users:all", "notification", notification)
end
四、Presence概述 #
4.1 什么是Presence #
Presence是Phoenix的用户在线状态系统,用于:
- 追踪用户连接状态
- 自动处理断线
- 同步多节点状态
4.2 Presence特点 #
- 自动清理断线用户
- 支持分布式追踪
- 冲突自动解决
- 低延迟同步
五、配置Presence #
5.1 创建Presence模块 #
elixir
defmodule HelloWeb.Presence do
use Phoenix.Presence,
otp_app: :hello,
pubsub_server: Hello.PubSub
def fetch(_topic, presences) do
users = Hello.Accounts.list_users(Map.keys(presences))
Enum.map(presences, fn {user_id, %{metas: metas}} ->
user = Enum.find(users, fn u -> u.id == user_id end)
{user_id, %{metas: metas, user: user}}
end)
|> Enum.into(%{})
end
end
5.2 启动Presence #
elixir
defmodule Hello.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.PubSub, name: Hello.PubSub},
HelloWeb.Presence
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
六、使用Presence #
6.1 追踪用户 #
elixir
defmodule HelloWeb.RoomChannel do
use Phoenix.Channel
alias HelloWeb.Presence
def join("room:" <> room_id, _params, socket) do
send(self(), :after_join)
{:ok, socket}
end
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second)),
device: "browser"
})
push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
end
6.2 获取在线用户 #
elixir
def list_present_users(topic) do
Presence.list(topic)
|> Enum.map(fn {user_id, %{metas: metas}} ->
%{id: user_id, metas: metas}
end)
end
def user_present?(topic, user_id) do
case Presence.get_by_key(topic, user_id) do
nil -> false
_ -> true
end
end
6.3 监听状态变化 #
elixir
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
def handle_info(%Phoenix.Socket.Broadcast{
event: "presence_diff",
payload: %{joins: joins, leaves: leaves}
}, socket) do
for {user_id, _meta} <- joins do
broadcast!(socket, "user_joined", %{user_id: user_id})
end
for {user_id, _meta} <- leaves do
broadcast!(socket, "user_left", %{user_id: user_id})
end
{:noreply, socket}
end
七、客户端Presence #
7.1 JavaScript客户端 #
javascript
let channel = socket.channel("room:123", {})
channel.join()
.receive("ok", resp => {
console.log("Joined successfully", resp)
})
channel.on("presence_state", state => {
console.log("Current users:", state)
renderUsers(state)
})
channel.on("presence_diff", diff => {
console.log("Users changed:", diff)
handlePresenceDiff(diff)
})
function renderUsers(presences) {
let usersList = document.getElementById("users")
usersList.innerHTML = ""
Presence.list(presences, (id, {metas}) => {
let userElement = document.createElement("li")
userElement.innerText = `${id} (${metas.length} connections)`
usersList.appendChild(userElement)
})
}
function handlePresenceDiff(diff) {
Object.keys(diff.joins).forEach(userId => {
console.log(`${userId} joined`)
})
Object.keys(diff.leaves).forEach(userId => {
console.log(`${userId} left`)
})
}
7.2 Presence辅助函数 #
javascript
let Presence = {
syncState(currentState, newState) {
let state = {...currentState}
Object.keys(newState).forEach(key => {
state[key] = newState[key]
})
return state
},
syncDiff(currentState, {joins, leaves}) {
let state = {...currentState}
Object.keys(joins).forEach(key => {
state[key] = joins[key]
})
Object.keys(leaves).forEach(key => {
delete state[key]
})
return state
},
list(presences, onPresence) {
return Object.keys(presences).map(key => {
return onPresence(key, presences[key])
})
}
}
八、高级Presence #
8.1 多设备追踪 #
elixir
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second)),
device: get_device_type(socket),
browser: get_browser(socket),
ip: get_client_ip(socket)
})
{:noreply, socket}
end
8.2 自定义元数据 #
elixir
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second)),
status: "online",
typing: false,
room_id: socket.topic
})
{:noreply, socket}
end
def handle_in("typing", %{"typing" => typing}, socket) do
{:ok, _} = Presence.update(socket, socket.assigns.user_id, fn meta ->
%{meta | typing: typing}
end)
{:noreply, socket}
end
8.3 获取特定用户状态 #
elixir
def get_user_presence(topic, user_id) do
case Presence.get_by_key(topic, user_id) do
nil -> nil
%{metas: metas} -> metas
end
end
def get_user_connections(topic, user_id) do
case Presence.get_by_key(topic, user_id) do
nil -> 0
%{metas: metas} -> length(metas)
end
end
九、分布式Presence #
9.1 多节点配置 #
elixir
config :hello, HelloWeb.Presence,
pubsub_server: Hello.PubSub
9.2 节点间同步 #
Presence自动处理多节点间的状态同步:
elixir
defmodule HelloWeb.Presence do
use Phoenix.Presence,
otp_app: :hello,
pubsub_server: Hello.PubSub
end
十、最佳实践 #
10.1 合理使用Topic #
elixir
def join("room:" <> room_id, _params, socket) do
if authorized?(room_id, socket) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
def join("user:" <> user_id, _params, socket) do
if socket.assigns.user_id == user_id do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
10.2 清理资源 #
elixir
def terminate(_reason, socket) do
cleanup_user_resources(socket.assigns.user_id)
:ok
end
10.3 错误处理 #
elixir
def handle_in("new_message", %{"body" => body}, socket) do
case create_message(socket, body) do
{:ok, message} ->
broadcast!(socket, "new_message", message)
{:noreply, socket}
{:error, changeset} ->
{:reply, {:error, format_errors(changeset)}, socket}
end
end
十一、总结 #
11.1 PubSub核心 #
| 函数 | 说明 |
|---|---|
| subscribe/2 | 订阅主题 |
| broadcast/3 | 广播消息 |
| unsubscribe/2 | 取消订阅 |
11.2 Presence核心 #
| 函数 | 说明 |
|---|---|
| track/3 | 追踪用户 |
| list/1 | 获取在线列表 |
| get_by_key/2 | 获取用户状态 |
| update/3 | 更新元数据 |
11.3 下一步 #
现在你已经了解了PubSub与Presence,接下来让我们学习 LiveView简介,深入了解实时视图!
最后更新:2026-03-28