Elixir Agent与Task #

一、Agent基础 #

1.1 什么是Agent #

Agent是封装状态的进程,提供简单的状态管理接口。

1.2 启动Agent #

elixir
iex(1)> {:ok, pid} = Agent.start_link(fn -> 0 end)
{:ok, #PID<0.123.0>}

iex(2)> {:ok, pid} = Agent.start_link(fn -> 0 end, name: :counter)
{:ok, #PID<0.123.0>}

1.3 获取状态 #

elixir
iex(1)> Agent.get(:counter, fn state -> state end)
0

iex(2)> Agent.get(:counter, & &1)
0

1.4 更新状态 #

elixir
iex(1)> Agent.update(:counter, fn state -> state + 1 end)
:ok

iex(2)> Agent.get(:counter, & &1)
1

1.5 获取并更新 #

elixir
iex(1)> Agent.get_and_update(:counter, fn state -> {state, state + 1} end)
1

iex(2)> Agent.get(:counter, & &1)
2

二、Agent实践 #

2.1 计数器 #

elixir
defmodule Counter do
  use Agent

  def start_link(initial \\ 0) do
    Agent.start_link(fn -> initial end, name: __MODULE__)
  end

  def increment do
    Agent.get_and_update(__MODULE__, fn count -> {count + 1, count + 1} end)
  end

  def decrement do
    Agent.get_and_update(__MODULE__, fn count -> {count - 1, count - 1} end)
  end

  def get do
    Agent.get(__MODULE__, & &1)
  end

  def reset do
    Agent.update(__MODULE__, fn _ -> 0 end)
  end
end

2.2 缓存 #

elixir
defmodule Cache do
  use Agent

  def start_link(_) do
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def get(key) do
    Agent.get(__MODULE__, fn state -> Map.get(state, key) end)
  end

  def put(key, value) do
    Agent.update(__MODULE__, fn state -> Map.put(state, key, value) end)
  end

  def delete(key) do
    Agent.update(__MODULE__, fn state -> Map.delete(state, key) end)
  end

  def clear do
    Agent.update(__MODULE__, fn _ -> %{} end)
  end
end

2.3 配置存储 #

elixir
defmodule AppConfig do
  use Agent

  def start_link(config) do
    Agent.start_link(fn -> config end, name: __MODULE__)
  end

  def get(key, default \\ nil) do
    Agent.get(__MODULE__, fn state -> Map.get(state, key, default) end)
  end

  def put(key, value) do
    Agent.update(__MODULE__, fn state -> Map.put(state, key, value) end)
  end

  def all do
    Agent.get(__MODULE__, & &1)
  end
end

三、Task基础 #

3.1 什么是Task #

Task是用于执行异步计算的进程。

3.2 启动Task #

elixir
iex(1)> task = Task.async(fn -> 1 + 1 end)
%Task{...}

iex(2)> Task.await(task)
2

3.3 Task.start #

elixir
iex(1)> {:ok, pid} = Task.start(fn -> IO.puts("Background task") end)
{:ok, #PID<0.123.0>}
elixir
iex(1)> {:ok, pid} = Task.start_link(fn -> :timer.sleep(1000) end)
{:ok, #PID<0.123.0>}

四、Task实践 #

4.1 并行计算 #

elixir
defmodule Parallel do
  def map(collection, func) do
    collection
    |> Enum.map(&Task.async(fn -> func.(&1) end))
    |> Enum.map(&Task.await/1)
  end

  def each(collection, func) do
    collection
    |> Enum.map(&Task.async(fn -> func.(&1) end))
    |> Enum.each(&Task.await/1)
  end
end

Parallel.map([1, 2, 3, 4, 5], fn x -> x * 2 end)

4.2 超时处理 #

elixir
iex(1)> task = Task.async(fn ->
...>   :timer.sleep(2000)
...>   :done
...> end)

iex(2)> Task.await(task, 1000)
** (exit) exited in: Task.await(%Task{...}, 1000)

4.3 忽略结果 #

elixir
iex(1)> Task.start(fn ->
...>   :timer.sleep(1000)
...>   IO.puts("Background work done")
...> end)

4.4 Task.yield #

elixir
iex(1)> task = Task.async(fn -> :timer.sleep(1000); :done end)

iex(2)> case Task.yield(task, 500) || Task.shutdown(task) do
...>   nil -> :timeout
...>   result -> result
...> end
:timeout

五、Task.Supervisor #

5.1 启动监督器 #

elixir
defmodule MyApp do
  use Application

  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.TaskSupervisor}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

5.2 启动受监督的任务 #

elixir
iex(1)> Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
...>   :timer.sleep(1000)
...>   :done
...> end)

5.3 启动子任务 #

elixir
iex(1)> Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
...>   IO.puts("Background task")
...> end)
{:ok, #PID<0.123.0>}

六、Agent与Task组合 #

6.1 异步状态更新 #

elixir
defmodule AsyncCounter do
  use Agent

  def start_link(initial \\ 0) do
    Agent.start_link(fn -> initial end, name: __MODULE__)
  end

  def increment_async do
    Task.start(fn ->
      Agent.update(__MODULE__, &(&1 + 1))
    end)
  end

  def get do
    Agent.get(__MODULE__, & &1)
  end
end

6.2 异步缓存 #

elixir
defmodule AsyncCache do
  use Agent

  def start_link(_) do
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def get(key, fetch_fn) do
    case Agent.get(__MODULE__, &Map.get(&1, key)) do
      nil ->
        task = Task.async(fn -> fetch_fn.() end)
        value = Task.await(task)
        Agent.update(__MODULE__, &Map.put(&1, key, value))
        value

      cached ->
        cached
    end
  end
end

七、最佳实践 #

7.1 Agent命名 #

elixir
defmodule MyApp.Counter do
  use Agent

  def start_link(_), do: Agent.start_link(fn -> 0 end, name: __MODULE__)
end

7.2 Task超时 #

elixir
task = Task.async(fn -> slow_operation() end)
result = Task.await(task, 30_000)

7.3 使用监督器 #

elixir
children = [
  {MyApp.Counter, []},
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

八、总结 #

Agent #

函数 用途
Agent.start_link/2 启动Agent
Agent.get/2 获取状态
Agent.update/2 更新状态
Agent.get_and_update/2 获取并更新

Task #

函数 用途
Task.async/1 启动异步任务
Task.await/2 等待任务结果
Task.start/1 启动后台任务
Task.start_link/1 启动链接任务
Task.yield/2 非阻塞等待

准备好学习GenServer了吗?让我们进入下一章。

最后更新:2026-03-27