快速开始 #

第一个 Ray 程序 #

初始化 Ray #

python
import ray

ray.init()

输出示例:

text
2024-01-01 10:00:00,000 INFO worker.py:1234 -- Connecting to existing Ray cluster at address: ...
Ray runtime started.
Dashboard: http://127.0.0.1:8265

Hello Ray #

python
import ray

ray.init()

@ray.remote
def hello(name):
    return f"Hello, {name}!"

future = hello.remote("Ray")
result = ray.get(future)
print(result)

ray.shutdown()

理解 Ray 编程模型 #

核心概念 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Ray 编程模型                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 普通函数 ──► @ray.remote ──► Task(远程函数)            │
│                                                             │
│  2. 普通类 ──► @ray.remote ──► Actor(远程服务)             │
│                                                             │
│  3. 返回值 ──► ObjectRef(对象引用)                         │
│                                                             │
│  4. 获取结果 ──► ray.get(ref)                               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

远程调用流程 #

text
┌─────────────────────────────────────────────────────────────┐
│                    远程调用流程                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Driver 进程                                                 │
│       │                                                      │
│       │  1. task.remote(args)                               │
│       ▼                                                      │
│  ┌─────────┐                                                │
│  │ 创建 Task │                                               │
│  └─────────┘                                                │
│       │                                                      │
│       │  2. 返回 ObjectRef                                   │
│       ▼                                                      │
│  ┌─────────┐                                                │
│  │ ObjectRef│  ──►  立即返回,不等待执行                      │
│  └─────────┘                                                │
│       │                                                      │
│       │  3. ray.get(ref)                                    │
│       ▼                                                      │
│  ┌─────────┐                                                │
│  │ 等待结果 │  ──►  阻塞直到结果就绪                          │
│  └─────────┘                                                │
│       │                                                      │
│       ▼                                                      │
│  实际结果                                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Tasks 基础 #

定义远程函数 #

python
import ray

ray.init()

@ray.remote
def add(a, b):
    return a + b

result_ref = add.remote(1, 2)
result = ray.get(result_ref)
print(result)

ray.shutdown()

并行执行多个任务 #

python
import ray
import time

ray.init()

@ray.remote
def slow_task(x):
    time.sleep(1)
    return x * 2

start = time.time()
refs = [slow_task.remote(i) for i in range(10)]
results = ray.get(refs)
end = time.time()

print(f"Results: {results}")
print(f"Time: {end - start:.2f}s")

ray.shutdown()

任务依赖 #

python
import ray

ray.init()

@ray.remote
def fetch_data(url):
    return f"data from {url}"

@ray.remote
def process_data(data):
    return f"processed: {data}"

@ray.remote
def save_result(result):
    return f"saved: {result}"

data_ref = fetch_data.remote("http://example.com")
processed_ref = process_data.remote(data_ref)
result_ref = save_result.remote(processed_ref)

print(ray.get(result_ref))

ray.shutdown()

指定资源需求 #

python
import ray

ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_cpus=2)
def cpu_intensive():
    return "CPU task done"

@ray.remote(num_gpus=1)
def gpu_task():
    return "GPU task done"

@ray.remote(num_cpus=1, num_gpus=0.5)
def mixed_task():
    return "Mixed task done"

ray.get(cpu_intensive.remote())

ray.shutdown()

Actors 基础 #

定义 Actor #

python
import ray

ray.init()

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1
        return self.value
    
    def get_value(self):
        return self.value

counter = Counter.remote()

print(ray.get(counter.increment.remote()))
print(ray.get(counter.increment.remote()))
print(ray.get(counter.get_value.remote()))

ray.shutdown()

Actor 生命周期 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Actor 生命周期                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 创建                                                     │
│     actor = ActorClass.remote()                             │
│     └── 分配资源,创建 Actor 进程                            │
│                                                             │
│  2. 调用方法                                                 │
│     ref = actor.method.remote()                             │
│     └── 方法在 Actor 进程中执行                              │
│                                                             │
│  3. 获取结果                                                 │
│     result = ray.get(ref)                                   │
│     └── 等待方法执行完成                                     │
│                                                             │
│  4. 销毁                                                     │
│     ray.kill(actor)                                         │
│     └── 强制终止 Actor                                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Actor 资源配置 #

python
import ray

ray.init(num_cpus=4, num_gpus=2)

@ray.remote(num_cpus=2)
class CPUActor:
    def work(self):
        return "CPU actor working"

@ray.remote(num_gpus=1)
class GPUActor:
    def work(self):
        import torch
        return torch.cuda.is_available()

cpu_actor = CPUActor.remote()
gpu_actor = GPUActor.remote()

print(ray.get(cpu_actor.work.remote()))
print(ray.get(gpu_actor.work.remote()))

ray.shutdown()

Actor 池 #

python
import ray

ray.init(num_cpus=4)

@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
    
    def process(self, data):
        return f"Worker {self.worker_id} processed: {data}"

workers = [Worker.remote(i) for i in range(4)]

results = ray.get([w.process.remote(f"task_{i}") for i, w in enumerate(workers)])
print(results)

ray.shutdown()

对象存储 #

ray.put 和 ray.get #

python
import ray

ray.init()

