Skip to content

Commit

Permalink
fix: fs服务缓存存储降级时异常 #2663
Browse files Browse the repository at this point in the history
  • Loading branch information
yaoxuwan authored Oct 31, 2024
1 parent 4be78be commit eb08c3f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ package com.tencent.bkrepo.fs.server.storage
import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.artifact.stream.DigestCalculateListener
import com.tencent.bkrepo.common.artifact.stream.closeQuietly
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.Throughput
import com.tencent.bkrepo.common.storage.util.createFile
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.core.io.buffer.DataBufferUtils
Expand All @@ -45,6 +49,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import java.io.ByteArrayInputStream
import java.io.File
import java.io.IOException
import java.io.InputStream
import java.nio.channels.AsynchronousFileChannel
import java.nio.file.Files
Expand All @@ -56,10 +61,22 @@ import java.nio.file.StandardOpenOption
* */
class CoArtifactDataReceiver(
receiveProperties: ReceiveProperties,
monitorProperties: MonitorProperties,
private var path: Path,
private val fileName: String = generateRandomName(),
) : StorageHealthMonitor.Observer {

/**
* 传输过程中发生存储降级时,是否将数据转移到本地磁盘
*/
private val enableTransfer = monitorProperties.enableTransfer

/**
* 数据传输buffer大小
*/
private val bufferSize = receiveProperties.bufferSize.toBytes().toInt()


/**
* 文件内存接收阈值
* */
Expand Down Expand Up @@ -93,13 +110,7 @@ class CoArtifactDataReceiver(
/**
* 文件异步接收通道
* */
private val channel: AsynchronousFileChannel by lazy {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
private var channel: AsynchronousFileChannel? = null

/**
* 是否降级
Expand All @@ -111,6 +122,11 @@ class CoArtifactDataReceiver(
* */
var finished: Boolean = false

/**
* 数据是否转移到本地磁盘
*/
private var hasTransferred: Boolean = false

/**
* 降级路径
* */
Expand Down Expand Up @@ -157,12 +173,14 @@ class CoArtifactDataReceiver(
checkFallback()
val len = buffer.readableByteCount()
if (pos + len > fileSizeThreshold && inMemory) {
initChannel()
flushToFile()
DataBufferUtils.write(Mono.just(buffer), channel, pos).awaitSingle()
DataBufferUtils.write(Mono.just(buffer), channel!!, pos).awaitSingle()
} else if (inMemory) {
buffer.read(cacheData!!, pos.toInt(), len)
} else {
DataBufferUtils.write(Mono.just(buffer), channel, pos).awaitSingle()
initChannel()
DataBufferUtils.write(Mono.just(buffer), channel!!, pos).awaitSingle()
}
buffer.readPosition(0)
digest(buffer)
Expand All @@ -173,13 +191,30 @@ class CoArtifactDataReceiver(
if (inMemory) {
val cacheData = cacheData!!.copyOfRange(0, pos.toInt())
val buf = DefaultDataBufferFactory.sharedInstance.wrap(cacheData)
DataBufferUtils.write(Mono.just(buf), channel).awaitSingle()
val filePath = this.filePath.apply { this.createFile() }
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(filePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
}
DataBufferUtils.write(Mono.just(buf), channel!!).awaitSingle()
inMemory = false
// help gc
this.cacheData = null
}
}

private suspend fun initChannel() {
if (channel == null) {
channel = withContext(Dispatchers.IO) {
Files.createDirectories(filePath.parent)
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
}
}

private fun digest(buffer: DataBuffer) {
val len = buffer.readableByteCount()
val digestArray = ByteArray(len)
Expand All @@ -188,23 +223,71 @@ class CoArtifactDataReceiver(
}

override fun unhealthy(fallbackPath: Path?, reason: String?) {
if (!finished && !fallback) {
if (!finished && !fallback && !hasTransferred) {
fallBackPath = fallbackPath
fallback = true
logger.warn("Path[$path] is unhealthy, fallback to use [$fallBackPath], reason: $reason")
}
}

private fun checkFallback() {
if (!fallback) {
/**
* 检查是否需要fall back操作
*/
private suspend fun checkFallback() {
if (!fallback || hasTransferred) {
return
}
if (fallBackPath == null || fallBackPath == path) {
logger.info("Fallback path is null or equals to primary path,skip")
logger.info("Fallback path is null or equals to primary path, skip transfer data")
hasTransferred = true
return
}
if (inMemory) {
path = fallBackPath!!
// originalPath表示NFS位置, fallBackPath表示本地磁盘位置
val originalPath = path
// 更新当前path为本地磁盘
path = fallBackPath!!
// transfer date
if (!inMemory) {
// 当文件已经落到NFS
if (enableTransfer) {
// 开Transfer功能时,从NFS转移到本地盘
cleanOriginalChannel()
val originalFile = originalPath.resolve(fileName)
val filePath = this.filePath.apply { this.createFile() }
val dataBuffer = DataBufferUtils.read(originalPath, DefaultDataBufferFactory(), bufferSize)
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
DataBufferUtils.write(dataBuffer, channel!!, 0).awaitSingle()
withContext(Dispatchers.IO) {
Files.deleteIfExists(originalFile)
}
logger.info("Success to transfer data from [$originalPath] to [$path]")
} else {
// 禁用Transfer功能时,忽略操作,继续使用NFS
path = originalPath
fallback = false
}
}
hasTransferred = true
}

/**
* 关闭原始输出流
*/
private fun cleanOriginalChannel() {
try {
channel?.force(true)
} catch (ignored: IOException) {
}

try {
channel?.close()
} catch (ignored: IOException) {
}
}

Expand All @@ -217,14 +300,14 @@ class CoArtifactDataReceiver(
return Throughput(pos, endTime - startTime)
} finally {
if (!inMemory) {
channel.closeQuietly()
channel?.closeQuietly()
}
}
}

fun close() {
if (!inMemory) {
channel.closeQuietly()
channel?.closeQuietly()
Files.deleteIfExists(filePath)
logger.info("Delete path $filePath")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CoArtifactFile(
val path = storageCredentials.upload.location.toPath()
receiver = CoArtifactDataReceiver(
storageProperties.receive,
storageProperties.monitor,
path
)
monitor.add(receiver)
Expand Down Expand Up @@ -108,7 +109,8 @@ class CoArtifactFile(
}

override fun isFallback(): Boolean {
return false
runBlocking { finish() }
return receiver.fallback
}

override fun isInLocalDisk(): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.com.bkrepo.fs.storage

import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.util.toPath
import com.tencent.bkrepo.fs.server.storage.CoArtifactDataReceiver
Expand Down Expand Up @@ -140,6 +141,7 @@ class CoArtifactDataReceiverTest {
fileSizeThreshold = DataSize.ofBytes(fileSizeThreshold),
rateLimit = DataSize.ofBytes(-1)
)
return CoArtifactDataReceiver(receive, primaryPath, filename)
val monitorProperties = MonitorProperties(enabled = true, enableTransfer = true)
return CoArtifactDataReceiver(receive, monitorProperties, primaryPath, filename)
}
}

0 comments on commit eb08c3f

Please sign in to comment.