Elixir进程通信 #

一、消息传递 #

1.1 send #

elixir
iex(1)> send(self(), :hello)
:hello

1.2 receive #

elixir
iex(1)> receive do
...>   :hello -> "Got hello!"
...>   :world -> "Got world!"
...> end
"Got hello!"

1.3 超时 #

elixir
iex(1)> receive do
...>   msg -> msg
...> after
...>   1000 -> {:error, :timeout}
...> end
{:error, :timeout}

1.4 模式匹配 #

elixir
iex(1)> receive do
...>   {:ok, data} -> "Success: #{data}"
...>   {:error, reason} -> "Error: #{reason}"
...>   {from, msg} when is_pid(from) -> "From #{inspect(from)}: #{msg}"
...> end

二、消息队列 #

2.1 消息队列原理 #

每个进程都有一个消息队列,receive 从队列中匹配消息:

text
┌─────────────────┐
│   Message 1     │
├─────────────────┤
│   Message 2     │
├─────────────────┤
│   Message 3     │
├─────────────────┤
│      ...        │
└─────────────────┘

2.2 查看消息队列 #

elixir
iex(1)> send(self(), :msg1)
iex(2)> send(self(), :msg2)
iex(3)> send(self(), :msg3)

iex(4)> Process.info(self(), :messages)
{:messages, [:msg1, :msg2, :msg3]}

2.3 清空消息队列 #

elixir
def flush do
  receive do
    _ -> flush()
  after
    0 -> :ok
  end
end

三、选择性接收 #

3.1 基本选择性接收 #

elixir
receive do
  {:priority, msg} -> handle_priority(msg)
after
  0 -> :no_priority_messages
end

receive do
  {:normal, msg} -> handle_normal(msg)
after
  0 -> :no_normal_messages
end

3.2 实现优先级队列 #

elixir
defmodule PriorityQueue do
  def receive_message do
    receive do
      {:high, msg} -> {:high, msg}
    after
      0 ->
        receive do
          {:medium, msg} -> {:medium, msg}
        after
          0 ->
            receive do
              {:low, msg} -> {:low, msg}
            after
              100 -> :timeout
            end
        end
    end
  end
end

四、进程链接 #

4.1 双向链接 #

elixir
iex(1)> pid1 = spawn(fn -> loop() end)
iex(2)> pid2 = spawn(fn -> loop() end)

iex(3)> Process.link(pid1)
true

4.2 链接传播 #

当链接的进程崩溃时,错误会传播:

elixir
iex(1)> spawn_link(fn -> raise "Error!" end)
** (RuntimeError) Error!

4.3 捕获退出 #

elixir
iex(1)> Process.flag(:trap_exit, true)
false

iex(2)> spawn_link(fn -> raise "Error!" end)
#PID<0.123.0>

iex(3)> receive do
...>   msg -> IO.inspect(msg)
...> end
{:EXIT, #PID<0.123.0>, %RuntimeError{message: "Error!"}}

4.4 退出信号 #

elixir
iex(1)> Process.exit(pid, :kill)
true

iex(2)> Process.exit(pid, :normal)
true

退出原因:

  • :normal - 正常退出
  • :kill - 强制杀死
  • :shutdown - 优雅关闭
  • 其他 - 异常退出

五、进程监控 #

5.1 单向监控 #

elixir
iex(1)> {pid, ref} = spawn_monitor(fn ->
...>   receive do
...>     _ -> :ok
...>   end
...> end)

iex(2)> send(pid, :stop)

iex(3)> receive do
...>   {:DOWN, ^ref, :process, ^pid, reason} ->
...>     IO.puts("Process exited: #{reason}")
...> end
Process exited: normal

5.2 多次监控 #

elixir
iex(1)> pid = spawn(fn -> loop() end)
iex(2)> ref1 = Process.monitor(pid)
iex(3)> ref2 = Process.monitor(pid)

5.3 取消监控 #

elixir
iex(1)> Process.demonitor(ref)
true

iex(2)> Process.demonitor(ref, [:flush])
true

六、命名进程 #

6.1 注册名称 #

elixir
iex(1)> pid = spawn(fn -> loop() end)
iex(2)> Process.register(pid, :my_server)
true

iex(3)> send(:my_server, :hello)
:hello

6.2 查找进程 #

elixir
iex(1)> Process.whereis(:my_server)
#PID<0.123.0>

iex(2)> Process.whereis(:unknown)
nil

6.3 已注册进程 #

elixir
iex(1)> Process.registered()
[:my_server, :code_server, :file_server, ...]

七、节点通信 #

7.1 节点连接 #

elixir
iex(1)> Node.connect(:"node2@192.168.1.2")
true

iex(2)> Node.list()
[:"node2@192.168.1.2"]

7.2 跨节点发送消息 #

elixir
iex(1)> send({:my_server, :"node2@192.168.1.2"}, :hello)
:hello

7.3 远程spawn #

elixir
iex(1)> Node.spawn(:"node2@192.168.1.2", fn -> IO.puts("Remote!") end)

八、进程组 #

8.1 PG2 #

elixir
iex(1)> :pg2.create(:my_group)
:ok

iex(2)> :pg2.join(:my_group, self())
:ok

iex(3)> :pg2.get_members(:my_group)
[#PID<0.1.0>]

iex(4)> :pg2.broadcast(:my_group, :hello)
:hello

8.2 PG (Elixir 1.13+) #

elixir
iex(1)> :pg.start_link()
iex(2)> :pg.join(:my_scope, :my_group, self())
iex(3)> :pg.get_members(:my_scope, :my_group)

九、通信模式 #

9.1 请求-响应模式 #

elixir
defmodule Server do
  def call(pid, request) do
    ref = make_ref()
    send(pid, {self(), ref, request})

    receive do
      {^ref, response} -> response
    after
      5000 -> {:error, :timeout}
    end
  end

  def reply({from, ref}, response) do
    send(from, {ref, response})
  end
end

9.2 发布-订阅模式 #

elixir
defmodule PubSub do
  def start do
    spawn(fn -> loop(%{}) end)
  end

  defp loop(subscribers) do
    receive do
      {:subscribe, pid, topic} ->
        new_subs = Map.update(subscribers, topic, [pid], &[pid | &1])
        loop(new_subs)

      {:unsubscribe, pid, topic} ->
        new_subs = Map.update(subscribers, topic, [], &List.delete(&1, pid))
        loop(new_subs)

      {:publish, topic, msg} ->
        Enum.each(Map.get(subscribers, topic, []), fn pid ->
          send(pid, msg)
        end)
        loop(subscribers)
    end
  end
end

十、总结 #

本章学习了:

特性 用途
send/receive 消息传递
Process.link 双向链接
Process.monitor 单向监控
Process.register 进程命名
:trap_exit 捕获退出信号
Node 跨节点通信

准备好学习Agent与Task了吗?让我们进入下一章。

最后更新:2026-03-27