Actors 角色 #

什么是 Actors? #

Actors 是 Ray 中的有状态分布式服务。与无状态的 Tasks 不同,Actors 可以在方法调用之间保持状态,适用于需要状态管理的场景。

text
┌─────────────────────────────────────────────────────────────┐
│                    Actor 概念图                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Task(无状态)              Actor(有状态)                  │
│  ┌─────────────┐            ┌─────────────┐                │
│  │   Task 1    │            │             │                │
│  │  独立执行    │            │   Actor     │                │
│  ├─────────────┤            │  ┌───────┐  │                │
│  │   Task 2    │            │  │ state │  │                │
│  │  独立执行    │            │  └───────┘  │                │
│  ├─────────────┤            │             │                │
│  │   Task 3    │            │  method()───┼──► 修改状态    │
│  │  独立执行    │            │             │                │
│  └─────────────┘            └─────────────┘                │
│                                                             │
│  每次调用独立                 状态在调用间保持                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

定义 Actors #

基本语法 #

python
import ray

ray.init()

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1
        return self.value
    
    def get_value(self):
        return self.value

counter = Counter.remote()

print(ray.get(counter.increment.remote()))
print(ray.get(counter.increment.remote()))
print(ray.get(counter.get_value.remote()))

ray.shutdown()

Actor 配置选项 #

python
@ray.remote(
    num_cpus=2,
    num_gpus=1,
    memory=1024 * 1024 * 1024,
    max_concurrency=10,
    max_restarts=3,
    max_task_retries=3,
    runtime_env={"pip": ["numpy"]}
)
class ComplexActor:
    def __init__(self):
        self.state = {}
    
    def process(self, key, value):
        self.state[key] = value
        return self.state

配置参数说明 #

参数 说明 默认值
num_cpus CPU 核心数 1
num_gpus GPU 数量 0
memory 内存需求 无限制
max_concurrency 最大并发方法数 1
max_restarts 最大重启次数 0
max_task_retries 任务最大重试次数 0
runtime_env 运行时环境 None
scheduling_strategy 调度策略 DEFAULT

Actor 生命周期 #

创建 Actor #

text
┌─────────────────────────────────────────────────────────────┐
│                    Actor 创建流程                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 定义 Actor 类                                           │
│     @ray.remote                                             │
│     class MyActor:                                          │
│         def __init__(self): ...                             │
│                                                             │
│  2. 创建 Actor 实例                                          │
│     actor = MyActor.remote()                                │
│     └── 分配资源                                             │
│     └── 启动 Actor 进程                                      │
│     └── 执行 __init__                                       │
│     └── 返回 ActorHandle                                    │
│                                                             │
│  3. 调用 Actor 方法                                          │
│     ref = actor.method.remote()                             │
│     └── 方法在 Actor 进程中执行                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘
python
import ray

ray.init()

@ray.remote
class Database:
    def __init__(self, name):
        self.name = name
        self.data = {}
        print(f"Database {name} initialized")
    
    def insert(self, key, value):
        self.data[key] = value
        return True
    
    def get(self, key):
        return self.data.get(key)

db = Database.remote("my_db")

ray.get(db.insert.remote("key1", "value1"))
print(ray.get(db.get.remote("key1")))

ray.shutdown()

销毁 Actor #

python
import ray

ray.init()

@ray.remote
class ResourceActor:
    def __init__(self):
        print("Actor created")
    
    def work(self):
        return "working"

actor = ResourceActor.remote()
print(ray.get(actor.work.remote()))

ray.kill(actor)

ray.shutdown()

Actor 生命周期钩子 #

python
import ray

ray.init()

@ray.remote
class LifecycleActor:
    def __init__(self):
        print("__init__: Actor starting")
        self.state = "initialized"
    
    def __repr__(self):
        return f"Actor state: {self.state}"
    
    def work(self):
        self.state = "working"
        return self.state

actor = LifecycleActor.remote()
print(ray.get(actor.work.remote()))

ray.shutdown()

并发控制 #

单线程 Actor(默认) #

python
import ray

ray.init()

@ray.remote
class SingleThreadActor:
    def __init__(self):
        self.counter = 0
    
    def increment(self):
        current = self.counter
        import time
        time.sleep(0.1)
        self.counter = current + 1
        return self.counter

actor = SingleThreadActor.remote()

refs = [actor.increment.remote() for _ in range(5)]
print(ray.get(refs))

ray.shutdown()

多线程 Actor #

python
import ray

ray.init()

