Elixir进程通信 #
一、消息传递 #
1.1 send #
elixir
iex(1)> send(self(), :hello)
:hello
1.2 receive #
elixir
iex(1)> receive do
...> :hello -> "Got hello!"
...> :world -> "Got world!"
...> end
"Got hello!"
1.3 超时 #
elixir
iex(1)> receive do
...> msg -> msg
...> after
...> 1000 -> {:error, :timeout}
...> end
{:error, :timeout}
1.4 模式匹配 #
elixir
iex(1)> receive do
...> {:ok, data} -> "Success: #{data}"
...> {:error, reason} -> "Error: #{reason}"
...> {from, msg} when is_pid(from) -> "From #{inspect(from)}: #{msg}"
...> end
二、消息队列 #
2.1 消息队列原理 #
每个进程都有一个消息队列,receive 从队列中匹配消息:
text
┌─────────────────┐
│ Message 1 │
├─────────────────┤
│ Message 2 │
├─────────────────┤
│ Message 3 │
├─────────────────┤
│ ... │
└─────────────────┘
2.2 查看消息队列 #
elixir
iex(1)> send(self(), :msg1)
iex(2)> send(self(), :msg2)
iex(3)> send(self(), :msg3)
iex(4)> Process.info(self(), :messages)
{:messages, [:msg1, :msg2, :msg3]}
2.3 清空消息队列 #
elixir
def flush do
receive do
_ -> flush()
after
0 -> :ok
end
end
三、选择性接收 #
3.1 基本选择性接收 #
elixir
receive do
{:priority, msg} -> handle_priority(msg)
after
0 -> :no_priority_messages
end
receive do
{:normal, msg} -> handle_normal(msg)
after
0 -> :no_normal_messages
end
3.2 实现优先级队列 #
elixir
defmodule PriorityQueue do
def receive_message do
receive do
{:high, msg} -> {:high, msg}
after
0 ->
receive do
{:medium, msg} -> {:medium, msg}
after
0 ->
receive do
{:low, msg} -> {:low, msg}
after
100 -> :timeout
end
end
end
end
end
四、进程链接 #
4.1 双向链接 #
elixir
iex(1)> pid1 = spawn(fn -> loop() end)
iex(2)> pid2 = spawn(fn -> loop() end)
iex(3)> Process.link(pid1)
true
4.2 链接传播 #
当链接的进程崩溃时,错误会传播:
elixir
iex(1)> spawn_link(fn -> raise "Error!" end)
** (RuntimeError) Error!
4.3 捕获退出 #
elixir
iex(1)> Process.flag(:trap_exit, true)
false
iex(2)> spawn_link(fn -> raise "Error!" end)
#PID<0.123.0>
iex(3)> receive do
...> msg -> IO.inspect(msg)
...> end
{:EXIT, #PID<0.123.0>, %RuntimeError{message: "Error!"}}
4.4 退出信号 #
elixir
iex(1)> Process.exit(pid, :kill)
true
iex(2)> Process.exit(pid, :normal)
true
退出原因:
:normal- 正常退出:kill- 强制杀死:shutdown- 优雅关闭- 其他 - 异常退出
五、进程监控 #
5.1 单向监控 #
elixir
iex(1)> {pid, ref} = spawn_monitor(fn ->
...> receive do
...> _ -> :ok
...> end
...> end)
iex(2)> send(pid, :stop)
iex(3)> receive do
...> {:DOWN, ^ref, :process, ^pid, reason} ->
...> IO.puts("Process exited: #{reason}")
...> end
Process exited: normal
5.2 多次监控 #
elixir
iex(1)> pid = spawn(fn -> loop() end)
iex(2)> ref1 = Process.monitor(pid)
iex(3)> ref2 = Process.monitor(pid)
5.3 取消监控 #
elixir
iex(1)> Process.demonitor(ref)
true
iex(2)> Process.demonitor(ref, [:flush])
true
六、命名进程 #
6.1 注册名称 #
elixir
iex(1)> pid = spawn(fn -> loop() end)
iex(2)> Process.register(pid, :my_server)
true
iex(3)> send(:my_server, :hello)
:hello
6.2 查找进程 #
elixir
iex(1)> Process.whereis(:my_server)
#PID<0.123.0>
iex(2)> Process.whereis(:unknown)
nil
6.3 已注册进程 #
elixir
iex(1)> Process.registered()
[:my_server, :code_server, :file_server, ...]
七、节点通信 #
7.1 节点连接 #
elixir
iex(1)> Node.connect(:"node2@192.168.1.2")
true
iex(2)> Node.list()
[:"node2@192.168.1.2"]
7.2 跨节点发送消息 #
elixir
iex(1)> send({:my_server, :"node2@192.168.1.2"}, :hello)
:hello
7.3 远程spawn #
elixir
iex(1)> Node.spawn(:"node2@192.168.1.2", fn -> IO.puts("Remote!") end)
八、进程组 #
8.1 PG2 #
elixir
iex(1)> :pg2.create(:my_group)
:ok
iex(2)> :pg2.join(:my_group, self())
:ok
iex(3)> :pg2.get_members(:my_group)
[#PID<0.1.0>]
iex(4)> :pg2.broadcast(:my_group, :hello)
:hello
8.2 PG (Elixir 1.13+) #
elixir
iex(1)> :pg.start_link()
iex(2)> :pg.join(:my_scope, :my_group, self())
iex(3)> :pg.get_members(:my_scope, :my_group)
九、通信模式 #
9.1 请求-响应模式 #
elixir
defmodule Server do
def call(pid, request) do
ref = make_ref()
send(pid, {self(), ref, request})
receive do
{^ref, response} -> response
after
5000 -> {:error, :timeout}
end
end
def reply({from, ref}, response) do
send(from, {ref, response})
end
end
9.2 发布-订阅模式 #
elixir
defmodule PubSub do
def start do
spawn(fn -> loop(%{}) end)
end
defp loop(subscribers) do
receive do
{:subscribe, pid, topic} ->
new_subs = Map.update(subscribers, topic, [pid], &[pid | &1])
loop(new_subs)
{:unsubscribe, pid, topic} ->
new_subs = Map.update(subscribers, topic, [], &List.delete(&1, pid))
loop(new_subs)
{:publish, topic, msg} ->
Enum.each(Map.get(subscribers, topic, []), fn pid ->
send(pid, msg)
end)
loop(subscribers)
end
end
end
十、总结 #
本章学习了:
| 特性 | 用途 |
|---|---|
send/receive |
消息传递 |
Process.link |
双向链接 |
Process.monitor |
单向监控 |
Process.register |
进程命名 |
:trap_exit |
捕获退出信号 |
Node |
跨节点通信 |
准备好学习Agent与Task了吗?让我们进入下一章。
最后更新:2026-03-27