data = [1, 2, 3, 4, 5]
ref = ray.put(data)

retrieved = ray.get(ref)
print(retrieved)

@ray.remote
def process(data_ref):
    data = ray.get(data_ref)
    return sum(data)

result = ray.get(process.remote(ref))
print(result)

ray.shutdown()

批量获取 #

python
import ray

ray.init()

@ray.remote
def task(x):
    return x * 2

refs = [task.remote(i) for i in range(10)]

results = ray.get(refs)
print(results)

results_dict = ray.get({f"task_{i}": refs[i] for i in range(10)})
print(results_dict)

ray.shutdown()

超时控制 #

python
import ray
import time

ray.init()

@ray.remote
def slow_task():
    time.sleep(10)
    return "done"

ref = slow_task.remote()

try:
    result = ray.get(ref, timeout=2)
except ray.exceptions.GetTimeoutError:
    print("Task timed out")

ray.shutdown()

异步编程 #

使用 async/await #

python
import ray
import asyncio

ray.init()

@ray.remote
async def async_task(x):
    await asyncio.sleep(1)
    return x * 2

async def main():
    ref = async_task.remote(10)
    result = await ref
    print(result)

asyncio.run(main())

ray.shutdown()

异步 Actor #

python
import ray
import asyncio

ray.init()

@ray.remote
class AsyncActor:
    async def slow_method(self):
        await asyncio.sleep(1)
        return "async result"
    
    async def concurrent_methods(self):
        tasks = [self.slow_method.remote() for _ in range(5)]
        results = await asyncio.gather(*tasks)
        return results

actor = AsyncActor.remote()
print(ray.get(actor.slow_method.remote()))

ray.shutdown()

实用示例 #

并行数据处理 #

python
import ray
import time

ray.init()

@ray.remote
def process_batch(batch):
    time.sleep(0.1)
    return [x * 2 for x in batch]

data = list(range(100))
batch_size = 10
batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]

refs = [process_batch.remote(batch) for batch in batches]
results = ray.get(refs)
flat_results = [item for batch in results for item in batch]

print(f"Processed {len(flat_results)} items")

ray.shutdown()

分布式计数器 #

python
import ray

ray.init()

@ray.remote
class DistributedCounter:
    def __init__(self):
        self.value = 0
    
    def increment(self, amount=1):
        self.value += amount
    
    def get(self):
        return self.value

counter = DistributedCounter.remote()

refs = [counter.increment.remote(i) for i in range(100)]
ray.get(refs)

print(f"Final count: {ray.get(counter.get.remote())}")

ray.shutdown()

参数服务器模式 #

python
import ray
import numpy as np

ray.init()

@ray.remote
class ParameterServer:
    def __init__(self, dim):
        self.params = np.zeros(dim)
    
    def get_params(self):
        return self.params.copy()
    
    def update(self, grads):
        self.params += grads

@ray.remote
def worker(ps, worker_id, iterations=10):
    for i in range(iterations):
        params = ray.get(ps.get_params.remote())
        grads = np.random.randn(len(params)) * 0.01
        ps.update.remote(grads)
    return f"Worker {worker_id} done"

ps = ParameterServer.remote(dim=10)

refs = [worker.remote(ps, i) for i in range(4)]
print(ray.get(refs))
print(f"Final params: {ray.get(ps.get_params.remote())}")

ray.shutdown()

Dashboard #

访问 Dashboard #

Ray 启动后,Dashboard 默认运行在 http://127.0.0.1:8265

text
┌─────────────────────────────────────────────────────────────┐
│                    Ray Dashboard                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  主要功能:                                                  │
│  ├── Overview:集群概览                                      │
│  ├── Jobs:任务管理                                          │
│  ├── Actors:Actor 状态                                      │
│  ├── Objects:对象存储                                       │
│  ├── Nodes:节点信息                                         │
│  ├── Metrics:性能指标                                       │
│  └── Logs:日志查看                                          │
│                                                             │
│  用途:                                                      │
│  ├── 监控集群状态                                            │
│  ├── 调试分布式应用                                          │
│  ├── 分析性能瓶颈                                            │
│  └── 查看资源使用                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

禁用 Dashboard #

python
ray.init(include_dashboard=False)

自定义端口 #

python
ray.init(dashboard_host="0.0.0.0", dashboard_port=8266)

最佳实践 #

1. 合理使用 ray.get #

python
refs = [task.remote(i) for i in range(100)]

results = ray.get(refs)

results = []
for ref in refs:
    results.append(ray.get(ref))

2. 避免小任务 #

python
@ray.remote
def process_batch(items):
    return [process(item) for item in items]

refs = [process_batch.remote(batch) for batch in batches]

refs = [process_item.remote(item) for item in items]

3. 使用 ray.put 共享大数据 #

python
large_data = load_large_dataset()
data_ref = ray.put(large_data)

refs = [process.remote(data_ref, i) for i in range(100)]

refs = [process.remote(large_data, i) for i in range(100)]

4. 及时清理资源 #

python
import ray

ray.init()

try:
    pass
finally:
    ray.shutdown()

下一步 #

现在你已经掌握了 Ray 的基本用法,继续学习 Tasks 任务 深入理解远程函数!

最后更新:2026-04-05