Skip to content

Commit

Permalink
优化GC相关功能 (TencentBlueKing#1615)
Browse files Browse the repository at this point in the history
* feat: 优化系统GC功能 TencentBlueKing#1600

* feat: 归档服务支持异步任务 TencentBlueKing#1600

* feat: 支持任务可取消 TencentBlueKing#1600

* feat: 优化压缩文件引用逻辑 TencentBlueKing#1600

* feat: 优化GC相关job TencentBlueKing#1600

* feat: merge master TencentBlueKing#1600

* feat: 节点聚类分析时,增加采样调查 TencentBlueKing#1600

* feat: 优化job db查询 TencentBlueKing#1600
  • Loading branch information
felixncheng authored Jan 4, 2024
1 parent 7476d57 commit 8ab0821
Show file tree
Hide file tree
Showing 51 changed files with 580 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ interface ArchiveClient {

@PutMapping("/compress/complete")
fun completeCompress(@RequestBody request: CompleteCompressRequest): Response<Void>

@DeleteMapping("/deleteAll")
fun deleteAll(@RequestBody request: ArchiveFileRequest): Response<Void>
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package com.tencent.bkrepo.archive.config

import com.tencent.bkrepo.common.security.http.core.HttpAuthSecurity
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import

@EnableConfigurationProperties(ArchiveProperties::class)
@Import(ArchiveShutdownConfiguration::class)
@Configuration
class ArchiveConfiguration
class ArchiveConfiguration {

@Bean
fun httpAuthSecurity(): HttpAuthSecurity {
return HttpAuthSecurity().withPrefix("/archive")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.tencent.bkrepo.archive.config

import com.tencent.bkrepo.archive.job.Cancellable
import com.tencent.bkrepo.common.service.shutdown.ServiceShutdownHook
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.ObjectProvider

/**
* 归档服务关机相关配置
* */
class ArchiveShutdownConfiguration(cancellable: ObjectProvider<Cancellable>) {

init {
ServiceShutdownHook.add {
cancellable.stream().forEach {
it.cancel()
logger.info("Shutdown job[${it.javaClass.simpleName}].")
}
}
}

companion object {
private val logger = LoggerFactory.getLogger(ArchiveShutdownConfiguration::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,15 @@ class ArchiveController(
compressService.complete(request)
return ResponseBuilder.success()
}

override fun deleteAll(request: ArchiveFileRequest): Response<Void> {
archiveService.delete(request)
val deleteCompressRequest = DeleteCompressRequest(
sha256 = request.sha256,
storageCredentialsKey = request.storageCredentialsKey,
operator = request.operator,
)
compressService.delete(deleteCompressRequest)
return ResponseBuilder.success()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.tencent.bkrepo.archive.controller.user

import com.tencent.bkrepo.archive.service.SystemAdminService
import com.tencent.bkrepo.common.security.permission.Principal
import com.tencent.bkrepo.common.security.permission.PrincipalType
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController

@Principal(PrincipalType.ADMIN)
@RestController
@RequestMapping("/api/archive/admin")
class SystemAdminController(
private val systemAdminService: SystemAdminService,
) {
@PutMapping("/stop")
fun stop(@RequestParam jobName: String) {
systemAdminService.stop(jobName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.tencent.bkrepo.archive.job

import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock

open class AsyncBaseJobSubscriber<T>(val executor: ThreadPoolExecutor) : BaseJobSubscriber<T>() {

private val lock: ReentrantLock = ReentrantLock()
private val executeComplete: Condition = lock.newCondition()
private val committedTask: AtomicInteger = AtomicInteger(0)
private val commitComplete: AtomicBoolean = AtomicBoolean(false)

override fun hookOnNext(value: T) {
committedTask.incrementAndGet()
executor.execute {
try {
super.hookOnNext(value)
} finally {
val committed = committedTask.decrementAndGet()
if (commitComplete.get() && committed == 0) {
lock.lock()
try {
executeComplete.signal()
} finally {
lock.unlock()
}
}
}
}
}

override fun hookOnComplete() {
commitComplete.set(true)
if (committedTask.get() != 0) {
lock.lock()
try {
executeComplete.await()
} finally {
lock.unlock()
}
}
super.hookOnComplete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ open class BaseJobSubscriber<T> : BaseSubscriber<T>() {
countDownLatch.countDown()
}

override fun hookOnCancel() {
LoggerHolder.jobLogger.info("Job[${getJobName()}] cancelled.")
}

override fun currentContext(): Context {
return Context.of(JOB_NAME, getJobName(), JOB_CTX, jobContext)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.tencent.bkrepo.archive.job

/**
* 可取消的操作
* */
interface Cancellable {
/**
* 取消
* */
fun cancel()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.archive.ArchiveStatus
import com.tencent.bkrepo.archive.config.ArchiveProperties
import com.tencent.bkrepo.archive.extensions.key
import com.tencent.bkrepo.archive.job.Cancellable
import com.tencent.bkrepo.archive.model.TArchiveFile
import com.tencent.bkrepo.archive.repository.ArchiveFileRepository
import com.tencent.bkrepo.archive.utils.ArchiveUtils.Companion.newFixedAndCachedThreadPool
Expand Down Expand Up @@ -38,7 +39,7 @@ class ArchiveJob(
private val archiveProperties: ArchiveProperties,
private val storageService: StorageService,
private val archiveFileRepository: ArchiveFileRepository,
) {
) : Cancellable {

/**
* 归档cos实例
Expand Down Expand Up @@ -89,6 +90,8 @@ class ArchiveJob(
ThreadFactoryBuilder().setNameFormat("archive-upload-%d").build(),
)

private var subscriber: ArchiveSubscriber? = null

init {
/*
* 磁盘容量监控
Expand Down Expand Up @@ -155,7 +158,13 @@ class ArchiveJob(
.runOn(Schedulers.fromExecutor(httpUploadPool), prefetch)
.flatMap(uploader::onArchiveFileWrapper) // 上传
.subscribe(subscriber)
this.subscriber = subscriber
subscriber.blockLast()
this.subscriber = null
}

override fun cancel() {
subscriber?.dispose()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.tencent.bkrepo.archive.job.archive

import com.tencent.bkrepo.archive.ArchiveStatus
import com.tencent.bkrepo.archive.config.ArchiveProperties
import com.tencent.bkrepo.archive.job.Cancellable
import com.tencent.bkrepo.archive.model.TArchiveFile
import com.tencent.bkrepo.archive.repository.ArchiveFileDao
import com.tencent.bkrepo.archive.repository.ArchiveFileRepository
Expand All @@ -25,9 +26,11 @@ class RestoreJob(
private val storageService: StorageService,
private val archiveProperties: ArchiveProperties,
private val archiveFileDao: ArchiveFileDao,
) {
) : Cancellable {
private val cosClient = CosClient(archiveProperties.cos)

private var subscriber: RestoreSubscriber? = null

/**
* 获取待归档文件列表
* */
Expand All @@ -48,6 +51,12 @@ class RestoreJob(
archiveProperties.workDir,
)
listFiles().subscribe(subscriber)
this.subscriber = subscriber
subscriber.blockLast()
this.subscriber = null
}

override fun cancel() {
subscriber?.dispose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,36 @@ package com.tencent.bkrepo.archive.job.compress
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.archive.CompressStatus
import com.tencent.bkrepo.archive.config.ArchiveProperties
import com.tencent.bkrepo.archive.job.Cancellable
import com.tencent.bkrepo.archive.model.TCompressFile
import com.tencent.bkrepo.archive.repository.CompressFileDao
import com.tencent.bkrepo.archive.repository.CompressFileRepository
import com.tencent.bkrepo.archive.utils.ArchiveUtils.Companion.newFixedAndCachedThreadPool
import com.tencent.bkrepo.archive.utils.ReactiveDaoUtils
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.repository.api.FileReferenceClient
import java.util.concurrent.TimeUnit
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.where
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers

@Component
class CompressJob(
private val archiveProperties: ArchiveProperties,
private val compressFileDao: CompressFileDao,
private val storageService: StorageService,
private val compressFileRepository: CompressFileRepository,
) {
private val fileReferenceClient: FileReferenceClient,
) : Cancellable {

private val compressThreadPool = newFixedAndCachedThreadPool(
archiveProperties.ioThreads,
ThreadFactoryBuilder().setNameFormat("storage-compress-%d").build(),
)
private var subscriber: CompressSubscriber? = null

fun listFiles(): Flux<TCompressFile> {
val criteria = where(TCompressFile::status).isEqualTo(CompressStatus.CREATED)
Expand All @@ -40,10 +43,20 @@ class CompressJob(

@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.HOURS)
fun compress() {
val subscriber = CompressSubscriber(compressFileDao, compressFileRepository, storageService)
listFiles().parallel()
.runOn(Schedulers.fromExecutor(compressThreadPool))
.subscribe(subscriber)
val subscriber = CompressSubscriber(
compressFileDao,
compressFileRepository,
storageService,
fileReferenceClient,
compressThreadPool,
)
listFiles().subscribe(subscriber)
this.subscriber = subscriber
subscriber.blockLast()
this.subscriber = null
}

override fun cancel() {
subscriber?.dispose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.tencent.bkrepo.archive.job.compress

import com.tencent.bkrepo.archive.CompressStatus
import com.tencent.bkrepo.archive.event.StorageFileCompressedEvent
import com.tencent.bkrepo.archive.job.BaseJobSubscriber
import com.tencent.bkrepo.archive.job.AsyncBaseJobSubscriber
import com.tencent.bkrepo.archive.model.TCompressFile
import com.tencent.bkrepo.archive.repository.CompressFileDao
import com.tencent.bkrepo.archive.repository.CompressFileRepository
Expand All @@ -13,14 +13,18 @@ import com.tencent.bkrepo.common.service.util.SpringContextUtils
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.innercos.retry
import com.tencent.bkrepo.common.storage.monitor.measureThroughput
import com.tencent.bkrepo.repository.api.FileReferenceClient
import org.slf4j.LoggerFactory
import java.time.LocalDateTime
import java.util.concurrent.ThreadPoolExecutor

class CompressSubscriber(
private val compressFileDao: CompressFileDao,
private val compressFileRepository: CompressFileRepository,
private val storageService: StorageService,
) : BaseJobSubscriber<TCompressFile>() {
private val fileReferenceClient: FileReferenceClient,
executor: ThreadPoolExecutor,
) : AsyncBaseJobSubscriber<TCompressFile>(executor) {

override fun doOnNext(value: TCompressFile) {
with(value) {
Expand Down Expand Up @@ -68,18 +72,23 @@ class CompressSubscriber(
SpringContextUtils.publishEvent(event)
} catch (e: TooLowerReuseRateException) {
logger.info("Reuse rate is too lower.")
value.status = CompressStatus.COMPRESS_FAILED
value.lastModifiedDate = LocalDateTime.now()
compressFileRepository.save(value)
compressFailed(value)
} catch (e: Exception) {
value.status = CompressStatus.COMPRESS_FAILED
value.lastModifiedDate = LocalDateTime.now()
compressFileRepository.save(value)
compressFailed(value)
throw e
}
}
}

private fun compressFailed(file: TCompressFile) {
with(file) {
status = CompressStatus.COMPRESS_FAILED
lastModifiedDate = LocalDateTime.now()
compressFileRepository.save(file)
fileReferenceClient.decrement(baseSha256, storageCredentialsKey)
}
}

override fun getSize(value: TCompressFile): Long {
return value.uncompressedSize
}
Expand Down
Loading

0 comments on commit 8ab0821

Please sign in to comment.