资源管理 #
资源概述 #
Ray 提供了灵活的资源管理系统,支持 CPU、GPU、内存等标准资源,以及自定义资源。通过资源管理,可以高效地调度和执行分布式任务。
text
┌─────────────────────────────────────────────────────────────┐
│ Ray 资源类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 标准资源: │
│ ├── CPU:CPU 核心数 │
│ ├── GPU:GPU 设备数 │
│ └── Memory:内存大小 │
│ │
│ 自定义资源: │
│ ├── 特定硬件(TPU、FPGA) │
│ ├── 软件许可证 │
│ ├── 网络带宽 │
│ └── 任何用户定义资源 │
│ │
└─────────────────────────────────────────────────────────────┘
查看资源 #
集群资源 #
python
import ray
ray.init()
print("Cluster resources:")
print(ray.cluster_resources())
print("\nAvailable resources:")
print(ray.available_resources())
ray.shutdown()
资源信息 #
python
import ray
ray.init(num_cpus=8, num_gpus=2)
resources = ray.cluster_resources()
print(f"Total CPUs: {resources.get('CPU', 0)}")
print(f"Total GPUs: {resources.get('GPU', 0)}")
print(f"Total Memory: {resources.get('memory', 0) / 1e9:.2f} GB")
ray.shutdown()
CPU 资源 #
指定 CPU 需求 #
python
import ray
ray.init(num_cpus=4)
@ray.remote(num_cpus=2)
def cpu_intensive_task():
import time
time.sleep(1)
return "CPU task done"
refs = [cpu_intensive_task.remote() for _ in range(4)]
results = ray.get(refs)
print(results)
ray.shutdown()
CPU 资源分配 #
text
┌─────────────────────────────────────────────────────────────┐
│ CPU 资源分配示例 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 集群:4 CPU │
│ │
│ Task 1 (num_cpus=2) ──► 占用 2 CPU │
│ Task 2 (num_cpus=1) ──► 占用 1 CPU │
│ Task 3 (num_cpus=2) ──► 等待(剩余 1 CPU 不足) │
│ │
│ Task 1 完成后 ──► 释放 2 CPU │
│ Task 3 开始执行 │
│ │
└─────────────────────────────────────────────────────────────┘
动态 CPU 分配 #
python
import ray
ray.init()
@ray.remote
def base_task(x):
return x * 2
ref1 = base_task.options(num_cpus=1).remote(10)
ref2 = base_task.options(num_cpus=2).remote(20)
ref3 = base_task.options(num_cpus=0.5).remote(30)
print(ray.get([ref1, ref2, ref3]))
ray.shutdown()
GPU 资源 #
指定 GPU 需求 #
python
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
def gpu_task():
import torch
return torch.cuda.is_available()
@ray.remote(num_gpus=0.5)
def half_gpu_task():
return "Using 0.5 GPU"
print(ray.get(gpu_task.remote()))
print(ray.get(half_gpu_task.remote()))
ray.shutdown()
多 GPU 任务 #
python
import ray
ray.init(num_gpus=4)
@ray.remote(num_gpus=2)
def multi_gpu_task():
import torch
num_gpus = torch.cuda.device_count()
return f"Available GPUs: {num_gpus}"
print(ray.get(multi_gpu_task.remote()))
ray.shutdown()
GPU 选择 #
python
import ray
import os
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
def gpu_info():
visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "not set")
return f"CUDA_VISIBLE_DEVICES: {visible_devices}"
print(ray.get(gpu_info.remote()))
ray.shutdown()
GPU Actor #
python
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
class GPUModel:
def __init__(self):
import torch
self.device = torch.device("cuda:0")
def predict(self, data):
import torch
tensor = torch.tensor(data).to(self.device)
return tensor.sum().item()
model = GPUModel.remote()
print(ray.get(model.predict.remote([1, 2, 3, 4, 5])))
ray.shutdown()
内存资源 #
指定内存需求 #
python
import ray
ray.init()
@ray.remote(memory=1024 * 1024 * 1024)
def memory_intensive_task():
data = [0] * (256 * 1024 * 1024)
return len(data)
print(ray.get(memory_intensive_task.remote()))
ray.shutdown()
内存限制 #
python
import ray
ray.init(object_store_memory=2 * 1024 * 1024 * 1024)
@ray.remote
def create_large_object(size):
return [0] * size
try:
refs = [create_large_object.remote(500 * 1024 * 1024) for _ in range(5)]
ray.get(refs)
except ray.exceptions.ObjectStoreFullError:
print("Object store full!")
ray.shutdown()
自定义资源 #
定义自定义资源 #
python
import ray
ray.init(resources={"TPU": 4, "SSD": 2})
@ray.remote(resources={"TPU": 1})
def tpu_task():
return "Using TPU"
@ray.remote(resources={"SSD": 1})
def ssd_task():
return "Using SSD"
print(ray.get(tpu_task.remote()))
print(ray.get(ssd_task.remote()))
ray.shutdown()
动态资源 #
python
import ray
ray.init()
@ray.remote
def task_with_resource():
return "done"
ref = task_with_resource.options(
resources={"custom_resource": 1}
).remote()
ray.shutdown()
资源标签 #
python
import ray
ray.init(resources={"node_type:gpu": 1, "node_type:cpu": 4})
@ray.remote(resources={"node_type:gpu": 1})
def gpu_node_task():
return "Running on GPU node"
@ray.remote(resources={"node_type:cpu": 1})
def cpu_node_task():
return "Running on CPU node"
ray.shutdown()
资源调度 #
调度策略 #
text
┌─────────────────────────────────────────────────────────────┐
│ 调度策略类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ DEFAULT │
│ ├── 默认策略 │
│ ├── 考虑资源可用性 │
│ └── 考虑数据局部性 │
│ │
│ SPREAD │
│ ├── 尽量分散到不同节点 │
│ ├── 适用于高可用场景 │
│ └── 减少单点故障影响 │
│ │
│ NODE_AFFINITY │
│ ├── 绑定到特定节点 │
│ ├── 适用于数据局部性 │
│ └── 减少网络传输 │
│ │
│ PLACEMENT_GROUP │
│ ├── 使用 Placement Group │
│ ├── 保证资源同时可用 │
│ └── 适用于协同任务 │
│ │
└─────────────────────────────────────────────────────────────┘
SPREAD 策略 #
python
import ray
ray.init()
@ray.remote(scheduling_strategy="SPREAD")
def spread_task(x):
import socket
return f"Task {x} on {socket.gethostname()}"
refs = [spread_task.remote(i) for i in range(10)]
print(ray.get(refs))
ray.shutdown()
NODE_AFFINITY 策略 #
python
import ray
ray.init()
@ray.remote
def get_node_id():
return ray.get_runtime_context().get_node_id()
node_id = ray.get(get_node_id.remote())
@ray.remote(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=node_id,
soft=False
)
)
def affinity_task():
return "Running on specific node"
print(ray.get(affinity_task.remote()))
ray.shutdown()
Placement Group #
创建 Placement Group #
python
import ray
ray.init(num_cpus=4, num_gpus=2)
pg = ray.util.placement_group(
bundles=[
{"CPU": 2, "GPU": 1},
{"CPU": 2, "GPU": 1}
],
strategy="STRICT_PACK"
)
ray.get(pg.ready())
@ray.remote(
num_cpus=1,
num_gpus=0.5,
placement_group=pg
)
def pg_task():
return "Running in placement group"
refs = [pg_task.remote() for _ in range(4)]
print(ray.get(refs))
ray.util.remove_placement_group(pg)
ray.shutdown()
Placement Group 策略 #
text
┌─────────────────────────────────────────────────────────────┐
│ Placement Group 策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ PACK │
│ ├── 尽量打包到最少节点 │
│ └── 节省资源,适合小任务 │
│ │
│ SPREAD │
│ ├── 尽量分散到不同节点 │
│ └── 高可用,适合关键任务 │
│ │
│ STRICT_PACK │
│ ├── 必须打包到同一节点 │
│ └── 适合需要频繁通信的任务 │
│ │
│ STRICT_SPREAD │
│ ├── 必须分散到不同节点 │
│ └── 最高可用性 │
│ │
└─────────────────────────────────────────────────────────────┘
Placement Group Bundle #
python
import ray
ray.init(num_cpus=6)
pg = ray.util.placement_group(
bundles=[
{"CPU": 2},
{"CPU": 2},
{"CPU": 2}
],
strategy="SPREAD"
)
ray.get(pg.ready())
@ray.remote(num_cpus=1, placement_group=pg, placement_group_bundle_index=0)
def task_on_bundle_0():
return "Bundle 0"
@ray.remote(num_cpus=1, placement_group=pg, placement_group_bundle_index=1)
def task_on_bundle_1():
return "Bundle 1"
print(ray.get([task_on_bundle_0.remote(), task_on_bundle_1.remote()]))
ray.util.remove_placement_group(pg)
ray.shutdown()
资源监控 #
实时监控 #
python
import ray
ray.init()
@ray.remote(num_cpus=1)
def monitor_task():
import time
time.sleep(5)
return "done"
refs = [monitor_task.remote() for _ in range(10)]
while True:
ready, remaining = ray.wait(refs, timeout=1, num_returns=len(refs))
if not remaining:
break
print(f"Completed: {len(ready)}, Remaining: {len(remaining)}")
ray.shutdown()
资源使用统计 #
python
import ray
ray.init()
print("Initial resources:", ray.available_resources())
@ray.remote(num_cpus=2)
def task():
import time
time.sleep(2)
return "done"
refs = [task.remote() for _ in range(2)]
print("After scheduling:", ray.available_resources())
ray.get(refs)
print("After completion:", ray.available_resources())
ray.shutdown()
最佳实践 #
1. 合理设置资源 #
python
import ray
ray.init()
@ray.remote(num_cpus=2)
def balanced_task():
return "balanced"
@ray.remote(num_cpus=100)
def over_requested():
return "will wait forever"
ray.shutdown()
2. 使用 Placement Group 协调资源 #
python
import ray
ray.init(num_cpus=4)
pg = ray.util.placement_group(
bundles=[{"CPU": 2}, {"CPU": 2}],
strategy="PACK"
)
ray.get(pg.ready())
@ray.remote(num_cpus=2, placement_group=pg)
def coordinated_task():
return "coordinated"
ray.util.remove_placement_group(pg)
ray.shutdown()
3. 监控资源使用 #
python
import ray
ray.init()
def check_resources():
available = ray.available_resources()
cluster = ray.cluster_resources()
used = {k: cluster[k] - available.get(k, 0) for k in cluster}
return used
print("Resource usage:", check_resources())
ray.shutdown()
4. 处理资源不足 #
python
import ray
import time
ray.init(num_cpus=2)
@ray.remote(num_cpus=1)
def task_with_timeout():
time.sleep(1)
return "done"
refs = [task_with_timeout.remote() for _ in range(10)]
ready, remaining = ray.wait(refs, timeout=5, num_returns=len(refs))
print(f"Completed: {len(ready)}, Timed out: {len(remaining)}")
ray.shutdown()
下一步 #
掌握了资源管理之后,继续学习 Ray Data 数据处理,了解大规模数据处理的方法!
最后更新:2026-04-05