Skip to content

Commit

Permalink
feat: 支持异步刷新devx ip缓存 TencentBlueKing#2282
Browse files Browse the repository at this point in the history
* feat: 支持异步刷新devx ip缓存 TencentBlueKing#2282

* feat: 支持异步刷新devx ip缓存 TencentBlueKing#2282
  • Loading branch information
cnlkl authored Jul 10, 2024
1 parent 58ce5f0 commit ccf97a0
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.api.constant.ANONYMOUS_USER
import com.tencent.bkrepo.common.api.exception.SystemErrorException
import com.tencent.bkrepo.common.api.message.CommonMessageCode
Expand All @@ -48,6 +51,8 @@ import okhttp3.Request
import org.slf4j.LoggerFactory
import org.springframework.web.servlet.HandlerInterceptor
import org.springframework.web.servlet.HandlerMapping
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse

Expand All @@ -56,10 +61,24 @@ import javax.servlet.http.HttpServletResponse
* */
open class DevXAccessInterceptor(private val devXProperties: DevXProperties) : HandlerInterceptor {
private val httpClient = OkHttpClient.Builder().build()
private val executor by lazy {
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
ThreadFactoryBuilder().setNameFormat("devx-access-%d").build(),
)
}
private val projectIpsCache: LoadingCache<String, Set<String>> = CacheBuilder.newBuilder()
.maximumSize(devXProperties.cacheSize)
.expireAfterWrite(devXProperties.cacheExpireTime)
.build(CacheLoader.from { key -> listIpFromProject(key) + listCvmIpFromProject(key) + listIpFromProps(key) })
.refreshAfterWrite(devXProperties.cacheExpireTime)
.build(object : CacheLoader<String, Set<String>>() {
override fun load(key: String): Set<String> {
return listIpFromProject(key) + listCvmIpFromProject(key) + listIpFromProps(key)
}

override fun reload(key: String, oldValue: Set<String>): ListenableFuture<Set<String>> {
return Futures.submit(Callable { load(key) }, executor)
}
})

override fun preHandle(request: HttpServletRequest, response: HttpServletResponse, handler: Any): Boolean {
val user = SecurityUtils.getUserId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ class DevXAccessFilter(
}

private suspend fun checkIpBelongToProject(projectId: String, srcIp: String, isRetry: Boolean = false) {
val projectIps = DevxWorkspaceUtils.getIpList(projectId).awaitSingle()
val projectIps = if(isRetry){
DevxWorkspaceUtils.getLatestIpList(projectId).awaitSingle()
} else {
DevxWorkspaceUtils.getIpList(projectId).awaitSingle()
}
val notBelong = srcIp !in projectIps &&
!projectIps.any { it.contains('/') && IpUtils.isInRange(srcIp, it) }
if (notBelong && !isRetry && !DevxWorkspaceUtils.knownIllegalIp(srcIp, projectId)) {
DevxWorkspaceUtils.refreshIpListCache(projectId)
return checkIpBelongToProject(projectId, srcIp, true)
}
if (notBelong) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

package com.tencent.bkrepo.fs.server.utils

import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.api.util.toJsonString
import com.tencent.bkrepo.common.security.interceptor.devx.ApiAuth
import com.tencent.bkrepo.common.security.interceptor.devx.DevXCvmWorkspace
Expand All @@ -38,8 +40,6 @@ import com.tencent.bkrepo.common.security.interceptor.devx.QueryResponse
import com.tencent.bkrepo.fs.server.context.ReactiveRequestContextHolder
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.slf4j.LoggerFactory
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.HttpStatus
Expand All @@ -48,13 +48,13 @@ import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody
import reactor.core.publisher.Mono
import reactor.core.publisher.toMono
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.PrematureCloseException
import reactor.netty.resources.ConnectionProvider
import reactor.util.retry.RetryBackoffSpec
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.Executors

class DevxWorkspaceUtils(
devXProperties: DevXProperties
Expand All @@ -74,38 +74,29 @@ class DevxWorkspaceUtils(
WebClient.builder().clientConnector(connector).build()
}

private val mutex = Mutex()
private val illegalIp by lazy { CacheBuilder.newBuilder()
.expireAfterWrite(devXProperties.cacheExpireTime)
.maximumSize(devXProperties.cacheSize)
.build<String, String>() }
private val projectIpsCache: ConcurrentHashMap<String, Mono<Set<String>>> by lazy {
ConcurrentHashMap(devXProperties.cacheSize.toInt())
private val illegalIp by lazy {
Caffeine.newBuilder()
.expireAfterWrite(devXProperties.cacheExpireTime)
.maximumSize(devXProperties.cacheSize)
.build<String, String>()
}

suspend fun getIpList(projectId: String): Mono<Set<String>> {
return projectIpsCache[projectId] ?: requestIpList(projectId)
private val executor by lazy {
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
ThreadFactoryBuilder().setNameFormat("devx-access-%d").build(),
)
}

/**
* 获取项目ip列表, 5s内获取不到ip列表则返回空
* 为了避免接口异常,阻塞大量请求, 获取锁的超时比请求读超时短
*/
private suspend fun requestIpList(projectId: String): Mono<Set<String>> {
val start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < TimeUnit.SECONDS.toMillis(5)) {
if (mutex.tryLock()) {
val ipList = try {
projectIpsCache.getOrPut(projectId) { listIp(projectId) }
} finally {
mutex.unlock()
}
return ipList
}
}
return Mono.empty()
private val projectIpsCache: AsyncLoadingCache<String, Set<String>> by lazy {
Caffeine.newBuilder()
.refreshAfterWrite(devXProperties.cacheExpireTime)
.executor(executor)
.maximumSize(devXProperties.cacheSize)
.buildAsync { k -> listIp(k).block() }
}

fun getIpList(projectId: String): Mono<Set<String>> {
return projectIpsCache[projectId].toMono()
}

/**
* 是否为已知非法ip
Expand All @@ -125,10 +116,8 @@ class DevxWorkspaceUtils(
illegalIp.put(ip, projectId)
}

suspend fun refreshIpListCache(projectId: String) {
mutex.withLock {
projectIpsCache[projectId] = listIp(projectId)
}
fun getLatestIpList(projectId: String): Mono<Set<String>> {
return listIp(projectId)
}

suspend fun getWorkspace(): Mono<DevXWorkSpace?> {
Expand All @@ -142,7 +131,6 @@ class DevxWorkspaceUtils(
private fun listIp(projectId: String): Mono<Set<String>> {
return Mono.zip(listIpFromProject(projectId), listIpFromProps(projectId), listCvmIpFromProject(projectId))
.map { it.t1 + it.t2 + it.t3 }
.cache(devXProperties.cacheExpireTime)
}

private fun listIpFromProject(projectId: String): Mono<Set<String>> {
Expand Down

0 comments on commit ccf97a0

Please sign in to comment.