高级特性 #
运行时环境 #
概述 #
Ray 运行时环境允许你为每个 Task 或 Actor 定义独立的依赖环境。
text
┌─────────────────────────────────────────────────────────────┐
│ 运行时环境 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 支持的配置: │
│ ├── pip: Python 包依赖 │
│ ├── conda: Conda 环境配置 │
│ ├── container: Docker 容器 │
│ ├── working_dir: 工作目录 │
│ ├── py_modules: Python 模块 │
│ ├── env_vars: 环境变量 │
│ └── excludes: 排除文件 │
│ │
└─────────────────────────────────────────────────────────────┘
pip 依赖 #
python
import ray
ray.init()
@ray.remote(runtime_env={"pip": ["pandas==2.0.0", "numpy==1.24.0"]})
def task_with_deps():
import pandas as pd
import numpy as np
return pd.__version__
print(ray.get(task_with_deps.remote()))
ray.shutdown()
Conda 环境 #
python
import ray
runtime_env = {
"conda": {
"dependencies": ["python=3.9", "pip", {"pip": ["ray"]}]
}
}
@ray.remote(runtime_env=runtime_env)
def conda_task():
import sys
return sys.version
ray.shutdown()
工作目录 #
python
import ray
runtime_env = {
"working_dir": "./my_project",
"py_modules": ["./my_module"]
}
@ray.remote(runtime_env=runtime_env)
def task_with_local_code():
from my_module import my_function
return my_function()
ray.shutdown()
环境变量 #
python
import ray
import os
runtime_env = {
"env_vars": {
"MY_VAR": "value",
"CUDA_VISIBLE_DEVICES": "0"
}
}
@ray.remote(runtime_env=runtime_env)
def task_with_env():
return os.environ.get("MY_VAR")
print(ray.get(task_with_env.remote()))
ray.shutdown()
自定义序列化 #
Context Managers #
python
import ray
import pickle
class CustomObject:
def __init__(self, value):
self.value = value
def __reduce__(self):
return (self.__class__, (self.value,))
ray.init()
obj = CustomObject(42)
ref = ray.put(obj)
retrieved = ray.get(ref)
print(retrieved.value)
ray.shutdown()
自定义序列化器 #
python
import ray
import json
class JSONSerializer:
@staticmethod
def serialize(obj):
return json.dumps(obj.__dict__).encode()
@staticmethod
def deserialize(data):
return json.loads(data.decode())
ray.shutdown()
性能优化 #
对象存储优化 #
python
import ray
import numpy as np
ray.init(object_store_memory=8 * 1024 * 1024 * 1024)
large_data = np.random.rand(10000, 10000)
data_ref = ray.put(large_data)
@ray.remote
def process(data_ref, idx):
data = ray.get(data_ref)
return data[idx, 0]
refs = [process.remote(data_ref, i) for i in range(100)]
ray.shutdown()
批处理优化 #
python
import ray
ray.init()
@ray.remote
def process_batch(items):
return [item * 2 for item in items]
data = list(range(10000))
batch_size = 100
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
refs = [process_batch.remote(batch) for batch in batches]
results = [item for batch in ray.get(refs) for item in batch]
ray.shutdown()
内存优化 #
python
import ray
ray.init()
@ray.remote
def memory_efficient_task():
import gc
result = compute_large_result()
gc.collect()
return result
def streaming_process(data):
for chunk in data:
ref = process_chunk.remote(chunk)
yield ray.get(ref)
del ref
ray.shutdown()
并行度优化 #
python
import ray
ray.init(num_cpus=8)
@ray.remote(num_cpus=1)
def cpu_task():
return "done"
refs = [cpu_task.remote() for _ in range(8)]
@ray.remote(num_cpus=0.25)
def lightweight_task():
return "done"
refs = [lightweight_task.remote() for _ in range(32)]
ray.shutdown()
调试技巧 #
日志配置 #
python
import ray
import logging
ray.init(logging_level=logging.DEBUG)
@ray.remote
def logged_task():
logging.info("Task started")
result = do_work()
logging.info("Task completed")
return result
ray.shutdown()
错误追踪 #
python
import ray
import traceback
ray.init()
@ray.remote
def failing_task():
try:
raise ValueError("Intentional error")
except Exception as e:
traceback.print_exc()
raise
try:
ray.get(failing_task.remote())
except ray.exceptions.RayTaskError as e:
print(f"Task failed: {e}")
ray.shutdown()
调试模式 #
python
import ray
ray.init(local_mode=True)
@ray.remote
def debug_task():
import pdb; pdb.set_trace()
return "done"
ray.shutdown()
性能分析 #
python
import ray
import time
ray.init()
@ray.remote
def profiled_task():
start = time.time()
result = compute()
duration = time.time() - start
print(f"Task took {duration:.2f}s")
return result
ray.shutdown()
高级模式 #
嵌套远程调用 #
python
import ray
ray.init()
@ray.remote
def outer_task():
@ray.remote
def inner_task(x):
return x * 2
ref = inner_task.remote(10)
return ray.get(ref)
print(ray.get(outer_task.remote()))
ray.shutdown()
动态任务图 #
python
import ray
ray.init()
@ray.remote
def dynamic_task(x):
if x > 0:
return dynamic_task.remote(x - 1)
return x
print(ray.get(dynamic_task.remote(5)))
ray.shutdown()
流式处理 #
python
import ray
ray.init()
@ray.remote
class StreamProcessor:
def __init__(self):
self.buffer = []
def add(self, item):
self.buffer.append(item)
if len(self.buffer) >= 10:
result = self.process_batch()
self.buffer.clear()
return result
return None
def process_batch(self):
return sum(self.buffer)
processor = StreamProcessor.remote()
for i in range(100):
result = ray.get(processor.add.remote(i))
if result:
print(f"Batch result: {result}")
ray.shutdown()
分布式锁 #
python
import ray
ray.init()
@ray.remote
class DistributedLock:
def __init__(self):
self.locked = False
def acquire(self):
while self.locked:
import time
time.sleep(0.01)
self.locked = True
def release(self):
self.locked = False
lock = DistributedLock.remote()
@ray.remote
def protected_task(lock):
ray.get(lock.acquire.remote())
try:
result = do_critical_work()
finally:
ray.get(lock.release.remote())
return result
ray.shutdown()
最佳实践 #
1. 避免过度并行 #
python
import ray
ray.init(num_cpus=4)
@ray.remote
def task():
return "done"
refs = [task.remote() for _ in range(4)]
refs = [task.remote() for _ in range(1000)]
2. 使用 ray.put 共享数据 #
python
import ray
ray.init()
large_data = load_large_dataset()
data_ref = ray.put(large_data)
refs = [process.remote(data_ref, i) for i in range(100)]
refs = [process.remote(large_data, i) for i in range(100)]
ray.shutdown()
3. 合理设置超时 #
python
import ray
ray.init()
@ray.remote
def slow_task():
import time
time.sleep(100)
return "done"
ref = slow_task.remote()
try:
result = ray.get(ref, timeout=10)
except ray.exceptions.GetTimeoutError:
print("Task timed out")
ray.shutdown()
4. 及时清理资源 #
python
import ray
ray.init()
try:
actor = MyActor.remote()
result = ray.get(actor.work.remote())
finally:
ray.shutdown()
下一步 #
掌握了高级特性之后,继续学习 实战案例,将所学知识应用到实际项目中!
最后更新:2026-04-05