资源管理 #

资源概述 #

Ray 提供了灵活的资源管理系统,支持 CPU、GPU、内存等标准资源,以及自定义资源。通过资源管理,可以高效地调度和执行分布式任务。

text
┌─────────────────────────────────────────────────────────────┐
│                    Ray 资源类型                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  标准资源:                                                  │
│  ├── CPU:CPU 核心数                                        │
│  ├── GPU:GPU 设备数                                        │
│  └── Memory:内存大小                                       │
│                                                             │
│  自定义资源:                                                │
│  ├── 特定硬件(TPU、FPGA)                                  │
│  ├── 软件许可证                                             │
│  ├── 网络带宽                                               │
│  └── 任何用户定义资源                                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

查看资源 #

集群资源 #

python
import ray

ray.init()

print("Cluster resources:")
print(ray.cluster_resources())

print("\nAvailable resources:")
print(ray.available_resources())

ray.shutdown()

资源信息 #

python
import ray

ray.init(num_cpus=8, num_gpus=2)

resources = ray.cluster_resources()
print(f"Total CPUs: {resources.get('CPU', 0)}")
print(f"Total GPUs: {resources.get('GPU', 0)}")
print(f"Total Memory: {resources.get('memory', 0) / 1e9:.2f} GB")

ray.shutdown()

CPU 资源 #

指定 CPU 需求 #

python
import ray

ray.init(num_cpus=4)

@ray.remote(num_cpus=2)
def cpu_intensive_task():
    import time
    time.sleep(1)
    return "CPU task done"

refs = [cpu_intensive_task.remote() for _ in range(4)]
results = ray.get(refs)
print(results)

ray.shutdown()

CPU 资源分配 #

text
┌─────────────────────────────────────────────────────────────┐
│                    CPU 资源分配示例                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  集群:4 CPU                                                │
│                                                             │
│  Task 1 (num_cpus=2) ──► 占用 2 CPU                        │
│  Task 2 (num_cpus=1) ──► 占用 1 CPU                        │
│  Task 3 (num_cpus=2) ──► 等待(剩余 1 CPU 不足)            │
│                                                             │
│  Task 1 完成后 ──► 释放 2 CPU                               │
│  Task 3 开始执行                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

动态 CPU 分配 #

python
import ray

ray.init()

@ray.remote
def base_task(x):
    return x * 2

ref1 = base_task.options(num_cpus=1).remote(10)
ref2 = base_task.options(num_cpus=2).remote(20)
ref3 = base_task.options(num_cpus=0.5).remote(30)

print(ray.get([ref1, ref2, ref3]))

ray.shutdown()

GPU 资源 #

指定 GPU 需求 #

python
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
def gpu_task():
    import torch
    return torch.cuda.is_available()

@ray.remote(num_gpus=0.5)
def half_gpu_task():
    return "Using 0.5 GPU"

print(ray.get(gpu_task.remote()))
print(ray.get(half_gpu_task.remote()))

ray.shutdown()

多 GPU 任务 #

python
import ray

ray.init(num_gpus=4)

@ray.remote(num_gpus=2)
def multi_gpu_task():
    import torch
    num_gpus = torch.cuda.device_count()
    return f"Available GPUs: {num_gpus}"

print(ray.get(multi_gpu_task.remote()))

ray.shutdown()

GPU 选择 #

python
import ray
import os

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
def gpu_info():
    visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "not set")
    return f"CUDA_VISIBLE_DEVICES: {visible_devices}"

print(ray.get(gpu_info.remote()))

ray.shutdown()

GPU Actor #

python
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUModel:
    def __init__(self):
        import torch
        self.device = torch.device("cuda:0")
    
    def predict(self, data):
        import torch
        tensor = torch.tensor(data).to(self.device)
        return tensor.sum().item()

model = GPUModel.remote()
print(ray.get(model.predict.remote([1, 2, 3, 4, 5])))

