Skip to content

Commit

Permalink
impr: 优化清理引用任务中的节点判存逻辑 TencentBlueKing#1777
Browse files Browse the repository at this point in the history
* impr: 优化清理引用任务中的节点判存逻辑 TencentBlueKing#1777

* #impr: 补充单元测试 TencentBlueKing#1777
  • Loading branch information
felixncheng authored Feb 29, 2024
1 parent 93c548f commit 573e5fc
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

package com.tencent.bkrepo.job.batch

import com.google.common.hash.BloomFilter
import com.google.common.hash.Funnels
import com.tencent.bkrepo.archive.api.ArchiveClient
import com.tencent.bkrepo.archive.request.ArchiveFileRequest
import com.tencent.bkrepo.archive.request.DeleteCompressRequest
Expand All @@ -36,10 +38,12 @@ import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.job.COUNT
import com.tencent.bkrepo.job.CREDENTIALS
import com.tencent.bkrepo.job.FOLDER
import com.tencent.bkrepo.job.SHA256
import com.tencent.bkrepo.job.SHARDING_COUNT
import com.tencent.bkrepo.job.batch.base.MongoDbBatchJob
import com.tencent.bkrepo.job.batch.context.FileJobContext
import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils
import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties
import com.tencent.bkrepo.job.exception.JobExecuteException
import com.tencent.bkrepo.repository.api.StorageCredentialsClient
Expand All @@ -50,6 +54,7 @@ import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.where
import org.springframework.stereotype.Component
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KClass
Expand All @@ -59,14 +64,23 @@ import kotlin.reflect.KClass
*/
@Component
@EnableConfigurationProperties(FileReferenceCleanupJobProperties::class)
@Suppress("UnstableApiUsage")
class FileReferenceCleanupJob(
private val storageService: StorageService,
private val storageCredentialsClient: StorageCredentialsClient,
properties: FileReferenceCleanupJobProperties,
private val properties: FileReferenceCleanupJobProperties,
private val archiveClient: ArchiveClient,
) : MongoDbBatchJob<FileReferenceCleanupJob.FileReferenceData, FileJobContext>(properties) {

/**
* 节点的布隆过滤器,用于快速判断sha256的节点是否存在
* 因跟踪节点的删除不方便,且布隆过滤器没有重置功能,所以这里每次任务开始前都会新建一个布隆过滤器,
* 如果设置的预期节点很多,可能会导致较多的gc甚至oom。
* */
private lateinit var bf: BloomFilter<CharSequence>

override fun start(): Boolean {
bf = buildBloomFilter()
return super.start()
}

Expand Down Expand Up @@ -95,7 +109,8 @@ class FileReferenceCleanupJob(
val storageCredentials = credentialsKey?.let { getCredentials(credentialsKey) }
try {
if (sha256.isNotBlank() && storageService.exist(sha256, storageCredentials)) {
if (existNode(sha256)) {
if (existNode(sha256, credentialsKey)) {
logger.warn("Dirty sha256[$sha256] exists on $credentialsKey")
return
}
storageService.delete(sha256, storageCredentials)
Expand All @@ -114,17 +129,34 @@ class FileReferenceCleanupJob(

/**
* 检查Node表中是否还存在对应sha256的node
* @return true表示存在节点,false表示不存在
*/
private fun existNode(sha256: String): Boolean {
(0 until SHARDING_COUNT).forEach {
val query = Query(where(Node::sha256).isEqualTo(sha256))
val exist = mongoTemplate.findOne(query, Node::class.java, COLLECTION_NODE_PREFIX + it) != null
if (exist) {
logger.info("sha256[$sha256] still has existed node in collection[$it]")
return true
private fun existNode(sha256: String, key: String?): Boolean {
/*
* 1. 通过布隆过滤器,快速判断节点是否存在。(大部分判断应该在这里终止,即大部分引用是正确的)
* 2. 真实判断存储实例的节点是否存在。(引用不正确的情况或者布隆过滤器的误报)
* */
val query = Query(where(Node::sha256).isEqualTo(sha256))
return bf.mightContain(sha256) && NodeCommonUtils.findNodes(query, key).isNotEmpty()
}

private fun buildBloomFilter(): BloomFilter<CharSequence> {
logger.info("Start build bloom filter.")
val bf = BloomFilter.create(
Funnels.stringFunnel(StandardCharsets.UTF_8),
properties.expectedNodes,
properties.fpp,
)
val query = Query(Criteria.where(FOLDER).isEqualTo(false))
query.fields().include(SHA256)
NodeCommonUtils.forEachNode(query) {
val sha256 = it[SHA256]?.toString()
if (sha256 != null) {
bf.put(sha256)
}
}
return false
logger.info("Build bloom filter successful,count: ${bf.approximateElementCount()},fpp: ${bf.expectedFpp()}")
return bf
}

private fun getCredentials(key: String): StorageCredentials? {
Expand Down Expand Up @@ -152,7 +184,6 @@ class FileReferenceCleanupJob(
companion object {
private val logger = LoggerHolder.jobLogger
private const val COLLECTION_NAME_PREFIX = "file_reference_"
private const val COLLECTION_NODE_PREFIX = "node_"
private const val COMPRESS_FILE_COLLECTION = "compress_file"
private const val ARCHIVE_FILE_COLLECTION = "archive_file"
private const val STORAGE_CREDENTIALS = "storageCredentialsKey"
Expand All @@ -166,7 +197,7 @@ class FileReferenceCleanupJob(

data class Node(
val id: String,
val sha256: String?,
val sha256: String,
)

override fun mapToEntity(row: Map<String, Any?>): FileReferenceData {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package com.tencent.bkrepo.job.batch.utils

import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID
import com.tencent.bkrepo.common.mongo.dao.util.sharding.HashShardingUtils
import com.tencent.bkrepo.job.BATCH_SIZE
import com.tencent.bkrepo.job.SHARDING_COUNT
import org.bson.types.ObjectId
import org.springframework.data.domain.Sort
import java.time.LocalDateTime
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.find
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class NodeCommonUtils(
Expand Down Expand Up @@ -41,6 +49,29 @@ class NodeCommonUtils(
return nodes
}

fun forEachNode(query: Query, batchSize: Int = BATCH_SIZE, consumer: Consumer<Map<String, Any?>>) {
var querySize: Int
var lastId = ObjectId(MIN_OBJECT_ID)
(0 until SHARDING_COUNT).map { "$COLLECTION_NAME_PREFIX$it" }.forEach { collection ->
do {
val newQuery = Query.of(query)
.addCriteria(Criteria.where(ID).gt(lastId))
.limit(batchSize)
.with(Sort.by(ID).ascending())
val data = mongoTemplate.find<Map<String, Any?>>(
newQuery,
collection,
)
if (data.isEmpty()) {
break
}
data.forEach { consumer.accept(it) }
querySize = data.size
lastId = data.last()[ID] as ObjectId
} while (querySize == batchSize)
}
}

fun collectionNames(projectIds: List<String>): List<String> {
val collectionNames = mutableListOf<String>()
if (projectIds.isNotEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,14 @@ import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties("job.file-reference-cleanup")
class FileReferenceCleanupJobProperties(
override var cron: String = "0 0 4/6 * * ?"
override var cron: String = "0 0 4/6 * * ?",
/**
* 预期系统节点数
* */
var expectedNodes: Long = 100_000_000, // 1e
/**
* 布隆过滤器的误报率。
* 误报率较高,会导致更多的数据库查询,但不影响节点清理的正确性,误报率越低,消耗的内存越大。
* */
var fpp: Double = 0.0001,
) : MongodbJobProperties()
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,21 @@
package com.tencent.bkrepo.job.batch

import com.tencent.bkrepo.archive.api.ArchiveClient
import com.tencent.bkrepo.common.artifact.pojo.RepositoryCategory
import com.tencent.bkrepo.common.artifact.pojo.RepositoryType
import com.tencent.bkrepo.common.artifact.pojo.configuration.local.LocalConfiguration
import com.tencent.bkrepo.common.service.util.ResponseBuilder
import com.tencent.bkrepo.common.service.util.SpringContextUtils
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.credentials.InnerCosCredentials
import com.tencent.bkrepo.job.SHARDING_COUNT
import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils
import com.tencent.bkrepo.job.batch.utils.RepositoryCommonUtils
import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties
import com.tencent.bkrepo.job.repository.JobSnapshotRepository
import com.tencent.bkrepo.repository.api.RepositoryClient
import com.tencent.bkrepo.repository.api.StorageCredentialsClient
import com.tencent.bkrepo.repository.pojo.repo.RepositoryDetail
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkObject
Expand Down Expand Up @@ -68,6 +76,10 @@ class FileReferenceCleanupJobTest : JobBaseTest() {

@MockBean
lateinit var archiveClient: ArchiveClient

@MockBean
lateinit var repositoryClient: RepositoryClient

@MockBean
lateinit var jobSnapshotRepository: JobSnapshotRepository

Expand All @@ -77,26 +89,58 @@ class FileReferenceCleanupJobTest : JobBaseTest() {
@Autowired
lateinit var fileReferenceCleanupJob: FileReferenceCleanupJob

@Autowired
lateinit var nodeCommonUtils: NodeCommonUtils

@Autowired
lateinit var repositoryCommonUtils: RepositoryCommonUtils

@Autowired
lateinit var fileReferenceCleanupJobProperties: FileReferenceCleanupJobProperties

@BeforeEach
fun beforeEach() {
val tracer = mockk<OtelTracer>()
mockkObject(SpringContextUtils.Companion)
every { SpringContextUtils.getBean<Tracer>() } returns tracer
every { tracer.currentSpan() } returns null
every { SpringContextUtils.publishEvent(any()) } returns Unit
Mockito.`when`(storageService.exist(anyString(), any())).thenReturn(true)
val credentials = InnerCosCredentials()
Mockito.`when`(storageCredentialsClient.findByKey(anyString())).thenReturn(
ResponseBuilder.success(credentials),
)
mockkObject(SpringContextUtils)
every { SpringContextUtils.publishEvent(any()) } returns Unit
Mockito.`when`(repositoryClient.getRepoDetail(anyString(), anyString(), anyString())).thenReturn(
ResponseBuilder.success(
RepositoryDetail(
projectId = "ut-project",
name = "ut-repo",
storageCredentials = InnerCosCredentials(key = "0"),
type = RepositoryType.NONE,
category = RepositoryCategory.LOCAL,
public = false,
description = "",
configuration = LocalConfiguration(),
createdBy = "",
createdDate = "",
lastModifiedBy = "",
lastModifiedDate = "",
oldCredentialsKey = null,
quota = 0,
used = 0,
),
),
)
fileReferenceCleanupJobProperties.expectedNodes = 50_000
}

@AfterEach
fun afterEach() {
fileReferenceCleanupJob.collectionNames().forEach {
mongoTemplate.remove(Query(), it)
}
mongoTemplate.remove(Query(), "node_0")
mongoTemplate.remove(Query(), "shed_lock")
}

@DisplayName("测试正常运行")
Expand Down Expand Up @@ -132,6 +176,34 @@ class FileReferenceCleanupJobTest : JobBaseTest() {
Assertions.assertEquals(num, deleted.get())
}

@DisplayName("测试错误引用数据清理")
@Test
fun errorRefTest() {
val num = 1000
insertMany(num, fileReferenceCleanupJob.collectionNames().first())
// 新增一个节点,制造引用错误
val doc = Document(
mutableMapOf(
"sha256" to "0",
"folder" to false,
"projectId" to "ut-project",
"repoName" to "ut-repo",
"size" to 1L,
"fullPath" to "/test",
) as Map<String, Any>?,
)
mongoTemplate.insert(
doc,
"node_0",
)
val deleted = AtomicInteger()
Mockito.`when`(storageService.delete(anyString(), any())).then {
deleted.incrementAndGet()
}
fileReferenceCleanupJob.start()
Assertions.assertEquals(num - 1, deleted.get())
}

private fun insertMany(num: Int, collectionName: String) {
(0 until num).forEach {
val doc = Document(
Expand Down

0 comments on commit 573e5fc

Please sign in to comment.