Objects 对象 #
什么是 Objects? #
Objects 是 Ray 中的分布式数据单元。每个 Object 存储在 Ray 的共享内存对象存储中,通过 ObjectRef 进行引用和访问。
text
┌─────────────────────────────────────────────────────────────┐
│ Object 概念图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 创建 Object │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ray.put(data) ──► ObjectRef ──► Object Store │ │
│ │ │ │
│ │ Task 返回值 ──► ObjectRef ──► Object Store │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 获取 Object │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ray.get(ObjectRef) ──► 从 Object Store 获取数据 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
ObjectRef #
基本概念 #
ObjectRef 是对 Object 的引用,类似于 Future 或 Promise。它是一个轻量级的句柄,可以传递给其他 Tasks 或 Actors。
python
import ray
ray.init()
@ray.remote
def create_object():
return [1, 2, 3, 4, 5]
ref = create_object.remote()
print(f"ObjectRef: {ref}")
data = ray.get(ref)
print(f"Data: {data}")
ray.shutdown()
ObjectRef 特性 #
text
┌─────────────────────────────────────────────────────────────┐
│ ObjectRef 特性 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 轻量级 │
│ ├── 仅包含对象 ID │
│ ├── 可以高效传递 │
│ └── 不包含实际数据 │
│ │
│ 2. 不可变 │
│ ├── Object 创建后不能修改 │
│ ├── 保证数据一致性 │
│ └── 简化并发控制 │
│ │
│ 3. 可序列化 │
│ ├── 可以传递给其他 Tasks │
│ ├── 可以传递给 Actors │
│ └── 可以存储和传输 │
│ │
│ 4. 引用计数 │
│ ├── 自动内存管理 │
│ ├── 引用归零时释放 │
│ └── 防止内存泄漏 │
│ │
└─────────────────────────────────────────────────────────────┘
ray.put #
基本用法 #
python
import ray
ray.init()
data = {"key": "value", "numbers": [1, 2, 3]}
ref = ray.put(data)
retrieved = ray.get(ref)
print(retrieved)
ray.shutdown()
使用场景 #
python
import ray
ray.init()
large_data = list(range(1000000))
data_ref = ray.put(large_data)
@ray.remote
def process(data_ref, start, end):
data = ray.get(data_ref)
return sum(data[start:end])
refs = [process.remote(data_ref, i*100000, (i+1)*100000) for i in range(10)]
results = ray.get(refs)
print(f"Partial sums: {results}")
ray.shutdown()
何时使用 ray.put #
text
┌─────────────────────────────────────────────────────────────┐
│ ray.put 使用场景 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 推荐使用: │
│ ├── 大数据被多个 Tasks 使用 │
│ ├── 避免重复序列化 │
│ ├── 减少网络传输 │
│ └── 共享只读数据 │
│ │
│ 不推荐使用: │
│ ├── 小数据 │
│ ├── 仅被单个 Task 使用 │
│ ├── 数据频繁变化 │
│ └── 内存紧张时 │
│ │
└─────────────────────────────────────────────────────────────┘
ray.get #
基本用法 #
python
import ray
ray.init()
@ray.remote
def task():
return "result"
ref = task.remote()
result = ray.get(ref)
print(result)
ray.shutdown()
批量获取 #
python
import ray
ray.init()
@ray.remote
def task(x):
return x * 2
refs = [task.remote(i) for i in range(10)]
results = ray.get(refs)
print(results)
results_dict = ray.get({f"task_{i}": refs[i] for i in range(10)})
print(results_dict)
ray.shutdown()
超时控制 #
python
import ray
import time
ray.init()
@ray.remote
def slow_task():
time.sleep(10)
return "done"
ref = slow_task.remote()
try:
result = ray.get(ref, timeout=2)
print(result)
except ray.exceptions.GetTimeoutError:
print("Timeout: Task not completed in 2 seconds")
ray.shutdown()
异常处理 #
python
import ray
ray.init()
@ray.remote
def failing_task():
raise ValueError("Intentional error")
ref = failing_task.remote()
try:
ray.get(ref)
except ray.exceptions.RayTaskError as e:
print(f"Task failed: {e}")
ray.shutdown()
对象存储 #
架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ Object Store 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Node 1 │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Local Object Store │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ Object1 │ │ Object2 │ │ Object3 │ │ │ │
│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ 网络传输 │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Node 2 │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Local Object Store │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ Object4 │ │ Object5 │ │ │ │
│ │ │ └─────────┘ └─────────┘ │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
内存管理 #
python
import ray
ray.init(object_store_memory=4 * 1024 * 1024 * 1024)
print(f"Object store memory: {ray.cluster_resources()['object_store_memory'] / 1e9:.2f} GB")
ray.shutdown()
内存监控 #
python
import ray
ray.init()
@ray.remote
def create_data(size):
return [0] * size
refs = []
for i in range(10):
ref = create_data.remote(1000000)
refs.append(ref)
print(f"Memory usage: {ray._private.internal_api.memory_summary()}")
ray.shutdown()
序列化 #
支持的类型 #
text
┌─────────────────────────────────────────────────────────────┐
│ 序列化支持类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 基本类型: │
│ ├── int, float, bool, str │
│ ├── bytes, bytearray │
│ └── None │
│ │
│ 容器类型: │
│ ├── list, tuple, dict │
│ ├── set, frozenset │
│ └── collections.deque, etc. │
│ │
│ NumPy: │
│ ├── numpy.ndarray │
│ ├── numpy scalar types │
│ └── 零拷贝传输 │
│ │
│ 其他: │
│ ├── pandas DataFrame │
│ ├── PIL Image │
│ └── 自定义序列化器 │
│ │
└─────────────────────────────────────────────────────────────┘
零拷贝序列化 #
python
import ray
import numpy as np
ray.init()
large_array = np.random.rand(10000, 10000)
ref = ray.put(large_array)
retrieved = ray.get(ref)
print(f"Same data: {np.array_equal(large_array, retrieved)}")
ray.shutdown()
自定义序列化 #
python
import ray
import pickle
ray.init()
class CustomClass:
def __init__(self, value):
self.value = value
def __reduce__(self):
return (self.__class__, (self.value,))
obj = CustomClass(42)
ref = ray.put(obj)
retrieved = ray.get(ref)
print(f"Value: {retrieved.value}")
ray.shutdown()
高级操作 #
ray.wait #
python
import ray
import time
ray.init()
@ray.remote
def task(duration):
time.sleep(duration)
return f"Task completed in {duration}s"
refs = [task.remote(i) for i in [3, 1, 2, 5, 4]]
ready, not_ready = ray.wait(refs, num_returns=1, timeout=None)
print(f"First ready: {ray.get(ready)}")
print(f"Not ready: {len(not_ready)} tasks")
ready, not_ready = ray.wait(refs, num_returns=len(refs), timeout=2)
print(f"Ready within 2s: {len(ready)} tasks")
ray.shutdown()
异步获取 #
python
import ray
import asyncio
ray.init()
@ray.remote
async def async_task(x):
await asyncio.sleep(1)
return x * 2
async def main():
ref = async_task.remote(10)
result = await ref
print(f"Result: {result}")
asyncio.run(main())
ray.shutdown()
对象位置 #
python
import ray
ray.init()
@ray.remote
def create_object():
return [1, 2, 3]
ref = create_object.remote()
locations = ray.experimental.get_object_locations([ref])
print(f"Object locations: {locations}")
ray.shutdown()
内存优化 #
避免内存溢出 #
python
import ray
ray.init(object_store_memory=1 * 1024 * 1024 * 1024)
@ray.remote
def process_batch(batch):
return sum(batch)
data = list(range(10000000))
batch_size = 100000
results = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
ref = process_batch.remote(batch)
results.append(ref)
if len(results) >= 10:
ray.get(results)
results = []
ray.get(results)
ray.shutdown()
及时释放引用 #
python
import ray
ray.init()
@ray.remote
def task():
return [0] * 1000000
refs = [task.remote() for _ in range(100)]
results = ray.get(refs[:10])
refs = refs[10:]
del results
ray.shutdown()
使用生成器 #
python
import ray
ray.init()
@ray.remote(num_returns=3)
def generator_task():
yield 1
yield 2
yield 3
ref1, ref2, ref3 = generator_task.remote()
print(ray.get([ref1, ref2, ref3]))
ray.shutdown()
最佳实践 #
1. 批量操作 #
python
import ray
ray.init()
@ray.remote
def task(x):
return x * 2
refs = [task.remote(i) for i in range(100)]
results = ray.get(refs)
for ref in refs:
result = ray.get(ref)
ray.shutdown()
2. 共享大数据 #
python
import ray
import numpy as np
ray.init()
large_data = np.random.rand(10000, 10000)
data_ref = ray.put(large_data)
@ray.remote
def process(data_ref, idx):
data = ray.get(data_ref)
return data[idx, 0]
refs = [process.remote(data_ref, i) for i in range(100)]
ray.shutdown()
3. 超时保护 #
python
import ray
ray.init()
@ray.remote
def potentially_slow():
import time
time.sleep(10)
return "done"
ref = potentially_slow.remote()
try:
result = ray.get(ref, timeout=5)
except ray.exceptions.GetTimeoutError:
print("Task timed out, continuing...")
ray.shutdown()
4. 错误处理 #
python
import ray
ray.init()
@ray.remote
def might_fail():
import random
if random.random() < 0.5:
raise ValueError("Random failure")
return "success"
refs = [might_fail.remote() for _ in range(10)]
results = []
for ref in refs:
try:
results.append(ray.get(ref))
except ray.exceptions.RayTaskError:
results.append(None)
print(f"Results: {results}")
ray.shutdown()
下一步 #
掌握了 Objects 之后,继续学习 资源管理,了解 Ray 的资源调度和分配机制!
最后更新:2026-04-05