SpringCloud Gateway——限流原理分析

SpringCloud Gateway——限流原理分析

1. SpringCloud Gateway中限流使用

这里以路由ID为限流维度,Redis做限流逻辑,即每个路由ID在1秒内可以接收多少个请求。

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>

    <!-- redis限流 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

application.yml

spring:
  application:
    name: nacos-gateway
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1
        namespace: 405e877e-56ab-4755-8dd8-6541e1ee4845
        group: nacos-dev
        username: nacos
        password: nacos
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
        - id: app
          uri: lb://nacos-app
          predicates:
            - Path=/app/**
          filters:
            - StripPrefix=1
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 10
                redis-rate-limiter.burstCapacity: 10
                key-resolver: "#{@routeIdKeyResolver}"

  redis:
    host: 127.0.0.1
    port: 6379

RateLimitConfig: 实例化RouteIdKeyResolver路由ID维度解析器

/**
 * @author Tarzan写bug
 * @since 2022/10/13
 */

@Configuration
public class RateLimitConfig {

    @Bean(value = "routeIdKeyResolver")
    public KeyResolver routeIdKeyResolver() {
        return exchange -> {
            Route route = (Route) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
            return Mono.just(route.getId());
        };
    }
}

2. 源码分析

1. 主流程

  1. 请求到达RequestRateLimiterGatewayFilterFactory
  2. 通过KeyResolver解析出限流维度ID;
  3. 通过redis lua脚本判断是否允许访问。

2. 过滤器RequestRateLimiterGatewayFilterFactory

启动Gateway时会初始化RequestRateLimiterGatewayFilterFactory

GatewayAutoConfiguration

@Bean
@ConditionalOnBean({ RateLimiter.classKeyResolver.class })
public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(
    RateLimiter rateLimiterKeyResolver resolver
{
    return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver);
}

@ConditionalOnBean中可以看出初始化这个过滤器需要RateLimiterKeyResolver两个关键类,后面会分析这两个类。

根据XxxGatewayFilterFactory结构,核心逻辑在apply()

RequestRateLimiterGatewayFilterFactory

@SuppressWarnings("unchecked")
@Override
public GatewayFilter apply(Config config) {
    KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
    RateLimiter<Object> limiter = getOrDefault(config.rateLimiter,
                                               defaultRateLimiter);
    boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
    HttpStatusHolder emptyKeyStatus = HttpStatusHolder
        .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

    // 调用KeyResolver.resolve()解析出限流的维度
    return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY)
        .flatMap(key -> {
            // 没找到则通过过滤器
            if (EMPTY_KEY.equals(key)) {
                if (denyEmpty) {
                    setResponseStatus(exchange, emptyKeyStatus);
                    return exchange.getResponse().setComplete();
                }
                return chain.filter(exchange);
            }
            String routeId = config.getRouteId();
            if (routeId == null) {
                Route route = exchange
                    .getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                routeId = route.getId();
            }
            // 调用RateLimiter.isAllowed()判断是否允许访问
            return limiter.isAllowed(routeId, key).flatMap(response -> {

                // 返回的response中的header加入到exchange header中
                for (Map.Entry<String, String> header : response.getHeaders()
                     .entrySet()) {
                    exchange.getResponse().getHeaders().add(header.getKey(),
                                                            header.getValue());
                }

                // 允许访问则通过过滤器
                if (response.isAllowed()) {
                    return chain.filter(exchange);
                }

                // 被限流,返回HTTP状态码429
                setResponseStatus(exchange, config.getStatusCode());
                return exchange.getResponse().setComplete();
            });
        });
}

首先通过KeyResolver.resolve()解析出限流的维度ID,然后调用RateLimiter.isAllowed()判断是否被限制访问。

3. KeyResolver

接口是用来解析限流的维度的,这些维度包括路由ID、请求IP、请求路径等。如果没有自定义维度,Gateway提供了默认的实现PrincipalNameKeyResolver,这里自定义实现了以路由ID为维度的解析器。主要就是从Route获取ID.

@Bean(value = "routeIdKeyResolver")
public KeyResolver routeIdKeyResolver() {
    return exchange -> {
        Route route = (Route) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
        return Mono.just(route.getId());
    };
}

4. RedisRateLimiter

RedisRateLimiterRateLimiter的一种实现,通过redis lua脚本来实现限流算法。首先看看何时初始化的?

GatewayRedisAutoConfiguration

@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@ConditionalOnBean(ReactiveRedisTemplate.class)
@ConditionalOnClass(
{ RedisTemplate.classDispatcherHandler.class })
class GatewayRedisAutoConfiguration 
{

    @Bean
    @SuppressWarnings("unchecked")
    public RedisScript redisRequestRateLimiterScript() {
        DefaultRedisScript redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(
            new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
        redisScript.setResultType(List.class);
        return redisScript;
    }

    @Bean
    @ConditionalOnMissingBean
    public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
                                             @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME)
 RedisScript<List<Long>> redisScript,
                                             ConfigurationService configurationService) 
{
        return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
    }

}