@ray.remote(max_concurrency=10)
class ConcurrentActor:
    def __init__(self):
        self.counter = 0
        self.lock = None
    
    def increment(self):
        import threading
        if self.lock is None:
            self.lock = threading.Lock()
        
        with self.lock:
            current = self.counter
            self.counter = current + 1
            return self.counter

actor = ConcurrentActor.remote()

refs = [actor.increment.remote() for _ in range(100)]
results = ray.get(refs)
print(f"Final counter: {max(results)}")

ray.shutdown()

异步 Actor #

python
import ray
import asyncio

ray.init()

@ray.remote
class AsyncActor:
    async def slow_operation(self, duration):
        await asyncio.sleep(duration)
        return f"Slept for {duration}s"
    
    async def concurrent_ops(self):
        tasks = [self.slow_operation.remote(i * 0.1) for i in range(5)]
        results = await asyncio.gather(*tasks)
        return results

actor = AsyncActor.remote()
print(ray.get(actor.concurrent_ops.remote()))

ray.shutdown()

Actor 模式 #

1. 服务模式 #

python
import ray

ray.init()

@ray.remote
class ModelService:
    def __init__(self, model_path):
        self.model = self._load_model(model_path)
    
    def _load_model(self, path):
        return f"Model from {path}"
    
    def predict(self, input_data):
        return f"Prediction for {input_data}"
    
    def batch_predict(self, inputs):
        return [self.predict(x) for x in inputs]

service = ModelService.remote("/models/my_model")

print(ray.get(service.predict.remote("test_input")))
print(ray.get(service.batch_predict.remote(["a", "b", "c"])))

ray.shutdown()

2. 资源池模式 #

python
import ray

ray.init()

@ray.remote
class ConnectionPool:
    def __init__(self, pool_size=5):
        self.pool = [f"connection_{i}" for i in range(pool_size)]
        self.available = list(self.pool)
    
    def acquire(self):
        if not self.available:
            return None
        return self.available.pop(0)
    
    def release(self, conn):
        if conn in self.pool:
            self.available.append(conn)
    
    def status(self):
        return {
            "total": len(self.pool),
            "available": len(self.available)
        }

pool = ConnectionPool.remote()

conn1 = ray.get(pool.acquire.remote())
conn2 = ray.get(pool.acquire.remote())
print(ray.get(pool.status.remote()))

ray.get(pool.release.remote(conn1))
print(ray.get(pool.status.remote()))

ray.shutdown()

3. 缓存模式 #

python
import ray

ray.init()

@ray.remote
class Cache:
    def __init__(self, max_size=100):
        self.cache = {}
        self.max_size = max_size
        self.access_order = []
    
    def get(self, key):
        if key in self.cache:
            self.access_order.remove(key)
            self.access_order.append(key)
            return self.cache[key]
        return None
    
    def set(self, key, value):
        if key in self.cache:
            self.access_order.remove(key)
        elif len(self.cache) >= self.max_size:
            oldest = self.access_order.pop(0)
            del self.cache[oldest]
        
        self.cache[key] = value
        self.access_order.append(key)
    
    def size(self):
        return len(self.cache)

cache = Cache.remote(max_size=3)

ray.get(cache.set.remote("a", 1))
ray.get(cache.set.remote("b", 2))
ray.get(cache.set.remote("c", 3))
print(f"Size: {ray.get(cache.size.remote())}")

ray.get(cache.set.remote("d", 4))
print(f"Size after eviction: {ray.get(cache.size.remote())}")
print(f"Get 'a': {ray.get(cache.get.remote('a'))}")

ray.shutdown()

4. 计数器模式 #

python
import ray

ray.init()

@ray.remote
class DistributedCounter:
    def __init__(self):
        self.value = 0
    
    def increment(self, delta=1):
        self.value += delta
        return self.value
    
    def decrement(self, delta=1):
        self.value -= delta
        return self.value
    
    def get(self):
        return self.value
    
    def reset(self):
        self.value = 0

counter = DistributedCounter.remote()

refs = [counter.increment.remote(i) for i in range(1, 6)]
ray.get(refs)
print(f"Final value: {ray.get(counter.get.remote())}")

ray.shutdown()

Actor 通信 #

Actor 间调用 #

python
import ray

ray.init()

@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
    
    def process(self, data):
        return f"Worker {self.worker_id} processed: {data}"

@ray.remote
class Coordinator:
    def __init__(self, num_workers):
        self.workers = [Worker.remote(i) for i in range(num_workers)]
        self.current = 0
    
    def distribute(self, data):
        worker = self.workers[self.current]
        self.current = (self.current + 1) % len(self.workers)
        return worker.process.remote(data)

