代理类型Agent #

一、Agent简介 #

Agent是Clojure的异步引用类型,用于独立状态的异步更新。Agent通过消息传递实现并发,适合后台任务和事件处理。

1.1 特点 #

  • 异步状态更新
  • 消息队列处理
  • 线程池执行
  • 错误处理机制

1.2 创建Agent #

clojure
(def counter (agent 0))

(def state (agent {:status :idle :data []}))

@counter

(deref state)

二、基本操作 #

2.1 send #

使用计算线程池发送任务:

clojure
(def counter (agent 0))

(send counter inc)

(Thread/sleep 10)

@counter

(send counter + 10)

(Thread/sleep 10)

@counter

(send counter (fn [state]
                (* state 2)))

(Thread/sleep 10)

@counter

2.2 send-off #

使用I/O线程池发送任务:

clojure
(def log (agent []))

(send-off log conj "Log entry 1")

(send-off log conj "Log entry 2")

(Thread/sleep 10)

@log

2.3 send-via #

使用自定义执行器:

clojure
(def counter (agent 0))

(def my-executor
  (java.util.concurrent.Executors/newFixedThreadPool 2))

(send-via counter my-executor inc)

(Thread/sleep 10)

@counter

三、等待操作 #

3.1 await #

阻塞等待所有发送的任务完成:

clojure
(def counter (agent 0))

(dotimes [_ 100]
  (send counter inc))

(await counter)

@counter

3.2 await-for #

带超时的等待:

clojure
(def counter (agent 0))

(dotimes [_ 1000]
  (send counter inc))

(await-for 1000 counter)

@counter

3.3 await1 #

等待单个agent:

clojure
(def state (agent 0))

(send state inc)

(await1 state)

@state

四、错误处理 #

4.1 错误模式 #

clojure
(def counter (agent 0))

(send counter (fn [_] (throw (ex-info "Error!" {}))))

(Thread/sleep 10)

@counter

(agent-error counter)

4.2 设置错误处理器 #

clojure
(def counter
  (agent 0
         :error-handler
         (fn [agent err]
           (println "Error:" (.getMessage err)))))

(send counter (fn [_] (throw (ex-info "Test error" {}))))

(Thread/sleep 10)

4.3 设置错误模式 #

clojure
(def counter
  (agent 0 :error-mode :continue))

(send counter (fn [_] (throw (ex-info "Error" {}))))

(Thread/sleep 10)

@counter

(def strict-counter
  (agent 0 :error-mode :fail))

(send strict-counter (fn [_] (throw (ex-info "Error" {}))))

(Thread/sleep 10)

(agent-error strict-counter)

4.4 重启Agent #

clojure
(def counter (agent 0))

(send counter (fn [_] (throw (ex-info "Error" {}))))

(Thread/sleep 10)

(restart-agent counter 0)

@counter

五、实践示例 #

5.1 日志系统 #

clojure
(defn make-logger []
  (agent []))

(defn log [logger message]
  (send-off logger
            (fn [logs]
              (let [entry {:time (System/currentTimeMillis)
                           :message message}]
                (conj logs entry)))))

(defn get-logs [logger]
  @logger)

(def logger (make-logger))
(log logger "Application started")
(log logger "Processing request")
(log logger "Request completed")

(Thread/sleep 100)
(get-logs logger)

5.2 缓存系统 #

clojure
(defn make-cache []
  (agent {}))

(defn cache-get [cache key]
  (@cache key))

(defn cache-put [cache key value]
  (send cache assoc key value))

(defn cache-remove [cache key]
  (send cache dissoc key))

(def cache (make-cache))
(cache-put cache :user {:name "Alice"})
(cache-put cache :settings {:theme :dark})

(Thread/sleep 10)
(cache-get cache :user)

5.3 统计收集 #

clojure
(defn make-stats []
  (agent {:count 0 :sum 0 :min nil :max nil}))

(defn record [stats value]
  (send stats
        (fn [{:keys [count sum min max] :as s}]
          (-> s
              (update :count inc)
              (update :sum + value)
              (assoc :min (if min (min min value) value))
              (assoc :max (if max (max max value) value))))))

(defn get-stats [stats]
  (let [{:keys [count sum min max]} @stats]
    {:count count
     :average (when (pos? count) (/ sum count))
     :min min
     :max max}))

(def stats (make-stats))
(doseq [v [5 3 8 1 9 4 6 2 7]]
  (record stats v))

(await stats)
(get-stats stats)

5.4 事件处理 #

clojure
(defn make-event-processor [handler]
  (agent {:queue []
          :processing false}))

(defn enqueue [processor event]
  (send processor
        (fn [{:keys [queue] :as state}]
          (assoc state :queue (conj queue event)))))

(defn process-next [processor handler]
  (send-off processor
            (fn [{:keys [queue processing] :as state}]
              (if (and (seq queue) (not processing))
                (do
                  (handler (first queue))
                  (-> state
                      (update :queue rest)
                      (assoc :processing false)))
                state))))

(def processor
  (make-event-processor
    (fn [event]
      (println "Processing:" event)
      (Thread/sleep 100))))

(enqueue processor {:type :click :x 10 :y 20})
(enqueue processor {:type :key :value "a"})

(Thread/sleep 10)
(process-next processor println)

六、send vs send-off #

6.1 线程池区别 #

函数 线程池 用途
send 计算线程池 CPU密集型
send-off I/O线程池 I/O操作

6.2 选择建议 #

clojure
(def cpu-agent (agent 0))
(def io-agent (agent []))

(send cpu-agent inc)

(send-off io-agent conj "data")

(send-off io-agent
          (fn [state]
            (spit "log.txt" (str state "\n") :append true)
            state))

七、Agent与其他引用类型 #

类型 同步/异步 协调/独立 适用场景
Atom 同步 独立 简单状态
Ref 同步 协调 多变量事务
Agent 异步 独立 后台任务

八、最佳实践 #

8.1 避免阻塞操作 #

clojure
(defn bad-agent-usage [agent]
  (send agent
        (fn [state]
          (Thread/sleep 10000)
          state)))

阻塞操作应使用 send-off

8.2 合理使用await #

clojure
(defn process-all [agent]
  (send agent process)
  (await agent))

只在需要时使用 await

8.3 错误处理 #

clojure
(defn safe-agent [initial]
  (agent initial
         :error-mode :continue
         :error-handler (fn [a e]
                          (println "Agent error:" e))))

九、总结 #

Agent操作速查:

操作 说明
agent 创建Agent
send 发送计算任务
send-off 发送I/O任务
await 等待完成
await-for 带超时等待
agent-error 获取错误
restart-agent 重启Agent

关键点:

  1. Agent用于异步状态更新
  2. send 用于计算,send-off 用于I/O
  3. 消息按顺序处理
  4. 设置错误处理器处理异常
  5. 使用 await 等待完成

下一步,让我们学习软件事务内存STM!

最后更新:2026-03-27