Skip to content

Commit

Permalink
[fix] [broker] rename to changeMaxReadPositionCount (apache#22656)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5ab0512)
(cherry picked from commit 1cf6a72)
  • Loading branch information
thetumbled authored and srinath-ctds committed May 16, 2024
1 parent 1a02229 commit 7042b52
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
*/
private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap<>();

// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it.
private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();
// when change max read position, the count will +1. Take snapshot will reset the count.
private final AtomicLong changeMaxReadPositionCount = new AtomicLong();

private final LongAdder txnCommittedCounter = new LongAdder();

Expand Down Expand Up @@ -429,15 +429,15 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
}

private void takeSnapshotByChangeTimes() {
if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) {
this.changeMaxReadPositionAndAddAbortTimes.set(0);
if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) {
this.changeMaxReadPositionCount.set(0);
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
}
}

private void takeSnapshotByTimeout() {
if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
this.changeMaxReadPositionAndAddAbortTimes.set(0);
if (changeMaxReadPositionCount.get() > 0) {
this.changeMaxReadPositionCount.set(0);
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
}
this.timer.newTimeout(TopicTransactionBuffer.this,
Expand All @@ -454,7 +454,7 @@ void updateMaxReadPosition(TxnID txnID) {
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
}
if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
this.changeMaxReadPositionCount.getAndIncrement();
}
}

Expand Down Expand Up @@ -489,7 +489,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
changeMaxReadPositionCount.incrementAndGet();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,20 +1052,20 @@ public void testCancelTxnTimeout() throws Exception{
}

@Test
public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception {
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService()
.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
.getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true)
.get().get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
processorField.setAccessible(true);

AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) processorField.get(buffer);
Field changeTimeField = TopicTransactionBuffer
.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
.class.getDeclaredField("changeMaxReadPositionCount");
changeTimeField.setAccessible(true);
AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) changeTimeField.get(buffer);
AtomicLong changeMaxReadPositionCount = (AtomicLong) changeTimeField.get(buffer);

Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
field1.setAccessible(true);
Expand All @@ -1074,10 +1074,10 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot()
TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
});
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);

buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);

}

Expand Down

0 comments on commit 7042b52

Please sign in to comment.