Skip to content

Commit

Permalink
feat: Job任务支持故障转移 TencentBlueKing#1563
Browse files Browse the repository at this point in the history
  • Loading branch information
felixncheng committed Dec 26, 2023
1 parent 9659fdf commit 691aa8f
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import com.tencent.bkrepo.common.service.util.SpringContextUtils
import com.tencent.bkrepo.job.config.properties.BatchJobProperties
import com.tencent.bkrepo.job.listener.event.TaskExecutedEvent
import net.javacrumbs.shedlock.core.LockConfiguration
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.core.LockingTaskExecutor
import net.javacrumbs.shedlock.core.SimpleLock
import org.springframework.beans.factory.annotation.Autowired
import java.time.Duration
import java.time.LocalDateTime
Expand All @@ -42,7 +44,7 @@ import kotlin.system.measureNanoTime
/**
* 抽象批处理作业Job
* */
abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobProperties) {
abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobProperties) : FailoverJob {
/**
* 锁名称
*/
Expand All @@ -64,6 +66,12 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
@Volatile
private var stop = true

/**
* Job是否正在运行
* */
@Volatile
private var inProcess = false

/**
* 是否排他执行,如果是则会加分布式锁
* */
Expand Down Expand Up @@ -93,6 +101,10 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
@Autowired
private lateinit var lockingTaskExecutor: LockingTaskExecutor

@Autowired
private lateinit var lockProvider: LockProvider
private var lock: SimpleLock? = null

var lastBeginTime: LocalDateTime? = null
var lastEndTime: LocalDateTime? = null
var lastExecuteTime: Long? = null
Expand All @@ -105,9 +117,14 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
stop = false
val jobContext = createJobContext()
val wasExecuted = if (isExclusive) {
val task = LockingTaskExecutor.TaskWithResult { doStart(jobContext) }
val result = lockingTaskExecutor.executeWithLock(task, getLockConfiguration())
result.wasExecuted()
var wasExecuted = false
lockProvider.lock(getLockConfiguration()).ifPresent {
lock = it
it.use { doStart(jobContext) }
lock = null
wasExecuted = true
}
wasExecuted
} else {
doStart(jobContext)
true
Expand All @@ -118,9 +135,7 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
}
if (!wasExecuted) {
logger.info("Job[${getJobName()}] already execution.")
return wasExecuted
}
stop = true
return wasExecuted
}

Expand All @@ -129,7 +144,11 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
* */
private fun doStart(jobContext: C) {
try {
inProcess = true
lastBeginTime = LocalDateTime.now()
if (isFailover()) {
recover()
}
val elapseNano = measureNanoTime {
doStart0(jobContext)
}
Expand All @@ -140,11 +159,13 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
val event = TaskExecutedEvent(
name = getJobName(),
context = jobContext,
time = Duration.ofNanos(elapseNano)
time = Duration.ofNanos(elapseNano),
)
SpringContextUtils.publishEvent(event)
} catch (e: Exception) {
logger.info("Job[${getJobName()}] execution failed.", e)
} finally {
inProcess = false
}
}

Expand All @@ -153,10 +174,43 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
/**
* 停止任务
* */
fun stop() {
fun stop(timeout: Long = DEFAULT_STOP_TIMEOUT, force: Boolean = false) {
if (stop && !inProcess) {
logger.info("Job [${getJobName()}] already stopped.")
return
}
logger.info("Stop job [${getJobName()}].")
// 尽量等待任务执行完毕
var waitTime = 0L
while (inProcess && waitTime < timeout) {
logger.info("Job [${getJobName()}] is still running, waiting for it to terminate.")
Thread.sleep(SLEEP_TIME_INTERVAL)
waitTime += SLEEP_TIME_INTERVAL
}
if (inProcess) {
logger.info("Stop job timeout [$timeout] ms.")
}
// 只有释放锁,才需要进行故障转移
if (inProcess && force) {
logger.info("Force stop job [${getJobName()}] and unlock.")
failover()
lock?.unlockQuietly()
}
stop = true
}

override fun failover() {
// NO-OP
}

override fun isFailover(): Boolean {
return false
}

override fun recover() {
// NO-OP
}

/**
* 启用
*/
Expand All @@ -172,12 +226,19 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
}

