高级特性 #

运行时环境 #

概述 #

Ray 运行时环境允许你为每个 Task 或 Actor 定义独立的依赖环境。

text
┌─────────────────────────────────────────────────────────────┐
│                    运行时环境                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  支持的配置:                                                │
│  ├── pip: Python 包依赖                                     │
│  ├── conda: Conda 环境配置                                  │
│  ├── container: Docker 容器                                 │
│  ├── working_dir: 工作目录                                  │
│  ├── py_modules: Python 模块                                │
│  ├── env_vars: 环境变量                                     │
│  └── excludes: 排除文件                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

pip 依赖 #

python
import ray

ray.init()

@ray.remote(runtime_env={"pip": ["pandas==2.0.0", "numpy==1.24.0"]})
def task_with_deps():
    import pandas as pd
    import numpy as np
    return pd.__version__

print(ray.get(task_with_deps.remote()))

ray.shutdown()

Conda 环境 #

python
import ray

runtime_env = {
    "conda": {
        "dependencies": ["python=3.9", "pip", {"pip": ["ray"]}]
    }
}

@ray.remote(runtime_env=runtime_env)
def conda_task():
    import sys
    return sys.version

ray.shutdown()

工作目录 #

python
import ray

runtime_env = {
    "working_dir": "./my_project",
    "py_modules": ["./my_module"]
}

@ray.remote(runtime_env=runtime_env)
def task_with_local_code():
    from my_module import my_function
    return my_function()

ray.shutdown()

环境变量 #

python
import ray
import os

runtime_env = {
    "env_vars": {
        "MY_VAR": "value",
        "CUDA_VISIBLE_DEVICES": "0"
    }
}

@ray.remote(runtime_env=runtime_env)
def task_with_env():
    return os.environ.get("MY_VAR")

print(ray.get(task_with_env.remote()))

ray.shutdown()

自定义序列化 #

Context Managers #

python
import ray
import pickle

class CustomObject:
    def __init__(self, value):
        self.value = value
    
    def __reduce__(self):
        return (self.__class__, (self.value,))

ray.init()

obj = CustomObject(42)
ref = ray.put(obj)
retrieved = ray.get(ref)
print(retrieved.value)

ray.shutdown()

自定义序列化器 #

python
import ray
import json

class JSONSerializer:
    @staticmethod
    def serialize(obj):
        return json.dumps(obj.__dict__).encode()
    
    @staticmethod
    def deserialize(data):
        return json.loads(data.decode())

ray.shutdown()

性能优化 #

对象存储优化 #

python
import ray
import numpy as np

ray.init(object_store_memory=8 * 1024 * 1024 * 1024)

large_data = np.random.rand(10000, 10000)
data_ref = ray.put(large_data)

@ray.remote
def process(data_ref, idx):
    data = ray.get(data_ref)
    return data[idx, 0]

refs = [process.remote(data_ref, i) for i in range(100)]

ray.shutdown()

批处理优化 #

python
import ray

ray.init()

@ray.remote
def process_batch(items):
    return [item * 2 for item in items]

data = list(range(10000))
batch_size = 100

batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
refs = [process_batch.remote(batch) for batch in batches]
results = [item for batch in ray.get(refs) for item in batch]

ray.shutdown()

内存优化 #

python
import ray

ray.init()

@ray.remote
def memory_efficient_task():
    import gc
    result = compute_large_result()
    gc.collect()
    return result

def streaming_process(data):
    for chunk in data:
        ref = process_chunk.remote(chunk)
        yield ray.get(ref)
        del ref

ray.shutdown()

并行度优化 #

python
import ray

ray.init(num_cpus=8)

@ray.remote(num_cpus=1)
def cpu_task():
    return "done"

refs = [cpu_task.remote() for _ in range(8)]

@ray.remote(num_cpus=0.25)
def lightweight_task():
    return "done"

refs = [lightweight_task.remote() for _ in range(32)]

