From 8ab082126cb702fb55ad08d327926d5b868feba4 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 4 Jan 2024 16:50:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96GC=E7=9B=B8=E5=85=B3=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=20(#1615)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 优化系统GC功能 #1600 * feat: 归档服务支持异步任务 #1600 * feat: 支持任务可取消 #1600 * feat: 优化压缩文件引用逻辑 #1600 * feat: 优化GC相关job #1600 * feat: merge master #1600 * feat: 节点聚类分析时,增加采样调查 #1600 * feat: 优化job db查询 #1600 --- .../bkrepo/archive/api/ArchiveClient.kt | 3 + .../archive/config/ArchiveConfiguration.kt | 12 ++- .../config/ArchiveShutdownConfiguration.kt | 25 +++++ .../controller/service/ArchiveController.kt | 11 +++ .../controller/user/SystemAdminController.kt | 21 ++++ .../archive/job/AsyncBaseJobSubscriber.kt | 47 +++++++++ .../bkrepo/archive/job/BaseJobSubscriber.kt | 4 + .../tencent/bkrepo/archive/job/Cancellable.kt | 11 +++ .../bkrepo/archive/job/archive/ArchiveJob.kt | 11 ++- .../bkrepo/archive/job/archive/RestoreJob.kt | 11 ++- .../archive/job/compress/CompressJob.kt | 25 +++-- .../job/compress/CompressSubscriber.kt | 25 +++-- .../archive/job/compress/UncompressJob.kt | 22 +++-- .../job/compress/UncompressSubscriber.kt | 15 ++- .../archive/service/CompressServiceImpl.kt | 51 ++++++---- .../archive/service/SystemAdminService.kt | 5 + .../archive/service/SystemAdminServiceImpl.kt | 13 +++ .../bkrepo/archive/job/JobSubscriberTest.kt | 90 ++++++++++++++++++ .../storage/StorageAutoConfiguration.kt | 3 + .../common/storage/core/CompressSupport.kt | 19 ++-- .../common/storage/util/StorageUtils.kt | 47 +++++++++ .../job/batch/ArchivedNodeCompleteJob.kt | 5 +- .../job/batch/ArchivedNodeRestoreJob.kt | 5 +- .../bkrepo/job/batch/ArtifactCleanupJob.kt | 5 +- .../bkrepo/job/batch/ArtifactPushJob.kt | 5 +- .../job/batch/DeletedBlockNodeCleanupJob.kt | 5 +- .../bkrepo/job/batch/DeletedNodeCleanupJob.kt | 5 +- .../batch/DistributedDockerImageCleanupJob.kt | 5 +- .../bkrepo/job/batch/EmptyFolderCleanupJob.kt | 5 +- .../bkrepo/job/batch/ExpiredNodeMarkupJob.kt | 5 +- .../job/batch/FileReferenceCleanupJob.kt | 10 +- .../bkrepo/job/batch/IdleNodeArchiveJob.kt | 5 +- .../bkrepo/job/batch/NodeCompressedJob.kt | 5 +- .../tencent/bkrepo/job/batch/NodeCopyJob.kt | 3 +- .../bkrepo/job/batch/NodeReport2BkbaseJob.kt | 5 +- .../bkrepo/job/batch/NodeUncompressedJob.kt | 5 +- .../bkrepo/job/batch/OciBlobNodeRefreshJob.kt | 5 +- .../job/batch/PipelineArtifactCleanupJob.kt | 5 +- .../job/batch/ProjectRepoMetricsStatJob.kt | 5 +- .../bkrepo/job/batch/RunOnceTaskCleanupJob.kt | 5 +- .../bkrepo/job/batch/SignFileCleanupJob.kt | 5 +- .../tencent/bkrepo/job/batch/SystemGcJob.kt | 95 ++++++++++++++++--- .../batch/ThirdPartyImageReplicationJob.kt | 5 +- .../batch/base/CompositeMongoDbBatchJob.kt | 2 +- .../batch/base/DefaultContextMongoDbJob.kt | 2 +- .../bkrepo/job/batch/base/DefaultRepoJob.kt | 5 +- .../bkrepo/job/batch/base/MongoDbBatchJob.kt | 12 ++- .../bkrepo/job/batch/ddc/DdcBlobCleanupJob.kt | 2 +- .../job/batch/ddc/ExpiredDdcRefCleanupJob.kt | 3 +- .../node/NodeStatCompositeMongoDbBatchJob.kt | 3 +- .../project/BkciProjectMetadataSyncJob.kt | 2 +- 51 files changed, 580 insertions(+), 125 deletions(-) create mode 100644 src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveShutdownConfiguration.kt create mode 100644 src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt create mode 100644 src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/AsyncBaseJobSubscriber.kt create mode 100644 src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/Cancellable.kt create mode 100644 src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminService.kt create mode 100644 src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminServiceImpl.kt create mode 100644 src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/JobSubscriberTest.kt create mode 100644 src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt diff --git a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/api/ArchiveClient.kt b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/api/ArchiveClient.kt index fcb790384f..c119f065b9 100644 --- a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/api/ArchiveClient.kt +++ b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/api/ArchiveClient.kt @@ -53,4 +53,7 @@ interface ArchiveClient { @PutMapping("/compress/complete") fun completeCompress(@RequestBody request: CompleteCompressRequest): Response + + @DeleteMapping("/deleteAll") + fun deleteAll(@RequestBody request: ArchiveFileRequest): Response } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveConfiguration.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveConfiguration.kt index 1971a0e448..cce265d192 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveConfiguration.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveConfiguration.kt @@ -1,8 +1,18 @@ package com.tencent.bkrepo.archive.config +import com.tencent.bkrepo.common.security.http.core.HttpAuthSecurity import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import @EnableConfigurationProperties(ArchiveProperties::class) +@Import(ArchiveShutdownConfiguration::class) @Configuration -class ArchiveConfiguration +class ArchiveConfiguration { + + @Bean + fun httpAuthSecurity(): HttpAuthSecurity { + return HttpAuthSecurity().withPrefix("/archive") + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveShutdownConfiguration.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveShutdownConfiguration.kt new file mode 100644 index 0000000000..e9f640f629 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveShutdownConfiguration.kt @@ -0,0 +1,25 @@ +package com.tencent.bkrepo.archive.config + +import com.tencent.bkrepo.archive.job.Cancellable +import com.tencent.bkrepo.common.service.shutdown.ServiceShutdownHook +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.ObjectProvider + +/** + * 归档服务关机相关配置 + * */ +class ArchiveShutdownConfiguration(cancellable: ObjectProvider) { + + init { + ServiceShutdownHook.add { + cancellable.stream().forEach { + it.cancel() + logger.info("Shutdown job[${it.javaClass.simpleName}].") + } + } + } + + companion object { + private val logger = LoggerFactory.getLogger(ArchiveShutdownConfiguration::class.java) + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/service/ArchiveController.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/service/ArchiveController.kt index 9018cf1310..cbc070c56a 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/service/ArchiveController.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/service/ArchiveController.kt @@ -65,4 +65,15 @@ class ArchiveController( compressService.complete(request) return ResponseBuilder.success() } + + override fun deleteAll(request: ArchiveFileRequest): Response { + archiveService.delete(request) + val deleteCompressRequest = DeleteCompressRequest( + sha256 = request.sha256, + storageCredentialsKey = request.storageCredentialsKey, + operator = request.operator, + ) + compressService.delete(deleteCompressRequest) + return ResponseBuilder.success() + } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt new file mode 100644 index 0000000000..d0c90d4b28 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt @@ -0,0 +1,21 @@ +package com.tencent.bkrepo.archive.controller.user + +import com.tencent.bkrepo.archive.service.SystemAdminService +import com.tencent.bkrepo.common.security.permission.Principal +import com.tencent.bkrepo.common.security.permission.PrincipalType +import org.springframework.web.bind.annotation.PutMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestParam +import org.springframework.web.bind.annotation.RestController + +@Principal(PrincipalType.ADMIN) +@RestController +@RequestMapping("/api/archive/admin") +class SystemAdminController( + private val systemAdminService: SystemAdminService, +) { + @PutMapping("/stop") + fun stop(@RequestParam jobName: String) { + systemAdminService.stop(jobName) + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/AsyncBaseJobSubscriber.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/AsyncBaseJobSubscriber.kt new file mode 100644 index 0000000000..eae813eda7 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/AsyncBaseJobSubscriber.kt @@ -0,0 +1,47 @@ +package com.tencent.bkrepo.archive.job + +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock + +open class AsyncBaseJobSubscriber(val executor: ThreadPoolExecutor) : BaseJobSubscriber() { + + private val lock: ReentrantLock = ReentrantLock() + private val executeComplete: Condition = lock.newCondition() + private val committedTask: AtomicInteger = AtomicInteger(0) + private val commitComplete: AtomicBoolean = AtomicBoolean(false) + + override fun hookOnNext(value: T) { + committedTask.incrementAndGet() + executor.execute { + try { + super.hookOnNext(value) + } finally { + val committed = committedTask.decrementAndGet() + if (commitComplete.get() && committed == 0) { + lock.lock() + try { + executeComplete.signal() + } finally { + lock.unlock() + } + } + } + } + } + + override fun hookOnComplete() { + commitComplete.set(true) + if (committedTask.get() != 0) { + lock.lock() + try { + executeComplete.await() + } finally { + lock.unlock() + } + } + super.hookOnComplete() + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/BaseJobSubscriber.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/BaseJobSubscriber.kt index 502ae6569b..94da232866 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/BaseJobSubscriber.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/BaseJobSubscriber.kt @@ -81,6 +81,10 @@ open class BaseJobSubscriber : BaseSubscriber() { countDownLatch.countDown() } + override fun hookOnCancel() { + LoggerHolder.jobLogger.info("Job[${getJobName()}] cancelled.") + } + override fun currentContext(): Context { return Context.of(JOB_NAME, getJobName(), JOB_CTX, jobContext) } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/Cancellable.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/Cancellable.kt new file mode 100644 index 0000000000..64f3909da0 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/Cancellable.kt @@ -0,0 +1,11 @@ +package com.tencent.bkrepo.archive.job + +/** + * 可取消的操作 + * */ +interface Cancellable { + /** + * 取消 + * */ + fun cancel() +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/ArchiveJob.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/ArchiveJob.kt index da8cf6c198..5bab3101c2 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/ArchiveJob.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/ArchiveJob.kt @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.archive.ArchiveStatus import com.tencent.bkrepo.archive.config.ArchiveProperties import com.tencent.bkrepo.archive.extensions.key +import com.tencent.bkrepo.archive.job.Cancellable import com.tencent.bkrepo.archive.model.TArchiveFile import com.tencent.bkrepo.archive.repository.ArchiveFileRepository import com.tencent.bkrepo.archive.utils.ArchiveUtils.Companion.newFixedAndCachedThreadPool @@ -38,7 +39,7 @@ class ArchiveJob( private val archiveProperties: ArchiveProperties, private val storageService: StorageService, private val archiveFileRepository: ArchiveFileRepository, -) { +) : Cancellable { /** * 归档cos实例 @@ -89,6 +90,8 @@ class ArchiveJob( ThreadFactoryBuilder().setNameFormat("archive-upload-%d").build(), ) + private var subscriber: ArchiveSubscriber? = null + init { /* * 磁盘容量监控 @@ -155,7 +158,13 @@ class ArchiveJob( .runOn(Schedulers.fromExecutor(httpUploadPool), prefetch) .flatMap(uploader::onArchiveFileWrapper) // 上传 .subscribe(subscriber) + this.subscriber = subscriber subscriber.blockLast() + this.subscriber = null + } + + override fun cancel() { + subscriber?.dispose() } companion object { diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/RestoreJob.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/RestoreJob.kt index 13735a506a..8439ebb820 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/RestoreJob.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/archive/RestoreJob.kt @@ -2,6 +2,7 @@ package com.tencent.bkrepo.archive.job.archive import com.tencent.bkrepo.archive.ArchiveStatus import com.tencent.bkrepo.archive.config.ArchiveProperties +import com.tencent.bkrepo.archive.job.Cancellable import com.tencent.bkrepo.archive.model.TArchiveFile import com.tencent.bkrepo.archive.repository.ArchiveFileDao import com.tencent.bkrepo.archive.repository.ArchiveFileRepository @@ -25,9 +26,11 @@ class RestoreJob( private val storageService: StorageService, private val archiveProperties: ArchiveProperties, private val archiveFileDao: ArchiveFileDao, -) { +) : Cancellable { private val cosClient = CosClient(archiveProperties.cos) + private var subscriber: RestoreSubscriber? = null + /** * 获取待归档文件列表 * */ @@ -48,6 +51,12 @@ class RestoreJob( archiveProperties.workDir, ) listFiles().subscribe(subscriber) + this.subscriber = subscriber subscriber.blockLast() + this.subscriber = null + } + + override fun cancel() { + subscriber?.dispose() } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt index b88c7d752d..66434767a0 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt @@ -3,12 +3,14 @@ package com.tencent.bkrepo.archive.job.compress import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.archive.CompressStatus import com.tencent.bkrepo.archive.config.ArchiveProperties +import com.tencent.bkrepo.archive.job.Cancellable import com.tencent.bkrepo.archive.model.TCompressFile import com.tencent.bkrepo.archive.repository.CompressFileDao import com.tencent.bkrepo.archive.repository.CompressFileRepository import com.tencent.bkrepo.archive.utils.ArchiveUtils.Companion.newFixedAndCachedThreadPool import com.tencent.bkrepo.archive.utils.ReactiveDaoUtils import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.repository.api.FileReferenceClient import java.util.concurrent.TimeUnit import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo @@ -16,7 +18,6 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import reactor.core.publisher.Flux -import reactor.core.scheduler.Schedulers @Component class CompressJob( @@ -24,12 +25,14 @@ class CompressJob( private val compressFileDao: CompressFileDao, private val storageService: StorageService, private val compressFileRepository: CompressFileRepository, -) { + private val fileReferenceClient: FileReferenceClient, +) : Cancellable { private val compressThreadPool = newFixedAndCachedThreadPool( archiveProperties.ioThreads, ThreadFactoryBuilder().setNameFormat("storage-compress-%d").build(), ) + private var subscriber: CompressSubscriber? = null fun listFiles(): Flux { val criteria = where(TCompressFile::status).isEqualTo(CompressStatus.CREATED) @@ -40,10 +43,20 @@ class CompressJob( @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.HOURS) fun compress() { - val subscriber = CompressSubscriber(compressFileDao, compressFileRepository, storageService) - listFiles().parallel() - .runOn(Schedulers.fromExecutor(compressThreadPool)) - .subscribe(subscriber) + val subscriber = CompressSubscriber( + compressFileDao, + compressFileRepository, + storageService, + fileReferenceClient, + compressThreadPool, + ) + listFiles().subscribe(subscriber) + this.subscriber = subscriber subscriber.blockLast() + this.subscriber = null + } + + override fun cancel() { + subscriber?.dispose() } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressSubscriber.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressSubscriber.kt index a144945d37..09bd45d196 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressSubscriber.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressSubscriber.kt @@ -2,7 +2,7 @@ package com.tencent.bkrepo.archive.job.compress import com.tencent.bkrepo.archive.CompressStatus import com.tencent.bkrepo.archive.event.StorageFileCompressedEvent -import com.tencent.bkrepo.archive.job.BaseJobSubscriber +import com.tencent.bkrepo.archive.job.AsyncBaseJobSubscriber import com.tencent.bkrepo.archive.model.TCompressFile import com.tencent.bkrepo.archive.repository.CompressFileDao import com.tencent.bkrepo.archive.repository.CompressFileRepository @@ -13,14 +13,18 @@ import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.innercos.retry import com.tencent.bkrepo.common.storage.monitor.measureThroughput +import com.tencent.bkrepo.repository.api.FileReferenceClient import org.slf4j.LoggerFactory import java.time.LocalDateTime +import java.util.concurrent.ThreadPoolExecutor class CompressSubscriber( private val compressFileDao: CompressFileDao, private val compressFileRepository: CompressFileRepository, private val storageService: StorageService, -) : BaseJobSubscriber() { + private val fileReferenceClient: FileReferenceClient, + executor: ThreadPoolExecutor, +) : AsyncBaseJobSubscriber(executor) { override fun doOnNext(value: TCompressFile) { with(value) { @@ -68,18 +72,23 @@ class CompressSubscriber( SpringContextUtils.publishEvent(event) } catch (e: TooLowerReuseRateException) { logger.info("Reuse rate is too lower.") - value.status = CompressStatus.COMPRESS_FAILED - value.lastModifiedDate = LocalDateTime.now() - compressFileRepository.save(value) + compressFailed(value) } catch (e: Exception) { - value.status = CompressStatus.COMPRESS_FAILED - value.lastModifiedDate = LocalDateTime.now() - compressFileRepository.save(value) + compressFailed(value) throw e } } } + private fun compressFailed(file: TCompressFile) { + with(file) { + status = CompressStatus.COMPRESS_FAILED + lastModifiedDate = LocalDateTime.now() + compressFileRepository.save(file) + fileReferenceClient.decrement(baseSha256, storageCredentialsKey) + } + } + override fun getSize(value: TCompressFile): Long { return value.uncompressedSize } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt index 1a4bd84172..0c6ca38fdf 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt @@ -3,6 +3,7 @@ package com.tencent.bkrepo.archive.job.compress import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.archive.CompressStatus import com.tencent.bkrepo.archive.config.ArchiveProperties +import com.tencent.bkrepo.archive.job.Cancellable import com.tencent.bkrepo.archive.model.TCompressFile import com.tencent.bkrepo.archive.repository.CompressFileDao import com.tencent.bkrepo.archive.repository.CompressFileRepository @@ -16,7 +17,6 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import reactor.core.publisher.Flux -import reactor.core.scheduler.Schedulers @Component class UncompressJob( @@ -24,12 +24,13 @@ class UncompressJob( private val compressFileDao: CompressFileDao, private val storageService: StorageService, private val compressFileRepository: CompressFileRepository, -) { +) : Cancellable { private val uncompressThreadPool = ArchiveUtils.newFixedAndCachedThreadPool( archiveProperties.ioThreads, ThreadFactoryBuilder().setNameFormat("storage-uncompress-%d").build(), ) + private var subscriber: UncompressSubscriber? = null fun listFiles(): Flux { val criteria = where(TCompressFile::status).isEqualTo(CompressStatus.WAIT_TO_UNCOMPRESS) @@ -40,10 +41,19 @@ class UncompressJob( @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) fun uncompress() { - val subscriber = UncompressSubscriber(compressFileDao, compressFileRepository, storageService) - listFiles().parallel() - .runOn(Schedulers.fromExecutor(uncompressThreadPool)) - .subscribe(subscriber) + val subscriber = UncompressSubscriber( + compressFileDao, + compressFileRepository, + storageService, + uncompressThreadPool, + ) + listFiles().subscribe(subscriber) + this.subscriber = subscriber subscriber.blockLast() + this.subscriber = null + } + + override fun cancel() { + subscriber?.dispose() } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressSubscriber.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressSubscriber.kt index dc30a03a6e..7127f43c54 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressSubscriber.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressSubscriber.kt @@ -2,7 +2,7 @@ package com.tencent.bkrepo.archive.job.compress import com.tencent.bkrepo.archive.CompressStatus import com.tencent.bkrepo.archive.event.StorageFileUncompressedEvent -import com.tencent.bkrepo.archive.job.BaseJobSubscriber +import com.tencent.bkrepo.archive.job.AsyncBaseJobSubscriber import com.tencent.bkrepo.archive.model.TCompressFile import com.tencent.bkrepo.archive.repository.CompressFileDao import com.tencent.bkrepo.archive.repository.CompressFileRepository @@ -13,12 +13,14 @@ import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.monitor.measureThroughput import java.time.LocalDateTime import org.slf4j.LoggerFactory +import java.util.concurrent.ThreadPoolExecutor class UncompressSubscriber( private val compressFileDao: CompressFileDao, private val compressFileRepository: CompressFileRepository, private val storageService: StorageService, -) : BaseJobSubscriber() { + executor: ThreadPoolExecutor, +) : AsyncBaseJobSubscriber(executor) { override fun doOnNext(value: TCompressFile) { with(value) { @@ -32,11 +34,18 @@ class UncompressSubscriber( ) if (!tryLock) { logger.info("File[$sha256] already start uncompress.") + return } // 解压 val credentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey) try { - val throughput = measureThroughput(uncompressedSize) { storageService.uncompress(sha256, credentials) } + var ret = 0 + val throughput = measureThroughput(uncompressedSize) { + ret = storageService.uncompress(sha256, credentials) + } + if (ret == 0) { + return + } // 更新状态 value.status = CompressStatus.UNCOMPRESSED value.lastModifiedDate = LocalDateTime.now() diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt index b5f297529b..03a17d008f 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt @@ -1,5 +1,6 @@ package com.tencent.bkrepo.archive.service +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.archive.ArchiveFileNotFound import com.tencent.bkrepo.archive.CompressStatus import com.tencent.bkrepo.archive.job.compress.CompressSubscriber @@ -28,8 +29,24 @@ class CompressServiceImpl( private val fileReferenceClient: FileReferenceClient, private val compressFileDao: CompressFileDao, ) : CompressService { - private val compressor = CompressSubscriber(compressFileDao, compressFileRepository, storageService) - private val uncompressor = UncompressSubscriber(compressFileDao, compressFileRepository, storageService) + private val executor = ArchiveUtils.newFixedAndCachedThreadPool( + 1, + ThreadFactoryBuilder().setNameFormat("compress-worker").build(), + ) + private val compressor = CompressSubscriber( + compressFileDao, + compressFileRepository, + storageService, + fileReferenceClient, + executor, + ) + private val uncompressor = UncompressSubscriber( + compressFileDao, + compressFileRepository, + storageService, + executor, + ) + override fun compress(request: CompressFileRequest) { with(request) { // 队头元素 @@ -37,15 +54,13 @@ class CompressServiceImpl( if (head != null && head.status != CompressStatus.NONE) { // 压缩任务已存在 logger.info("Compress file [$sha256] already exists,status: ${head.status}.") - head.lastModifiedBy = operator - head.lastModifiedDate = LocalDateTime.now() // 重新触发压缩后逻辑,删除原存储文件和更新node状态 - head.status = if (head.status == CompressStatus.COMPLETED) { - CompressStatus.COMPRESSED - } else { - CompressStatus.CREATED + if (head.status == CompressStatus.COMPLETED) { + head.lastModifiedBy = operator + head.lastModifiedDate = LocalDateTime.now() + head.status = CompressStatus.COMPRESSED + compressFileRepository.save(head) } - compressFileRepository.save(head) return } var currentChainLength = 0 @@ -125,19 +140,23 @@ class CompressServiceImpl( with(request) { val file = compressFileRepository.findBySha256AndStorageCredentialsKey(sha256, storageCredentialsKey) ?: return - if (file.status != CompressStatus.NONE) { + if (file.status == CompressStatus.NONE) { + return + } + if (file.status != CompressStatus.COMPRESS_FAILED) { val storageCredentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey) if (storageService.isCompressed(sha256, storageCredentials)) { storageService.deleteCompressed(sha256, storageCredentials) } + // 压缩失败的已经解除了base sha256的引用 fileReferenceClient.decrement(file.baseSha256, storageCredentialsKey) - /* - * 解压是小概率事件,所以这里链长度我们就不减少,这样带来的问题是, - * 压缩链更容易达到最大长度。但是这个影响并不重要。 - * */ - compressFileRepository.delete(file) - logger.info("Delete compress file [$sha256].") } + /* + * 解压是小概率事件,所以这里链长度我们就不减少,这样带来的问题是, + * 压缩链更容易达到最大长度。但是这个影响并不重要。 + * */ + compressFileRepository.delete(file) + logger.info("Delete compress file [$sha256].") } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminService.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminService.kt new file mode 100644 index 0000000000..bd03014323 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminService.kt @@ -0,0 +1,5 @@ +package com.tencent.bkrepo.archive.service + +interface SystemAdminService { + fun stop(jobName: String) +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminServiceImpl.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminServiceImpl.kt new file mode 100644 index 0000000000..05fea81416 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/SystemAdminServiceImpl.kt @@ -0,0 +1,13 @@ +package com.tencent.bkrepo.archive.service + +import com.tencent.bkrepo.archive.job.Cancellable +import com.tencent.bkrepo.common.service.util.SpringContextUtils +import org.springframework.stereotype.Service + +@Service +class SystemAdminServiceImpl : SystemAdminService { + override fun stop(jobName: String) { + val cancellable = SpringContextUtils.getBean(Cancellable::class.java, jobName) + cancellable.cancel() + } +} diff --git a/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/JobSubscriberTest.kt b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/JobSubscriberTest.kt new file mode 100644 index 0000000000..17e52454bc --- /dev/null +++ b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/JobSubscriberTest.kt @@ -0,0 +1,90 @@ +package com.tencent.bkrepo.archive.job + +import com.tencent.bkrepo.archive.job.BaseJobSubscriber.Companion.JOB_CTX +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import reactor.core.publisher.Flux +import java.time.Duration +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread + +class JobSubscriberTest { + @Test + fun commonJobSubscriberTest() { + val job = object : BaseJobSubscriber() { + override fun doOnNext(value: String) { + Thread.sleep(1000) + println(value) + } + } + thread { + Flux.range(0, 3) + .map { it.toString() } + .subscribe(job) + } + val context: JobContext = job.currentContext().get(JOB_CTX) + Assertions.assertEquals(0L, context.success.get()) + job.blockLast() + Assertions.assertEquals(3L, context.success.get()) + Assertions.assertEquals(3L, context.total.get()) + Assertions.assertEquals(0L, context.failed.get()) + } + + @Test + fun asyncJobSubscriberTest() { + val threadPoolExecutor = ThreadPoolExecutor( + 3, + 3, + 0, + TimeUnit.SECONDS, + ArrayBlockingQueue(3), + ) + val job = object : AsyncBaseJobSubscriber(threadPoolExecutor) { + override fun doOnNext(value: String) { + Thread.sleep(1000) + println("${Thread.currentThread().name} $value") + } + } + val startAt = System.currentTimeMillis() + thread { + Flux.range(0, 3) + .map { it.toString() } + .subscribe(job) + } + val context: JobContext = job.currentContext().get(JOB_CTX) + Assertions.assertEquals(0L, context.success.get()) + job.blockLast() + Assertions.assertEquals(3L, context.success.get()) + Assertions.assertEquals(3L, context.total.get()) + Assertions.assertEquals(0L, context.failed.get()) + Assertions.assertTrue(System.currentTimeMillis() - startAt < 2000) + } + + @Test + fun cancelTest() { + val count = AtomicInteger() + val job = object : BaseJobSubscriber() { + override fun doOnNext(value: String) { + println("${Thread.currentThread().name}: $value") + count.incrementAndGet() + } + } + Flux.range(0, 10) + .map { it.toString() } + .delayElements(Duration.ofSeconds(1)) + .subscribe(job) + thread { + job.blockLast() + println("Job finish") + } + Thread.sleep(5000) + job.dispose() + if (job.isDisposed) { + println("Job Disposed") + } + Assertions.assertEquals(4, count.get()) + } +} diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/StorageAutoConfiguration.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/StorageAutoConfiguration.kt index 81f1c5ad88..bb3387cca6 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/StorageAutoConfiguration.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/StorageAutoConfiguration.kt @@ -48,11 +48,13 @@ import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitorHelper import com.tencent.bkrepo.common.storage.s3.S3Storage import com.tencent.bkrepo.common.storage.util.PolarisUtil +import com.tencent.bkrepo.common.storage.util.StorageUtils import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import import org.springframework.retry.annotation.EnableRetry import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import java.util.concurrent.ConcurrentHashMap @@ -62,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap */ @Configuration(proxyBeanMethods = false) @EnableRetry +@Import(StorageUtils::class) @EnableConfigurationProperties(StorageProperties::class) class StorageAutoConfiguration { diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt index f63135be92..efdbaa5b48 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt @@ -4,16 +4,15 @@ import com.google.common.cache.CacheBuilder import com.google.common.cache.CacheLoader import com.google.common.cache.LoadingCache import com.google.common.cache.RemovalListener -import com.tencent.bkrepo.common.api.util.StreamUtils import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.bksync.BkSync import com.tencent.bkrepo.common.bksync.file.BkSyncDeltaSource import com.tencent.bkrepo.common.bksync.file.BDUtils import com.tencent.bkrepo.common.bksync.transfer.exception.TooLowerReuseRateException import com.tencent.bkrepo.common.storage.credentials.StorageCredentials -import com.tencent.bkrepo.common.storage.filesystem.FileSystemClient import com.tencent.bkrepo.common.storage.message.StorageErrorException import com.tencent.bkrepo.common.storage.message.StorageMessageCode +import com.tencent.bkrepo.common.storage.util.StorageUtils import com.tencent.bkrepo.common.storage.util.createFile import java.io.File import java.nio.file.Files @@ -173,17 +172,13 @@ abstract class CompressSupport : OverlaySupport() { * 下载[digest]到指定目录[dir] * */ protected fun download(digest: String, credentials: StorageCredentials, dir: Path): File { - val tempFile = FileSystemClient(dir).touch("", "$digest.temp") - try { - val path = fileLocator.locate(digest) - val inputStream = fileStorage.load(path, digest, Range.FULL_RANGE, credentials) - ?: error("Miss data $digest on ${credentials.key}") - StreamUtils.useCopy(inputStream, tempFile.outputStream()) - return tempFile - } catch (e: Exception) { - tempFile.delete() - throw e + val filePath = dir.resolve("$digest.temp") + if (!Files.isDirectory(filePath.parent)) { + Files.createDirectories(filePath.parent) } + val path = fileLocator.locate(digest) + StorageUtils.download(path, digest, credentials, filePath) + return filePath.toFile() } /** diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt new file mode 100644 index 0000000000..39c2d50372 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt @@ -0,0 +1,47 @@ +package com.tencent.bkrepo.common.storage.util + +import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.api.util.StreamUtils +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.storage.core.FileStorage +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.common.storage.filesystem.FileSystemClient +import org.springframework.stereotype.Component +import java.nio.file.Files +import java.nio.file.Path + +@Component +class StorageUtils( + private val fileStorage: FileStorage, +) { + init { + Companion.fileStorage = fileStorage + } + + companion object { + private lateinit var fileStorage: FileStorage + private const val DOWNLOAD_PREFIX = "downloading_" + private const val DOWNLOAD_SUFFIX = ".temp" + + /** + * 下载文件到指定路径 + * @param path 文件源路径 + * @param digest 文件名 + * @param credentials 存储实例 + * @param filePath 下载目标路径 + * */ + fun download(path: String, digest: String, credentials: StorageCredentials, filePath: Path) { + val dir = credentials.upload.localPath + val fileName = StringPool.randomStringByLongValue(DOWNLOAD_PREFIX, DOWNLOAD_SUFFIX) + val tempFile = FileSystemClient(dir).touch("", fileName) + try { + val inputStream = fileStorage.load(path, digest, Range.FULL_RANGE, credentials) + ?: error("Miss data $digest on ${credentials.key}") + StreamUtils.useCopy(inputStream, tempFile.outputStream()) + Files.move(tempFile.toPath(), filePath) + } finally { + tempFile.delete() + } + } + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeCompleteJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeCompleteJob.kt index 2ac4dc76d5..c2b9efe246 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeCompleteJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeCompleteJob.kt @@ -22,6 +22,7 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component +import kotlin.reflect.KClass /** * 归档节点完成任务 @@ -108,8 +109,8 @@ class ArchivedNodeCompleteJob( ) } - override fun entityClass(): Class { - return ArchivedNodeRestoreJob.ArchiveFile::class.java + override fun entityClass(): KClass { + return ArchivedNodeRestoreJob.ArchiveFile::class } private fun archiveNode( diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeRestoreJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeRestoreJob.kt index 1647198e7d..56be3b45c6 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeRestoreJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArchivedNodeRestoreJob.kt @@ -16,6 +16,7 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component +import kotlin.reflect.KClass /** * 归档节点恢复任务 @@ -80,8 +81,8 @@ class ArchivedNodeRestoreJob( ) } - override fun entityClass(): Class { - return ArchiveFile::class.java + override fun entityClass(): KClass { + return ArchiveFile::class } private fun listNode(sha256: String, storageCredentialsKey: String?): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactCleanupJob.kt index fda1df741c..968b460a8d 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactCleanupJob.kt @@ -61,6 +61,7 @@ import org.springframework.web.client.RestTemplate import java.time.Duration import java.time.LocalDateTime import java.time.format.DateTimeFormatter +import kotlin.reflect.KClass /** * 根据仓库配置的清理策略清理对应仓库下的制品 @@ -79,8 +80,8 @@ class ArtifactCleanupJob( @Value("\${service.prefix:}") private val servicePrefix: String = "" - override fun entityClass(): Class { - return RepoData::class.java + override fun entityClass(): KClass { + return RepoData::class } override fun collectionNames(): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactPushJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactPushJob.kt index d5fdc6ac33..e19428d0e5 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactPushJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ArtifactPushJob.kt @@ -44,6 +44,7 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 用于将新上传/更新的制品推送到远端仓库 @@ -61,8 +62,8 @@ class ArtifactPushJob( return super.start() } - override fun entityClass(): Class { - return PackageVersionData::class.java + override fun entityClass(): KClass { + return PackageVersionData::class } override fun collectionNames(): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedBlockNodeCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedBlockNodeCleanupJob.kt index 769c88b662..9d49b5b35e 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedBlockNodeCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedBlockNodeCleanupJob.kt @@ -44,6 +44,7 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 清理被标记为删除的node,同时减少文件引用 @@ -87,8 +88,8 @@ class DeletedBlockNodeCleanupJob( ) } - override fun entityClass(): Class { - return BlockNode::class.java + override fun entityClass(): KClass { + return BlockNode::class } override fun run(row: BlockNode, collectionName: String, context: JobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt index c1efd2dd38..77c888e295 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DeletedNodeCleanupJob.kt @@ -55,6 +55,7 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 清理被标记为删除的node,同时减少文件引用 @@ -117,8 +118,8 @@ class DeletedNodeCleanupJob( ) } - override fun entityClass(): Class { - return Repository::class.java + override fun entityClass(): KClass { + return Repository::class } override fun run(row: Repository, collectionName: String, context: DeletedNodeCleanupJobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DistributedDockerImageCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DistributedDockerImageCleanupJob.kt index b395cd61b9..8e5b070407 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DistributedDockerImageCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/DistributedDockerImageCleanupJob.kt @@ -44,6 +44,7 @@ import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 清理镜像仓库下存储的已经分发的镜像 @@ -56,8 +57,8 @@ class DistributedDockerImageCleanupJob( ) : DefaultContextMongoDbJob(properties) { - override fun entityClass(): Class { - return PackageData::class.java + override fun entityClass(): KClass { + return PackageData::class } override fun collectionNames(): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/EmptyFolderCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/EmptyFolderCleanupJob.kt index c59eacee8f..24f5bd5aea 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/EmptyFolderCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/EmptyFolderCleanupJob.kt @@ -59,6 +59,7 @@ import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass /** @@ -81,8 +82,8 @@ class EmptyFolderCleanupJob( return Node(row) } - override fun entityClass(): Class { - return Node::class.java + override fun entityClass(): KClass { + return Node::class } override fun run(row: Node, collectionName: String, context: JobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ExpiredNodeMarkupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ExpiredNodeMarkupJob.kt index f78e5a3e43..83c27501e8 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ExpiredNodeMarkupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ExpiredNodeMarkupJob.kt @@ -44,6 +44,7 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 标记已过期的节点为已删除 @@ -90,8 +91,8 @@ class ExpiredNodeMarkupJob( ) } - override fun entityClass(): Class { - return Node::class.java + override fun entityClass(): KClass { + return Node::class } override fun run(row: Node, collectionName: String, context: JobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt index 08098e1348..59e25ccc14 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt @@ -29,7 +29,6 @@ package com.tencent.bkrepo.job.batch import com.tencent.bkrepo.archive.api.ArchiveClient import com.tencent.bkrepo.archive.request.ArchiveFileRequest -import com.tencent.bkrepo.archive.request.DeleteCompressRequest import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.service.log.LoggerHolder import com.tencent.bkrepo.common.storage.core.StorageService @@ -51,6 +50,7 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component import java.time.Duration import java.util.concurrent.ConcurrentHashMap +import kotlin.reflect.KClass /** * 清理引用=0的文件 @@ -72,8 +72,8 @@ class FileReferenceCleanupJob( return FileJobContext() } - override fun entityClass(): Class { - return FileReferenceData::class.java + override fun entityClass(): KClass { + return FileReferenceData::class } override fun collectionNames(): List { @@ -133,9 +133,7 @@ class FileReferenceCleanupJob( private fun cleanupRelatedResources(sha256: String, credentialsKey: String?) { val deleteArchiveFileRequest = ArchiveFileRequest(sha256, credentialsKey, SYSTEM_USER) - archiveClient.delete(deleteArchiveFileRequest) - val deleteCompressRequest = DeleteCompressRequest(sha256, credentialsKey, SYSTEM_USER) - archiveClient.deleteCompress(deleteCompressRequest) + archiveClient.deleteAll(deleteArchiveFileRequest) } private val cacheMap: ConcurrentHashMap = ConcurrentHashMap() diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/IdleNodeArchiveJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/IdleNodeArchiveJob.kt index 0a4224570a..684d36c012 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/IdleNodeArchiveJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/IdleNodeArchiveJob.kt @@ -22,6 +22,7 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.inValues import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component +import kotlin.reflect.KClass /** * 空闲节点归档任务 @@ -184,8 +185,8 @@ class IdleNodeArchiveJob( ) } - override fun entityClass(): Class { - return Node::class.java + override fun entityClass(): KClass { + return Node::class } override fun createJobContext(): NodeContext { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt index 03c7de258e..7c3e0d6eb5 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt @@ -17,6 +17,7 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component +import kotlin.reflect.KClass /** * 节点压缩任务 @@ -80,8 +81,8 @@ class NodeCompressedJob( ) } - override fun entityClass(): Class { - return CompressFile::class.java + override fun entityClass(): KClass { + return CompressFile::class } data class CompressFile( diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCopyJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCopyJob.kt index a6e2d1d206..2446586891 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCopyJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCopyJob.kt @@ -19,6 +19,7 @@ import org.springframework.data.mongodb.core.query.Update import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component +import kotlin.reflect.KClass @Component @EnableConfigurationProperties(NodeCopyJobProperties::class) @@ -46,7 +47,7 @@ class NodeCopyJob( return NodeCopyData(row) } - override fun entityClass(): Class = NodeCopyData::class.java + override fun entityClass(): KClass = NodeCopyData::class override fun createJobContext(): NodeCopyJobContext = NodeCopyJobContext() diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeReport2BkbaseJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeReport2BkbaseJob.kt index 91bf852703..70b611191c 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeReport2BkbaseJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeReport2BkbaseJob.kt @@ -41,6 +41,7 @@ import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component import java.time.LocalDateTime import java.time.format.DateTimeFormatter +import kotlin.reflect.KClass /** * 导出node表至数据平台 @@ -88,8 +89,8 @@ class NodeReport2BkbaseJob( ) } - override fun entityClass(): Class { - return Node::class.java + override fun entityClass(): KClass { + return Node::class } override fun run(row: Node, collectionName: String, context: JobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt index 204fa537e6..95d0e205be 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt @@ -15,6 +15,7 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component +import kotlin.reflect.KClass /** * 节点解压任务 @@ -72,8 +73,8 @@ class NodeUncompressedJob( ) } - override fun entityClass(): Class { - return CompressFile::class.java + override fun entityClass(): KClass { + return CompressFile::class } data class CompressFile( diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/OciBlobNodeRefreshJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/OciBlobNodeRefreshJob.kt index 221cb7d30c..5d2186f846 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/OciBlobNodeRefreshJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/OciBlobNodeRefreshJob.kt @@ -41,6 +41,7 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.Duration +import kotlin.reflect.KClass /** * 用于将存储在blobs目录下的公共blob节点全部迁移到对应版本目录下, @@ -59,8 +60,8 @@ class OciBlobNodeRefreshJob( return super.start() } - override fun entityClass(): Class { - return PackageData::class.java + override fun entityClass(): KClass { + return PackageData::class } override fun collectionNames(): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/PipelineArtifactCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/PipelineArtifactCleanupJob.kt index 0372569c45..9c86e2d9c0 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/PipelineArtifactCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/PipelineArtifactCleanupJob.kt @@ -48,6 +48,7 @@ import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 流水线构件清理任务 @@ -85,8 +86,8 @@ class PipelineArtifactCleanupJob( return Node(row) } - override fun entityClass(): Class { - return Node::class.java + override fun entityClass(): KClass { + return Node::class } override fun run(row: Node, collectionName: String, context: JobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ProjectRepoMetricsStatJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ProjectRepoMetricsStatJob.kt index d774a71364..ec9ba5325a 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ProjectRepoMetricsStatJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ProjectRepoMetricsStatJob.kt @@ -52,6 +52,7 @@ import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDate import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 项目仓库指标统计任务 @@ -72,8 +73,8 @@ class ProjectRepoMetricsStatJob( return Repository(row) } - override fun entityClass(): Class { - return Repository::class.java + override fun entityClass(): KClass { + return Repository::class } override fun run(row: Repository, collectionName: String, context: JobContext) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/RunOnceTaskCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/RunOnceTaskCleanupJob.kt index 4be88580dc..0044700780 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/RunOnceTaskCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/RunOnceTaskCleanupJob.kt @@ -42,6 +42,7 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.LocalDateTime +import kotlin.reflect.KClass /** * 清除已经执行完成的一次性分发任务 @@ -56,8 +57,8 @@ class RunOnceTaskCleanupJob( return super.start() } - override fun entityClass(): Class { - return TaskData::class.java + override fun entityClass(): KClass { + return TaskData::class } override fun collectionNames(): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SignFileCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SignFileCleanupJob.kt index 9e33905c16..e9f7451a93 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SignFileCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SignFileCleanupJob.kt @@ -42,6 +42,7 @@ import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass @Component @EnableConfigurationProperties(SignFileCleanupJobProperties::class) @@ -83,8 +84,8 @@ class SignFileCleanupJob( return SignFileData(row) } - override fun entityClass(): Class { - return SignFileData::class.java + override fun entityClass(): KClass { + return SignFileData::class } override fun getLockAtMostFor(): Duration { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt index be49305a9b..d985b025c4 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt @@ -3,10 +3,18 @@ package com.tencent.bkrepo.job.batch import com.tencent.bkrepo.archive.api.ArchiveClient import com.tencent.bkrepo.archive.request.CompressFileRequest import com.tencent.bkrepo.common.api.collection.groupBySimilar +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.api.util.HumanReadable +import com.tencent.bkrepo.common.api.util.StreamUtils +import com.tencent.bkrepo.common.bksync.BkSync +import com.tencent.bkrepo.common.bksync.DiffResult import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID import com.tencent.bkrepo.common.mongo.dao.util.sharding.HashShardingUtils +import com.tencent.bkrepo.common.storage.core.StorageProperties +import com.tencent.bkrepo.common.storage.core.locator.FileLocator +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.common.storage.util.StorageUtils import com.tencent.bkrepo.fs.server.constant.FAKE_SHA256 import com.tencent.bkrepo.job.SHARDING_COUNT import com.tencent.bkrepo.job.batch.base.DefaultContextJob @@ -26,6 +34,10 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.Paths +import kotlin.reflect.full.declaredMemberProperties import kotlin.system.measureNanoTime /** @@ -38,11 +50,14 @@ class SystemGcJob( val properties: SystemGcJobProperties, private val mongoTemplate: MongoTemplate, private val archiveClient: ArchiveClient, + private val storageProperties: StorageProperties, + private val fileLocator: FileLocator, ) : DefaultContextJob(properties) { private var lastId = MIN_OBJECT_ID - private var lastCutoffTime = LocalDateTime.MIN + private var lastCutoffTimeMap = mutableMapOf() private var curCutoffTime = LocalDateTime.MIN + private var bksync = BkSync() override fun doStart0(jobContext: JobContext) { curCutoffTime = LocalDateTime.now().minus(Duration.ofDays(properties.idleDays.toLong())) properties.repos.forEach { @@ -52,8 +67,8 @@ class SystemGcJob( val repoName = splits[1] val nanos = measureNanoTime { count = repoGc(projectId, repoName) } logger.info("Finish gc repository [$projectId/$repoName]($count nodes), took ${HumanReadable.time(nanos)}.") + lastCutoffTimeMap[it] = curCutoffTime } - lastCutoffTime = curCutoffTime } private fun repoGc(projectId: String, repoName: String): Long { @@ -117,7 +132,14 @@ class SystemGcJob( Criteria.where("lastAccessDate").isEqualTo(null), Criteria.where("lastAccessDate").lt(curCutoffTime), ), - ).limit(properties.maxBatchSize).with(Sort.by(ID).ascending()) // 长时间未访问 + ).limit(properties.maxBatchSize) + .with(Sort.by(ID).ascending()) + .apply { + val fields = fields() + Node::class.declaredMemberProperties.forEach { + fields.include(it.name) + } + } } /** @@ -131,24 +153,70 @@ class SystemGcJob( } } .sortedBy { it.createdDate } + if (logger.isDebugEnabled) { + logger.debug("Group node: [${sortedNodes.joinToString(",") { it.name }}]") + } // 没有新的节点,表示节点已经gc过一轮了 - if (lastEndTime != null && sortedNodes.last().createdDate < lastCutoffTime) { + val repoKey = nodes.first().let { "${it.projectId}/${it.repoName}" } + val lastCutoffTime = lastCutoffTimeMap[repoKey] + if (lastCutoffTime != null && sortedNodes.last().createdDate < lastCutoffTime) { logger.info("There are no new nodes, gc is skipped.") return } val gcNodes = sortedNodes.subList(0, sortedNodes.size - properties.retain) - if (logger.isDebugEnabled) { - logger.debug("Group node: [${gcNodes.joinToString(",") { it.name }}]") - } // 保留最新的 val newest = sortedNodes.last() - val repo = RepositoryCommonUtils.getRepositoryDetail(newest.projectId, newest.repoName) - val credentials = repo.storageCredentials - gcNodes.forEach { - val compressedRequest = CompressFileRequest(it.sha256, it.size, newest.sha256, credentials?.key) - archiveClient.compress(compressedRequest) - logger.info("Compress node ${it.name} by node ${newest.name}.") + if (samplingSurvey(gcNodes, newest)) { + val repo = RepositoryCommonUtils.getRepositoryDetail(newest.projectId, newest.repoName) + val credentials = repo.storageCredentials + gcNodes.forEach { + val compressedRequest = CompressFileRequest(it.sha256, it.size, newest.sha256, credentials?.key) + archiveClient.compress(compressedRequest) + logger.info("Compress node ${it.name} by node ${newest.name}.") + } + } + } + + /** + * 抽样调查,快速判断一组节点的实际数据是否相似 + * */ + private fun samplingSurvey(nodes: List, base: Node): Boolean { + val src = nodes.first() + val repo = RepositoryCommonUtils.getRepositoryDetail(src.projectId, src.repoName) + val storageCredentials = repo.storageCredentials ?: storageProperties.defaultStorageCredentials() + val dir = Paths.get(storageCredentials.upload.localPath, GC_DIR, StringPool.randomStringByLongValue()) + val baseFilePath = createTempFile(dir) + val srcFilePath = createTempFile(dir) + val signFilePath = createTempFile(dir) + val deltaFilePath = createTempFile(dir) + try { + downloadFile(base.sha256, storageCredentials, baseFilePath) + downloadFile(src.sha256, storageCredentials, srcFilePath) + Files.newOutputStream(signFilePath).use { + bksync.checksum(baseFilePath.toFile(), it) + } + var actual = DiffResult(0, 1) + StreamUtils.use(Files.newInputStream(signFilePath), Files.newOutputStream(deltaFilePath)) { input, output -> + actual = bksync.diff(srcFilePath.toFile(), input, output, storageCredentials.compress.ratio) + } + logger.info("Sampling (${src.name},${base.name}),reuse: ${actual.hitRate}.") + return actual.hitRate >= storageCredentials.compress.ratio + } finally { + dir.toFile().deleteRecursively() + } + } + + private fun createTempFile(dir: Path): Path { + val fileName = StringPool.randomStringByLongValue(prefix = "gc", suffix = ".temp") + return dir.resolve(fileName) + } + + private fun downloadFile(sha256: String, storageCredentials: StorageCredentials, filePath: Path) { + val path = fileLocator.locate(sha256) + if (!Files.isDirectory(filePath.parent)) { + Files.createDirectories(filePath.parent) } + StorageUtils.download(path, sha256, storageCredentials, filePath) } data class Node( @@ -169,5 +237,6 @@ class SystemGcJob( companion object { private val logger = LoggerFactory.getLogger(SystemGcJob::class.java) private val HAMMING_DISTANCE_INSTANCE = HammingDistance() + private const val GC_DIR = "gc" } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ThirdPartyImageReplicationJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ThirdPartyImageReplicationJob.kt index cb0ab679e3..8d6732c3fe 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ThirdPartyImageReplicationJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ThirdPartyImageReplicationJob.kt @@ -37,6 +37,7 @@ import com.tencent.bkrepo.oci.pojo.third.OciReplicationRecordInfo import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.data.mongodb.core.query.Query import org.springframework.stereotype.Component +import kotlin.reflect.KClass /** * 生成 @@ -52,8 +53,8 @@ class ThirdPartyImageReplicationJob( return super.start() } - override fun entityClass(): Class { - return OciReplicationRecordInfoData::class.java + override fun entityClass(): KClass { + return OciReplicationRecordInfoData::class } override fun collectionNames(): List { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt index 4db1b45d6e..1143c5c41c 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt @@ -6,7 +6,7 @@ import com.tencent.bkrepo.job.config.properties.CompositeJobProperties /** * 由多个[ChildMongoDbBatchJob]组成的任务,主要目的是将类似遍历所有Node表这种耗时任务合并成一个,只需要遍历一次就可以完成所有数据的处理 */ -abstract class CompositeMongoDbBatchJob( +abstract class CompositeMongoDbBatchJob( private val properties: CompositeJobProperties ) : MongoDbBatchJob>(properties) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultContextMongoDbJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultContextMongoDbJob.kt index 8d0f278416..2276e6bd40 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultContextMongoDbJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultContextMongoDbJob.kt @@ -2,7 +2,7 @@ package com.tencent.bkrepo.job.batch.base import com.tencent.bkrepo.job.config.properties.MongodbJobProperties -abstract class DefaultContextMongoDbJob( +abstract class DefaultContextMongoDbJob( private val properties: MongodbJobProperties ) : MongoDbBatchJob(properties) { override fun createJobContext(): JobContext = JobContext() diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultRepoJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultRepoJob.kt index 13c006d39c..b82cb884cc 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultRepoJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/DefaultRepoJob.kt @@ -33,6 +33,7 @@ import com.tencent.bkrepo.common.artifact.pojo.configuration.remote.RemoteConfig import com.tencent.bkrepo.common.service.log.LoggerHolder import com.tencent.bkrepo.job.config.properties.RepoJobProperties import java.time.Duration +import kotlin.reflect.KClass abstract class DefaultRepoJob( properties: RepoJobProperties @@ -42,8 +43,8 @@ abstract class DefaultRepoJob( return listOf(COLLECTION_NAME) } - override fun entityClass(): Class { - return ProxyRepoData::class.java + override fun entityClass(): KClass { + return ProxyRepoData::class } override fun mapToEntity(row: Map): ProxyRepoData { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt index afc7030819..ae500e522e 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt @@ -49,12 +49,14 @@ import java.net.InetAddress import java.time.LocalDateTime import java.util.Collections import java.util.concurrent.CountDownLatch +import kotlin.reflect.KClass +import kotlin.reflect.full.declaredMemberProperties import kotlin.system.measureNanoTime /** * MongoDb抽象批处理作业Job * */ -abstract class MongoDbBatchJob( +abstract class MongoDbBatchJob( private val properties: MongodbJobProperties, ) : MongodbFailoverJob(properties) { /** @@ -80,7 +82,7 @@ abstract class MongoDbBatchJob( * */ abstract fun mapToEntity(row: Map): Entity - abstract fun entityClass(): Class + abstract fun entityClass(): KClass /** * 表执行结束回调 @@ -189,9 +191,9 @@ abstract class MongoDbBatchJob( .addCriteria(Criteria.where(ID).gt(lastId)) .limit(batchSize) .with(Sort.by(ID).ascending()) - entityClass().fields.forEach { - val filedName = if (it.name.equals(JAVA_ID)) ID else it.name - query.fields().include(filedName) + val fields = query.fields() + entityClass().declaredMemberProperties.forEach { + fields.include(it.name) } val data = mongoTemplate.find>( query, diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/DdcBlobCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/DdcBlobCleanupJob.kt index a05b41e8a2..e25abf6d0d 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/DdcBlobCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/DdcBlobCleanupJob.kt @@ -74,7 +74,7 @@ class DdcBlobCleanupJob( ) } - override fun entityClass() = Blob::class.java + override fun entityClass() = Blob::class override fun run(row: Blob, collectionName: String, context: JobContext) { nodeClient.deleteNode( diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/ExpiredDdcRefCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/ExpiredDdcRefCleanupJob.kt index bd74755bb4..b3455aaacb 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/ExpiredDdcRefCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ddc/ExpiredDdcRefCleanupJob.kt @@ -43,6 +43,7 @@ import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component import java.time.Duration import java.time.LocalDateTime +import kotlin.reflect.KClass @Component @EnableConfigurationProperties(ExpiredDdcRefCleanupJobProperties::class) @@ -72,7 +73,7 @@ class ExpiredDdcRefCleanupJob( ) } - override fun entityClass(): Class = Ref::class.java + override fun entityClass(): KClass = Ref::class override fun run(row: Ref, collectionName: String, context: JobContext) { // 清理过期ref diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt index 96d22c862d..da3697ac22 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt @@ -11,6 +11,7 @@ import org.springframework.data.redis.core.RedisTemplate import org.springframework.stereotype.Component import java.time.Duration import java.util.Date +import kotlin.reflect.KClass @Component @EnableConfigurationProperties(NodeStatCompositeMongoDbBatchJobProperties::class) @@ -28,7 +29,7 @@ class NodeStatCompositeMongoDbBatchJob( override fun mapToEntity(row: Map): Node = Node(row) - override fun entityClass(): Class = Node::class.java + override fun entityClass(): KClass = Node::class override fun createChildJobs(): List> { return listOf( diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/project/BkciProjectMetadataSyncJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/project/BkciProjectMetadataSyncJob.kt index 5eaac08c80..626ed641f8 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/project/BkciProjectMetadataSyncJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/project/BkciProjectMetadataSyncJob.kt @@ -127,7 +127,7 @@ class BkciProjectMetadataSyncJob( return Project(row[Project::name.name]!! as String, metadata) } - override fun entityClass() = Project::class.java + override fun entityClass() = Project::class companion object { private val logger = LoggerFactory.getLogger(BkciProjectMetadataSyncJob::class.java)