Skip to content

Commit

Permalink
refactor handle commit exception
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins committed Aug 29, 2023
1 parent a3f2e9b commit d128fe0
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQue
TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), attributes);
}

// if no cached message found and there is currently an inflight request,
// wait for the request to end before continuing
// If there are no messages in the cache and there are currently requests being pulled.
// We need to wait for the request to return before continuing.
if (resultWrapperList.isEmpty() && waitInflightRequest) {
CompletableFuture<Long> future =
flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ private void releaseCommitLock() {
}
}

private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) {
if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
dispatchCommitOffset =
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
}
}

/**
* @return false: commit, true: no commit operation
*/
Expand All @@ -353,31 +360,36 @@ public CompletableFuture<Boolean> commitAsync() {
return CompletableFuture.completedFuture(false);
}

if (fileSegmentInputStream != null) {
this.correctPosition(this.getSize(), null);
this.releaseCommitLock();
return CompletableFuture.completedFuture(false);
}

try {
List<ByteBuffer> bufferList = borrowBuffer();
int bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
+ (codaBuffer != null ? codaBuffer.remaining() : 0);
if (bufferSize == 0) {
return CompletableFuture.completedFuture(true);
if (fileSegmentInputStream != null) {
if (correctPosition(this.getSize(), null)) {
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
fileSegmentInputStream = null;
} else {
fileSegmentInputStream.rewind();
}
}

this.fileSegmentInputStream = FileSegmentInputStreamFactory.build(
fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
int bufferSize;
if (fileSegmentInputStream != null) {
bufferSize = fileSegmentInputStream.available();
} else {
List<ByteBuffer> bufferList = borrowBuffer();
bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
+ (codaBuffer != null ? codaBuffer.remaining() : 0);
if (bufferSize == 0) {
releaseCommitLock();
return CompletableFuture.completedFuture(true);
}
fileSegmentInputStream = FileSegmentInputStreamFactory.build(
fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
}

return flightCommitRequest = this
.commit0(fileSegmentInputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
.thenApply(result -> {
if (result) {
if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
dispatchCommitOffset =
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
}
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
commitPosition += bufferSize;
fileSegmentInputStream = null;
return true;
Expand Down Expand Up @@ -413,43 +425,41 @@ private boolean handleCommitException(Throwable e) {
return this.correctPosition(fileSize, cause);
}

/**
* return true to clear buffer
*/
private boolean correctPosition(long fileSize, Throwable throwable) {

// Current we have three offsets here: commit offset, expect offset, file size.
// We guarantee that the commit offset is less than or equal to the expect offset.
// Max offset will increase because we can continuously put in new buffers
String handleInfo = throwable == null ? "before commit" : "after commit";
long expectPosition = commitPosition + fileSegmentInputStream.getContentLength();
String offsetInfo = String.format("Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, file: %s",
commitPosition, expectPosition, appendPosition, fileSize, this.getPath());
String offsetInfo = String.format("Correct Commit Position, %s, result=[{}], " +
"Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, FileName: %s",
handleInfo, commitPosition, expectPosition, appendPosition, fileSize, this.getPath());

if (fileSize == -1L) {
logger.error("Handle CommitException, unable to get file size, {}", offsetInfo, throwable);
fileSegmentInputStream.rewind();
logger.error(offsetInfo, "UnknownError", throwable);
return false;
}

if (fileSize < commitPosition || fileSize > expectPosition) {
logger.error("Handle CommitException, file size incorrect, {}", offsetInfo, throwable);
// We are believing that the file size returned by the server is correct,
// can reset the commit offset to the file size reported by the storage system.
if (fileSize == expectPosition) {
logger.info(offsetInfo, "Success", throwable);
commitPosition = fileSize;
return false;
}

if (fileSize == commitPosition) {
logger.warn("Handle CommitException, file commit failed, {}", offsetInfo, throwable);
fileSegmentInputStream.rewind();
return true;
}

if (fileSize > commitPosition && fileSize < expectPosition) {
logger.warn("Handle CommitException, file commit partial success, {}", offsetInfo, throwable);
return false;
}

if (fileSize == expectPosition) {
logger.info("Handle CommitException, file commit success, {}", offsetInfo);
commitPosition = fileSize;
fileSegmentInputStream = null;
if (fileSize < commitPosition) {
logger.error(offsetInfo, "FileSizeIncorrect", throwable);
} else if (fileSize == commitPosition) {
logger.warn(offsetInfo, "CommitFailed", throwable);
} else if (fileSize > commitPosition) {
logger.warn(offsetInfo, "PartialSuccess", throwable);
}
return true;
commitPosition = fileSize;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class CommitLogInputStream extends FileSegmentInputStream {
/**
* commitLogOffset is the real physical offset of the commitLog buffer which is being read
*/
private final long startCommitLogOffset;

private long commitLogOffset;

private final ByteBuffer codaBuffer;
Expand All @@ -37,6 +39,7 @@ public class CommitLogInputStream extends FileSegmentInputStream {
public CommitLogInputStream(FileSegmentType fileType, long startOffset,
List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
super(fileType, uploadBufferList, contentLength);
this.startCommitLogOffset = startOffset;
this.commitLogOffset = startOffset;
this.codaBuffer = codaBuffer;
}
Expand All @@ -53,6 +56,15 @@ public synchronized void reset() throws IOException {
this.commitLogOffset = markCommitLogOffset;
}

@Override
public synchronized void rewind() {
super.rewind();
this.commitLogOffset = this.startCommitLogOffset;
if (this.codaBuffer != null) {
this.codaBuffer.rewind();
}
}

@Override
public ByteBuffer getCodaBuffer() {
return this.codaBuffer;
Expand All @@ -64,17 +76,17 @@ public int read() {
return -1;
}
readPosition++;
if (curReadBufferIndex >= uploadBufferList.size()) {
if (curReadBufferIndex >= bufferList.size()) {
return readCoda();
}
int res;
if (readPosInCurBuffer >= curBuffer.remaining()) {
curReadBufferIndex++;
if (curReadBufferIndex >= uploadBufferList.size()) {
if (curReadBufferIndex >= bufferList.size()) {
readPosInCurBuffer = 0;
return readCoda();
}
curBuffer = uploadBufferList.get(curReadBufferIndex);
curBuffer = bufferList.get(curReadBufferIndex);
commitLogOffset += readPosInCurBuffer;
readPosInCurBuffer = 0;
}
Expand All @@ -95,14 +107,6 @@ private int readCoda() {
return codaBuffer.get(readPosInCurBuffer++) & 0xff;
}

@Override
public void rewind() {
super.rewind();
if (codaBuffer != null) {
codaBuffer.rewind();
}
}

@Override
public int read(byte[] b, int off, int len) {
if (b == null) {
Expand All @@ -127,9 +131,9 @@ public int read(byte[] b, int off, int len) {
int posInCurBuffer = readPosInCurBuffer;
long curCommitLogOffset = commitLogOffset;
ByteBuffer curBuf = curBuffer;
while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
while (needRead > 0 && bufIndex <= bufferList.size()) {
int readLen, remaining, realReadLen = 0;
if (bufIndex == uploadBufferList.size()) {
if (bufIndex == bufferList.size()) {
// read from coda buffer
remaining = codaBuffer.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
Expand All @@ -145,7 +149,7 @@ public int read(byte[] b, int off, int len) {
}
remaining = curBuf.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
curBuf = uploadBufferList.get(bufIndex);
curBuf = bufferList.get(bufIndex);
if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
realReadLen = Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
// read from commitLog buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;

public class FileSegmentInputStream extends InputStream {
Expand All @@ -33,7 +34,7 @@ public class FileSegmentInputStream extends InputStream {
/**
* hold bytebuffer
*/
protected final List<ByteBuffer> uploadBufferList;
protected final List<ByteBuffer> bufferList;

/**
* total remaining of bytebuffer list
Expand Down Expand Up @@ -66,12 +67,12 @@ public class FileSegmentInputStream extends InputStream {
private int markReadPosInCurBuffer = -1;

public FileSegmentInputStream(
FileSegmentType fileType, List<ByteBuffer> uploadBufferList, int contentLength) {
FileSegmentType fileType, List<ByteBuffer> bufferList, int contentLength) {
this.fileType = fileType;
this.contentLength = contentLength;
this.uploadBufferList = uploadBufferList;
if (uploadBufferList != null && uploadBufferList.size() > 0) {
this.curBuffer = uploadBufferList.get(curReadBufferIndex);
this.bufferList = bufferList;
if (bufferList != null && bufferList.size() > 0) {
this.curBuffer = bufferList.get(curReadBufferIndex);
}
}

Expand All @@ -95,8 +96,20 @@ public synchronized void reset() throws IOException {
this.readPosition = markReadPosition;
this.curReadBufferIndex = markCurReadBufferIndex;
this.readPosInCurBuffer = markReadPosInCurBuffer;
if (this.curReadBufferIndex < uploadBufferList.size()) {
this.curBuffer = uploadBufferList.get(curReadBufferIndex);
if (this.curReadBufferIndex < bufferList.size()) {
this.curBuffer = bufferList.get(curReadBufferIndex);
}
}

public synchronized void rewind() {
this.readPosition = 0;
this.curReadBufferIndex = 0;
this.readPosInCurBuffer = 0;
if (CollectionUtils.isNotEmpty(bufferList)) {
this.curBuffer = bufferList.get(0);
for (ByteBuffer buffer : bufferList) {
buffer.rewind();
}
}
}

Expand All @@ -109,16 +122,8 @@ public int available() {
return contentLength - readPosition;
}

public void rewind() {
if (uploadBufferList != null) {
for (ByteBuffer buffer : uploadBufferList) {
buffer.rewind();
}
}
}

public List<ByteBuffer> getUploadBufferList() {
return uploadBufferList;
public List<ByteBuffer> getBufferList() {
return bufferList;
}

public ByteBuffer getCodaBuffer() {
Expand All @@ -133,10 +138,10 @@ public int read() {
readPosition++;
if (readPosInCurBuffer >= curBuffer.remaining()) {
curReadBufferIndex++;
if (curReadBufferIndex >= uploadBufferList.size()) {
if (curReadBufferIndex >= bufferList.size()) {
return -1;
}
curBuffer = uploadBufferList.get(curReadBufferIndex);
curBuffer = bufferList.get(curReadBufferIndex);
readPosInCurBuffer = 0;
}
return curBuffer.get(readPosInCurBuffer++) & 0xff;
Expand Down Expand Up @@ -165,8 +170,8 @@ public int read(byte[] b, int off, int len) {
int bufIndex = curReadBufferIndex;
int posInCurBuffer = readPosInCurBuffer;
ByteBuffer curBuf = curBuffer;
while (needRead > 0 && bufIndex < uploadBufferList.size()) {
curBuf = uploadBufferList.get(bufIndex);
while (needRead > 0 && bufIndex < bufferList.size()) {
curBuf = bufferList.get(bufIndex);
int remaining = curBuf.remaining() - posInCurBuffer;
int readLen = Math.min(remaining, needRead);
// read from curBuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static FileSegmentInputStream build(
}
return new FileSegmentInputStream(fileType, bufferList, length);
default:
throw new IllegalArgumentException("fileType is not supported");
throw new IllegalArgumentException("file type is not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
Expand Down Expand Up @@ -55,6 +56,7 @@ public void setUp() throws ClassNotFoundException, NoSuchMethodException {
public void tearDown() throws IOException {
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
TieredStoreExecutor.shutdown();
}

private List<FileSegmentMetadata> getSegmentMetadataList(TieredMetadataStore metadataStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,7 @@ public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoS

indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
Assert.assertEquals(1, indexList.size());

indexFile.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public int read() {
}

@Override
public List<ByteBuffer> getUploadBufferList() {
public List<ByteBuffer> getBufferList() {
return null;
}

Expand Down
Loading

0 comments on commit d128fe0

Please sign in to comment.