ray.shutdown()

调试技巧 #

日志配置 #

python
import ray
import logging

ray.init(logging_level=logging.DEBUG)

@ray.remote
def logged_task():
    logging.info("Task started")
    result = do_work()
    logging.info("Task completed")
    return result

ray.shutdown()

错误追踪 #

python
import ray
import traceback

ray.init()

@ray.remote
def failing_task():
    try:
        raise ValueError("Intentional error")
    except Exception as e:
        traceback.print_exc()
        raise

try:
    ray.get(failing_task.remote())
except ray.exceptions.RayTaskError as e:
    print(f"Task failed: {e}")

ray.shutdown()

调试模式 #

python
import ray

ray.init(local_mode=True)

@ray.remote
def debug_task():
    import pdb; pdb.set_trace()
    return "done"

ray.shutdown()

性能分析 #

python
import ray
import time

ray.init()

@ray.remote
def profiled_task():
    start = time.time()
    result = compute()
    duration = time.time() - start
    print(f"Task took {duration:.2f}s")
    return result

ray.shutdown()

高级模式 #

嵌套远程调用 #

python
import ray

ray.init()

@ray.remote
def outer_task():
    @ray.remote
    def inner_task(x):
        return x * 2
    
    ref = inner_task.remote(10)
    return ray.get(ref)

print(ray.get(outer_task.remote()))

ray.shutdown()

动态任务图 #

python
import ray

ray.init()

@ray.remote
def dynamic_task(x):
    if x > 0:
        return dynamic_task.remote(x - 1)
    return x

print(ray.get(dynamic_task.remote(5)))

ray.shutdown()

流式处理 #

python
import ray

ray.init()

@ray.remote
class StreamProcessor:
    def __init__(self):
        self.buffer = []
    
    def add(self, item):
        self.buffer.append(item)
        if len(self.buffer) >= 10:
            result = self.process_batch()
            self.buffer.clear()
            return result
        return None
    
    def process_batch(self):
        return sum(self.buffer)

processor = StreamProcessor.remote()

for i in range(100):
    result = ray.get(processor.add.remote(i))
    if result:
        print(f"Batch result: {result}")

ray.shutdown()

分布式锁 #

python
import ray

ray.init()

@ray.remote
class DistributedLock:
    def __init__(self):
        self.locked = False
    
    def acquire(self):
        while self.locked:
            import time
            time.sleep(0.01)
        self.locked = True
    
    def release(self):
        self.locked = False

lock = DistributedLock.remote()

@ray.remote
def protected_task(lock):
    ray.get(lock.acquire.remote())
    try:
        result = do_critical_work()
    finally:
        ray.get(lock.release.remote())
    return result

ray.shutdown()

最佳实践 #

1. 避免过度并行 #

python
import ray

ray.init(num_cpus=4)

@ray.remote
def task():
    return "done"

refs = [task.remote() for _ in range(4)]

refs = [task.remote() for _ in range(1000)]

2. 使用 ray.put 共享数据 #

python
import ray

ray.init()

large_data = load_large_dataset()
data_ref = ray.put(large_data)

refs = [process.remote(data_ref, i) for i in range(100)]

refs = [process.remote(large_data, i) for i in range(100)]

ray.shutdown()

3. 合理设置超时 #

python
import ray

ray.init()

@ray.remote
def slow_task():
    import time
    time.sleep(100)
    return "done"

ref = slow_task.remote()

try:
    result = ray.get(ref, timeout=10)
except ray.exceptions.GetTimeoutError:
    print("Task timed out")

ray.shutdown()

4. 及时清理资源 #

python
import ray

ray.init()

try:
    actor = MyActor.remote()
    result = ray.get(actor.work.remote())
finally:
    ray.shutdown()

下一步 #

掌握了高级特性之后,继续学习 实战案例,将所学知识应用到实际项目中!

最后更新:2026-04-05