diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 52a686f6857..def5c8f2d06 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -457,11 +457,9 @@ public void shutdown() { this.fileStatus.set(IndexStatusEnum.SHUTDOWN); if (this.mappedFile != null) { this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); - this.mappedFile = null; } if (this.compactMappedFile != null) { this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); - this.compactMappedFile = null; } } catch (Exception e) { log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 14608aa58d5..e99ea0de182 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.logfile.DefaultMappedFile; @@ -101,6 +102,10 @@ private void doConvertOldFormatFile(String filePath) { private void recover() { Stopwatch stopwatch = Stopwatch.createStarted(); + // delete compact file directory + UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(), + FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString())); + // recover local File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString()); this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString()); @@ -141,6 +146,10 @@ private void recover() { for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) { IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment); + IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp()); + if (localFile != null) { + localFile.destroy(); + } timeStoreTable.put(indexFile.getTimestamp(), indexFile); log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp()); } @@ -248,6 +257,7 @@ public void doCompactThenUploadFile(IndexFile indexFile) { if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) { log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}", indexFile.getTimestamp(), indexFile.getFileStatus()); + indexFile.destroy(); return; }