ray.shutdown()

内存资源 #

指定内存需求 #

python
import ray

ray.init()

@ray.remote(memory=1024 * 1024 * 1024)
def memory_intensive_task():
    data = [0] * (256 * 1024 * 1024)
    return len(data)

print(ray.get(memory_intensive_task.remote()))

ray.shutdown()

内存限制 #

python
import ray

ray.init(object_store_memory=2 * 1024 * 1024 * 1024)

@ray.remote
def create_large_object(size):
    return [0] * size

try:
    refs = [create_large_object.remote(500 * 1024 * 1024) for _ in range(5)]
    ray.get(refs)
except ray.exceptions.ObjectStoreFullError:
    print("Object store full!")

ray.shutdown()

自定义资源 #

定义自定义资源 #

python
import ray

ray.init(resources={"TPU": 4, "SSD": 2})

@ray.remote(resources={"TPU": 1})
def tpu_task():
    return "Using TPU"

@ray.remote(resources={"SSD": 1})
def ssd_task():
    return "Using SSD"

print(ray.get(tpu_task.remote()))
print(ray.get(ssd_task.remote()))

ray.shutdown()

动态资源 #

python
import ray

ray.init()

@ray.remote
def task_with_resource():
    return "done"

ref = task_with_resource.options(
    resources={"custom_resource": 1}
).remote()

ray.shutdown()

资源标签 #

python
import ray

ray.init(resources={"node_type:gpu": 1, "node_type:cpu": 4})

@ray.remote(resources={"node_type:gpu": 1})
def gpu_node_task():
    return "Running on GPU node"

@ray.remote(resources={"node_type:cpu": 1})
def cpu_node_task():
    return "Running on CPU node"

ray.shutdown()

资源调度 #

调度策略 #

text
┌─────────────────────────────────────────────────────────────┐
│                    调度策略类型                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  DEFAULT                                                    │
│  ├── 默认策略                                               │
│  ├── 考虑资源可用性                                         │
│  └── 考虑数据局部性                                         │
│                                                             │
│  SPREAD                                                     │
│  ├── 尽量分散到不同节点                                     │
│  ├── 适用于高可用场景                                       │
│  └── 减少单点故障影响                                       │
│                                                             │
│  NODE_AFFINITY                                              │
│  ├── 绑定到特定节点                                         │
│  ├── 适用于数据局部性                                       │
│  └── 减少网络传输                                           │
│                                                             │
│  PLACEMENT_GROUP                                            │
│  ├── 使用 Placement Group                                   │
│  ├── 保证资源同时可用                                       │
│  └── 适用于协同任务                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

SPREAD 策略 #

python
import ray

ray.init()

@ray.remote(scheduling_strategy="SPREAD")
def spread_task(x):
    import socket
    return f"Task {x} on {socket.gethostname()}"

refs = [spread_task.remote(i) for i in range(10)]
print(ray.get(refs))

ray.shutdown()

NODE_AFFINITY 策略 #

python
import ray

ray.init()

@ray.remote
def get_node_id():
    return ray.get_runtime_context().get_node_id()

node_id = ray.get(get_node_id.remote())

@ray.remote(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=node_id,
        soft=False
    )
)
def affinity_task():
    return "Running on specific node"

print(ray.get(affinity_task.remote()))

ray.shutdown()

Placement Group #

创建 Placement Group #

python
import ray

ray.init(num_cpus=4, num_gpus=2)

pg = ray.util.placement_group(
    bundles=[
        {"CPU": 2, "GPU": 1},
        {"CPU": 2, "GPU": 1}
    ],
    strategy="STRICT_PACK"
)

ray.get(pg.ready())

@ray.remote(
    num_cpus=1,
    num_gpus=0.5,
    placement_group=pg
)
def pg_task():
    return "Running in placement group"

refs = [pg_task.remote() for _ in range(4)]
print(ray.get(refs))

