Skip to content

Commit

Permalink
ledger: turn deferredCommitContext.newBase into a function (#5093)
Browse files Browse the repository at this point in the history
  • Loading branch information
cce authored Feb 1, 2023
1 parent b326672 commit 7f7939d
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 28 deletions.
6 changes: 3 additions & 3 deletions ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,15 +380,15 @@ func (ao *onlineAccounts) prepareCommit(dcc *deferredCommitContext) error {
if err != nil {
return err
}
end, err := ao.roundParamsOffset(dcc.newBase)
end, err := ao.roundParamsOffset(dcc.newBase())
if err != nil {
return err
}
// write for rounds oldbase+1 up to and including newbase
dcc.onlineRoundParams = ao.onlineRoundParamsData[start+1 : end+1]

maxOnlineLookback := basics.Round(ao.maxBalLookback())
dcc.onlineAccountsForgetBefore = (dcc.newBase + 1).SubSaturate(maxOnlineLookback)
dcc.onlineAccountsForgetBefore = (dcc.newBase() + 1).SubSaturate(maxOnlineLookback)
if dcc.lowestRound > 0 && dcc.lowestRound < dcc.onlineAccountsForgetBefore {
// extend history as needed
dcc.onlineAccountsForgetBefore = dcc.lowestRound
Expand Down Expand Up @@ -440,7 +440,7 @@ func (ao *onlineAccounts) commitRound(ctx context.Context, tx *sql.Tx, dcc *defe

func (ao *onlineAccounts) postCommit(ctx context.Context, dcc *deferredCommitContext) {
offset := dcc.offset
newBase := dcc.newBase
newBase := dcc.newBase()

ao.accountsMu.Lock()
// Drop reference counts to modified accounts, and evict them
Expand Down
7 changes: 2 additions & 5 deletions ledger/acctonline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func commitSync(t *testing.T, oa *onlineAccounts, ml *mockLedgerForTracker, rnd
ml.trackers.accountsWriting.Add(1)

// do not take any locks since all operations are synchronous
newBase := basics.Round(dcc.offset) + dcc.oldBase
dcc.newBase = newBase
err := ml.trackers.commitRound(dcc)
require.NoError(t, err)
}()
Expand All @@ -73,8 +71,7 @@ func commitSyncPartial(t *testing.T, oa *onlineAccounts, ml *mockLedgerForTracke
ml.trackers.accountsWriting.Add(1)

// do not take any locks since all operations are synchronous
newBase := basics.Round(dcc.offset) + dcc.oldBase
dcc.newBase = newBase
newBase := dcc.newBase()
dcc.flushTime = time.Now()

for _, lt := range ml.trackers.trackers {
Expand Down Expand Up @@ -102,7 +99,7 @@ func commitSyncPartial(t *testing.T, oa *onlineAccounts, ml *mockLedgerForTracke
func commitSyncPartialComplete(t *testing.T, oa *onlineAccounts, ml *mockLedgerForTracker, dcc *deferredCommitContext) {
defer ml.trackers.accountsWriting.Done()

ml.trackers.dbRound = dcc.newBase
ml.trackers.dbRound = dcc.newBase()
for _, lt := range ml.trackers.trackers {
lt.postCommit(ml.trackers.ctx, dcc)
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon

offset := dcc.offset
dbRound := dcc.oldBase
newBase := dcc.newBase
newBase := dcc.newBase()

dcc.updatingBalancesDuration = time.Since(dcc.flushTime)

Expand Down
6 changes: 2 additions & 4 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2183,8 +2183,7 @@ func TestAcctUpdatesResources(t *testing.T) {
defer ml.trackers.accountsWriting.Done()

// do not take any locks since all operations are synchronous
newBase := basics.Round(dcc.offset) + dcc.oldBase
dcc.newBase = newBase
newBase := dcc.newBase()

err := au.prepareCommit(dcc)
require.NoError(t, err)
Expand Down Expand Up @@ -2467,8 +2466,7 @@ func auCommitSync(t *testing.T, rnd basics.Round, au *accountUpdates, ml *mockLe
defer ml.trackers.accountsWriting.Done()

// do not take any locks since all operations are synchronous
newBase := basics.Round(dcc.offset) + dcc.oldBase
dcc.newBase = newBase
newBase := dcc.newBase()

err := au.prepareCommit(dcc)
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx *sql.Tx, dcc *d
dcc.stats.MerkleTrieUpdateDuration = time.Duration(time.Now().UnixNano())
}

err = ct.accountsUpdateBalances(dcc.compactAccountDeltas, dcc.compactResourcesDeltas, dcc.compactKvDeltas, dcc.oldBase, dcc.newBase)
err = ct.accountsUpdateBalances(dcc.compactAccountDeltas, dcc.compactResourcesDeltas, dcc.compactKvDeltas, dcc.oldBase, dcc.newBase())
if err != nil {
return err
}
Expand Down Expand Up @@ -867,11 +867,11 @@ func (ct *catchpointTracker) pruneFirstStageRecordsData(ctx context.Context, max

func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.catchpointFirstStage {
err := ct.finishFirstStage(ctx, dcc.newBase, dcc.updatingBalancesDuration)
err := ct.finishFirstStage(ctx, dcc.newBase(), dcc.updatingBalancesDuration)
if err != nil {
ct.log.Warnf(
"error finishing catchpoint's first stage dcc.newBase: %d err: %v",
dcc.newBase, err)
dcc.newBase(), err)
}
}

Expand All @@ -885,13 +885,13 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr
}

// Prune first stage catchpoint records from the database.
if uint64(dcc.newBase) >= dcc.catchpointLookback {
if uint64(dcc.newBase()) >= dcc.catchpointLookback {
err := ct.pruneFirstStageRecordsData(
ctx, dcc.newBase-basics.Round(dcc.catchpointLookback))
ctx, dcc.newBase()-basics.Round(dcc.catchpointLookback))
if err != nil {
ct.log.Warnf(
"error pruning first stage records and data dcc.newBase: %d err: %v",
dcc.newBase, err)
dcc.newBase(), err)
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ type deferredCommitRange struct {
type deferredCommitContext struct {
deferredCommitRange

newBase basics.Round
flushTime time.Time

genesisProto config.ConsensusParams
Expand Down Expand Up @@ -273,6 +272,10 @@ type deferredCommitContext struct {
updateStats bool
}

func (dcc deferredCommitContext) newBase() basics.Round {
return dcc.oldBase + basics.Round(dcc.offset)
}

var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker")

func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) {
Expand Down Expand Up @@ -495,7 +498,6 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {

dcc.offset = offset
dcc.oldBase = dbRound
dcc.newBase = newBase
dcc.flushTime = time.Now()

for _, lt := range tr.trackers {
Expand Down
8 changes: 4 additions & 4 deletions ledger/txtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,10 @@ func (t *txTail) prepareCommit(dcc *deferredCommitContext) (err error) {
dcc.txTailDeltas = append(dcc.txTailDeltas, t.roundTailSerializedDeltas[i])
}
lowest := t.lowestBlockHeaderRound
proto, ok := config.Consensus[t.blockHeaderData[dcc.newBase].CurrentProtocol]
proto, ok := config.Consensus[t.blockHeaderData[dcc.newBase()].CurrentProtocol]
t.tailMu.RUnlock()
if !ok {
return fmt.Errorf("round %d not found in blockHeaderData: lowest=%d, base=%d", dcc.newBase, lowest, dcc.oldBase)
return fmt.Errorf("round %d not found in blockHeaderData: lowest=%d, base=%d", dcc.newBase(), lowest, dcc.oldBase)
}
// get the MaxTxnLife from the consensus params of the latest round in this commit range
// preserve data for MaxTxnLife + DeeperBlockHeaderHistory
Expand All @@ -274,7 +274,7 @@ func (t *txTail) commitRound(ctx context.Context, tx *sql.Tx, dcc *deferredCommi

// determine the round to remove data
// the formula is similar to the committedUpTo: rnd + 1 - retain size
forgetBeforeRound := (dcc.newBase + 1).SubSaturate(basics.Round(dcc.txTailRetainSize))
forgetBeforeRound := (dcc.newBase() + 1).SubSaturate(basics.Round(dcc.txTailRetainSize))
baseRound := dcc.oldBase + 1
if err := arw.TxtailNewRound(ctx, baseRound, dcc.txTailDeltas, forgetBeforeRound); err != nil {
return fmt.Errorf("txTail: unable to persist new round %d : %w", baseRound, err)
Expand All @@ -290,7 +290,7 @@ func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) {

// get the MaxTxnLife from the consensus params of the latest round in this commit range
// preserve data for MaxTxnLife + DeeperBlockHeaderHistory rounds
newLowestRound := (dcc.newBase + 1).SubSaturate(basics.Round(dcc.txTailRetainSize))
newLowestRound := (dcc.newBase() + 1).SubSaturate(basics.Round(dcc.txTailRetainSize))
for t.lowestBlockHeaderRound < newLowestRound {
delete(t.blockHeaderData, t.lowestBlockHeaderRound)
t.lowestBlockHeaderRound++
Expand Down
4 changes: 1 addition & 3 deletions ledger/txtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ func TestTxTailDeltaTracking(t *testing.T) {
offset: 1,
catchpointFirstStage: true,
},
newBase: basics.Round(i),
}
err = txtail.prepareCommit(dcc)
require.NoError(t, err)
Expand Down Expand Up @@ -363,12 +362,11 @@ func BenchmarkTxTailBlockHeaderCache(b *testing.B) {
oldBase: dbRound,
lookback: lookback,
},
newBase: dbRound + basics.Round(offset),
}
err := tail.prepareCommit(dcc)
require.NoError(b, err)
tail.postCommit(context.Background(), dcc)
dbRound = dcc.newBase
dbRound = dcc.newBase()
require.Less(b, len(tail.blockHeaderData), 1001+10)
}
}
Expand Down

0 comments on commit 7f7939d

Please sign in to comment.