Erlang消息传递 #

一、消息传递概述 #

Erlang进程之间通过异步消息传递进行通信,这是Erlang并发模型的核心。

1.1 消息传递特点 #

  • 异步发送
  • 消息是有序的
  • 消息存储在接收者邮箱
  • 支持选择性接收

二、发送消息 #

2.1 发送操作符 #

erlang
-module(send_message).
-export([demo/0]).

demo() ->
    Pid = self(),
    Pid ! {hello, "World"},
    receive
        Msg -> io:format("Received: ~p~n", [Msg])
    end.

2.2 发送给注册进程 #

erlang
-module(send_registered).
-export([start/0, loop/0, send/1]).

start() ->
    Pid = spawn(?MODULE, loop, []),
    register(my_server, Pid),
    {ok, Pid}.

loop() ->
    receive
        {request, From, Msg} ->
            io:format("Received: ~p~n", [Msg]),
            From ! {response, ok},
            loop();
        stop -> ok
    end.

send(Msg) ->
    my_server ! {request, self(), Msg},
    receive
        {response, Response} -> {ok, Response}
    after 5000 -> {error, timeout}
    end.

2.3 发送给远程节点 #

erlang
-module(send_remote).
-export([demo/1]).

demo(Node) ->
    {my_server, Node} ! {hello, from, node()}.

三、接收消息 #

3.1 基本接收 #

erlang
-module(receive_basic).
-export([demo/0]).

demo() ->
    self() ! hello,
    receive
        Msg -> io:format("Received: ~p~n", [Msg])
    end.

3.2 模式匹配接收 #

erlang
-module(receive_pattern).
-export([demo/0]).

demo() ->
    self() ! {point, 10, 20},
    receive
        {point, X, Y} ->
            io:format("Point: (~p, ~p)~n", [X, Y])
    end.

3.3 选择性接收 #

erlang
-module(selective_receive).
-export([demo/0]).

demo() ->
    self() ! b,
    self() ! a,
    self() ! c,
    
    receive
        a -> io:format("Got a~n")
    end,
    receive
        b -> io:format("Got b~n")
    end,
    receive
        c -> io:format("Got c~n")
    end.

四、超时处理 #

4.1 基本超时 #

erlang
-module(timeout_basic).
-export([demo/0]).

demo() ->
    receive
        Msg -> io:format("Received: ~p~n", [Msg])
    after 1000 ->
        io:format("Timeout!~n")
    end.

4.2 零超时 #

erlang
-module(zero_timeout).
-export([check_mailbox/0]).

check_mailbox() ->
    receive
        Msg -> {found, Msg}
    after 0 ->
        empty
    end.

4.3 无限等待 #

erlang
-module(infinite_wait).
-export([wait_forever/0]).

wait_forever() ->
    receive
        Msg -> io:format("Received: ~p~n", [Msg])
    end.

五、消息模式 #

5.1 请求-响应模式 #

erlang
-module(request_response).
-export([start/0, request/2, loop/0]).

start() ->
    spawn(?MODULE, loop, []).

request(Pid, Msg) ->
    Ref = make_ref(),
    Pid ! {request, self(), Ref, Msg},
    receive
        {response, Ref, Response} -> {ok, Response}
    after 5000 -> {error, timeout}
    end.

loop() ->
    receive
        {request, From, Ref, Msg} ->
            Response = process(Msg),
            From ! {response, Ref, Response},
            loop()
    end.

process(Msg) -> {processed, Msg}.

5.2 广播模式 #

erlang
-module(broadcast).
-export([broadcast/2]).

broadcast(Pids, Msg) ->
    lists:foreach(fun(Pid) -> Pid ! Msg end, Pids).

5.3 发布-订阅模式 #

erlang
-module(pubsub).
-export([start/0, subscribe/1, unsubscribe/1, publish/1, loop/1]).

start() ->
    spawn(?MODULE, loop, [[]]).

subscribe(Server) ->
    Server ! {subscribe, self()},
    ok.

unsubscribe(Server) ->
    Server ! {unsubscribe, self()},
    ok.

publish(Server, Msg) ->
    Server ! {publish, Msg},
    ok.

loop(Subscribers) ->
    receive
        {subscribe, Pid} ->
            io:format("~p subscribed~n", [Pid]),
            loop([Pid | Subscribers]);
        {unsubscribe, Pid} ->
            io:format("~p unsubscribed~n", [Pid]),
            loop(lists:delete(Pid, Subscribers));
        {publish, Msg} ->
            io:format("Publishing: ~p~n", [Msg]),
            lists:foreach(fun(Pid) -> Pid ! Msg end, Subscribers),
            loop(Subscribers)
    end.