ray.util.remove_placement_group(pg)

ray.shutdown()

Placement Group 策略 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Placement Group 策略                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  PACK                                                       │
│  ├── 尽量打包到最少节点                                     │
│  └── 节省资源,适合小任务                                   │
│                                                             │
│  SPREAD                                                     │
│  ├── 尽量分散到不同节点                                     │
│  └── 高可用,适合关键任务                                   │
│                                                             │
│  STRICT_PACK                                                │
│  ├── 必须打包到同一节点                                     │
│  └── 适合需要频繁通信的任务                                 │
│                                                             │
│  STRICT_SPREAD                                              │
│  ├── 必须分散到不同节点                                     │
│  └── 最高可用性                                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Placement Group Bundle #

python
import ray

ray.init(num_cpus=6)

pg = ray.util.placement_group(
    bundles=[
        {"CPU": 2},
        {"CPU": 2},
        {"CPU": 2}
    ],
    strategy="SPREAD"
)

ray.get(pg.ready())

@ray.remote(num_cpus=1, placement_group=pg, placement_group_bundle_index=0)
def task_on_bundle_0():
    return "Bundle 0"

@ray.remote(num_cpus=1, placement_group=pg, placement_group_bundle_index=1)
def task_on_bundle_1():
    return "Bundle 1"

print(ray.get([task_on_bundle_0.remote(), task_on_bundle_1.remote()]))

ray.util.remove_placement_group(pg)

ray.shutdown()

资源监控 #

实时监控 #

python
import ray

ray.init()

@ray.remote(num_cpus=1)
def monitor_task():
    import time
    time.sleep(5)
    return "done"

refs = [monitor_task.remote() for _ in range(10)]

while True:
    ready, remaining = ray.wait(refs, timeout=1, num_returns=len(refs))
    if not remaining:
        break
    print(f"Completed: {len(ready)}, Remaining: {len(remaining)}")

ray.shutdown()

资源使用统计 #

python
import ray

ray.init()

print("Initial resources:", ray.available_resources())

@ray.remote(num_cpus=2)
def task():
    import time
    time.sleep(2)
    return "done"

refs = [task.remote() for _ in range(2)]

print("After scheduling:", ray.available_resources())

ray.get(refs)

print("After completion:", ray.available_resources())

ray.shutdown()

最佳实践 #

1. 合理设置资源 #

python
import ray

ray.init()

@ray.remote(num_cpus=2)
def balanced_task():
    return "balanced"

@ray.remote(num_cpus=100)
def over_requested():
    return "will wait forever"

ray.shutdown()

2. 使用 Placement Group 协调资源 #

python
import ray

ray.init(num_cpus=4)

pg = ray.util.placement_group(
    bundles=[{"CPU": 2}, {"CPU": 2}],
    strategy="PACK"
)
ray.get(pg.ready())

@ray.remote(num_cpus=2, placement_group=pg)
def coordinated_task():
    return "coordinated"

ray.util.remove_placement_group(pg)

ray.shutdown()

3. 监控资源使用 #

python
import ray

ray.init()

def check_resources():
    available = ray.available_resources()
    cluster = ray.cluster_resources()
    used = {k: cluster[k] - available.get(k, 0) for k in cluster}
    return used

print("Resource usage:", check_resources())

ray.shutdown()

4. 处理资源不足 #

python
import ray
import time

ray.init(num_cpus=2)

@ray.remote(num_cpus=1)
def task_with_timeout():
    time.sleep(1)
    return "done"

refs = [task_with_timeout.remote() for _ in range(10)]

ready, remaining = ray.wait(refs, timeout=5, num_returns=len(refs))
print(f"Completed: {len(ready)}, Timed out: {len(remaining)}")

ray.shutdown()

下一步 #

掌握了资源管理之后,继续学习 Ray Data 数据处理,了解大规模数据处理的方法!

最后更新:2026-04-05