快速开始 #
第一个 Ray 程序 #
初始化 Ray #
python
import ray
ray.init()
输出示例:
text
2024-01-01 10:00:00,000 INFO worker.py:1234 -- Connecting to existing Ray cluster at address: ...
Ray runtime started.
Dashboard: http://127.0.0.1:8265
Hello Ray #
python
import ray
ray.init()
@ray.remote
def hello(name):
return f"Hello, {name}!"
future = hello.remote("Ray")
result = ray.get(future)
print(result)
ray.shutdown()
理解 Ray 编程模型 #
核心概念 #
text
┌─────────────────────────────────────────────────────────────┐
│ Ray 编程模型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 普通函数 ──► @ray.remote ──► Task(远程函数) │
│ │
│ 2. 普通类 ──► @ray.remote ──► Actor(远程服务) │
│ │
│ 3. 返回值 ──► ObjectRef(对象引用) │
│ │
│ 4. 获取结果 ──► ray.get(ref) │
│ │
└─────────────────────────────────────────────────────────────┘
远程调用流程 #
text
┌─────────────────────────────────────────────────────────────┐
│ 远程调用流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Driver 进程 │
│ │ │
│ │ 1. task.remote(args) │
│ ▼ │
│ ┌─────────┐ │
│ │ 创建 Task │ │
│ └─────────┘ │
│ │ │
│ │ 2. 返回 ObjectRef │
│ ▼ │
│ ┌─────────┐ │
│ │ ObjectRef│ ──► 立即返回,不等待执行 │
│ └─────────┘ │
│ │ │
│ │ 3. ray.get(ref) │
│ ▼ │
│ ┌─────────┐ │
│ │ 等待结果 │ ──► 阻塞直到结果就绪 │
│ └─────────┘ │
│ │ │
│ ▼ │
│ 实际结果 │
│ │
└─────────────────────────────────────────────────────────────┘
Tasks 基础 #
定义远程函数 #
python
import ray
ray.init()
@ray.remote
def add(a, b):
return a + b
result_ref = add.remote(1, 2)
result = ray.get(result_ref)
print(result)
ray.shutdown()
并行执行多个任务 #
python
import ray
import time
ray.init()
@ray.remote
def slow_task(x):
time.sleep(1)
return x * 2
start = time.time()
refs = [slow_task.remote(i) for i in range(10)]
results = ray.get(refs)
end = time.time()
print(f"Results: {results}")
print(f"Time: {end - start:.2f}s")
ray.shutdown()
任务依赖 #
python
import ray
ray.init()
@ray.remote
def fetch_data(url):
return f"data from {url}"
@ray.remote
def process_data(data):
return f"processed: {data}"
@ray.remote
def save_result(result):
return f"saved: {result}"
data_ref = fetch_data.remote("http://example.com")
processed_ref = process_data.remote(data_ref)
result_ref = save_result.remote(processed_ref)
print(ray.get(result_ref))
ray.shutdown()
指定资源需求 #
python
import ray
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_cpus=2)
def cpu_intensive():
return "CPU task done"
@ray.remote(num_gpus=1)
def gpu_task():
return "GPU task done"
@ray.remote(num_cpus=1, num_gpus=0.5)
def mixed_task():
return "Mixed task done"
ray.get(cpu_intensive.remote())
ray.shutdown()
Actors 基础 #
定义 Actor #
python
import ray
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_value(self):
return self.value
counter = Counter.remote()
print(ray.get(counter.increment.remote()))
print(ray.get(counter.increment.remote()))
print(ray.get(counter.get_value.remote()))
ray.shutdown()
Actor 生命周期 #
text
┌─────────────────────────────────────────────────────────────┐
│ Actor 生命周期 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 创建 │
│ actor = ActorClass.remote() │
│ └── 分配资源,创建 Actor 进程 │
│ │
│ 2. 调用方法 │
│ ref = actor.method.remote() │
│ └── 方法在 Actor 进程中执行 │
│ │
│ 3. 获取结果 │
│ result = ray.get(ref) │
│ └── 等待方法执行完成 │
│ │
│ 4. 销毁 │
│ ray.kill(actor) │
│ └── 强制终止 Actor │
│ │
└─────────────────────────────────────────────────────────────┘
Actor 资源配置 #
python
import ray
ray.init(num_cpus=4, num_gpus=2)
@ray.remote(num_cpus=2)
class CPUActor:
def work(self):
return "CPU actor working"
@ray.remote(num_gpus=1)
class GPUActor:
def work(self):
import torch
return torch.cuda.is_available()
cpu_actor = CPUActor.remote()
gpu_actor = GPUActor.remote()
print(ray.get(cpu_actor.work.remote()))
print(ray.get(gpu_actor.work.remote()))
ray.shutdown()
Actor 池 #
python
import ray
ray.init(num_cpus=4)
@ray.remote
class Worker:
def __init__(self, worker_id):
self.worker_id = worker_id
def process(self, data):
return f"Worker {self.worker_id} processed: {data}"
workers = [Worker.remote(i) for i in range(4)]
results = ray.get([w.process.remote(f"task_{i}") for i, w in enumerate(workers)])
print(results)
ray.shutdown()
对象存储 #
ray.put 和 ray.get #
python
import ray
ray.init()
data = [1, 2, 3, 4, 5]
ref = ray.put(data)
retrieved = ray.get(ref)
print(retrieved)
@ray.remote
def process(data_ref):
data = ray.get(data_ref)
return sum(data)
result = ray.get(process.remote(ref))
print(result)
ray.shutdown()
批量获取 #
python
import ray
ray.init()
@ray.remote
def task(x):
return x * 2
refs = [task.remote(i) for i in range(10)]
results = ray.get(refs)
print(results)
results_dict = ray.get({f"task_{i}": refs[i] for i in range(10)})
print(results_dict)
ray.shutdown()
超时控制 #
python
import ray
import time
ray.init()
@ray.remote
def slow_task():
time.sleep(10)
return "done"
ref = slow_task.remote()
try:
result = ray.get(ref, timeout=2)
except ray.exceptions.GetTimeoutError:
print("Task timed out")
ray.shutdown()
异步编程 #
使用 async/await #
python
import ray
import asyncio
ray.init()
@ray.remote
async def async_task(x):
await asyncio.sleep(1)
return x * 2
async def main():
ref = async_task.remote(10)
result = await ref
print(result)
asyncio.run(main())
ray.shutdown()
异步 Actor #
python
import ray
import asyncio
ray.init()
@ray.remote
class AsyncActor:
async def slow_method(self):
await asyncio.sleep(1)
return "async result"
async def concurrent_methods(self):
tasks = [self.slow_method.remote() for _ in range(5)]
results = await asyncio.gather(*tasks)
return results
actor = AsyncActor.remote()
print(ray.get(actor.slow_method.remote()))
ray.shutdown()
实用示例 #
并行数据处理 #
python
import ray
import time
ray.init()
@ray.remote
def process_batch(batch):
time.sleep(0.1)
return [x * 2 for x in batch]
data = list(range(100))
batch_size = 10
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
refs = [process_batch.remote(batch) for batch in batches]
results = ray.get(refs)
flat_results = [item for batch in results for item in batch]
print(f"Processed {len(flat_results)} items")
ray.shutdown()
分布式计数器 #
python
import ray
ray.init()
@ray.remote
class DistributedCounter:
def __init__(self):
self.value = 0
def increment(self, amount=1):
self.value += amount
def get(self):
return self.value
counter = DistributedCounter.remote()
refs = [counter.increment.remote(i) for i in range(100)]
ray.get(refs)
print(f"Final count: {ray.get(counter.get.remote())}")
ray.shutdown()
参数服务器模式 #
python
import ray
import numpy as np
ray.init()
@ray.remote
class ParameterServer:
def __init__(self, dim):
self.params = np.zeros(dim)
def get_params(self):
return self.params.copy()
def update(self, grads):
self.params += grads
@ray.remote
def worker(ps, worker_id, iterations=10):
for i in range(iterations):
params = ray.get(ps.get_params.remote())
grads = np.random.randn(len(params)) * 0.01
ps.update.remote(grads)
return f"Worker {worker_id} done"
ps = ParameterServer.remote(dim=10)
refs = [worker.remote(ps, i) for i in range(4)]
print(ray.get(refs))
print(f"Final params: {ray.get(ps.get_params.remote())}")
ray.shutdown()
Dashboard #
访问 Dashboard #
Ray 启动后,Dashboard 默认运行在 http://127.0.0.1:8265
text
┌─────────────────────────────────────────────────────────────┐
│ Ray Dashboard │
├─────────────────────────────────────────────────────────────┤
│ │
│ 主要功能: │
│ ├── Overview:集群概览 │
│ ├── Jobs:任务管理 │
│ ├── Actors:Actor 状态 │
│ ├── Objects:对象存储 │
│ ├── Nodes:节点信息 │
│ ├── Metrics:性能指标 │
│ └── Logs:日志查看 │
│ │
│ 用途: │
│ ├── 监控集群状态 │
│ ├── 调试分布式应用 │
│ ├── 分析性能瓶颈 │
│ └── 查看资源使用 │
│ │
└─────────────────────────────────────────────────────────────┘
禁用 Dashboard #
python
ray.init(include_dashboard=False)
自定义端口 #
python
ray.init(dashboard_host="0.0.0.0", dashboard_port=8266)
最佳实践 #
1. 合理使用 ray.get #
python
refs = [task.remote(i) for i in range(100)]
results = ray.get(refs)
results = []
for ref in refs:
results.append(ray.get(ref))
2. 避免小任务 #
python
@ray.remote
def process_batch(items):
return [process(item) for item in items]
refs = [process_batch.remote(batch) for batch in batches]
refs = [process_item.remote(item) for item in items]
3. 使用 ray.put 共享大数据 #
python
large_data = load_large_dataset()
data_ref = ray.put(large_data)
refs = [process.remote(data_ref, i) for i in range(100)]
refs = [process.remote(large_data, i) for i in range(100)]
4. 及时清理资源 #
python
import ray
ray.init()
try:
pass
finally:
ray.shutdown()
下一步 #
现在你已经掌握了 Ray 的基本用法,继续学习 Tasks 任务 深入理解远程函数!
最后更新:2026-04-05