Objects 对象 #

什么是 Objects? #

Objects 是 Ray 中的分布式数据单元。每个 Object 存储在 Ray 的共享内存对象存储中,通过 ObjectRef 进行引用和访问。

text
┌─────────────────────────────────────────────────────────────┐
│                    Object 概念图                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  创建 Object                                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                     │   │
│  │  ray.put(data)  ──►  ObjectRef  ──►  Object Store  │   │
│  │                                                     │   │
│  │  Task 返回值  ──►  ObjectRef  ──►  Object Store    │   │
│  │                                                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  获取 Object                                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                     │   │
│  │  ray.get(ObjectRef)  ──►  从 Object Store 获取数据  │   │
│  │                                                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

ObjectRef #

基本概念 #

ObjectRef 是对 Object 的引用,类似于 Future 或 Promise。它是一个轻量级的句柄,可以传递给其他 Tasks 或 Actors。

python
import ray

ray.init()

@ray.remote
def create_object():
    return [1, 2, 3, 4, 5]

ref = create_object.remote()
print(f"ObjectRef: {ref}")

data = ray.get(ref)
print(f"Data: {data}")

ray.shutdown()

ObjectRef 特性 #

text
┌─────────────────────────────────────────────────────────────┐
│                    ObjectRef 特性                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 轻量级                                                   │
│     ├── 仅包含对象 ID                                        │
│     ├── 可以高效传递                                         │
│     └── 不包含实际数据                                       │
│                                                             │
│  2. 不可变                                                   │
│     ├── Object 创建后不能修改                                │
│     ├── 保证数据一致性                                       │
│     └── 简化并发控制                                         │
│                                                             │
│  3. 可序列化                                                 │
│     ├── 可以传递给其他 Tasks                                 │
│     ├── 可以传递给 Actors                                    │
│     └── 可以存储和传输                                       │
│                                                             │
│  4. 引用计数                                                 │
│     ├── 自动内存管理                                         │
│     ├── 引用归零时释放                                       │
│     └── 防止内存泄漏                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

ray.put #

基本用法 #

python
import ray

ray.init()

data = {"key": "value", "numbers": [1, 2, 3]}
ref = ray.put(data)

retrieved = ray.get(ref)
print(retrieved)

ray.shutdown()

使用场景 #

python
import ray

ray.init()

large_data = list(range(1000000))
data_ref = ray.put(large_data)

@ray.remote
def process(data_ref, start, end):
    data = ray.get(data_ref)
    return sum(data[start:end])

refs = [process.remote(data_ref, i*100000, (i+1)*100000) for i in range(10)]
results = ray.get(refs)
print(f"Partial sums: {results}")

ray.shutdown()

何时使用 ray.put #

text
┌─────────────────────────────────────────────────────────────┐
│                    ray.put 使用场景                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  推荐使用:                                                  │
│  ├── 大数据被多个 Tasks 使用                                 │
│  ├── 避免重复序列化                                          │
│  ├── 减少网络传输                                            │
│  └── 共享只读数据                                            │
│                                                             │
│  不推荐使用:                                                │
│  ├── 小数据                                                  │
│  ├── 仅被单个 Task 使用                                      │
│  ├── 数据频繁变化                                            │
│  └── 内存紧张时                                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

ray.get #

基本用法 #

python
import ray

ray.init()

@ray.remote
def task():
    return "result"

ref = task.remote()
result = ray.get(ref)
print(result)

ray.shutdown()

批量获取 #

python
import ray

ray.init()

@ray.remote
def task(x):
    return x * 2

refs = [task.remote(i) for i in range(10)]

results = ray.get(refs)
print(results)

results_dict = ray.get({f"task_{i}": refs[i] for i in range(10)})
print(results_dict)

ray.shutdown()

超时控制 #

python
import ray
import time

ray.init()

@ray.remote
def slow_task():
    time.sleep(10)
    return "done"

ref = slow_task.remote()

try:
    result = ray.get(ref, timeout=2)
    print(result)
except ray.exceptions.GetTimeoutError:
    print("Timeout: Task not completed in 2 seconds")

ray.shutdown()

异常处理 #

python
import ray

ray.init()

@ray.remote
def failing_task():
    raise ValueError("Intentional error")

ref = failing_task.remote()

try:
    ray.get(ref)
except ray.exceptions.RayTaskError as e:
    print(f"Task failed: {e}")

ray.shutdown()

对象存储 #

架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Object Store 架构                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Node 1                             │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │            Local Object Store                │   │   │
│  │  │  ┌─────────┐ ┌─────────┐ ┌─────────┐       │   │   │
│  │  │  │ Object1 │ │ Object2 │ │ Object3 │       │   │   │
│  │  │  └─────────┘ └─────────┘ └─────────┘       │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          │ 网络传输                         │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Node 2                             │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │            Local Object Store                │   │   │
│  │  │  ┌─────────┐ ┌─────────┐                    │   │   │
│  │  │  │ Object4 │ │ Object5 │                    │   │   │
│  │  │  └─────────┘ └─────────┘                    │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

内存管理 #

python
import ray

ray.init(object_store_memory=4 * 1024 * 1024 * 1024)

print(f"Object store memory: {ray.cluster_resources()['object_store_memory'] / 1e9:.2f} GB")

ray.shutdown()

内存监控 #

python
import ray

ray.init()

@ray.remote
def create_data(size):
    return [0] * size

