Skip to content

Commit

Permalink
impr: 优化引用清理任务 TencentBlueKing#1877
Browse files Browse the repository at this point in the history
  • Loading branch information
felixncheng authored Mar 14, 2024
1 parent 2bb415e commit a9c45d2
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import com.tencent.bkrepo.job.batch.context.FileJobContext
import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils
import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties
import com.tencent.bkrepo.job.exception.JobExecuteException
import com.tencent.bkrepo.repository.api.FileReferenceClient
import com.tencent.bkrepo.repository.api.StorageCredentialsClient
import com.tencent.bkrepo.repository.constant.SYSTEM_USER
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand All @@ -73,7 +72,6 @@ class FileReferenceCleanupJob(
private val storageCredentialsClient: StorageCredentialsClient,
private val properties: FileReferenceCleanupJobProperties,
private val archiveClient: ArchiveClient,
private val fileReferenceClient: FileReferenceClient,
) : MongoDbBatchJob<FileReferenceCleanupJob.FileReferenceData, FileJobContext>(properties) {

/**
Expand Down Expand Up @@ -166,7 +164,11 @@ class FileReferenceCleanupJob(
* 2. 真实判断存储实例的节点是否存在。(引用不正确的情况或者布隆过滤器的误报)
* */
val query = Query(where(Node::sha256).isEqualTo(sha256))
return bf.mightContain(sha256) && NodeCommonUtils.findNodes(query, key).isNotEmpty()
val mightContain = bf.mightContain(sha256)
if (mightContain) {
logger.info("Bloom filter might contain $sha256.")
}
return mightContain && NodeCommonUtils.exist(query, key)
}

private fun buildBloomFilter(): BloomFilter<CharSequence> {
Expand All @@ -178,7 +180,7 @@ class FileReferenceCleanupJob(
)
val query = Query(Criteria.where(FOLDER).isEqualTo(false))
query.fields().include(SHA256)
NodeCommonUtils.forEachNode(query) {
NodeCommonUtils.forEachNodeByCollectionParallel(query) {
val sha256 = it[SHA256]?.toString()
if (sha256 != null) {
bf.put(sha256)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.bkrepo.job.batch.utils

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID
import com.tencent.bkrepo.common.mongo.dao.util.sharding.HashShardingUtils
Expand All @@ -13,6 +14,10 @@ import org.springframework.data.mongodb.core.find
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.stereotype.Component
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Consumer

@Component
Expand All @@ -37,6 +42,15 @@ class NodeCommonUtils(
companion object {
lateinit var mongoTemplate: MongoTemplate
private const val COLLECTION_NAME_PREFIX = "node_"
private val workPool = ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1L,
TimeUnit.MINUTES,
ArrayBlockingQueue(DEFAULT_BUFFER_SIZE),
ThreadFactoryBuilder().setNameFormat("node-utils-%d").build(),
)

fun findNodes(query: Query, storageCredentialsKey: String?): List<Node> {
val nodes = mutableListOf<Node>()
(0 until SHARDING_COUNT).map { "$COLLECTION_NAME_PREFIX$it" }.forEach { collection ->
Expand All @@ -49,27 +63,65 @@ class NodeCommonUtils(
return nodes
}

fun forEachNode(query: Query, batchSize: Int = BATCH_SIZE, consumer: Consumer<Map<String, Any?>>) {
var querySize: Int
(0 until SHARDING_COUNT).map { "$COLLECTION_NAME_PREFIX$it" }.forEach { collection ->
var lastId = ObjectId(MIN_OBJECT_ID)
do {
val newQuery = Query.of(query)
.addCriteria(Criteria.where(ID).gt(lastId))
.limit(batchSize)
.with(Sort.by(ID).ascending())
val data = mongoTemplate.find<Map<String, Any?>>(
newQuery,
collection,
)
if (data.isEmpty()) {
break
fun exist(query: Query, storageCredentialsKey: String?): Boolean {
for (i in 0 until SHARDING_COUNT) {
val collection = COLLECTION_NAME_PREFIX.plus(i)
val find = mongoTemplate.find(query, Node::class.java, collection)
.distinctBy { it.projectId + it.repoName }
.any {
val repo = RepositoryCommonUtils.getRepositoryDetail(it.projectId, it.repoName)
repo.storageCredentials?.key == storageCredentialsKey
}
data.forEach { consumer.accept(it) }
querySize = data.size
lastId = data.last()[ID] as ObjectId
} while (querySize == batchSize)
if (find) {
return true
}
}
return false
}

fun forEachNodeByCollectionParallel(
query: Query,
batchSize: Int = BATCH_SIZE,
consumer: Consumer<Map<String, Any?>>,
) {
val countDownLatch = CountDownLatch(SHARDING_COUNT)
for (i in 0 until SHARDING_COUNT) {
val collection = COLLECTION_NAME_PREFIX.plus(i)
workPool.execute {
try {
findByCollection(query, batchSize, collection, consumer)
} finally {
countDownLatch.countDown()
}
}
}
countDownLatch.await()
}

private fun findByCollection(
query: Query,
batchSize: Int,
collection: String,
consumer: Consumer<Map<String, Any?>>,
) {
var querySize: Int
var lastId = ObjectId(MIN_OBJECT_ID)
do {
val newQuery = Query.of(query)
.addCriteria(Criteria.where(ID).gt(lastId))
.limit(batchSize)
.with(Sort.by(ID).ascending())
val data = mongoTemplate.find<Map<String, Any?>>(
newQuery,
collection,
)
if (data.isEmpty()) {
break
}
data.forEach { consumer.accept(it) }
querySize = data.size
lastId = data.last()[ID] as ObjectId
} while (querySize == batchSize)
}

fun collectionNames(projectIds: List<String>): List<String> {
Expand Down

0 comments on commit a9c45d2

Please sign in to comment.