/**
* 任务是否在运行
* 任务是否应该运行
* */
fun isRunning(): Boolean {
fun shouldRun(): Boolean {
return !stop
}

/**
* 任务是否正在运行
* */
fun inProcess(): Boolean {
return inProcess
}

/**
* 获取分布式锁需要的锁配置
* */
Expand All @@ -192,7 +253,31 @@ abstract class BatchJob<C : JobContext>(open val batchJobProperties: BatchJobPro
return batchJobProperties.enabled
}

/**
* 使用锁,[block]运行完后,将会释放锁
* */
private fun SimpleLock.use(block: () -> Unit) {
try {
block()
} finally {
unlockQuietly()
}
}

/**
* 静默释放锁
* */
private fun SimpleLock.unlockQuietly() {
try {
unlock()
} catch (ignore: Exception) {
// ignore
}
}

companion object {
private val logger = LoggerHolder.jobLogger
private const val SLEEP_TIME_INTERVAL = 1000L
private const val DEFAULT_STOP_TIMEOUT = 30000L
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.tencent.bkrepo.job.batch.base

/**
* 支持故障转移的job
* */
interface FailoverJob {
/**
* 故障转移
* */
fun failover()

/**
* 是否发生故障转移
* */
fun isFailover(): Boolean

/**
* 恢复现场
*/
fun recover()
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
package com.tencent.bkrepo.job.batch.base

import com.tencent.bkrepo.common.api.util.HumanReadable
import com.tencent.bkrepo.common.api.util.readJsonString
import com.tencent.bkrepo.common.api.util.toJsonString
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID
import com.tencent.bkrepo.common.service.log.LoggerHolder
import com.tencent.bkrepo.job.config.properties.MongodbJobProperties
import com.tencent.bkrepo.job.executor.BlockThreadPoolTaskExecutorDecorator
import com.tencent.bkrepo.job.executor.IdentityTask
import com.tencent.bkrepo.job.pojo.TJobFailover
import net.javacrumbs.shedlock.core.LockingTaskExecutor
import org.bson.types.ObjectId
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -42,6 +45,9 @@ import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.find
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import java.net.InetAddress
import java.time.LocalDateTime
import java.util.Collections
import java.util.concurrent.CountDownLatch
import kotlin.system.measureNanoTime

Expand All @@ -50,7 +56,7 @@ import kotlin.system.measureNanoTime
* */
abstract class MongoDbBatchJob<Entity, Context : JobContext>(
private val properties: MongodbJobProperties,
) : CenterNodeJob<Context>(properties) {
) : MongodbFailoverJob<Context>(properties) {
/**
* 需要操作的表名列表
* */
Expand Down Expand Up @@ -102,12 +108,31 @@ abstract class MongoDbBatchJob<Entity, Context : JobContext>(
@Autowired
private lateinit var executor: BlockThreadPoolTaskExecutorDecorator

/**
* 是否存在异步任务
* */
private var hasAsyncTask = false

/**
* 未执行列表
* */
private var undoList = Collections.synchronizedList(mutableListOf<String>())

/**
* 恢复任务上下文
* */
private var recoverableJobContext = RecoverableMongodbJobContext(mutableListOf())

/**
* 是否是从故障中恢复
* */
private var recover = false

override fun doStart0(jobContext: Context) {
try {
hasAsyncTask = false
val collectionNames = collectionNames()
prepareContext(jobContext)
val collectionNames = undoList.toList()
if (concurrentLevel == JobConcurrentLevel.COLLECTION) {
// 使用闭锁来保证表异步生产任务的结束
val countDownLatch = CountDownLatch(collectionNames.size)
Expand All @@ -128,12 +153,29 @@ abstract class MongoDbBatchJob<Entity, Context : JobContext>(
}
}

/**
* 准备执行上下文
* */
private fun prepareContext(jobContext: Context) {
undoList.clear()
if (recover) {
jobContext.success = recoverableJobContext.success
jobContext.failed = recoverableJobContext.failed
jobContext.total = recoverableJobContext.total
undoList.addAll(recoverableJobContext.undoCollectionNames)
recover = false
} else {
recoverableJobContext.init(jobContext)
undoList.addAll(collectionNames())
}
}

/**
* 处理单个表数据
* */
private fun runCollection(collectionName: String, context: Context) {
if (!isRunning()) {
logger.info("Job[${getJobName()}] already stop.")
if (!shouldRun()) {
logger.info("Job[${getJobName()}] already stopped.")
return
}
logger.info("Job[${getJobName()}]: Start collection $collectionName.")
Expand Down Expand Up @@ -167,10 +209,11 @@ abstract class MongoDbBatchJob<Entity, Context : JobContext>(
querySize = data.size
lastId = data.last()[ID] as ObjectId
report(context)
} while (querySize == pageSize && isRunning())
} while (querySize == pageSize && shouldRun())
}.apply {
val elapsedTime = HumanReadable.time(this)
onRunCollectionFinished(collectionName, context)
undoList.remove(collectionName)
logger.info("Job[${getJobName()}]: collection $collectionName run completed,sum [$sum] elapse $elapsedTime")
}
}
Expand Down Expand Up @@ -217,6 +260,35 @@ abstract class MongoDbBatchJob<Entity, Context : JobContext>(
}
}

override fun capture(): TJobFailover {
return with(recoverableJobContext) {
TJobFailover(
name = getJobName(),
createdBy = hostName(),
createdDate = LocalDateTime.now(),
success = success.get(),
failed = failed.get(),
total = total.get(),
data = undoList.toJsonString(),
)
}
}

override fun reply(snapshot: TJobFailover) {
with(snapshot) {
recoverableJobContext.reset()
recoverableJobContext.success.addAndGet(success)
recoverableJobContext.failed.addAndGet(failed)
recoverableJobContext.total.addAndGet(total)
data?.let { data -> recoverableJobContext.undoCollectionNames.addAll(data.readJsonString()) }
}
recover = true
}

private fun hostName(): String {
return InetAddress.getLocalHost().hostName
}

companion object {
private val logger = LoggerHolder.jobLogger

Expand Down
Loading

0 comments on commit 691aa8f

Please sign in to comment.