限流熔断 #

一、限流概述 #

1.1 为什么需要限流 #

text
                    无限流情况:
                    
客户端 ──► ──► ──► ──► ──► 服务
  │                           │
  └── 大量请求 ────────────────┘
                │
                ▼
            服务崩溃

                    有限流情况:
                    
客户端 ──► ──► ──► ──► ──► 网关 ──► 服务
  │                           │
  └── 大量请求 ────────────────┘
                │
                ▼
           限流保护
           正常响应

1.2 限流算法 #

算法 说明 特点
计数器 固定时间窗口计数 简单,有临界问题
滑动窗口 滑动时间窗口计数 平滑,内存占用高
令牌桶 按速率生成令牌 允许突发流量
漏桶 恒定速率处理 平滑流量

二、内置限流器 #

2.1 添加依赖 #

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

2.2 配置Redis #

yaml
spring:
  redis:
    host: localhost
    port: 6379

2.3 配置限流 #

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: rate-limit-route
          uri: lb://user-service
          predicates:
            - Path=/user/**
          filters:
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 10
                redis-rate-limiter.burstCapacity: 20
                redis-rate-limiter.requestedTokens: 1
                key-resolver: "#{@userKeyResolver}"

2.4 配置KeyResolver #

java
@Configuration
public class RateLimitConfig {

    @Bean
    public KeyResolver userKeyResolver() {
        return exchange -> {
            String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
            return Mono.just(userId != null ? userId : "anonymous");
        };
    }

    @Bean
    public KeyResolver ipKeyResolver() {
        return exchange -> Mono.just(
            exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()
        );
    }

    @Bean
    public KeyResolver apiKeyResolver() {
        return exchange -> Mono.just(
            exchange.getRequest().getPath().value()
        );
    }
}

2.5 参数说明 #

参数 说明
replenishRate 每秒补充的令牌数
burstCapacity 令牌桶容量
requestedTokens 每次请求消耗的令牌数
key-resolver 限流Key解析器

三、自定义限流器 #

3.1 自定义RateLimiter #

java
@Component
public class CustomRateLimiter extends AbstractRateLimiter<CustomRateLimiter.Config> {

    private final Map<String, RateLimiter> limiters = new ConcurrentHashMap<>();

    public CustomRateLimiter() {
        super(Config.class, "custom-rate-limiter", null);
    }

    @Override
    public Mono<Response> isAllowed(String routeId, String id) {
        RateLimiter limiter = limiters.computeIfAbsent(id, 
            key -> new RateLimiter(getConfig().getReplenishRate(), getConfig().getBurstCapacity()));
        
        boolean allowed = limiter.tryAcquire();
        
        return Mono.just(new Response(allowed, 
            Collections.singletonMap("X-RateLimit-Remaining", 
                String.valueOf(limiter.getAvailableTokens()))));
    }

    public static class Config {
        private int replenishRate = 10;
        private int burstCapacity = 20;

        public int getReplenishRate() {
            return replenishRate;
        }

        public void setReplenishRate(int replenishRate) {
            this.replenishRate = replenishRate;
        }

        public int getBurstCapacity() {
            return burstCapacity;
        }

        public void setBurstCapacity(int burstCapacity) {
            this.burstCapacity = burstCapacity;
        }
    }

    private static class RateLimiter {
        private final int replenishRate;
        private final int burstCapacity;
        private final AtomicInteger tokens;
        private final AtomicLong lastRefillTime;

        public RateLimiter(int replenishRate, int burstCapacity) {
            this.replenishRate = replenishRate;
            this.burstCapacity = burstCapacity;
            this.tokens = new AtomicInteger(burstCapacity);
            this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
        }

        public synchronized boolean tryAcquire() {
            refill();
            return tokens.getAndDecrement() > 0;
        }

        private void refill() {
            long now = System.currentTimeMillis();
            long elapsed = now - lastRefillTime.get();
            int tokensToAdd = (int) (elapsed * replenishRate / 1000);
            
            if (tokensToAdd > 0) {
                tokens.set(Math.min(burstCapacity, tokens.get() + tokensToAdd));
                lastRefillTime.set(now);
            }
        }

        public int getAvailableTokens() {
            return tokens.get();
        }
    }
}

3.2 使用自定义限流器 #

java
@Configuration
public class RateLimitConfig {

    @Bean
    public CustomRateLimiter customRateLimiter() {
        return new CustomRateLimiter();
    }
}
yaml
spring:
  cloud:
    gateway:
      routes:
        - id: custom-rate-limit-route
          uri: lb://user-service
          predicates:
            - Path=/user/**
          filters:
            - name: RequestRateLimiter
              args:
                rate-limiter: "#{@customRateLimiter}"
                key-resolver: "#{@ipKeyResolver}"

四、熔断器 #

4.1 添加依赖 #

xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>

4.2 配置熔断 #

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: circuit-breaker-route
          uri: lb://user-service
          predicates:
            - Path=/user/**
          filters:
            - name: CircuitBreaker
              args:
                name: userServiceCircuitBreaker
                fallbackUri: forward:/fallback

4.3 配置Resilience4j #

yaml
resilience4j:
  circuitbreaker:
    configs:
      default:
        slidingWindowSize: 10
        failureRateThreshold: 50
        waitDurationInOpenState: 10s
        permittedNumberOfCallsInHalfOpenState: 3
    instances:
      userServiceCircuitBreaker:
        baseConfig: default

4.4 降级处理 #

java
@RestController
public class FallbackController {

    @GetMapping("/fallback")
    public Mono<Map<String, Object>> fallback() {
        Map<String, Object> result = new HashMap<>();
        result.put("code", 503);
        result.put("message", "服务暂时不可用,请稍后重试");
        result.put("timestamp", System.currentTimeMillis());
        return Mono.just(result);
    }

    @GetMapping("/fallback/user")
    public Mono<Map<String, Object>> userFallback() {
        Map<String, Object> result = new HashMap<>();
        result.put("code", 503);
        result.put("message", "用户服务暂时不可用");
        return Mono.just(result);
    }
}

4.5 熔断状态 #

text
        ┌─────────────────────────────────────────┐
        │            熔断器状态机                  │
        └─────────────────────────────────────────┘
        
        ┌─────────┐    失败率<阈值    ┌─────────┐
        │  关闭   │ ◄────────────────│  半开   │
        │ Closed  │                  │Half-Open│
        └────┬────┘                  └────┬────┘
             │                            │
             │ 失败率>阈值                 │ 成功
             │                            │
             ▼                            │
        ┌─────────┐    超时后尝试    ──────┘
        │  打开   │ ─────────────────►
        │  Open   │
        └─────────┘

五、重试机制 #

5.1 配置重试 #

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: retry-route
          uri: lb://user-service
          predicates:
            - Path=/user/**
          filters:
            - name: Retry
              args:
                retries: 3
                statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
                methods: GET
                exceptions: java.io.IOException
                backoff:
                  firstBackoff: 100ms
                  maxBackoff: 500ms
                  factor: 2
                  basedOnPreviousValue: false

5.2 参数说明 #

参数 说明
retries 重试次数
statuses 触发重试的状态码
methods 触发重试的方法
exceptions 触发重试的异常
backoff 退避策略

六、组合使用 #

6.1 限流+熔断+重试 #

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: protected-route
          uri: lb://user-service
          predicates:
            - Path=/user/**
          filters:
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 100
                redis-rate-limiter.burstCapacity: 200
                key-resolver: "#{@ipKeyResolver}"
            - name: CircuitBreaker
              args:
                name: userServiceCircuitBreaker
                fallbackUri: forward:/fallback
            - name: Retry
              args:
                retries: 3
                statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
                methods: GET

七、限流响应处理 #

7.1 自定义限流响应 #

java
@Configuration
public class RateLimitConfig {

    @Bean
    public KeyResolver ipKeyResolver() {
        return exchange -> Mono.just(
            exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()
        );
    }
}

@Component
public class RateLimitResponseHandler implements WebExceptionHandler {

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        if (ex instanceof ResponseStatusException) {
            ResponseStatusException responseStatusException = (ResponseStatusException) ex;
            
            if (responseStatusException.getStatus() == HttpStatus.TOO_MANY_REQUESTS) {
                exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
                
                Map<String, Object> result = new HashMap<>();
                result.put("code", 429);
                result.put("message", "请求过于频繁,请稍后重试");
                result.put("timestamp", System.currentTimeMillis());
                
                DataBuffer buffer = exchange.getResponse().bufferFactory()
                        .wrap(JSON.toJSONBytes(result));
                
                return exchange.getResponse().writeWith(Mono.just(buffer));
            }
        }
        
        return Mono.error(ex);
    }
}

八、监控指标 #

8.1 添加Actuator #

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

8.2 配置端点 #

yaml
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,gateway
  endpoint:
    health:
      show-details: always
    gateway:
      enabled: true

8.3 查看指标 #

bash
# 查看健康状态
curl http://localhost:8080/actuator/health

# 查看Gateway路由
curl http://localhost:8080/actuator/gateway/routes

# 查看指标
curl http://localhost:8080/actuator/metrics

九、最佳实践 #

9.1 生产环境配置 #

yaml
spring:
  cloud:
    gateway:
      httpclient:
        connect-timeout: 5000
        response-timeout: 30000
      routes:
        - id: user-service
          uri: lb://user-service
          predicates:
            - Path=/api/user/**
          filters:
            - StripPrefix=2
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 100
                redis-rate-limiter.burstCapacity: 200
                key-resolver: "#{@ipKeyResolver}"
            - name: CircuitBreaker
              args:
                name: userServiceCircuitBreaker
                fallbackUri: forward:/fallback/user
            - name: Retry
              args:
                retries: 2
                statuses: BAD_GATEWAY,SERVICE_UNAVAILABLE
                methods: GET

resilience4j:
  circuitbreaker:
    configs:
      default:
        slidingWindowSize: 20
        failureRateThreshold: 50
        waitDurationInOpenState: 30s
        permittedNumberOfCallsInHalfOpenState: 5
    instances:
      userServiceCircuitBreaker:
        baseConfig: default

9.2 注意事项 #

注意点 说明
限流Key 合理选择限流维度
熔断配置 根据服务特点配置
降级响应 提供友好的降级信息
监控告警 配置限流熔断告警

十、总结 #

10.1 核心要点 #

要点 说明
RequestRateLimiter 内置限流过滤器
CircuitBreaker 熔断器过滤器
Retry 重试过滤器
KeyResolver 限流Key解析器

10.2 下一步 #

现在你已经掌握了Gateway的限流熔断,接下来让我们学习 服务容错,深入了解Resilience4j的使用!

最后更新:2026-03-28