Skip to content

Commit

Permalink
[ISSUE apache#6728] Compute the confirmOffset without considering new…
Browse files Browse the repository at this point in the history
… connections (apache#6729)

* 1. When compute the confirmOffset, dismiss the ackOffset of new connections. 2. When compute the confirmOffset, use getConfirmOffsetDirectly() to avoid the endless calling.

* use the calculated slaveAckOffset

* optimize the logic.
  • Loading branch information
GenerousMan authored May 11, 2023
1 parent 6b6fb17 commit 5dc2e20
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
22 changes: 21 additions & 1 deletion store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -553,15 +553,35 @@ private void setBatchSizeIfNeeded(Map<String, String> propertiesMap, DispatchReq
}
}

// Fetch and compute the newest confirmOffset.
// Even if it is just inited.
public long getConfirmOffset() {
if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
return this.defaultMessageStore.getMaxPhyOffset();
}
// First time compute confirmOffset.
// First time it will compute the confirmOffset.
if (this.confirmOffset <= 0) {
setConfirmOffset(((AutoSwitchHAService) this.defaultMessageStore.getHaService()).computeConfirmOffset());
log.info("Init the confirmOffset to {}.", this.confirmOffset);
}
}
return this.confirmOffset;
} else if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
return this.confirmOffset;
} else {
return getMaxOffset();
}
}

// Fetch the original confirmOffset's value.
// Without checking and re-computing.
public long getConfirmOffsetDirectly() {
if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
return this.defaultMessageStore.getMaxPhyOffset();
}
}
return this.confirmOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1589,11 +1589,19 @@ public boolean resetWriteOffset(long phyOffset) {
}
}

// Fetch and compute the newest confirmOffset.
// Even if it is just inited.
@Override
public long getConfirmOffset() {
return this.commitLog.getConfirmOffset();
}

// Fetch the original confirmOffset's value.
// Without checking and re-computing.
public long getConfirmOffsetDirectly() {
return this.commitLog.getConfirmOffsetDirectly();
}

@Override
public void setConfirmOffset(long phyOffset) {
this.commitLog.setConfirmOffset(phyOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,14 @@ public long computeConfirmOffset() {
for (Long syncId : currentSyncStateSet) {
if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
return this.defaultMessageStore.getConfirmOffset();
// Without check and re-compute, return the confirmOffset's value directly.
return this.defaultMessageStore.getConfirmOffsetDirectly();
}
}

for (HAConnection connection : this.connectionList) {
final Long slaveId = ((AutoSwitchHAConnection) connection).getSlaveId();
if (currentSyncStateSet.contains(slaveId)) {
if (currentSyncStateSet.contains(slaveId) && connection.getSlaveAckOffset() > 0) {
newConfirmOffset = Math.min(newConfirmOffset, connection.getSlaveAckOffset());
}
}
Expand Down

0 comments on commit 5dc2e20

Please sign in to comment.