Actors 角色 #
什么是 Actors? #
Actors 是 Ray 中的有状态分布式服务。与无状态的 Tasks 不同,Actors 可以在方法调用之间保持状态,适用于需要状态管理的场景。
text
┌─────────────────────────────────────────────────────────────┐
│ Actor 概念图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Task(无状态) Actor(有状态) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Task 1 │ │ │ │
│ │ 独立执行 │ │ Actor │ │
│ ├─────────────┤ │ ┌───────┐ │ │
│ │ Task 2 │ │ │ state │ │ │
│ │ 独立执行 │ │ └───────┘ │ │
│ ├─────────────┤ │ │ │
│ │ Task 3 │ │ method()───┼──► 修改状态 │
│ │ 独立执行 │ │ │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 每次调用独立 状态在调用间保持 │
│ │
└─────────────────────────────────────────────────────────────┘
定义 Actors #
基本语法 #
python
import ray
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_value(self):
return self.value
counter = Counter.remote()
print(ray.get(counter.increment.remote()))
print(ray.get(counter.increment.remote()))
print(ray.get(counter.get_value.remote()))
ray.shutdown()
Actor 配置选项 #
python
@ray.remote(
num_cpus=2,
num_gpus=1,
memory=1024 * 1024 * 1024,
max_concurrency=10,
max_restarts=3,
max_task_retries=3,
runtime_env={"pip": ["numpy"]}
)
class ComplexActor:
def __init__(self):
self.state = {}
def process(self, key, value):
self.state[key] = value
return self.state
配置参数说明 #
| 参数 | 说明 | 默认值 |
|---|---|---|
| num_cpus | CPU 核心数 | 1 |
| num_gpus | GPU 数量 | 0 |
| memory | 内存需求 | 无限制 |
| max_concurrency | 最大并发方法数 | 1 |
| max_restarts | 最大重启次数 | 0 |
| max_task_retries | 任务最大重试次数 | 0 |
| runtime_env | 运行时环境 | None |
| scheduling_strategy | 调度策略 | DEFAULT |
Actor 生命周期 #
创建 Actor #
text
┌─────────────────────────────────────────────────────────────┐
│ Actor 创建流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 定义 Actor 类 │
│ @ray.remote │
│ class MyActor: │
│ def __init__(self): ... │
│ │
│ 2. 创建 Actor 实例 │
│ actor = MyActor.remote() │
│ └── 分配资源 │
│ └── 启动 Actor 进程 │
│ └── 执行 __init__ │
│ └── 返回 ActorHandle │
│ │
│ 3. 调用 Actor 方法 │
│ ref = actor.method.remote() │
│ └── 方法在 Actor 进程中执行 │
│ │
└─────────────────────────────────────────────────────────────┘
python
import ray
ray.init()
@ray.remote
class Database:
def __init__(self, name):
self.name = name
self.data = {}
print(f"Database {name} initialized")
def insert(self, key, value):
self.data[key] = value
return True
def get(self, key):
return self.data.get(key)
db = Database.remote("my_db")
ray.get(db.insert.remote("key1", "value1"))
print(ray.get(db.get.remote("key1")))
ray.shutdown()
销毁 Actor #
python
import ray
ray.init()
@ray.remote
class ResourceActor:
def __init__(self):
print("Actor created")
def work(self):
return "working"
actor = ResourceActor.remote()
print(ray.get(actor.work.remote()))
ray.kill(actor)
ray.shutdown()
Actor 生命周期钩子 #
python
import ray
ray.init()
@ray.remote
class LifecycleActor:
def __init__(self):
print("__init__: Actor starting")
self.state = "initialized"
def __repr__(self):
return f"Actor state: {self.state}"
def work(self):
self.state = "working"
return self.state
actor = LifecycleActor.remote()
print(ray.get(actor.work.remote()))
ray.shutdown()
并发控制 #
单线程 Actor(默认) #
python
import ray
ray.init()
@ray.remote
class SingleThreadActor:
def __init__(self):
self.counter = 0
def increment(self):
current = self.counter
import time
time.sleep(0.1)
self.counter = current + 1
return self.counter
actor = SingleThreadActor.remote()
refs = [actor.increment.remote() for _ in range(5)]
print(ray.get(refs))
ray.shutdown()
多线程 Actor #
python
import ray
ray.init()
@ray.remote(max_concurrency=10)
class ConcurrentActor:
def __init__(self):
self.counter = 0
self.lock = None
def increment(self):
import threading
if self.lock is None:
self.lock = threading.Lock()
with self.lock:
current = self.counter
self.counter = current + 1
return self.counter
actor = ConcurrentActor.remote()
refs = [actor.increment.remote() for _ in range(100)]
results = ray.get(refs)
print(f"Final counter: {max(results)}")
ray.shutdown()
异步 Actor #
python
import ray
import asyncio
ray.init()
@ray.remote
class AsyncActor:
async def slow_operation(self, duration):
await asyncio.sleep(duration)
return f"Slept for {duration}s"
async def concurrent_ops(self):
tasks = [self.slow_operation.remote(i * 0.1) for i in range(5)]
results = await asyncio.gather(*tasks)
return results
actor = AsyncActor.remote()
print(ray.get(actor.concurrent_ops.remote()))
ray.shutdown()
Actor 模式 #
1. 服务模式 #
python
import ray
ray.init()
@ray.remote
class ModelService:
def __init__(self, model_path):
self.model = self._load_model(model_path)
def _load_model(self, path):
return f"Model from {path}"
def predict(self, input_data):
return f"Prediction for {input_data}"
def batch_predict(self, inputs):
return [self.predict(x) for x in inputs]
service = ModelService.remote("/models/my_model")
print(ray.get(service.predict.remote("test_input")))
print(ray.get(service.batch_predict.remote(["a", "b", "c"])))
ray.shutdown()
2. 资源池模式 #
python
import ray
ray.init()
@ray.remote
class ConnectionPool:
def __init__(self, pool_size=5):
self.pool = [f"connection_{i}" for i in range(pool_size)]
self.available = list(self.pool)
def acquire(self):
if not self.available:
return None
return self.available.pop(0)
def release(self, conn):
if conn in self.pool:
self.available.append(conn)
def status(self):
return {
"total": len(self.pool),
"available": len(self.available)
}
pool = ConnectionPool.remote()
conn1 = ray.get(pool.acquire.remote())
conn2 = ray.get(pool.acquire.remote())
print(ray.get(pool.status.remote()))
ray.get(pool.release.remote(conn1))
print(ray.get(pool.status.remote()))
ray.shutdown()
3. 缓存模式 #
python
import ray
ray.init()
@ray.remote
class Cache:
def __init__(self, max_size=100):
self.cache = {}
self.max_size = max_size
self.access_order = []
def get(self, key):
if key in self.cache:
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return None
def set(self, key, value):
if key in self.cache:
self.access_order.remove(key)
elif len(self.cache) >= self.max_size:
oldest = self.access_order.pop(0)
del self.cache[oldest]
self.cache[key] = value
self.access_order.append(key)
def size(self):
return len(self.cache)
cache = Cache.remote(max_size=3)
ray.get(cache.set.remote("a", 1))
ray.get(cache.set.remote("b", 2))
ray.get(cache.set.remote("c", 3))
print(f"Size: {ray.get(cache.size.remote())}")
ray.get(cache.set.remote("d", 4))
print(f"Size after eviction: {ray.get(cache.size.remote())}")
print(f"Get 'a': {ray.get(cache.get.remote('a'))}")
ray.shutdown()
4. 计数器模式 #
python
import ray
ray.init()
@ray.remote
class DistributedCounter:
def __init__(self):
self.value = 0
def increment(self, delta=1):
self.value += delta
return self.value
def decrement(self, delta=1):
self.value -= delta
return self.value
def get(self):
return self.value
def reset(self):
self.value = 0
counter = DistributedCounter.remote()
refs = [counter.increment.remote(i) for i in range(1, 6)]
ray.get(refs)
print(f"Final value: {ray.get(counter.get.remote())}")
ray.shutdown()
Actor 通信 #
Actor 间调用 #
python
import ray
ray.init()
@ray.remote
class Worker:
def __init__(self, worker_id):
self.worker_id = worker_id
def process(self, data):
return f"Worker {self.worker_id} processed: {data}"
@ray.remote
class Coordinator:
def __init__(self, num_workers):
self.workers = [Worker.remote(i) for i in range(num_workers)]
self.current = 0
def distribute(self, data):
worker = self.workers[self.current]
self.current = (self.current + 1) % len(self.workers)
return worker.process.remote(data)
coordinator = Coordinator.remote(num_workers=3)
refs = [coordinator.distribute.remote(f"task_{i}") for i in range(6)]
results = ray.get(refs)
print(results)
ray.shutdown()
Actor 引用传递 #
python
import ray
ray.init()
@ray.remote
class DataStore:
def __init__(self):
self.data = {}
def put(self, key, value):
self.data[key] = value
def get(self, key):
return self.data.get(key)
@ray.remote
class DataProcessor:
def __init__(self, store_handle):
self.store = store_handle
def process(self, key):
data = ray.get(self.store.get.remote(key))
return f"Processed: {data}"
store = DataStore.remote()
ray.get(store.put.remote("key1", "value1"))
processor = DataProcessor.remote(store)
print(ray.get(processor.process.remote("key1")))
ray.shutdown()
容错与恢复 #
Actor 重启 #
python
import ray
ray.init()
@ray.remote(max_restarts=3, max_task_retries=3)
class ResilientActor:
def __init__(self):
self.call_count = 0
print("Actor initialized")
def work(self):
self.call_count += 1
return f"Call #{self.call_count}"
def crash(self):
import os
os._exit(1)
actor = ResilientActor.remote()
print(ray.get(actor.work.remote()))
ray.kill(actor, no_restart=False)
print(ray.get(actor.work.remote()))
ray.shutdown()
状态恢复 #
python
import ray
ray.init()
@ray.remote(max_restarts=3)
class StatefulActor:
def __init__(self, checkpoint_file=None):
self.state = {}
if checkpoint_file:
self._load_checkpoint(checkpoint_file)
def _load_checkpoint(self, path):
pass
def set_state(self, key, value):
self.state[key] = value
def get_state(self, key):
return self.state.get(key)
actor = StatefulActor.remote()
ray.get(actor.set_state.remote("key1", "value1"))
ray.shutdown()
高级特性 #
动态选项 #
python
import ray
ray.init()
@ray.remote
class BaseActor:
def work(self):
return "working"
actor = BaseActor.options(
num_cpus=2,
name="my_actor",
lifetime="detached",
get_if_exists=True
).remote()
print(ray.get(actor.work.remote()))
ray.shutdown()
命名 Actor #
python
import ray
ray.init()
@ray.remote
class NamedActor:
def __init__(self, name):
self.name = name
def greet(self):
return f"Hello from {self.name}"
actor = NamedActor.options(name="my_named_actor", lifetime="detached").remote("Alice")
retrieved = ray.get_actor("my_named_actor")
print(ray.get(retrieved.greet.remote()))
ray.kill(actor)
ray.shutdown()
分离 Actor #
python
import ray
ray.init()
@ray.remote
class DetachedActor:
def __init__(self):
self.counter = 0
def increment(self):
self.counter += 1
return self.counter
actor = DetachedActor.options(lifetime="detached").remote()
ray.shutdown()
ray.init()
actor = ray.get_actor("DetachedActor")
性能优化 #
减少 Actor 调用 #
python
import ray
ray.init()
@ray.remote
class BatchActor:
def __init__(self):
self.buffer = []
def add(self, items):
self.buffer.extend(items)
def flush(self):
result = self.buffer.copy()
self.buffer.clear()
return result
actor = BatchActor.remote()
for i in range(100):
ray.get(actor.add.remote([i, i+1]))
print(f"Buffered items: {len(ray.get(actor.flush.remote()))}")
ray.shutdown()
使用异步方法 #
python
import ray
import asyncio
ray.init()
@ray.remote
class AsyncBatchActor:
def __init__(self):
self.queue = []
async def add(self, item):
self.queue.append(item)
async def process_all(self):
results = []
while self.queue:
item = self.queue.pop(0)
results.append(item * 2)
return results
actor = AsyncBatchActor.remote()
refs = [actor.add.remote(i) for i in range(10)]
ray.get(refs)
print(ray.get(actor.process_all.remote()))
ray.shutdown()
下一步 #
掌握了 Actors 之后,继续学习 Objects 对象,深入理解 Ray 的对象存储和序列化机制!
最后更新:2026-04-05