Elixir Agent与Task #
一、Agent基础 #
1.1 什么是Agent #
Agent是封装状态的进程,提供简单的状态管理接口。
1.2 启动Agent #
elixir
iex(1)> {:ok, pid} = Agent.start_link(fn -> 0 end)
{:ok, #PID<0.123.0>}
iex(2)> {:ok, pid} = Agent.start_link(fn -> 0 end, name: :counter)
{:ok, #PID<0.123.0>}
1.3 获取状态 #
elixir
iex(1)> Agent.get(:counter, fn state -> state end)
0
iex(2)> Agent.get(:counter, & &1)
0
1.4 更新状态 #
elixir
iex(1)> Agent.update(:counter, fn state -> state + 1 end)
:ok
iex(2)> Agent.get(:counter, & &1)
1
1.5 获取并更新 #
elixir
iex(1)> Agent.get_and_update(:counter, fn state -> {state, state + 1} end)
1
iex(2)> Agent.get(:counter, & &1)
2
二、Agent实践 #
2.1 计数器 #
elixir
defmodule Counter do
use Agent
def start_link(initial \\ 0) do
Agent.start_link(fn -> initial end, name: __MODULE__)
end
def increment do
Agent.get_and_update(__MODULE__, fn count -> {count + 1, count + 1} end)
end
def decrement do
Agent.get_and_update(__MODULE__, fn count -> {count - 1, count - 1} end)
end
def get do
Agent.get(__MODULE__, & &1)
end
def reset do
Agent.update(__MODULE__, fn _ -> 0 end)
end
end
2.2 缓存 #
elixir
defmodule Cache do
use Agent
def start_link(_) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def get(key) do
Agent.get(__MODULE__, fn state -> Map.get(state, key) end)
end
def put(key, value) do
Agent.update(__MODULE__, fn state -> Map.put(state, key, value) end)
end
def delete(key) do
Agent.update(__MODULE__, fn state -> Map.delete(state, key) end)
end
def clear do
Agent.update(__MODULE__, fn _ -> %{} end)
end
end
2.3 配置存储 #
elixir
defmodule AppConfig do
use Agent
def start_link(config) do
Agent.start_link(fn -> config end, name: __MODULE__)
end
def get(key, default \\ nil) do
Agent.get(__MODULE__, fn state -> Map.get(state, key, default) end)
end
def put(key, value) do
Agent.update(__MODULE__, fn state -> Map.put(state, key, value) end)
end
def all do
Agent.get(__MODULE__, & &1)
end
end
三、Task基础 #
3.1 什么是Task #
Task是用于执行异步计算的进程。
3.2 启动Task #
elixir
iex(1)> task = Task.async(fn -> 1 + 1 end)
%Task{...}
iex(2)> Task.await(task)
2
3.3 Task.start #
elixir
iex(1)> {:ok, pid} = Task.start(fn -> IO.puts("Background task") end)
{:ok, #PID<0.123.0>}
3.4 Task.start_link #
elixir
iex(1)> {:ok, pid} = Task.start_link(fn -> :timer.sleep(1000) end)
{:ok, #PID<0.123.0>}
四、Task实践 #
4.1 并行计算 #
elixir
defmodule Parallel do
def map(collection, func) do
collection
|> Enum.map(&Task.async(fn -> func.(&1) end))
|> Enum.map(&Task.await/1)
end
def each(collection, func) do
collection
|> Enum.map(&Task.async(fn -> func.(&1) end))
|> Enum.each(&Task.await/1)
end
end
Parallel.map([1, 2, 3, 4, 5], fn x -> x * 2 end)
4.2 超时处理 #
elixir
iex(1)> task = Task.async(fn ->
...> :timer.sleep(2000)
...> :done
...> end)
iex(2)> Task.await(task, 1000)
** (exit) exited in: Task.await(%Task{...}, 1000)
4.3 忽略结果 #
elixir
iex(1)> Task.start(fn ->
...> :timer.sleep(1000)
...> IO.puts("Background work done")
...> end)
4.4 Task.yield #
elixir
iex(1)> task = Task.async(fn -> :timer.sleep(1000); :done end)
iex(2)> case Task.yield(task, 500) || Task.shutdown(task) do
...> nil -> :timeout
...> result -> result
...> end
:timeout
五、Task.Supervisor #
5.1 启动监督器 #
elixir
defmodule MyApp do
use Application
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
5.2 启动受监督的任务 #
elixir
iex(1)> Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
...> :timer.sleep(1000)
...> :done
...> end)
5.3 启动子任务 #
elixir
iex(1)> Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
...> IO.puts("Background task")
...> end)
{:ok, #PID<0.123.0>}
六、Agent与Task组合 #
6.1 异步状态更新 #
elixir
defmodule AsyncCounter do
use Agent
def start_link(initial \\ 0) do
Agent.start_link(fn -> initial end, name: __MODULE__)
end
def increment_async do
Task.start(fn ->
Agent.update(__MODULE__, &(&1 + 1))
end)
end
def get do
Agent.get(__MODULE__, & &1)
end
end
6.2 异步缓存 #
elixir
defmodule AsyncCache do
use Agent
def start_link(_) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def get(key, fetch_fn) do
case Agent.get(__MODULE__, &Map.get(&1, key)) do
nil ->
task = Task.async(fn -> fetch_fn.() end)
value = Task.await(task)
Agent.update(__MODULE__, &Map.put(&1, key, value))
value
cached ->
cached
end
end
end
七、最佳实践 #
7.1 Agent命名 #
elixir
defmodule MyApp.Counter do
use Agent
def start_link(_), do: Agent.start_link(fn -> 0 end, name: __MODULE__)
end
7.2 Task超时 #
elixir
task = Task.async(fn -> slow_operation() end)
result = Task.await(task, 30_000)
7.3 使用监督器 #
elixir
children = [
{MyApp.Counter, []},
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
八、总结 #
Agent #
| 函数 | 用途 |
|---|---|
Agent.start_link/2 |
启动Agent |
Agent.get/2 |
获取状态 |
Agent.update/2 |
更新状态 |
Agent.get_and_update/2 |
获取并更新 |
Task #
| 函数 | 用途 |
|---|---|
Task.async/1 |
启动异步任务 |
Task.await/2 |
等待任务结果 |
Task.start/1 |
启动后台任务 |
Task.start_link/1 |
启动链接任务 |
Task.yield/2 |
非阻塞等待 |
准备好学习GenServer了吗?让我们进入下一章。
最后更新:2026-03-27