diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt index 430b4df57a..b7594b54b4 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJob.kt @@ -27,6 +27,8 @@ package com.tencent.bkrepo.job.batch +import com.google.common.hash.BloomFilter +import com.google.common.hash.Funnels import com.tencent.bkrepo.archive.api.ArchiveClient import com.tencent.bkrepo.archive.request.ArchiveFileRequest import com.tencent.bkrepo.archive.request.DeleteCompressRequest @@ -36,10 +38,12 @@ import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import com.tencent.bkrepo.job.COUNT import com.tencent.bkrepo.job.CREDENTIALS +import com.tencent.bkrepo.job.FOLDER import com.tencent.bkrepo.job.SHA256 import com.tencent.bkrepo.job.SHARDING_COUNT import com.tencent.bkrepo.job.batch.base.MongoDbBatchJob import com.tencent.bkrepo.job.batch.context.FileJobContext +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties import com.tencent.bkrepo.job.exception.JobExecuteException import com.tencent.bkrepo.repository.api.StorageCredentialsClient @@ -50,6 +54,7 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.data.mongodb.core.query.where import org.springframework.stereotype.Component +import java.nio.charset.StandardCharsets import java.time.Duration import java.util.concurrent.ConcurrentHashMap import kotlin.reflect.KClass @@ -59,14 +64,23 @@ import kotlin.reflect.KClass */ @Component @EnableConfigurationProperties(FileReferenceCleanupJobProperties::class) +@Suppress("UnstableApiUsage") class FileReferenceCleanupJob( private val storageService: StorageService, private val storageCredentialsClient: StorageCredentialsClient, - properties: FileReferenceCleanupJobProperties, + private val properties: FileReferenceCleanupJobProperties, private val archiveClient: ArchiveClient, ) : MongoDbBatchJob(properties) { + /** + * 节点的布隆过滤器,用于快速判断sha256的节点是否存在 + * 因跟踪节点的删除不方便,且布隆过滤器没有重置功能,所以这里每次任务开始前都会新建一个布隆过滤器, + * 如果设置的预期节点很多,可能会导致较多的gc甚至oom。 + * */ + private lateinit var bf: BloomFilter + override fun start(): Boolean { + bf = buildBloomFilter() return super.start() } @@ -95,7 +109,8 @@ class FileReferenceCleanupJob( val storageCredentials = credentialsKey?.let { getCredentials(credentialsKey) } try { if (sha256.isNotBlank() && storageService.exist(sha256, storageCredentials)) { - if (existNode(sha256)) { + if (existNode(sha256, credentialsKey)) { + logger.warn("Dirty sha256[$sha256] exists on $credentialsKey") return } storageService.delete(sha256, storageCredentials) @@ -114,17 +129,34 @@ class FileReferenceCleanupJob( /** * 检查Node表中是否还存在对应sha256的node + * @return true表示存在节点,false表示不存在 */ - private fun existNode(sha256: String): Boolean { - (0 until SHARDING_COUNT).forEach { - val query = Query(where(Node::sha256).isEqualTo(sha256)) - val exist = mongoTemplate.findOne(query, Node::class.java, COLLECTION_NODE_PREFIX + it) != null - if (exist) { - logger.info("sha256[$sha256] still has existed node in collection[$it]") - return true + private fun existNode(sha256: String, key: String?): Boolean { + /* + * 1. 通过布隆过滤器,快速判断节点是否存在。(大部分判断应该在这里终止,即大部分引用是正确的) + * 2. 真实判断存储实例的节点是否存在。(引用不正确的情况或者布隆过滤器的误报) + * */ + val query = Query(where(Node::sha256).isEqualTo(sha256)) + return bf.mightContain(sha256) && NodeCommonUtils.findNodes(query, key).isNotEmpty() + } + + private fun buildBloomFilter(): BloomFilter { + logger.info("Start build bloom filter.") + val bf = BloomFilter.create( + Funnels.stringFunnel(StandardCharsets.UTF_8), + properties.expectedNodes, + properties.fpp, + ) + val query = Query(Criteria.where(FOLDER).isEqualTo(false)) + query.fields().include(SHA256) + NodeCommonUtils.forEachNode(query) { + val sha256 = it[SHA256]?.toString() + if (sha256 != null) { + bf.put(sha256) } } - return false + logger.info("Build bloom filter successful,count: ${bf.approximateElementCount()},fpp: ${bf.expectedFpp()}") + return bf } private fun getCredentials(key: String): StorageCredentials? { @@ -152,7 +184,6 @@ class FileReferenceCleanupJob( companion object { private val logger = LoggerHolder.jobLogger private const val COLLECTION_NAME_PREFIX = "file_reference_" - private const val COLLECTION_NODE_PREFIX = "node_" private const val COMPRESS_FILE_COLLECTION = "compress_file" private const val ARCHIVE_FILE_COLLECTION = "archive_file" private const val STORAGE_CREDENTIALS = "storageCredentialsKey" @@ -166,7 +197,7 @@ class FileReferenceCleanupJob( data class Node( val id: String, - val sha256: String?, + val sha256: String, ) override fun mapToEntity(row: Map): FileReferenceData { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt index bb2cccafaa..88d5275b2f 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt @@ -1,11 +1,19 @@ package com.tencent.bkrepo.job.batch.utils +import com.tencent.bkrepo.common.mongo.constant.ID +import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID import com.tencent.bkrepo.common.mongo.dao.util.sharding.HashShardingUtils +import com.tencent.bkrepo.job.BATCH_SIZE import com.tencent.bkrepo.job.SHARDING_COUNT +import org.bson.types.ObjectId +import org.springframework.data.domain.Sort import java.time.LocalDateTime import org.springframework.data.mongodb.core.MongoTemplate +import org.springframework.data.mongodb.core.find +import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.stereotype.Component +import java.util.function.Consumer @Component class NodeCommonUtils( @@ -41,6 +49,29 @@ class NodeCommonUtils( return nodes } + fun forEachNode(query: Query, batchSize: Int = BATCH_SIZE, consumer: Consumer>) { + var querySize: Int + var lastId = ObjectId(MIN_OBJECT_ID) + (0 until SHARDING_COUNT).map { "$COLLECTION_NAME_PREFIX$it" }.forEach { collection -> + do { + val newQuery = Query.of(query) + .addCriteria(Criteria.where(ID).gt(lastId)) + .limit(batchSize) + .with(Sort.by(ID).ascending()) + val data = mongoTemplate.find>( + newQuery, + collection, + ) + if (data.isEmpty()) { + break + } + data.forEach { consumer.accept(it) } + querySize = data.size + lastId = data.last()[ID] as ObjectId + } while (querySize == batchSize) + } + } + fun collectionNames(projectIds: List): List { val collectionNames = mutableListOf() if (projectIds.isNotEmpty()) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/FileReferenceCleanupJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/FileReferenceCleanupJobProperties.kt index 265bf89f63..2ca8281ce8 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/FileReferenceCleanupJobProperties.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/FileReferenceCleanupJobProperties.kt @@ -31,5 +31,14 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties("job.file-reference-cleanup") class FileReferenceCleanupJobProperties( - override var cron: String = "0 0 4/6 * * ?" + override var cron: String = "0 0 4/6 * * ?", + /** + * 预期系统节点数 + * */ + var expectedNodes: Long = 100_000_000, // 1e + /** + * 布隆过滤器的误报率。 + * 误报率较高,会导致更多的数据库查询,但不影响节点清理的正确性,误报率越低,消耗的内存越大。 + * */ + var fpp: Double = 0.0001, ) : MongodbJobProperties() diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt index f6b2fc9b4e..17b619251a 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/FileReferenceCleanupJobTest.kt @@ -28,13 +28,21 @@ package com.tencent.bkrepo.job.batch import com.tencent.bkrepo.archive.api.ArchiveClient +import com.tencent.bkrepo.common.artifact.pojo.RepositoryCategory +import com.tencent.bkrepo.common.artifact.pojo.RepositoryType +import com.tencent.bkrepo.common.artifact.pojo.configuration.local.LocalConfiguration import com.tencent.bkrepo.common.service.util.ResponseBuilder import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.credentials.InnerCosCredentials import com.tencent.bkrepo.job.SHARDING_COUNT +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.batch.utils.RepositoryCommonUtils +import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties import com.tencent.bkrepo.job.repository.JobSnapshotRepository +import com.tencent.bkrepo.repository.api.RepositoryClient import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import com.tencent.bkrepo.repository.pojo.repo.RepositoryDetail import io.mockk.every import io.mockk.mockk import io.mockk.mockkObject @@ -68,6 +76,10 @@ class FileReferenceCleanupJobTest : JobBaseTest() { @MockBean lateinit var archiveClient: ArchiveClient + + @MockBean + lateinit var repositoryClient: RepositoryClient + @MockBean lateinit var jobSnapshotRepository: JobSnapshotRepository @@ -77,19 +89,49 @@ class FileReferenceCleanupJobTest : JobBaseTest() { @Autowired lateinit var fileReferenceCleanupJob: FileReferenceCleanupJob + @Autowired + lateinit var nodeCommonUtils: NodeCommonUtils + + @Autowired + lateinit var repositoryCommonUtils: RepositoryCommonUtils + + @Autowired + lateinit var fileReferenceCleanupJobProperties: FileReferenceCleanupJobProperties + @BeforeEach fun beforeEach() { val tracer = mockk() mockkObject(SpringContextUtils.Companion) every { SpringContextUtils.getBean() } returns tracer every { tracer.currentSpan() } returns null + every { SpringContextUtils.publishEvent(any()) } returns Unit Mockito.`when`(storageService.exist(anyString(), any())).thenReturn(true) val credentials = InnerCosCredentials() Mockito.`when`(storageCredentialsClient.findByKey(anyString())).thenReturn( ResponseBuilder.success(credentials), ) - mockkObject(SpringContextUtils) - every { SpringContextUtils.publishEvent(any()) } returns Unit + Mockito.`when`(repositoryClient.getRepoDetail(anyString(), anyString(), anyString())).thenReturn( + ResponseBuilder.success( + RepositoryDetail( + projectId = "ut-project", + name = "ut-repo", + storageCredentials = InnerCosCredentials(key = "0"), + type = RepositoryType.NONE, + category = RepositoryCategory.LOCAL, + public = false, + description = "", + configuration = LocalConfiguration(), + createdBy = "", + createdDate = "", + lastModifiedBy = "", + lastModifiedDate = "", + oldCredentialsKey = null, + quota = 0, + used = 0, + ), + ), + ) + fileReferenceCleanupJobProperties.expectedNodes = 50_000 } @AfterEach @@ -97,6 +139,8 @@ class FileReferenceCleanupJobTest : JobBaseTest() { fileReferenceCleanupJob.collectionNames().forEach { mongoTemplate.remove(Query(), it) } + mongoTemplate.remove(Query(), "node_0") + mongoTemplate.remove(Query(), "shed_lock") } @DisplayName("测试正常运行") @@ -132,6 +176,34 @@ class FileReferenceCleanupJobTest : JobBaseTest() { Assertions.assertEquals(num, deleted.get()) } + @DisplayName("测试错误引用数据清理") + @Test + fun errorRefTest() { + val num = 1000 + insertMany(num, fileReferenceCleanupJob.collectionNames().first()) + // 新增一个节点,制造引用错误 + val doc = Document( + mutableMapOf( + "sha256" to "0", + "folder" to false, + "projectId" to "ut-project", + "repoName" to "ut-repo", + "size" to 1L, + "fullPath" to "/test", + ) as Map?, + ) + mongoTemplate.insert( + doc, + "node_0", + ) + val deleted = AtomicInteger() + Mockito.`when`(storageService.delete(anyString(), any())).then { + deleted.incrementAndGet() + } + fileReferenceCleanupJob.start() + Assertions.assertEquals(num - 1, deleted.get()) + } + private fun insertMany(num: Int, collectionName: String) { (0 until num).forEach { val doc = Document(