Erlang消息传递 #
一、消息传递概述 #
Erlang进程之间通过异步消息传递进行通信,这是Erlang并发模型的核心。
1.1 消息传递特点 #
- 异步发送
- 消息是有序的
- 消息存储在接收者邮箱
- 支持选择性接收
二、发送消息 #
2.1 发送操作符 #
erlang
-module(send_message).
-export([demo/0]).
demo() ->
Pid = self(),
Pid ! {hello, "World"},
receive
Msg -> io:format("Received: ~p~n", [Msg])
end.
2.2 发送给注册进程 #
erlang
-module(send_registered).
-export([start/0, loop/0, send/1]).
start() ->
Pid = spawn(?MODULE, loop, []),
register(my_server, Pid),
{ok, Pid}.
loop() ->
receive
{request, From, Msg} ->
io:format("Received: ~p~n", [Msg]),
From ! {response, ok},
loop();
stop -> ok
end.
send(Msg) ->
my_server ! {request, self(), Msg},
receive
{response, Response} -> {ok, Response}
after 5000 -> {error, timeout}
end.
2.3 发送给远程节点 #
erlang
-module(send_remote).
-export([demo/1]).
demo(Node) ->
{my_server, Node} ! {hello, from, node()}.
三、接收消息 #
3.1 基本接收 #
erlang
-module(receive_basic).
-export([demo/0]).
demo() ->
self() ! hello,
receive
Msg -> io:format("Received: ~p~n", [Msg])
end.
3.2 模式匹配接收 #
erlang
-module(receive_pattern).
-export([demo/0]).
demo() ->
self() ! {point, 10, 20},
receive
{point, X, Y} ->
io:format("Point: (~p, ~p)~n", [X, Y])
end.
3.3 选择性接收 #
erlang
-module(selective_receive).
-export([demo/0]).
demo() ->
self() ! b,
self() ! a,
self() ! c,
receive
a -> io:format("Got a~n")
end,
receive
b -> io:format("Got b~n")
end,
receive
c -> io:format("Got c~n")
end.
四、超时处理 #
4.1 基本超时 #
erlang
-module(timeout_basic).
-export([demo/0]).
demo() ->
receive
Msg -> io:format("Received: ~p~n", [Msg])
after 1000 ->
io:format("Timeout!~n")
end.
4.2 零超时 #
erlang
-module(zero_timeout).
-export([check_mailbox/0]).
check_mailbox() ->
receive
Msg -> {found, Msg}
after 0 ->
empty
end.
4.3 无限等待 #
erlang
-module(infinite_wait).
-export([wait_forever/0]).
wait_forever() ->
receive
Msg -> io:format("Received: ~p~n", [Msg])
end.
五、消息模式 #
5.1 请求-响应模式 #
erlang
-module(request_response).
-export([start/0, request/2, loop/0]).
start() ->
spawn(?MODULE, loop, []).
request(Pid, Msg) ->
Ref = make_ref(),
Pid ! {request, self(), Ref, Msg},
receive
{response, Ref, Response} -> {ok, Response}
after 5000 -> {error, timeout}
end.
loop() ->
receive
{request, From, Ref, Msg} ->
Response = process(Msg),
From ! {response, Ref, Response},
loop()
end.
process(Msg) -> {processed, Msg}.
5.2 广播模式 #
erlang
-module(broadcast).
-export([broadcast/2]).
broadcast(Pids, Msg) ->
lists:foreach(fun(Pid) -> Pid ! Msg end, Pids).
5.3 发布-订阅模式 #
erlang
-module(pubsub).
-export([start/0, subscribe/1, unsubscribe/1, publish/1, loop/1]).
start() ->
spawn(?MODULE, loop, [[]]).
subscribe(Server) ->
Server ! {subscribe, self()},
ok.
unsubscribe(Server) ->
Server ! {unsubscribe, self()},
ok.
publish(Server, Msg) ->
Server ! {publish, Msg},
ok.
loop(Subscribers) ->
receive
{subscribe, Pid} ->
io:format("~p subscribed~n", [Pid]),
loop([Pid | Subscribers]);
{unsubscribe, Pid} ->
io:format("~p unsubscribed~n", [Pid]),
loop(lists:delete(Pid, Subscribers));
{publish, Msg} ->
io:format("Publishing: ~p~n", [Msg]),
lists:foreach(fun(Pid) -> Pid ! Msg end, Subscribers),
loop(Subscribers)
end.
六、消息队列 #
6.1 查看消息队列 #
erlang
-module(message_queue).
-export([demo/0]).
demo() ->
self() ! msg1,
self() ! msg2,
self() ! msg3,
{messages, Messages} = process_info(self(), messages),
io:format("Messages: ~p~n", [Messages]).
6.2 清空消息队列 #
erlang
-module(flush_queue).
-export([flush/0]).
flush() ->
receive
_ -> flush()
after 0 -> ok
end.
6.3 消息队列长度 #
erlang
-module(queue_length).
-export([length/0]).
length() ->
{message_queue_len, Len} = process_info(self(), message_queue_len),
Len.
七、消息传递技巧 #
7.1 使用引用匹配 #
erlang
-module(ref_match).
-export([call/2, loop/0]).
call(Pid, Request) ->
Ref = make_ref(),
Pid ! {call, self(), Ref, Request},
receive
{reply, Ref, Reply} -> {ok, Reply}
after 5000 -> {error, timeout}
end.
loop() ->
receive
{call, From, Ref, Request} ->
Reply = handle(Request),
From ! {reply, Ref, Reply},
loop()
end.
handle(R) -> {handled, R}.
7.2 消息优先级 #
erlang
-module(priority_receive).
-export([receive_high/0, receive_normal/0]).
receive_high() ->
receive
{high, Msg} -> {high, Msg}
after 0 ->
receive_normal()
end.
receive_normal() ->
receive
{normal, Msg} -> {normal, Msg}
after 0 ->
no_message
end.
7.3 消息过滤 #
erlang
-module(filter_receive).
-export([receive_if/1]).
receive_if(Pred) ->
receive
Msg when Pred(Msg) -> {ok, Msg}
after 0 ->
no_message
end.
八、实际应用 #
8.1 简单RPC #
erlang
-module(simple_rpc).
-export([start/0, call/2, cast/2, loop/0]).
start() ->
spawn(?MODULE, loop, []).
call(Pid, Request) ->
Ref = make_ref(),
Pid ! {call, self(), Ref, Request},
receive
{reply, Ref, Reply} -> {ok, Reply}
after 5000 -> {error, timeout}
end.
cast(Pid, Request) ->
Pid ! {cast, Request},
ok.
loop() ->
receive
{call, From, Ref, Request} ->
Reply = handle(Request),
From ! {reply, Ref, Reply},
loop();
{cast, Request} ->
handle(Request),
loop()
end.
handle(R) -> {handled, R}.
8.2 心跳检测 #
erlang
-module(heartbeat).
-export([start/1, loop/1]).
start(Interval) ->
spawn(?MODULE, loop, [Interval]).
loop(Interval) ->
receive
{ping, From} ->
From ! pong,
loop(Interval)
after Interval ->
io:format("Heartbeat~n"),
loop(Interval)
end.
8.3 连接池 #
erlang
-module(connection_pool).
-export([start/1, get_connection/0, return_connection/1, loop/1]).
start(Size) ->
Connections = [spawn_connection() || _ <- lists:seq(1, Size)],
spawn(?MODULE, loop, [Connections]).
spawn_connection() ->
spawn(fun() -> connection_loop() end).
get_connection() ->
pool ! {get, self()},
receive
{connection, Pid} -> {ok, Pid}
after 5000 -> {error, timeout}
end.
return_connection(Pid) ->
pool ! {return, Pid},
ok.
loop(Available) ->
receive
{get, From} when Available =/= [] ->
[Pid | Rest] = Available,
From ! {connection, Pid},
loop(Rest);
{return, Pid} ->
loop([Pid | Available])
end.
connection_loop() ->
receive
{request, From, Data} ->
From ! {response, Data},
connection_loop()
end.
九、总结 #
本章学习了:
- 发送消息
- 接收消息
- 选择性接收
- 超时处理
- 消息模式
- 消息队列
- 消息传递技巧
- 实际应用
准备好学习进程链接了吗?让我们进入下一章。
最后更新:2026-03-27