六、消息队列 #

6.1 查看消息队列 #

erlang
-module(message_queue).
-export([demo/0]).

demo() ->
    self() ! msg1,
    self() ! msg2,
    self() ! msg3,
    
    {messages, Messages} = process_info(self(), messages),
    io:format("Messages: ~p~n", [Messages]).

6.2 清空消息队列 #

erlang
-module(flush_queue).
-export([flush/0]).

flush() ->
    receive
        _ -> flush()
    after 0 -> ok
    end.

6.3 消息队列长度 #

erlang
-module(queue_length).
-export([length/0]).

length() ->
    {message_queue_len, Len} = process_info(self(), message_queue_len),
    Len.

七、消息传递技巧 #

7.1 使用引用匹配 #

erlang
-module(ref_match).
-export([call/2, loop/0]).

call(Pid, Request) ->
    Ref = make_ref(),
    Pid ! {call, self(), Ref, Request},
    receive
        {reply, Ref, Reply} -> {ok, Reply}
    after 5000 -> {error, timeout}
    end.

loop() ->
    receive
        {call, From, Ref, Request} ->
            Reply = handle(Request),
            From ! {reply, Ref, Reply},
            loop()
    end.

handle(R) -> {handled, R}.

7.2 消息优先级 #

erlang
-module(priority_receive).
-export([receive_high/0, receive_normal/0]).

receive_high() ->
    receive
        {high, Msg} -> {high, Msg}
    after 0 ->
        receive_normal()
    end.

receive_normal() ->
    receive
        {normal, Msg} -> {normal, Msg}
    after 0 ->
        no_message
    end.

7.3 消息过滤 #

erlang
-module(filter_receive).
-export([receive_if/1]).

receive_if(Pred) ->
    receive
        Msg when Pred(Msg) -> {ok, Msg}
    after 0 ->
        no_message
    end.

八、实际应用 #

8.1 简单RPC #

erlang
-module(simple_rpc).
-export([start/0, call/2, cast/2, loop/0]).

start() ->
    spawn(?MODULE, loop, []).

call(Pid, Request) ->
    Ref = make_ref(),
    Pid ! {call, self(), Ref, Request},
    receive
        {reply, Ref, Reply} -> {ok, Reply}
    after 5000 -> {error, timeout}
    end.

cast(Pid, Request) ->
    Pid ! {cast, Request},
    ok.

loop() ->
    receive
        {call, From, Ref, Request} ->
            Reply = handle(Request),
            From ! {reply, Ref, Reply},
            loop();
        {cast, Request} ->
            handle(Request),
            loop()
    end.

handle(R) -> {handled, R}.

8.2 心跳检测 #

erlang
-module(heartbeat).
-export([start/1, loop/1]).

start(Interval) ->
    spawn(?MODULE, loop, [Interval]).

loop(Interval) ->
    receive
        {ping, From} ->
            From ! pong,
            loop(Interval)
    after Interval ->
        io:format("Heartbeat~n"),
        loop(Interval)
    end.

8.3 连接池 #

erlang
-module(connection_pool).
-export([start/1, get_connection/0, return_connection/1, loop/1]).

start(Size) ->
    Connections = [spawn_connection() || _ <- lists:seq(1, Size)],
    spawn(?MODULE, loop, [Connections]).

spawn_connection() ->
    spawn(fun() -> connection_loop() end).

get_connection() ->
    pool ! {get, self()},
    receive
        {connection, Pid} -> {ok, Pid}
    after 5000 -> {error, timeout}
    end.

return_connection(Pid) ->
    pool ! {return, Pid},
    ok.

loop(Available) ->
    receive
        {get, From} when Available =/= [] ->
            [Pid | Rest] = Available,
            From ! {connection, Pid},
            loop(Rest);
        {return, Pid} ->
            loop([Pid | Available])
    end.

connection_loop() ->
    receive
        {request, From, Data} ->
            From ! {response, Data},
            connection_loop()
    end.

九、总结 #

本章学习了:

  • 发送消息
  • 接收消息
  • 选择性接收
  • 超时处理
  • 消息模式
  • 消息队列
  • 消息传递技巧
  • 实际应用

准备好学习进程链接了吗?让我们进入下一章。

最后更新:2026-03-27