diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedLeakyRateLimiter.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedLeakyRateLimiter.kt new file mode 100644 index 0000000000..20ff13e4c1 --- /dev/null +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedLeakyRateLimiter.kt @@ -0,0 +1,73 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.common.ratelimiter.algorithm + +import com.tencent.bkrepo.common.ratelimiter.exception.AcquireLockFailedException +import com.tencent.bkrepo.common.ratelimiter.redis.LuaScript +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.data.redis.core.RedisTemplate +import org.springframework.data.redis.core.script.DefaultRedisScript +import kotlin.system.measureTimeMillis + +/** + * 分布式漏桶算法实现 + */ +class DistributedLeakyRateLimiter( + private val key: String, + private val permitsPerSecond: Double, + private val capacity: Long, + private val redisTemplate: RedisTemplate, +) : RateLimiter { + override fun tryAcquire(permits: Long): Boolean { + try { + var acquireResult = false + val elapsedTime = measureTimeMillis { + val redisScript = DefaultRedisScript(LuaScript.leakyRateLimiterScript, List::class.java) + val results = redisTemplate.execute( + redisScript, getKeys(key), permitsPerSecond.toString(), capacity.toString(), permits.toString() + ) + acquireResult = results[0] == 1L + } + logger.info("acquire distributed leaky rateLimiter elapsed time: $elapsedTime") + return acquireResult + } catch (e: Exception) { + e.printStackTrace() + throw AcquireLockFailedException("distributed lock acquire failed: $e") + } + } + + private fun getKeys(key: String): List { + return listOf(key, "$key.timestamp") + } + + + companion object { + private val logger: Logger = LoggerFactory.getLogger(DistributedLeakyRateLimiter::class.java) + } +} diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedSlidingWindowRateLimiter.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedSlidingWindowRateLimiter.kt new file mode 100644 index 0000000000..30c771ab90 --- /dev/null +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedSlidingWindowRateLimiter.kt @@ -0,0 +1,75 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.common.ratelimiter.algorithm + +import com.tencent.bkrepo.common.ratelimiter.exception.AcquireLockFailedException +import com.tencent.bkrepo.common.ratelimiter.redis.LuaScript +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.data.redis.core.RedisTemplate +import org.springframework.data.redis.core.script.DefaultRedisScript +import java.util.concurrent.TimeUnit +import kotlin.system.measureTimeMillis + +/** + * 分布式滑动窗口算法实现 + */ +class DistributedSlidingWindowRateLimiter( + private val key: String, + private val limit: Long, + private val interval: Long, + private val limitUnit: TimeUnit, + private val redisTemplate: RedisTemplate, +) : RateLimiter { + override fun tryAcquire(permits: Long): Boolean { + try { + var acquireResult = false + val elapsedTime = measureTimeMillis { + val redisScript = DefaultRedisScript(LuaScript.slidingWindowRateLimiterScript, List::class.java) + val results = redisTemplate.execute( + redisScript, getKeys(key), limit.toString(), (interval * limitUnit.toSeconds(1)).toString(), permits.toString() + ) + acquireResult = results[0] == 1L + } + logger.info("acquire distributed sliding window rateLimiter elapsed time: $elapsedTime") + return acquireResult + } catch (e: Exception) { + e.printStackTrace() + throw AcquireLockFailedException("distributed lock acquire failed: $e") + } + } + + private fun getKeys(key: String): List { + return listOf(key) + } + + + companion object { + private val logger: Logger = LoggerFactory.getLogger(DistributedSlidingWindowRateLimiter::class.java) + } +} diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedTokenBucketRateLimiter.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedTokenBucketRateLimiter.kt index 3f30b89344..5d5968b5e3 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedTokenBucketRateLimiter.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/DistributedTokenBucketRateLimiter.kt @@ -45,9 +45,7 @@ class DistributedTokenBucketRateLimiter( private val redisTemplate: RedisTemplate, ) : RateLimiter { override fun tryAcquire(permits: Long): Boolean { - try { - var acquireResult = false val elapsedTime = measureTimeMillis { val redisScript = DefaultRedisScript(LuaScript.tokenBucketRateLimiterScript, List::class.java) @@ -62,7 +60,6 @@ class DistributedTokenBucketRateLimiter( e.printStackTrace() throw AcquireLockFailedException("distributed lock acquire failed: $e") } - } private fun getKeys(key: String): List { diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/FixedWindowRateLimiter.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/FixedWindowRateLimiter.kt index ccc9c2ea90..df5c1c1dfa 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/FixedWindowRateLimiter.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/FixedWindowRateLimiter.kt @@ -31,7 +31,6 @@ import com.google.common.base.Stopwatch import com.tencent.bkrepo.common.ratelimiter.constant.TRY_LOCK_TIMEOUT import com.tencent.bkrepo.common.ratelimiter.exception.AcquireLockFailedException import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock @@ -44,32 +43,28 @@ class FixedWindowRateLimiter( private val stopWatch: Stopwatch = Stopwatch.createStarted() ) : RateLimiter { - private val currentValue: AtomicLong = AtomicLong(0) - + private var currentValue: Long = 0 private val lock: Lock = ReentrantLock() override fun tryAcquire(permits: Long): Boolean { // TODO 当剩余容量少于permit时,会导致一直获取不到 - var updateValue = currentValue.incrementAndGet() - if (updateValue <= limit) { - return true - } try { if (!lock.tryLock(TRY_LOCK_TIMEOUT, TimeUnit.MILLISECONDS)) { - throw AcquireLockFailedException("tryLock wait too long: $TRY_LOCK_TIMEOUT ms") + throw AcquireLockFailedException("fix window tryLock wait too long: $TRY_LOCK_TIMEOUT ms") } try { if (stopWatch.elapsed(TimeUnit.MILLISECONDS) > unit.toMillis(1)) { - currentValue.set(0) + currentValue = 0 stopWatch.reset() } - updateValue = currentValue.addAndGet(permits) - return updateValue <= limit + if (currentValue + permits > limit) return false + currentValue += permits + return true } finally { lock.unlock() } } catch (e: InterruptedException) { - throw AcquireLockFailedException("tryLock is interrupted by lock timeout: $e") + throw AcquireLockFailedException("fix window tryLock is interrupted by lock timeout: $e") } } } diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/LeakyRateLimiter.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/LeakyRateLimiter.kt new file mode 100644 index 0000000000..b5594b1870 --- /dev/null +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/LeakyRateLimiter.kt @@ -0,0 +1,77 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.common.ratelimiter.algorithm + +import com.tencent.bkrepo.common.ratelimiter.constant.TRY_LOCK_TIMEOUT +import com.tencent.bkrepo.common.ratelimiter.exception.AcquireLockFailedException +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + + +/** + * 单机令牌桶算法实现 + */ +class LeakyRateLimiter( + private val permitsPerSecond: Double, + private val capacity: Long, +) : RateLimiter { + + // 计算的起始时间 + private var lastLeakTime = System.currentTimeMillis() + private var water: Long = 0 + private val lock: Lock = ReentrantLock() + + + override fun tryAcquire(permits: Long): Boolean { + try { + if (!lock.tryLock(TRY_LOCK_TIMEOUT, TimeUnit.MILLISECONDS)) { + throw AcquireLockFailedException("leaky tryLock wait too long: $TRY_LOCK_TIMEOUT ms") + } + try { + return allow(permits) + } finally { + lock.unlock() + } + } catch (e: InterruptedException) { + throw AcquireLockFailedException("leaky tryLock is interrupted by lock timeout: $e") + } + } + + fun allow(permits: Long): Boolean { + val now = System.currentTimeMillis() + val timeElapsed = now - lastLeakTime + water = Math.max(0, (water - timeElapsed * permitsPerSecond / 1000).toLong()) // 漏水 + lastLeakTime = now + if (water + permits <= capacity) { + water += permits + return true + } + return false + } +} diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/SlidingWindowRateLimiter.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/SlidingWindowRateLimiter.kt new file mode 100644 index 0000000000..9789b51cea --- /dev/null +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/algorithm/SlidingWindowRateLimiter.kt @@ -0,0 +1,93 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.common.ratelimiter.algorithm + +import com.tencent.bkrepo.common.ratelimiter.constant.TRY_LOCK_TIMEOUT +import com.tencent.bkrepo.common.ratelimiter.exception.AcquireLockFailedException +import java.util.LinkedList +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + +/** + * 单机令牌桶算法实现 + */ +class SlidingWindowRateLimiter( + private val limit: Long, + private val interval: Long, + private val limitUnit: TimeUnit, +) : RateLimiter { + + private val queue = LinkedList() + private var counter: Long = 0 + private var lastUpdate = System.currentTimeMillis() + private val lock: Lock = ReentrantLock() + + override fun tryAcquire(permits: Long): Boolean { + try { + if (!lock.tryLock(TRY_LOCK_TIMEOUT, TimeUnit.MILLISECONDS)) { + throw AcquireLockFailedException("sliding window tryLock wait too long: $TRY_LOCK_TIMEOUT ms") + } + try { + return allow(permits) + } finally { + lock.unlock() + } + } catch (e: InterruptedException) { + throw AcquireLockFailedException("sliding window tryLock is interrupted by lock timeout: $e") + } + } + + fun allow(permits: Long): Boolean { + val now = System.currentTimeMillis() + val slots = ((now - lastUpdate) / (interval / limitUnit.toMillis(1))).toInt() + + // 移除过期的时间片 + while (queue.size >= slots) { + val removed = queue.removeFirst() + counter -= removed + } + + // 添加新的时间片 + for (i in queue.size until slots) { + queue.addLast(0) + } + + // 如果当前时间片的请求数量已经达到窗口大小,则拒绝请求 + if (counter + permits > limit) { + return false + } + + // 更新当前时间片的请求数量,并增加计数器 + queue.addLast(queue.removeLast() + permits) + counter += permits + lastUpdate = now + return true + } + +} diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt index de9ef56872..01dd3d72cb 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt @@ -27,15 +27,12 @@ package com.tencent.bkrepo.common.ratelimiter.config -import com.tencent.bkrepo.common.ratelimiter.enums.WorkScope import com.tencent.bkrepo.common.ratelimiter.rule.ResourceLimit import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties(prefix = "rate.limiter") data class RateLimiterProperties( var enabled: Boolean = false, - // 生效范围 - var scope: WorkScope = WorkScope.LOCAL, var dryRun: Boolean = false, // 配置规则刷新频率 单位为秒 var refreshDuration: Long = 10L, diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/redis/LuaScript.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/redis/LuaScript.kt index f0b464bc8a..d147b7526b 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/redis/LuaScript.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/redis/LuaScript.kt @@ -39,19 +39,28 @@ object LuaScript { private val logger = LoggerFactory.getLogger(LuaScript::class.java) private const val FIX_WINDOW_RATE_LIMITER_FILE_PATH = "META-INF/fix-window-rate-limiter.lua" private const val TOKEN_BUCKET_RATE_LIMITER_FILE_PATH = "META-INF/token-bucket-rate-limiter.lua" + private const val SLIDING_WINDOW_RATE_LIMITER_FILE_PATH = "META-INF/sliding-window-rate-limiter.lua" + private const val LEAKY_RATE_LIMITER_FILE_PATH = "META-INF/leaky-rate-limiter.lua" lateinit var fixWindowRateLimiterScript: String lateinit var tokenBucketRateLimiterScript: String + lateinit var slidingWindowRateLimiterScript: String + lateinit var leakyRateLimiterScript: String init { val fixWindowInput = Thread.currentThread().contextClassLoader .getResourceAsStream(FIX_WINDOW_RATE_LIMITER_FILE_PATH) val tokenBucketInput = Thread.currentThread().contextClassLoader .getResourceAsStream(TOKEN_BUCKET_RATE_LIMITER_FILE_PATH) - + val slidingWindowInput = Thread.currentThread().contextClassLoader + .getResourceAsStream(SLIDING_WINDOW_RATE_LIMITER_FILE_PATH) + val leakyInput = Thread.currentThread().contextClassLoader + .getResourceAsStream(LEAKY_RATE_LIMITER_FILE_PATH) try { fixWindowRateLimiterScript = StreamUtils.copyToString(fixWindowInput, StandardCharsets.UTF_8) tokenBucketRateLimiterScript = StreamUtils.copyToString(tokenBucketInput, StandardCharsets.UTF_8) + slidingWindowRateLimiterScript = StreamUtils.copyToString(slidingWindowInput, StandardCharsets.UTF_8) + leakyRateLimiterScript = StreamUtils.copyToString(leakyInput, StandardCharsets.UTF_8) } catch (e: IOException) { logger.error("lua script Initialization failed, $e") } diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/rule/ResourceLimit.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/rule/ResourceLimit.kt index 79d68c7c8c..eccdbc3a45 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/rule/ResourceLimit.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/rule/ResourceLimit.kt @@ -29,6 +29,7 @@ package com.tencent.bkrepo.common.ratelimiter.rule import com.tencent.bkrepo.common.ratelimiter.enums.Algorithms import com.tencent.bkrepo.common.ratelimiter.enums.LimitDimension +import com.tencent.bkrepo.common.ratelimiter.enums.WorkScope import java.util.concurrent.TimeUnit /** @@ -43,8 +44,12 @@ data class ResourceLimit( var limitDimension: LimitDimension = LimitDimension.URL, // 限流值 var limit: Long = -1, - // 桶容量(令牌桶使用) - var bucketCapacity: Long? = null, // 限流时间单位 var unit: TimeUnit = TimeUnit.SECONDS, + // 桶容量(令牌桶和漏桶使用) + var capacity: Long? = null, + // 事件间隔(滑动窗口使用) + var interval: Long? = null, + // 生效范围 + var scope: WorkScope = WorkScope.LOCAL, ) diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt index d939cfed81..925450d9c5 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt @@ -84,7 +84,6 @@ abstract class AbstractBandwidthRateLimiterService( return null } val resource = buildResource(request) - val resourceLimit = rateLimitRule?.getRateLimitRule(resource) ?: rateLimitRule?.getRateLimitRule( resource, diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt index 52082b78b5..61bd28dfd9 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt @@ -29,9 +29,13 @@ package com.tencent.bkrepo.common.ratelimiter.service import com.tencent.bkrepo.common.ratelimiter.algorithm.DistributedFixedWindowRateLimiter +import com.tencent.bkrepo.common.ratelimiter.algorithm.DistributedLeakyRateLimiter +import com.tencent.bkrepo.common.ratelimiter.algorithm.DistributedSlidingWindowRateLimiter import com.tencent.bkrepo.common.ratelimiter.algorithm.DistributedTokenBucketRateLimiter import com.tencent.bkrepo.common.ratelimiter.algorithm.FixedWindowRateLimiter +import com.tencent.bkrepo.common.ratelimiter.algorithm.LeakyRateLimiter import com.tencent.bkrepo.common.ratelimiter.algorithm.RateLimiter +import com.tencent.bkrepo.common.ratelimiter.algorithm.SlidingWindowRateLimiter import com.tencent.bkrepo.common.ratelimiter.algorithm.TokenBucketRateLimiter import com.tencent.bkrepo.common.ratelimiter.config.RateLimiterProperties import com.tencent.bkrepo.common.ratelimiter.enums.Algorithms @@ -195,7 +199,7 @@ abstract class AbstractRateLimiterService( } return when (resourceLimit.algo) { Algorithms.FIXED_WINDOW -> { - if (rateLimiterProperties.scope == WorkScope.LOCAL) { + if (resourceLimit.scope == WorkScope.LOCAL) { FixedWindowRateLimiter(resourceLimit.limit, resourceLimit.unit) } else { DistributedFixedWindowRateLimiter( @@ -204,16 +208,40 @@ abstract class AbstractRateLimiterService( } } Algorithms.TOKEN_BUCKET -> { - if (rateLimiterProperties.scope == WorkScope.LOCAL) { + if (resourceLimit.scope == WorkScope.LOCAL) { val permitsPerSecond = resourceLimit.limit / resourceLimit.unit.toSeconds(1) TokenBucketRateLimiter(permitsPerSecond) } else { - if (resourceLimit.bucketCapacity == null || resourceLimit.bucketCapacity!! <= 0) { + if (resourceLimit.capacity == null || resourceLimit.capacity!! <= 0) { throw AcquireLockFailedException("Resource limit config $resourceLimit is illegal") } val permitsPerSecond = resourceLimit.limit / resourceLimit.unit.toSeconds(1).toDouble() DistributedTokenBucketRateLimiter( - resource, permitsPerSecond, resourceLimit.bucketCapacity!!, redisTemplate!! + resource, permitsPerSecond, resourceLimit.capacity!!, redisTemplate!! + ) + } + } + Algorithms.SLIDING_WINDOW -> { + val interval = resourceLimit.interval ?: 1 + if (resourceLimit.scope == WorkScope.LOCAL) { + SlidingWindowRateLimiter(resourceLimit.limit, interval, resourceLimit.unit) + } else { + DistributedSlidingWindowRateLimiter( + resource, resourceLimit.limit, interval, resourceLimit.unit, redisTemplate!! + ) + } + } + Algorithms.LEAKY_BUCKET -> { + if (resourceLimit.capacity == null || resourceLimit.capacity!! <= 0) { + throw AcquireLockFailedException("Resource limit config $resourceLimit is illegal") + } + val permitsPerSecond = resourceLimit.limit / resourceLimit.unit.toSeconds(1).toDouble() + if (resourceLimit.scope == WorkScope.LOCAL) { + LeakyRateLimiter(permitsPerSecond, resourceLimit.capacity!!) + } else { + + DistributedLeakyRateLimiter( + resource, permitsPerSecond, resourceLimit.capacity!!, redisTemplate!! ) } } diff --git a/src/backend/common/common-ratelimiter/src/main/resources/META-INF/leaky-rate-limiter.lua b/src/backend/common/common-ratelimiter/src/main/resources/META-INF/leaky-rate-limiter.lua new file mode 100644 index 0000000000..ec643f17c3 --- /dev/null +++ b/src/backend/common/common-ratelimiter/src/main/resources/META-INF/leaky-rate-limiter.lua @@ -0,0 +1,57 @@ +local leaky_bucket_key = KEYS[1] +-- last update key +local last_bucket_key = KEYS[2] +-- the rate of leak water +local rate = tonumber(ARGV[1]) +-- capacity +local capacity = tonumber(ARGV[2]) +-- request count +local requested = tonumber(ARGV[3]) +-- the key life time +local key_lifetime = math.ceil((capacity / rate) + 1) +-- current timestamp +local now = redis.call('TIME')[1] + +-- the yield of water in the bucket default 0 +local key_bucket_count = tonumber(redis.call("GET", leaky_bucket_key)) or 0 + +-- the last update time default now +local last_time = tonumber(redis.call("GET", last_bucket_key)) or now + +-- the time difference +local millis_since_last_leak = now - last_time + +-- the yield of water had lasted +local leaks = millis_since_last_leak * rate + +if leaks > 0 then + -- clean up the bucket + if leaks >= key_bucket_count then + key_bucket_count = 0 + else + -- compute the yield of water in the bucket + key_bucket_count = key_bucket_count - leaks + end + last_time = now +end + +-- is allowed pass default not allow +local is_allow = 0 + +local new_bucket_count = key_bucket_count + requested +-- allow +if new_bucket_count <= capacity then + is_allow = 1 +else + -- not allow + return {is_allow, new_bucket_count} +end + +-- update the key bucket water yield +redis.call("SETEX", leaky_bucket_key, key_lifetime, new_bucket_count) + +-- update last update time +redis.call("SETEX", last_bucket_key, key_lifetime, now) + +-- return +return {is_allow, new_bucket_count} diff --git a/src/backend/common/common-ratelimiter/src/main/resources/META-INF/sliding-window-rate-limiter.lua b/src/backend/common/common-ratelimiter/src/main/resources/META-INF/sliding-window-rate-limiter.lua new file mode 100644 index 0000000000..733c991b9f --- /dev/null +++ b/src/backend/common/common-ratelimiter/src/main/resources/META-INF/sliding-window-rate-limiter.lua @@ -0,0 +1,39 @@ +-- key: 限流器的键名 +-- limit: 限流器的容量 +-- interval: 时间窗口的长度(单位为秒) +-- count: 一次获取的令牌数量 +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local interval = tonumber(ARGV[2]) +local count = tonumber(ARGV[3]) + +-- 计算当前时间戳 +local now = tonumber(redis.call('time')[1]) + +-- 删除时间窗口之外的令牌 +redis.call('zremrangebyscore', key, 0, now - interval) + +-- 获取当前时间窗口内的令牌数量 +local current = tonumber(redis.call('zcard', key)) + +-- 如果当前令牌数量已经达到限流器的容量,则不再生成新的令牌 +if current >= limit then + return 0 +end + +-- 计算需要生成的令牌数量 +local remaining = limit - current +local allowed_num = 0 +if (remaining < count) then + return { allowed_num, remaining } +end +remaining = count +allowed_num = 1 +-- 生成令牌,并返回生成的令牌数量 +local tokens = {} +for i = 1, remaining do + table.insert(tokens, now) +end +redis.call('zadd', key, unpack(tokens)) +redis.call('expire', key, interval) +return { allowed_num, remaining } diff --git a/src/backend/common/common-ratelimiter/src/main/resources/META-INF/token-bucket-rate-limiter.lua b/src/backend/common/common-ratelimiter/src/main/resources/META-INF/token-bucket-rate-limiter.lua index e210096370..373dcc2a38 100644 --- a/src/backend/common/common-ratelimiter/src/main/resources/META-INF/token-bucket-rate-limiter.lua +++ b/src/backend/common/common-ratelimiter/src/main/resources/META-INF/token-bucket-rate-limiter.lua @@ -2,38 +2,51 @@ redis.replicate_commands() local tokens_key = KEYS[1] local timestamp_key = KEYS[2] - +-- 每秒填充速率 local rate = tonumber(ARGV[1]) +-- 令牌桶最大数量 local capacity = tonumber(ARGV[2]) +-- 消耗令牌数量 local requested = tonumber(ARGV[3]) local now = redis.call('TIME')[1] +-- 计算令牌桶填充满需要多久 local fill_time = capacity/rate +-- *2保证时间充足 local ttl = math.floor(fill_time*2) +-- 防止小于0 +if ttl < 1 then + ttl = 10 +end +-- 获取令牌桶内剩余数量 local last_tokens = tonumber(redis.call("get", tokens_key)) +-- 第一次没有数值,设置桶为满的 if last_tokens == nil then last_tokens = capacity end - +-- 获取上次更新时间 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end - +-- 本次校验和上次更新时间的间隔 local delta = math.max(0, now-last_refreshed) +-- 填充令牌,计算新的令牌桶剩余令牌数,填充不超过令牌桶上限 local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) + +-- 判断令牌数量是否足够 local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then + -- 如成功,令牌桶剩余令牌数减消耗令牌数 new_tokens = filled_tokens - requested allowed_num = 1 end -if ttl > 0 then - redis.call("setex", tokens_key, ttl, new_tokens) - redis.call("setex", timestamp_key, ttl, now) -end +-- 设置令牌桶剩余令牌数,令牌桶最后填充时间now, ttl超时时间 +redis.call("setex", tokens_key, ttl, new_tokens) +redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }