限流熔断 #
一、限流概述 #
1.1 为什么需要限流 #
text
无限流情况:
客户端 ──► ──► ──► ──► ──► 服务
│ │
└── 大量请求 ────────────────┘
│
▼
服务崩溃
有限流情况:
客户端 ──► ──► ──► ──► ──► 网关 ──► 服务
│ │
└── 大量请求 ────────────────┘
│
▼
限流保护
正常响应
1.2 限流算法 #
| 算法 | 说明 | 特点 |
|---|---|---|
| 计数器 | 固定时间窗口计数 | 简单,有临界问题 |
| 滑动窗口 | 滑动时间窗口计数 | 平滑,内存占用高 |
| 令牌桶 | 按速率生成令牌 | 允许突发流量 |
| 漏桶 | 恒定速率处理 | 平滑流量 |
二、内置限流器 #
2.1 添加依赖 #
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
2.2 配置Redis #
yaml
spring:
redis:
host: localhost
port: 6379
2.3 配置限流 #
yaml
spring:
cloud:
gateway:
routes:
- id: rate-limit-route
uri: lb://user-service
predicates:
- Path=/user/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 10
redis-rate-limiter.burstCapacity: 20
redis-rate-limiter.requestedTokens: 1
key-resolver: "#{@userKeyResolver}"
2.4 配置KeyResolver #
java
@Configuration
public class RateLimitConfig {
@Bean
public KeyResolver userKeyResolver() {
return exchange -> {
String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
return Mono.just(userId != null ? userId : "anonymous");
};
}
@Bean
public KeyResolver ipKeyResolver() {
return exchange -> Mono.just(
exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()
);
}
@Bean
public KeyResolver apiKeyResolver() {
return exchange -> Mono.just(
exchange.getRequest().getPath().value()
);
}
}
2.5 参数说明 #
| 参数 | 说明 |
|---|---|
| replenishRate | 每秒补充的令牌数 |
| burstCapacity | 令牌桶容量 |
| requestedTokens | 每次请求消耗的令牌数 |
| key-resolver | 限流Key解析器 |
三、自定义限流器 #
3.1 自定义RateLimiter #
java
@Component
public class CustomRateLimiter extends AbstractRateLimiter<CustomRateLimiter.Config> {
private final Map<String, RateLimiter> limiters = new ConcurrentHashMap<>();
public CustomRateLimiter() {
super(Config.class, "custom-rate-limiter", null);
}
@Override
public Mono<Response> isAllowed(String routeId, String id) {
RateLimiter limiter = limiters.computeIfAbsent(id,
key -> new RateLimiter(getConfig().getReplenishRate(), getConfig().getBurstCapacity()));
boolean allowed = limiter.tryAcquire();
return Mono.just(new Response(allowed,
Collections.singletonMap("X-RateLimit-Remaining",
String.valueOf(limiter.getAvailableTokens()))));
}
public static class Config {
private int replenishRate = 10;
private int burstCapacity = 20;
public int getReplenishRate() {
return replenishRate;
}
public void setReplenishRate(int replenishRate) {
this.replenishRate = replenishRate;
}
public int getBurstCapacity() {
return burstCapacity;
}
public void setBurstCapacity(int burstCapacity) {
this.burstCapacity = burstCapacity;
}
}
private static class RateLimiter {
private final int replenishRate;
private final int burstCapacity;
private final AtomicInteger tokens;
private final AtomicLong lastRefillTime;
public RateLimiter(int replenishRate, int burstCapacity) {
this.replenishRate = replenishRate;
this.burstCapacity = burstCapacity;
this.tokens = new AtomicInteger(burstCapacity);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
}
public synchronized boolean tryAcquire() {
refill();
return tokens.getAndDecrement() > 0;
}
private void refill() {
long now = System.currentTimeMillis();
long elapsed = now - lastRefillTime.get();
int tokensToAdd = (int) (elapsed * replenishRate / 1000);
if (tokensToAdd > 0) {
tokens.set(Math.min(burstCapacity, tokens.get() + tokensToAdd));
lastRefillTime.set(now);
}
}
public int getAvailableTokens() {
return tokens.get();
}
}
}
3.2 使用自定义限流器 #
java
@Configuration
public class RateLimitConfig {
@Bean
public CustomRateLimiter customRateLimiter() {
return new CustomRateLimiter();
}
}
yaml
spring:
cloud:
gateway:
routes:
- id: custom-rate-limit-route
uri: lb://user-service
predicates:
- Path=/user/**
filters:
- name: RequestRateLimiter
args:
rate-limiter: "#{@customRateLimiter}"
key-resolver: "#{@ipKeyResolver}"
四、熔断器 #
4.1 添加依赖 #
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>
4.2 配置熔断 #
yaml
spring:
cloud:
gateway:
routes:
- id: circuit-breaker-route
uri: lb://user-service
predicates:
- Path=/user/**
filters:
- name: CircuitBreaker
args:
name: userServiceCircuitBreaker
fallbackUri: forward:/fallback
4.3 配置Resilience4j #
yaml
resilience4j:
circuitbreaker:
configs:
default:
slidingWindowSize: 10
failureRateThreshold: 50
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 3
instances:
userServiceCircuitBreaker:
baseConfig: default
4.4 降级处理 #
java
@RestController
public class FallbackController {
@GetMapping("/fallback")
public Mono<Map<String, Object>> fallback() {
Map<String, Object> result = new HashMap<>();
result.put("code", 503);
result.put("message", "服务暂时不可用,请稍后重试");
result.put("timestamp", System.currentTimeMillis());
return Mono.just(result);
}
@GetMapping("/fallback/user")
public Mono<Map<String, Object>> userFallback() {
Map<String, Object> result = new HashMap<>();
result.put("code", 503);
result.put("message", "用户服务暂时不可用");
return Mono.just(result);
}
}
4.5 熔断状态 #
text
┌─────────────────────────────────────────┐
│ 熔断器状态机 │
└─────────────────────────────────────────┘
┌─────────┐ 失败率<阈值 ┌─────────┐
│ 关闭 │ ◄────────────────│ 半开 │
│ Closed │ │Half-Open│
└────┬────┘ └────┬────┘
│ │
│ 失败率>阈值 │ 成功
│ │
▼ │
┌─────────┐ 超时后尝试 ──────┘
│ 打开 │ ─────────────────►
│ Open │
└─────────┘
五、重试机制 #
5.1 配置重试 #
yaml
spring:
cloud:
gateway:
routes:
- id: retry-route
uri: lb://user-service
predicates:
- Path=/user/**
filters:
- name: Retry
args:
retries: 3
statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
methods: GET
exceptions: java.io.IOException
backoff:
firstBackoff: 100ms
maxBackoff: 500ms
factor: 2
basedOnPreviousValue: false
5.2 参数说明 #
| 参数 | 说明 |
|---|---|
| retries | 重试次数 |
| statuses | 触发重试的状态码 |
| methods | 触发重试的方法 |
| exceptions | 触发重试的异常 |
| backoff | 退避策略 |
六、组合使用 #
6.1 限流+熔断+重试 #
yaml
spring:
cloud:
gateway:
routes:
- id: protected-route
uri: lb://user-service
predicates:
- Path=/user/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200
key-resolver: "#{@ipKeyResolver}"
- name: CircuitBreaker
args:
name: userServiceCircuitBreaker
fallbackUri: forward:/fallback
- name: Retry
args:
retries: 3
statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
methods: GET
七、限流响应处理 #
7.1 自定义限流响应 #
java
@Configuration
public class RateLimitConfig {
@Bean
public KeyResolver ipKeyResolver() {
return exchange -> Mono.just(
exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()
);
}
}
@Component
public class RateLimitResponseHandler implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
if (ex instanceof ResponseStatusException) {
ResponseStatusException responseStatusException = (ResponseStatusException) ex;
if (responseStatusException.getStatus() == HttpStatus.TOO_MANY_REQUESTS) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
Map<String, Object> result = new HashMap<>();
result.put("code", 429);
result.put("message", "请求过于频繁,请稍后重试");
result.put("timestamp", System.currentTimeMillis());
DataBuffer buffer = exchange.getResponse().bufferFactory()
.wrap(JSON.toJSONBytes(result));
return exchange.getResponse().writeWith(Mono.just(buffer));
}
}
return Mono.error(ex);
}
}
八、监控指标 #
8.1 添加Actuator #
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
8.2 配置端点 #
yaml
management:
endpoints:
web:
exposure:
include: health,info,metrics,gateway
endpoint:
health:
show-details: always
gateway:
enabled: true
8.3 查看指标 #
bash
# 查看健康状态
curl http://localhost:8080/actuator/health
# 查看Gateway路由
curl http://localhost:8080/actuator/gateway/routes
# 查看指标
curl http://localhost:8080/actuator/metrics
九、最佳实践 #
9.1 生产环境配置 #
yaml
spring:
cloud:
gateway:
httpclient:
connect-timeout: 5000
response-timeout: 30000
routes:
- id: user-service
uri: lb://user-service
predicates:
- Path=/api/user/**
filters:
- StripPrefix=2
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200
key-resolver: "#{@ipKeyResolver}"
- name: CircuitBreaker
args:
name: userServiceCircuitBreaker
fallbackUri: forward:/fallback/user
- name: Retry
args:
retries: 2
statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
methods: GET
resilience4j:
circuitbreaker:
configs:
default:
slidingWindowSize: 20
failureRateThreshold: 50
waitDurationInOpenState: 30s
permittedNumberOfCallsInHalfOpenState: 5
instances:
userServiceCircuitBreaker:
baseConfig: default
9.2 注意事项 #
| 注意点 | 说明 |
|---|---|
| 限流Key | 合理选择限流维度 |
| 熔断配置 | 根据服务特点配置 |
| 降级响应 | 提供友好的降级信息 |
| 监控告警 | 配置限流熔断告警 |
十、总结 #
10.1 核心要点 #
| 要点 | 说明 |
|---|---|
| RequestRateLimiter | 内置限流过滤器 |
| CircuitBreaker | 熔断器过滤器 |
| Retry | 重试过滤器 |
| KeyResolver | 限流Key解析器 |
10.2 下一步 #
现在你已经掌握了Gateway的限流熔断,接下来让我们学习 服务容错,深入了解Resilience4j的使用!
最后更新:2026-03-28