代理类型Agent #
一、Agent简介 #
Agent是Clojure的异步引用类型,用于独立状态的异步更新。Agent通过消息传递实现并发,适合后台任务和事件处理。
1.1 特点 #
- 异步状态更新
- 消息队列处理
- 线程池执行
- 错误处理机制
1.2 创建Agent #
clojure
(def counter (agent 0))
(def state (agent {:status :idle :data []}))
@counter
(deref state)
二、基本操作 #
2.1 send #
使用计算线程池发送任务:
clojure
(def counter (agent 0))
(send counter inc)
(Thread/sleep 10)
@counter
(send counter + 10)
(Thread/sleep 10)
@counter
(send counter (fn [state]
(* state 2)))
(Thread/sleep 10)
@counter
2.2 send-off #
使用I/O线程池发送任务:
clojure
(def log (agent []))
(send-off log conj "Log entry 1")
(send-off log conj "Log entry 2")
(Thread/sleep 10)
@log
2.3 send-via #
使用自定义执行器:
clojure
(def counter (agent 0))
(def my-executor
(java.util.concurrent.Executors/newFixedThreadPool 2))
(send-via counter my-executor inc)
(Thread/sleep 10)
@counter
三、等待操作 #
3.1 await #
阻塞等待所有发送的任务完成:
clojure
(def counter (agent 0))
(dotimes [_ 100]
(send counter inc))
(await counter)
@counter
3.2 await-for #
带超时的等待:
clojure
(def counter (agent 0))
(dotimes [_ 1000]
(send counter inc))
(await-for 1000 counter)
@counter
3.3 await1 #
等待单个agent:
clojure
(def state (agent 0))
(send state inc)
(await1 state)
@state
四、错误处理 #
4.1 错误模式 #
clojure
(def counter (agent 0))
(send counter (fn [_] (throw (ex-info "Error!" {}))))
(Thread/sleep 10)
@counter
(agent-error counter)
4.2 设置错误处理器 #
clojure
(def counter
(agent 0
:error-handler
(fn [agent err]
(println "Error:" (.getMessage err)))))
(send counter (fn [_] (throw (ex-info "Test error" {}))))
(Thread/sleep 10)
4.3 设置错误模式 #
clojure
(def counter
(agent 0 :error-mode :continue))
(send counter (fn [_] (throw (ex-info "Error" {}))))
(Thread/sleep 10)
@counter
(def strict-counter
(agent 0 :error-mode :fail))
(send strict-counter (fn [_] (throw (ex-info "Error" {}))))
(Thread/sleep 10)
(agent-error strict-counter)
4.4 重启Agent #
clojure
(def counter (agent 0))
(send counter (fn [_] (throw (ex-info "Error" {}))))
(Thread/sleep 10)
(restart-agent counter 0)
@counter
五、实践示例 #
5.1 日志系统 #
clojure
(defn make-logger []
(agent []))
(defn log [logger message]
(send-off logger
(fn [logs]
(let [entry {:time (System/currentTimeMillis)
:message message}]
(conj logs entry)))))
(defn get-logs [logger]
@logger)
(def logger (make-logger))
(log logger "Application started")
(log logger "Processing request")
(log logger "Request completed")
(Thread/sleep 100)
(get-logs logger)
5.2 缓存系统 #
clojure
(defn make-cache []
(agent {}))
(defn cache-get [cache key]
(@cache key))
(defn cache-put [cache key value]
(send cache assoc key value))
(defn cache-remove [cache key]
(send cache dissoc key))
(def cache (make-cache))
(cache-put cache :user {:name "Alice"})
(cache-put cache :settings {:theme :dark})
(Thread/sleep 10)
(cache-get cache :user)
5.3 统计收集 #
clojure
(defn make-stats []
(agent {:count 0 :sum 0 :min nil :max nil}))
(defn record [stats value]
(send stats
(fn [{:keys [count sum min max] :as s}]
(-> s
(update :count inc)
(update :sum + value)
(assoc :min (if min (min min value) value))
(assoc :max (if max (max max value) value))))))
(defn get-stats [stats]
(let [{:keys [count sum min max]} @stats]
{:count count
:average (when (pos? count) (/ sum count))
:min min
:max max}))
(def stats (make-stats))
(doseq [v [5 3 8 1 9 4 6 2 7]]
(record stats v))
(await stats)
(get-stats stats)
5.4 事件处理 #
clojure
(defn make-event-processor [handler]
(agent {:queue []
:processing false}))
(defn enqueue [processor event]
(send processor
(fn [{:keys [queue] :as state}]
(assoc state :queue (conj queue event)))))
(defn process-next [processor handler]
(send-off processor
(fn [{:keys [queue processing] :as state}]
(if (and (seq queue) (not processing))
(do
(handler (first queue))
(-> state
(update :queue rest)
(assoc :processing false)))
state))))
(def processor
(make-event-processor
(fn [event]
(println "Processing:" event)
(Thread/sleep 100))))
(enqueue processor {:type :click :x 10 :y 20})
(enqueue processor {:type :key :value "a"})
(Thread/sleep 10)
(process-next processor println)
六、send vs send-off #
6.1 线程池区别 #
| 函数 | 线程池 | 用途 |
|---|---|---|
send |
计算线程池 | CPU密集型 |
send-off |
I/O线程池 | I/O操作 |
6.2 选择建议 #
clojure
(def cpu-agent (agent 0))
(def io-agent (agent []))
(send cpu-agent inc)
(send-off io-agent conj "data")
(send-off io-agent
(fn [state]
(spit "log.txt" (str state "\n") :append true)
state))
七、Agent与其他引用类型 #
| 类型 | 同步/异步 | 协调/独立 | 适用场景 |
|---|---|---|---|
| Atom | 同步 | 独立 | 简单状态 |
| Ref | 同步 | 协调 | 多变量事务 |
| Agent | 异步 | 独立 | 后台任务 |
八、最佳实践 #
8.1 避免阻塞操作 #
clojure
(defn bad-agent-usage [agent]
(send agent
(fn [state]
(Thread/sleep 10000)
state)))
阻塞操作应使用 send-off。
8.2 合理使用await #
clojure
(defn process-all [agent]
(send agent process)
(await agent))
只在需要时使用 await。
8.3 错误处理 #
clojure
(defn safe-agent [initial]
(agent initial
:error-mode :continue
:error-handler (fn [a e]
(println "Agent error:" e))))
九、总结 #
Agent操作速查:
| 操作 | 说明 |
|---|---|
agent |
创建Agent |
send |
发送计算任务 |
send-off |
发送I/O任务 |
await |
等待完成 |
await-for |
带超时等待 |
agent-error |
获取错误 |
restart-agent |
重启Agent |
关键点:
- Agent用于异步状态更新
send用于计算,send-off用于I/O- 消息按顺序处理
- 设置错误处理器处理异常
- 使用
await等待完成
下一步,让我们学习软件事务内存STM!
最后更新:2026-03-27