refs = []
for i in range(10):
    ref = create_data.remote(1000000)
    refs.append(ref)

print(f"Memory usage: {ray._private.internal_api.memory_summary()}")

ray.shutdown()

序列化 #

支持的类型 #

text
┌─────────────────────────────────────────────────────────────┐
│                    序列化支持类型                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  基本类型:                                                  │
│  ├── int, float, bool, str                                 │
│  ├── bytes, bytearray                                      │
│  └── None                                                  │
│                                                             │
│  容器类型:                                                  │
│  ├── list, tuple, dict                                     │
│  ├── set, frozenset                                        │
│  └── collections.deque, etc.                               │
│                                                             │
│  NumPy:                                                     │
│  ├── numpy.ndarray                                         │
│  ├── numpy scalar types                                    │
│  └── 零拷贝传输                                             │
│                                                             │
│  其他:                                                      │
│  ├── pandas DataFrame                                      │
│  ├── PIL Image                                             │
│  └── 自定义序列化器                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

零拷贝序列化 #

python
import ray
import numpy as np

ray.init()

large_array = np.random.rand(10000, 10000)
ref = ray.put(large_array)

retrieved = ray.get(ref)
print(f"Same data: {np.array_equal(large_array, retrieved)}")

ray.shutdown()

自定义序列化 #

python
import ray
import pickle

ray.init()

class CustomClass:
    def __init__(self, value):
        self.value = value
    
    def __reduce__(self):
        return (self.__class__, (self.value,))

obj = CustomClass(42)
ref = ray.put(obj)
retrieved = ray.get(ref)
print(f"Value: {retrieved.value}")

ray.shutdown()

高级操作 #

ray.wait #

python
import ray
import time

ray.init()

@ray.remote
def task(duration):
    time.sleep(duration)
    return f"Task completed in {duration}s"

refs = [task.remote(i) for i in [3, 1, 2, 5, 4]]

ready, not_ready = ray.wait(refs, num_returns=1, timeout=None)
print(f"First ready: {ray.get(ready)}")
print(f"Not ready: {len(not_ready)} tasks")

ready, not_ready = ray.wait(refs, num_returns=len(refs), timeout=2)
print(f"Ready within 2s: {len(ready)} tasks")

ray.shutdown()

异步获取 #

python
import ray
import asyncio

ray.init()

@ray.remote
async def async_task(x):
    await asyncio.sleep(1)
    return x * 2

async def main():
    ref = async_task.remote(10)
    result = await ref
    print(f"Result: {result}")

asyncio.run(main())

ray.shutdown()

对象位置 #

python
import ray

ray.init()

@ray.remote
def create_object():
    return [1, 2, 3]

ref = create_object.remote()

locations = ray.experimental.get_object_locations([ref])
print(f"Object locations: {locations}")

ray.shutdown()

内存优化 #

避免内存溢出 #

python
import ray

ray.init(object_store_memory=1 * 1024 * 1024 * 1024)

@ray.remote
def process_batch(batch):
    return sum(batch)

data = list(range(10000000))
batch_size = 100000

results = []
for i in range(0, len(data), batch_size):
    batch = data[i:i+batch_size]
    ref = process_batch.remote(batch)
    results.append(ref)
    
    if len(results) >= 10:
        ray.get(results)
        results = []

ray.get(results)

ray.shutdown()

及时释放引用 #

python
import ray

ray.init()

@ray.remote
def task():
    return [0] * 1000000

refs = [task.remote() for _ in range(100)]

results = ray.get(refs[:10])
refs = refs[10:]

del results

ray.shutdown()

使用生成器 #

python
import ray

ray.init()

@ray.remote(num_returns=3)
def generator_task():
    yield 1
    yield 2
    yield 3

ref1, ref2, ref3 = generator_task.remote()
print(ray.get([ref1, ref2, ref3]))

ray.shutdown()

最佳实践 #

1. 批量操作 #

python
import ray

ray.init()

@ray.remote
def task(x):
    return x * 2

refs = [task.remote(i) for i in range(100)]
results = ray.get(refs)

for ref in refs:
    result = ray.get(ref)

ray.shutdown()

2. 共享大数据 #

python
import ray
import numpy as np

ray.init()

large_data = np.random.rand(10000, 10000)
data_ref = ray.put(large_data)

@ray.remote
def process(data_ref, idx):
    data = ray.get(data_ref)
    return data[idx, 0]

refs = [process.remote(data_ref, i) for i in range(100)]

ray.shutdown()

3. 超时保护 #

python
import ray

ray.init()

@ray.remote
def potentially_slow():
    import time
    time.sleep(10)
    return "done"

ref = potentially_slow.remote()

try:
    result = ray.get(ref, timeout=5)
except ray.exceptions.GetTimeoutError:
    print("Task timed out, continuing...")

ray.shutdown()

4. 错误处理 #

python
import ray

ray.init()

@ray.remote
def might_fail():
    import random
    if random.random() < 0.5:
        raise ValueError("Random failure")
    return "success"

refs = [might_fail.remote() for _ in range(10)]

results = []
for ref in refs:
    try:
        results.append(ray.get(ref))
    except ray.exceptions.RayTaskError:
        results.append(None)

print(f"Results: {results}")

ray.shutdown()

下一步 #

掌握了 Objects 之后,继续学习 资源管理,了解 Ray 的资源调度和分配机制!

最后更新:2026-04-05