只有引入redis依赖才会初始化这个配置类,在GatewayRedisAutoConfiguration中出初始化两个Bean,一个是读取META-INF/scripts/request_rate_limiter.lua路径下的lua脚本,另外一个就是初始化RedisRateLimiter.

RedisRateLimiter

@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
    if (!this.initialized.get()) {
        throw new IllegalStateException("RedisRateLimiter is not initialized");
    }

    // 获取限流配置
    Config routeConfig = loadConfiguration(routeId);

    // 生产速率,每秒生产多少个令牌
    int replenishRate = routeConfig.getReplenishRate();

    // 最大访问数量
    int burstCapacity = routeConfig.getBurstCapacity();

    // 每个请求消耗多个个令牌
    int requestedTokens = routeConfig.getRequestedTokens();

    try {
        // 根据限流维度得出redis key
        List<String> keys = getKeys(id);

        // lua脚本参数
        List<String> scriptArgs = Arrays.asList(replenishRate + "",
                                                burstCapacity + "", Instant.now().getEpochSecond() + "",
                                                requestedTokens + "");
        // 运行lua脚本
        Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
                                                           scriptArgs);
        return flux.onErrorResume(throwable -> {
            if (log.isDebugEnabled()) {
                log.debug("Error calling rate limiter lua", throwable);
            }
            return Flux.just(Arrays.asList(1L, -1L));
        }).reduce(new ArrayList<Long>(), (longs, l) -> {
            // 从lua返回两个返回值,是否允许访问和剩余令牌数量
            // 将结果存入List中
            longs.addAll(l);
            return longs;
        }).map(results -> {
            boolean allowed = results.get(0) == 1L;
            Long tokensLeft = results.get(1);

            // 构造Response返回
            Response response = new Response(allowed,
                                             getHeaders(routeConfig, tokensLeft));

            if (log.isDebugEnabled()) {
                log.debug("response: " + response);
            }
            return response;
        });
    }
    catch (Exception e) {
        log.error("Error determining if user allowed from redis", e);
    }
    return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}

该方法调用redis lua脚本实现限流逻辑,那我们来看看lua脚本的逻辑

META-INF/scripts/request_rate_limiter.lua

--redis key名,用于保存限流维度下剩余令牌数量,request_rate_limiter.{id}.tokens
local tokens_key = KEYS[1]
--redis key名,用于保存限流维度下最近获取令牌时间,request_rate_limiter.{id}.timestamp
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

--生产速率,每秒生产多少个令牌
local rate = tonumber(ARGV[1])
--容量
local capacity = tonumber(ARGV[2])
--当前时间(秒级时间戳)
local now = tonumber(ARGV[3])
--每个请求消耗的令牌个数
local requested = tonumber(ARGV[4])

--填充时间=容量/生产速率
local fill_time = capacity/rate
--key过期时间设置为填充时间的2倍
local ttl = math.floor(fill_time*2)

--剩余令牌数量
local last_tokens = tonumber(redis.call("get", tokens_key))
--不存在key,则初始化令牌数量为最大容量
if last_tokens == nil then
  last_tokens = capacity
end

--最近获取令牌秒级时间戳
local last_refreshed = tonumber(redis.call("get", timestamp_key))
--不存在key,则last_refreshed = 0
if last_refreshed == nil then
  last_refreshed = 0
end

--距离上次获取令牌时间相差多少秒
local delta = math.max(0, now-last_refreshed)
--计算当前令牌数量(考虑delta时间内生成的令牌个数=delta*速率)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
--当前令牌数量是否大于1
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens

local allowed_num = 0
--允许访问,新令牌数量-1,allowed_num=1
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end


--保存令牌个数和最近获取令牌时间
if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

return { allowed_num, new_tokens }

流程图:

SpringCloud Gateway——限流原理分析

SpringCloud Gateway——限流原理分析

限流逻辑的关键点在计算当前令牌的个数,要考虑间隔时间内生产出的令牌个数:

local filled_tokens = math.min(capacity, last_tokens+(delta*rate))

这里为什么要取最小值呢?因为有一种场景是上一秒的令牌没有用完,这时会导致last_tokens+(delta*rate)>最大容量,导致限流不准确。这里这样做的目的是,不管上一秒令牌数量是否用完,在下一秒的令牌个数都是初始容量。

这里可能会考虑一秒内的滑动窗口问题,其实这里不会考虑毫秒级,因为RedisRateLimiter传参到lua脚本时用到是Instant.now().getEpochSecond()秒级时间戳,所以并发的时候只有同一秒或差几秒的说法,这里还要提醒的就是秒级时间戳相减后的结果就是等于相差了几秒。

世界那么大,感谢遇见,未来可期…

欢迎同频共振的那一部分人

作者公众号:Tarzan写bug


原文始发于微信公众号(Tarzan写bug):SpringCloud Gateway——限流原理分析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/45891.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!