Skip to content

Commit

Permalink
[ISSUE #6633] Not clear uninitialized files and fix metadata recover (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins committed Sep 12, 2023
1 parent 1a8e7cb commit fd32dae
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.tieredstore.file;

import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -25,13 +24,13 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.AppendResult;
Expand All @@ -43,7 +42,6 @@
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.apache.rocketmq.common.BoundaryType;

public class TieredFlatFile {

Expand Down Expand Up @@ -177,7 +175,10 @@ protected void recoverMetadata() {
}
}

private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
/**
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
*/
public void updateFileSegment(TieredFileSegment fileSegment) {

FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
Expand All @@ -186,45 +187,24 @@ private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fil
if (metadata == null) {
metadata = new FileSegmentMetadata(
this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
if (fileSegment.isClosed()) {
metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
}
this.tieredMetadataStore.updateFileSegment(metadata);
metadata.setCreateTimestamp(System.currentTimeMillis());
}
return metadata;
}

/**
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
*/
public void updateFileSegment(TieredFileSegment fileSegment) {
FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);

if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW
&& fileSegment.isFull()
&& !fileSegment.needCommit()) {
metadata.setSize(fileSegment.getCommitPosition());
metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
metadata.setEndTimestamp(fileSegment.getMaxTimestamp());

segmentMetadata.markSealed();
if (fileSegment.isFull() && !fileSegment.needCommit()) {
if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) {
metadata.markSealed();
}
}

if (fileSegment.isClosed()) {
segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
}

segmentMetadata.setSize(fileSegment.getCommitPosition());
segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());

FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());

if (!Objects.equals(metadata, segmentMetadata)) {
this.tieredMetadataStore.updateFileSegment(segmentMetadata);
logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
}
this.tieredMetadataStore.updateFileSegment(metadata);
}

private void checkAndFixFileSize() {
Expand Down Expand Up @@ -598,6 +578,9 @@ public void destroy() {
logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e);
}
fileSegment.destroyFile();
if (!fileSegment.exists()) {
tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
}
}
fileSegmentList.clear();
needCommitFileSegmentList.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,13 @@ public void doCleanExpiredFile() {
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
flatFile.getCompositeFlatFileLock().lock();
try {
flatFile.getCompositeFlatFileLock().lock();
flatFile.cleanExpiredFile(expiredTimeStamp);
flatFile.destroyExpiredFile();
if (flatFile.getConsumeQueueBaseOffset() == -1) {
logger.info("Clean flatFile because file not initialized, topic={}, queueId={}",
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
destroyCompositeFile(flatFile.getMessageQueue());
}
} catch (Throwable t) {
logger.error("Do Clean expired file error, topic={}, queueId={}",
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t);
} finally {
flatFile.getCompositeFlatFileLock().unlock();
}
Expand Down

0 comments on commit fd32dae

Please sign in to comment.