diff --git a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/api/proxy/ProxyRepositoryClient.kt b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/api/proxy/ProxyRepositoryClient.kt index 8a76c5aa14..724fc5387f 100644 --- a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/api/proxy/ProxyRepositoryClient.kt +++ b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/api/proxy/ProxyRepositoryClient.kt @@ -64,11 +64,4 @@ interface ProxyRepositoryClient { @ApiParam(value = "仓库名称", required = true) @PathVariable repoName: String ): Response - - @ApiOperation("查询仓库列表") - @GetMapping("/list/{projectId}") - fun listRepo( - @ApiParam(value = "所属项目", required = true) - @PathVariable projectId: String - ): Response> } diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/controller/proxy/ProxyRepositoryController.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/controller/proxy/ProxyRepositoryController.kt index 2306387577..6833fa0a58 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/controller/proxy/ProxyRepositoryController.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/controller/proxy/ProxyRepositoryController.kt @@ -46,8 +46,4 @@ class ProxyRepositoryController( override fun getRepoInfo(projectId: String, repoName: String): Response { return ResponseBuilder.success(repositoryService.getRepoInfo(projectId, repoName, null)) } - - override fun listRepo(projectId: String): Response> { - return ResponseBuilder.success(repositoryService.listRepo(projectId)) - } } diff --git a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyCleanFileVisitor.kt b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyCleanFileVisitor.kt new file mode 100644 index 0000000000..4e2d12bdb3 --- /dev/null +++ b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyCleanFileVisitor.kt @@ -0,0 +1,91 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.proxy.artifact.storage + +import com.tencent.bkrepo.common.service.exception.RemoteErrorCodeException +import com.tencent.bkrepo.common.storage.filesystem.ArtifactFileVisitor +import com.tencent.bkrepo.repository.api.proxy.ProxyFileReferenceClient +import org.slf4j.LoggerFactory +import java.io.File +import java.nio.file.FileVisitResult +import java.nio.file.Path +import java.nio.file.attribute.BasicFileAttributes + +/** + * 不同步时,清理本地存储的文件遍历器 + */ +class ProxyCleanFileVisitor( + private val proxyFileReferenceClient: ProxyFileReferenceClient, + private val cacheExpireDays: Int +) : ArtifactFileVisitor() { + override fun needWalk(): Boolean { + return true + } + + override fun visitFile(filePath: Path, attrs: BasicFileAttributes?): FileVisitResult { + try { + if (filePath.toString().endsWith(".sync")) { + val syncFile = filePath.toFile() + val dataFile = File(filePath.toString().removeSuffix(".sync")) + cleanFile(dataFile, syncFile) + } else { + ProxyStorageUtils.deleteCacheFile(filePath, cacheExpireDays) + } + } catch (e: Exception) { + logger.error("clean file error: ", e) + } + return FileVisitResult.CONTINUE + } + + /** + * 当文件索引为0时,删除本地存储 + */ + private fun cleanFile(dataFile: File, syncFile: File) { + val sha256 = dataFile.name + val credentialKeys = ProxyStorageUtils.readStorageCredentialKeys(syncFile) + try { + credentialKeys.forEach { + val fileReference = proxyFileReferenceClient.count(sha256, it).data!! + if (fileReference > 0) { + logger.info("skip clean file[$sha256] which file reference is $fileReference on storage[$it]") + return + } + } + } catch (e: RemoteErrorCodeException) { + logger.warn("get sha256[$sha256] file reference failed: ${e.errorMessage}") + return + } + dataFile.delete() + syncFile.delete() + logger.info("clean file[$sha256] success") + } + + companion object { + private val logger = LoggerFactory.getLogger(ProxyCleanFileVisitor::class.java) + } +} diff --git a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageService.kt b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageService.kt index 3809c2601f..952cd5154c 100644 --- a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageService.kt +++ b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageService.kt @@ -28,15 +28,16 @@ package com.tencent.bkrepo.proxy.artifact.storage import com.tencent.bkrepo.common.api.constant.ensureSuffix +import com.tencent.bkrepo.common.api.util.StreamUtils.readText import com.tencent.bkrepo.common.artifact.api.ArtifactFile import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream -import com.tencent.bkrepo.common.artifact.stream.EmptyInputStream import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.artifact.stream.artifactStream import com.tencent.bkrepo.common.storage.core.AbstractStorageService import com.tencent.bkrepo.common.storage.credentials.FileSystemCredentials import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import com.tencent.bkrepo.common.storage.filesystem.FileSystemClient +import com.tencent.bkrepo.repository.api.proxy.ProxyFileReferenceClient import org.apache.commons.io.IOUtils import java.nio.charset.Charset import java.util.concurrent.atomic.AtomicBoolean @@ -45,6 +46,7 @@ class ProxyStorageService : AbstractStorageService() { /** * 多存储一个sha256.sync文件,用来标记对应sha256的文件待同步至服务端 + * sha256.sync文件中记录credentialsKey */ override fun doStore( path: String, @@ -53,22 +55,24 @@ class ProxyStorageService : AbstractStorageService() { credentials: StorageCredentials, cancel: AtomicBoolean? ) { + val proxyCredentials = storageProperties.defaultStorageCredentials() when { artifactFile.isInMemory() -> { - fileStorage.store(path, filename, artifactFile.getInputStream(), artifactFile.getSize(), credentials) + fileStorage.store( + path, filename, artifactFile.getInputStream(), artifactFile.getSize(), proxyCredentials + ) } artifactFile.isFallback() -> { - fileStorage.store(path, filename, artifactFile.flushToFile(), credentials) + fileStorage.store(path, filename, artifactFile.flushToFile(), proxyCredentials) } else -> { - fileStorage.store(path, filename, artifactFile.flushToFile(), credentials) + fileStorage.store(path, filename, artifactFile.flushToFile(), proxyCredentials) } } val syncFilename = filename.ensureSuffix(".sync") - val inputStream = - credentials.key?.let { IOUtils.toInputStream(it, Charset.defaultCharset()) } ?: EmptyInputStream() - val size = credentials.key?.length?.toLong() ?: 0L - fileStorage.store(path, syncFilename, inputStream, size, credentials) + val inputStream = IOUtils.toInputStream(credentials.key.toString(), Charset.defaultCharset()) + val size = credentials.key.toString().length.toLong() + fileStorage.store(path, syncFilename, inputStream, size, storageProperties.defaultStorageCredentials()) } override fun doLoad( @@ -77,25 +81,62 @@ class ProxyStorageService : AbstractStorageService() { range: Range, credentials: StorageCredentials ): ArtifactInputStream? { - return fileStorage.load(path, filename, range, credentials)?.artifactStream(range) + return fileStorage.load(path, filename, range, storageProperties.defaultStorageCredentials()) + ?.artifactStream(range) } override fun doDelete(path: String, filename: String, credentials: StorageCredentials) { - return fileStorage.delete(path, filename, credentials) + return fileStorage.delete(path, filename, storageProperties.defaultStorageCredentials()) } override fun doExist(path: String, filename: String, credentials: StorageCredentials): Boolean { - return fileStorage.exist(path, filename, credentials) + val proxyCredentials = storageProperties.defaultStorageCredentials() + val dataFileExist = fileStorage.exist(path, filename, proxyCredentials) + if (!dataFileExist) { + return false + } + + // 数据文件存在,确认对应credentialsKey已记录 + val syncFileName = filename.plus(".sync") + val syncFileExist = fileStorage.exist(path, syncFileName, proxyCredentials) + if (syncFileExist) { + val keys = fileStorage.load(path, syncFileName, Range.FULL_RANGE, proxyCredentials)?.use { + it.readText().lines().toMutableSet() + } + keys?.add(credentials.key.toString()) + val content = keys.orEmpty().joinToString(System.lineSeparator()) + val inputStream = IOUtils.toInputStream(content, Charset.defaultCharset()) + val size = content.length.toLong() + fileStorage.delete(path, syncFileName, proxyCredentials) + fileStorage.store(path, syncFileName, inputStream, size, proxyCredentials) + } else { + val inputStream = IOUtils.toInputStream(credentials.key.toString(), Charset.defaultCharset()) + val size = credentials.key.toString().length.toLong() + fileStorage.store(path, syncFileName, inputStream, size, storageProperties.defaultStorageCredentials()) + } + return true } /** * 同步数据到服务端 */ fun sync(rate: Long, cacheExpireDays: Int) { - val credentials = getCredentialsOrDefault(null) + val credentials = storageProperties.defaultStorageCredentials() val visitor = ProxySyncFileVisitor(rate, cacheExpireDays) require(credentials is FileSystemCredentials) FileSystemClient(credentials.path).walk(visitor) - return + } + + /** + * 不同步时,清理已删除文件对应存储 + */ + fun clean( + proxyFileReferenceClient: ProxyFileReferenceClient, + cacheExpireDays: Int + ) { + val credentials = storageProperties.defaultStorageCredentials() + val visitor = ProxyCleanFileVisitor(proxyFileReferenceClient, cacheExpireDays) + require(credentials is FileSystemCredentials) + FileSystemClient(credentials.path).walk(visitor) } } diff --git a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageUtils.kt b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageUtils.kt new file mode 100644 index 0000000000..b77bc0e3f8 --- /dev/null +++ b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxyStorageUtils.kt @@ -0,0 +1,74 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.proxy.artifact.storage + +import com.tencent.bkrepo.common.storage.util.existReal +import java.io.File +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.Paths +import java.nio.file.attribute.BasicFileAttributeView +import java.time.Instant +import java.time.temporal.ChronoUnit + +object ProxyStorageUtils { + + fun deleteCacheFile(filePath: Path, cacheExpireDays: Int) { + if (filePath.toFile().isDirectory) { + return + } + val syncFilePath = Paths.get(filePath.toString().plus(".sync")) + // 还未同步到服务端 + if (syncFilePath.existReal()) { + return + } + val view = Files.getFileAttributeView(filePath, BasicFileAttributeView::class.java) + val attributes = view.readAttributes() + val aTime = attributes.lastAccessTime().toInstant() + val expireTime = Instant.now().minus(cacheExpireDays.toLong(), ChronoUnit.DAYS) + if (aTime.isBefore(expireTime)) { + filePath.toFile().delete() + } + } + + fun readStorageCredentialKeys(syncFile: File): List { + if (syncFile.length() == 0L) { + return listOf(null) + } + + val keys = mutableListOf() + syncFile.readLines().forEach { + if (it == "null") { + keys.add(null) + } else { + keys.add(it) + } + } + return keys + } +} diff --git a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxySyncFileVisitor.kt b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxySyncFileVisitor.kt index 5c2815d719..f88beb9962 100644 --- a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxySyncFileVisitor.kt +++ b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/artifact/storage/ProxySyncFileVisitor.kt @@ -30,11 +30,11 @@ package com.tencent.bkrepo.proxy.artifact.storage import com.tencent.bkrepo.common.artifact.stream.rateLimit import com.tencent.bkrepo.common.artifact.util.http.StreamRequestBody import com.tencent.bkrepo.common.service.proxy.ProxyEnv +import com.tencent.bkrepo.common.service.proxy.ProxyFeignClientFactory import com.tencent.bkrepo.common.service.proxy.ProxyRequestInterceptor import com.tencent.bkrepo.common.service.util.okhttp.HttpClientBuilderFactory import com.tencent.bkrepo.common.storage.filesystem.ArtifactFileVisitor -import com.tencent.bkrepo.common.storage.util.delete -import com.tencent.bkrepo.common.storage.util.existReal +import com.tencent.bkrepo.replication.api.proxy.ProxyBlobReplicaClient import com.tencent.bkrepo.replication.constant.FILE import com.tencent.bkrepo.replication.constant.SHA256 import com.tencent.bkrepo.replication.constant.SIZE @@ -44,13 +44,8 @@ import okhttp3.Request import org.slf4j.LoggerFactory import java.io.File import java.nio.file.FileVisitResult -import java.nio.file.Files import java.nio.file.Path -import java.nio.file.Paths -import java.nio.file.attribute.BasicFileAttributeView import java.nio.file.attribute.BasicFileAttributes -import java.time.Instant -import java.time.temporal.ChronoUnit class ProxySyncFileVisitor( private val rate: Long, @@ -58,6 +53,9 @@ class ProxySyncFileVisitor( ) : ArtifactFileVisitor() { private val httpClient = HttpClientBuilderFactory.create().addInterceptor(ProxyRequestInterceptor()).build() + private val proxyBlobReplicaClient: ProxyBlobReplicaClient by lazy { + ProxyFeignClientFactory.create("replication") + } override fun needWalk(): Boolean { return true @@ -69,7 +67,7 @@ class ProxySyncFileVisitor( val file = File(filePath.toString().removeSuffix(".sync")) syncFile(filePath, file) } else { - deleteCacheFile(filePath) + ProxyStorageUtils.deleteCacheFile(filePath, cacheExpireDays) } } catch (e: Exception) { logger.error("sync file error: ", e) @@ -79,11 +77,20 @@ class ProxySyncFileVisitor( private fun syncFile(filePath: Path, file: File) { val syncFile = filePath.toFile() - val storageKey = if (syncFile.length() == 0L) null else syncFile.readText() + val storageKeys = ProxyStorageUtils.readStorageCredentialKeys(syncFile) + storageKeys.forEach { storageKey -> + blobPush(file, storageKey) + } + syncFile.delete() + } + private fun blobPush(file: File, storageKey: String?) { val sha256 = file.canonicalPath.split(File.separator).last() + if (proxyBlobReplicaClient.check(sha256, storageKey).data == true) { + return + } val url = "$gateway/replication/proxy/replica/blob/push" - logger.info("start sync file[${file.canonicalPath}]") + logger.info("start sync file[${file.canonicalPath}] on storage credential[$storageKey]") val inputStream = if (rate > 0) { file.inputStream().rateLimit(rate) } else { @@ -99,32 +106,14 @@ class ProxySyncFileVisitor( val request = Request.Builder().url(url).post(requestBody).build() httpClient.newCall(request).execute().use { if (it.isSuccessful) { - logger.info("sync file[${file.canonicalPath}] success") - filePath.delete() + logger.info("sync file[${file.canonicalPath}] success on storage credential[$storageKey]") } else { - logger.error("sync file[${file.canonicalPath}] error: ${it.code}, ${it.body?.string()}") + logger.error("sync file[${file.canonicalPath}] error on storage credential[$storageKey]:" + + " ${it.code}, ${it.body?.string()}") } } } - private fun deleteCacheFile(filePath: Path) { - if (filePath.toFile().isDirectory) { - return - } - val syncFilePath = Paths.get(filePath.toString().plus(".sync")) - // 还未同步到服务端 - if (syncFilePath.existReal()) { - return - } - val view = Files.getFileAttributeView(filePath, BasicFileAttributeView::class.java) - val attributes = view.readAttributes() - val aTime = attributes.lastAccessTime().toInstant() - val expireTime = Instant.now().minus(cacheExpireDays.toLong(), ChronoUnit.DAYS) - if (aTime.isBefore(expireTime)) { - filePath.toFile().deleteOnExit() - } - } - companion object { private val logger = LoggerFactory.getLogger(ProxySyncFileVisitor::class.java) private val gateway = ProxyEnv.getGateway() diff --git a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/job/ProxySyncJob.kt b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/job/ProxySyncJob.kt index 9408f80e10..0b9d6f1814 100644 --- a/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/job/ProxySyncJob.kt +++ b/src/proxy/biz-proxy/src/main/kotlin/com/tencent/bkrepo/proxy/job/ProxySyncJob.kt @@ -32,6 +32,7 @@ import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.service.proxy.ProxyEnv import com.tencent.bkrepo.common.service.proxy.ProxyFeignClientFactory import com.tencent.bkrepo.proxy.artifact.storage.ProxyStorageService +import com.tencent.bkrepo.repository.api.proxy.ProxyFileReferenceClient import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import java.time.LocalDateTime @@ -48,16 +49,27 @@ class ProxySyncJob( ProxyFeignClientFactory.create("auth") } + private val proxyFileReferenceClient: ProxyFileReferenceClient by lazy { + ProxyFeignClientFactory.create("repository") + } + @Scheduled(initialDelay = 10000, fixedRate = 1800000) fun sync() { val projectId = ProxyEnv.getProjectId() val name = ProxyEnv.getName() val proxyInfo = proxyProxyClient.info(projectId, name).data!! val (startHour, endHour) = proxyInfo.syncTimeRange.split(StringPool.DASH).map { it.toInt() } + if (disabledSync(startHour, endHour)) { + return storageService.clean(proxyFileReferenceClient, proxyInfo.cacheExpireDays) + } val currentHour = LocalDateTime.now().hour if (currentHour < startHour || currentHour > endHour) { return } storageService.sync(proxyInfo.syncRateLimit, proxyInfo.cacheExpireDays) } + + private fun disabledSync(startHour: Int, endHour: Int): Boolean { + return startHour >= endHour + } } diff --git a/src/proxy/buildSrc/src/main/kotlin/Versions.kt b/src/proxy/buildSrc/src/main/kotlin/Versions.kt index 7b105322d6..b0438f3718 100644 --- a/src/proxy/buildSrc/src/main/kotlin/Versions.kt +++ b/src/proxy/buildSrc/src/main/kotlin/Versions.kt @@ -32,7 +32,7 @@ object Release { object Versions { const val DevopsBoot = "0.0.6" - const val BkRepo = "1.1.0-RELEASE" + const val BkRepo = "1.1.1-SNAPSHOT" const val OKhttp = "4.9.0" const val Polaris = "1.5.2" const val CommonsText = "1.9"