coordinator = Coordinator.remote(num_workers=3)

refs = [coordinator.distribute.remote(f"task_{i}") for i in range(6)]
results = ray.get(refs)
print(results)

ray.shutdown()

Actor 引用传递 #

python
import ray

ray.init()

@ray.remote
class DataStore:
    def __init__(self):
        self.data = {}
    
    def put(self, key, value):
        self.data[key] = value
    
    def get(self, key):
        return self.data.get(key)

@ray.remote
class DataProcessor:
    def __init__(self, store_handle):
        self.store = store_handle
    
    def process(self, key):
        data = ray.get(self.store.get.remote(key))
        return f"Processed: {data}"

store = DataStore.remote()
ray.get(store.put.remote("key1", "value1"))

processor = DataProcessor.remote(store)
print(ray.get(processor.process.remote("key1")))

ray.shutdown()

容错与恢复 #

Actor 重启 #

python
import ray

ray.init()

@ray.remote(max_restarts=3, max_task_retries=3)
class ResilientActor:
    def __init__(self):
        self.call_count = 0
        print("Actor initialized")
    
    def work(self):
        self.call_count += 1
        return f"Call #{self.call_count}"
    
    def crash(self):
        import os
        os._exit(1)

actor = ResilientActor.remote()
print(ray.get(actor.work.remote()))

ray.kill(actor, no_restart=False)

print(ray.get(actor.work.remote()))

ray.shutdown()

状态恢复 #

python
import ray

ray.init()

@ray.remote(max_restarts=3)
class StatefulActor:
    def __init__(self, checkpoint_file=None):
        self.state = {}
        if checkpoint_file:
            self._load_checkpoint(checkpoint_file)
    
    def _load_checkpoint(self, path):
        pass
    
    def set_state(self, key, value):
        self.state[key] = value
    
    def get_state(self, key):
        return self.state.get(key)

actor = StatefulActor.remote()
ray.get(actor.set_state.remote("key1", "value1"))

ray.shutdown()

高级特性 #

动态选项 #

python
import ray

ray.init()

@ray.remote
class BaseActor:
    def work(self):
        return "working"

actor = BaseActor.options(
    num_cpus=2,
    name="my_actor",
    lifetime="detached",
    get_if_exists=True
).remote()

print(ray.get(actor.work.remote()))

ray.shutdown()

命名 Actor #

python
import ray

ray.init()

@ray.remote
class NamedActor:
    def __init__(self, name):
        self.name = name
    
    def greet(self):
        return f"Hello from {self.name}"

actor = NamedActor.options(name="my_named_actor", lifetime="detached").remote("Alice")

retrieved = ray.get_actor("my_named_actor")
print(ray.get(retrieved.greet.remote()))

ray.kill(actor)

ray.shutdown()

分离 Actor #

python
import ray

ray.init()

@ray.remote
class DetachedActor:
    def __init__(self):
        self.counter = 0
    
    def increment(self):
        self.counter += 1
        return self.counter

actor = DetachedActor.options(lifetime="detached").remote()

ray.shutdown()

ray.init()
actor = ray.get_actor("DetachedActor")

性能优化 #

减少 Actor 调用 #

python
import ray

ray.init()

@ray.remote
class BatchActor:
    def __init__(self):
        self.buffer = []
    
    def add(self, items):
        self.buffer.extend(items)
    
    def flush(self):
        result = self.buffer.copy()
        self.buffer.clear()
        return result

actor = BatchActor.remote()

for i in range(100):
    ray.get(actor.add.remote([i, i+1]))

print(f"Buffered items: {len(ray.get(actor.flush.remote()))}")

ray.shutdown()

使用异步方法 #

python
import ray
import asyncio

ray.init()

@ray.remote
class AsyncBatchActor:
    def __init__(self):
        self.queue = []
    
    async def add(self, item):
        self.queue.append(item)
    
    async def process_all(self):
        results = []
        while self.queue:
            item = self.queue.pop(0)
            results.append(item * 2)
        return results

actor = AsyncBatchActor.remote()

refs = [actor.add.remote(i) for i in range(10)]
ray.get(refs)
print(ray.get(actor.process_all.remote()))

ray.shutdown()

下一步 #

掌握了 Actors 之后,继续学习 Objects 对象,深入理解 Ray 的对象存储和序列化机制!

最后更新:2026-04-05