Skip to content

Commit

Permalink
ledger: refactor the ledger Totals (#2846)
Browse files Browse the repository at this point in the history
## Summary

The plan to change the tracker database to maintain the latest 320 rounds for the online accounts only have some other required modification; one of them is the semantics of the `Totals` method:
- At this time, it supports retrieving `AccountTotals` for any of the recent 320 rounds.
- We need the method to support the `AccountTotals` for the latest round only, as well as provide the circulating supply for latest-320 rounds ( i.e. the circulating supply is a subset of the `AccountTotals` ).

Once the database implementation is complete, the database would contain the `AccountTotals` information for the latest round only, plus the online circulation for the past 320 rounds.

In order to support that, I've broken up the interface into:
- `LatestTotals`, which would return the latest totals as well as the latest round associated with these totals.
- `OnlineTotals`, which receive a round number and return the online totals associated with that round.

This change is focused around the Ledger interface. Further changes would be required in the accounts update before the high-level goals could be achieved.

## Test Plan

Unit tests updated.
  • Loading branch information
tsachiherman authored Sep 29, 2021
1 parent 412aef5 commit 1202d32
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 48 deletions.
3 changes: 1 addition & 2 deletions daemon/algod/api/server/v1/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,8 +1637,7 @@ func GetSupply(ctx lib.ReqContext, context echo.Context) {

w := context.Response().Writer

latest := ctx.Node.Ledger().Latest()
totals, err := ctx.Node.Ledger().Totals(latest)
latest, totals, err := ctx.Node.Ledger().LatestTotals()
if err != nil {
err = fmt.Errorf("GetSupply(): round %d failed: %v", latest, err)
lib.ErrorResponse(w, http.StatusInternalServerError, err, errInternalFailure, ctx.Log)
Expand Down
3 changes: 1 addition & 2 deletions daemon/algod/api/server/v2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ func (v2 *Handlers) GetProof(ctx echo.Context, round uint64, txid string, params
// GetSupply gets the current supply reported by the ledger.
// (GET /v2/ledger/supply)
func (v2 *Handlers) GetSupply(ctx echo.Context) error {
latest := v2.Node.Ledger().Latest()
totals, err := v2.Node.Ledger().Totals(latest)
latest, totals, err := v2.Node.Ledger().LatestTotals()
if err != nil {
err = fmt.Errorf("GetSupply(): round %d, failed: %v", latest, err)
return internalError(ctx, err, errInternalFailure, v2.Log)
Expand Down
3 changes: 2 additions & 1 deletion daemon/algod/api/server/v2/test/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func TestGetBlockJsonEncoding(t *testing.T) {
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
defer backlogPool.Shutdown()

totals, err := l.Totals(l.Latest())
totalsRound, totals, err := l.LatestTotals()
require.NoError(t, err)
require.Equal(t, l.Latest(), totalsRound)
totalRewardUnits := totals.RewardUnits()
poolBal, err := l.Lookup(l.Latest(), poolAddr)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions data/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (l *Ledger) Circulation(r basics.Round) (basics.MicroAlgos, error) {
}
}

totals, err := l.Totals(r)
totals, err := l.OnlineTotals(r) //nolint:typecheck
if err != nil {
return basics.MicroAlgos{}, err
}
Expand All @@ -196,12 +196,12 @@ func (l *Ledger) Circulation(r basics.Round) (basics.MicroAlgos, error) {
circulation.elements[1],
{
round: r,
onlineMoney: totals.Online.Money},
onlineMoney: totals},
},
})
}

return totals.Online.Money, nil
return totals, nil
}

// Seed gives the VRF seed that was agreed on in a given round,
Expand Down
32 changes: 15 additions & 17 deletions data/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func TestLedgerCirculation(t *testing.T) {
baseDestValue := data.MicroAlgos.Raw

blk := genesisInitState.Block
totals, _ := realLedger.Totals(basics.Round(0))
totalsRound, totals, err := realLedger.LatestTotals()
require.NoError(t, err)
require.Equal(t, basics.Round(0), totalsRound)
baseCirculation := totals.Online.Money.Raw

srcAccountKey := keys[sourceAccount]
Expand Down Expand Up @@ -192,15 +194,13 @@ func TestLedgerCirculation(t *testing.T) {
require.NoError(t, err)
require.Equal(t, baseDestValue+uint64(rnd), data.MicroAlgos.Raw)

totals, err = realLedger.Totals(rnd)
roundCirculation, err := realLedger.OnlineTotals(rnd)
require.NoError(t, err)
roundCirculation := totals.Online.Money.Raw
require.Equal(t, baseCirculation-uint64(rnd)*(10001), roundCirculation)
require.Equal(t, baseCirculation-uint64(rnd)*(10001), roundCirculation.Raw)

totals, err = l.Totals(rnd)
roundCirculation, err = l.OnlineTotals(rnd)
require.NoError(t, err)
roundCirculation = totals.Online.Money.Raw
require.Equal(t, baseCirculation-uint64(rnd)*(10001), roundCirculation)
require.Equal(t, baseCirculation-uint64(rnd)*(10001), roundCirculation.Raw)
} else if rnd < basics.Round(510) {
// test one round ago
data, err = realLedger.Lookup(rnd-1, destAccount)
Expand All @@ -210,15 +210,13 @@ func TestLedgerCirculation(t *testing.T) {
require.NoError(t, err)
require.Equal(t, baseDestValue+uint64(rnd)-1, data.MicroAlgos.Raw)

totals, err = realLedger.Totals(rnd - 1)
roundCirculation, err := realLedger.OnlineTotals(rnd - 1)
require.NoError(t, err)
roundCirculation := totals.Online.Money.Raw
require.Equal(t, baseCirculation-uint64(rnd-1)*(10001), roundCirculation)
require.Equal(t, baseCirculation-uint64(rnd-1)*(10001), roundCirculation.Raw)

totals, err = l.Totals(rnd - 1)
roundCirculation, err = l.OnlineTotals(rnd - 1)
require.NoError(t, err)
roundCirculation = totals.Online.Money.Raw
require.Equal(t, baseCirculation-uint64(rnd-1)*(10001), roundCirculation)
require.Equal(t, baseCirculation-uint64(rnd-1)*(10001), roundCirculation.Raw)
} else if rnd < basics.Round(520) {
// test one round in the future ( expected error )
data, err = realLedger.Lookup(rnd+1, destAccount)
Expand All @@ -228,17 +226,17 @@ func TestLedgerCirculation(t *testing.T) {
require.Error(t, err)
require.Equal(t, uint64(0), data.MicroAlgos.Raw)

_, err = realLedger.Totals(rnd + 1)
_, err = realLedger.OnlineTotals(rnd + 1)
require.Error(t, err)

_, err = l.Totals(rnd + 1)
_, err = l.OnlineTotals(rnd + 1)
require.Error(t, err)
} else if rnd < basics.Round(520) {
// test expired round ( expected error )
_, err = realLedger.Totals(rnd - 500)
_, err = realLedger.OnlineTotals(rnd - 500)
require.Error(t, err)

_, err = l.Totals(rnd - 500)
_, err = l.OnlineTotals(rnd - 500)
require.Error(t, err)
}
}
Expand Down
20 changes: 18 additions & 2 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package pools

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -667,6 +668,13 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact
}
pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, hint)
if err != nil {
var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval
if errors.As(err, &nonSeqBlockEval) {
if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound {
pool.log.Infof("TransactionPool.recomputeBlockEvaluator: skipped creating block evaluator for round %d since ledger already caught up with that round", nonSeqBlockEval.EvaluatorRound)
return
}
}
pool.log.Warnf("TransactionPool.recomputeBlockEvaluator: cannot start evaluator: %v", err)
return
}
Expand Down Expand Up @@ -853,7 +861,7 @@ func (pool *TransactionPool) AssembleBlock(round basics.Round, deadline time.Tim
pool.log.Warnf("AssembleBlock: ran out of time for round %d", round)
stats.StopReason = telemetryspec.AssembleBlockTimeout
if emptyBlockErr != nil {
emptyBlockErr = fmt.Errorf("AssembleBlock: failed to construct empty block : %v", emptyBlockErr)
emptyBlockErr = fmt.Errorf("AssembleBlock: failed to construct empty block : %w", emptyBlockErr)
}
return emptyBlock, emptyBlockErr
}
Expand Down Expand Up @@ -894,7 +902,15 @@ func (pool *TransactionPool) assembleEmptyBlock(round basics.Round) (assembled *
next := bookkeeping.MakeBlock(prev)
blockEval, err := pool.ledger.StartEvaluator(next.BlockHeader, 0)
if err != nil {
err = fmt.Errorf("TransactionPool.assembleEmptyBlock: cannot start evaluator for %d: %v", round, err)
var nonSeqBlockEval ledgercore.ErrNonSequentialBlockEval
if errors.As(err, &nonSeqBlockEval) {
if nonSeqBlockEval.EvaluatorRound <= nonSeqBlockEval.LatestRound {
// in the case that the ledger have already moved beyond that round, just let the agreement know that
// we don't generate a block and it's perfectly fine.
return nil, ErrStaleBlockAssemblyRequest
}
}
err = fmt.Errorf("TransactionPool.assembleEmptyBlock: cannot start evaluator for %d: %w", round, err)
return nil, err
}
return blockEval.GenerateBlock()
Expand Down
20 changes: 17 additions & 3 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,13 @@ func (au *accountUpdates) Totals(rnd basics.Round) (totals ledgercore.AccountTot
return au.totalsImpl(rnd)
}

// LatestTotals returns the totals of all accounts for the most recent round, as well as the round number
func (au *accountUpdates) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) {
au.accountsMu.RLock()
defer au.accountsMu.RUnlock()
return au.latestTotalsImpl()
}

// ReadCloseSizer interface implements the standard io.Reader and io.Closer as well
// as supporting the Size() function that let the caller know what the size of the stream would be (in bytes).
type ReadCloseSizer interface {
Expand Down Expand Up @@ -909,9 +916,9 @@ func (aul *accountUpdatesLedgerEvaluator) BlockHdr(r basics.Round) (bookkeeping.
return bookkeeping.BlockHeader{}, ledgercore.ErrNoEntry{}
}

// Totals returns the totals for a given round
func (aul *accountUpdatesLedgerEvaluator) Totals(rnd basics.Round) (ledgercore.AccountTotals, error) {
return aul.au.totalsImpl(rnd)
// LatestTotals returns the totals of all accounts for the most recent round, as well as the round number
func (aul *accountUpdatesLedgerEvaluator) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) {
return aul.au.latestTotalsImpl()
}

// CheckDup test to see if the given transaction id/lease already exists. It's not needed by the accountUpdatesLedgerEvaluator and implemented as a stub.
Expand Down Expand Up @@ -941,6 +948,13 @@ func (au *accountUpdates) totalsImpl(rnd basics.Round) (totals ledgercore.Accoun
return
}

// latestTotalsImpl returns the totals of all accounts for the most recent round, as well as the round number
func (au *accountUpdates) latestTotalsImpl() (basics.Round, ledgercore.AccountTotals, error) {
offset := len(au.deltas)
rnd := au.dbRound + basics.Round(len(au.deltas))
return rnd, au.roundTotals[offset], nil
}

// initializeCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).
// the method also support balances recovery in cases where the difference between the lastBalancesRound and the lastestBlockRound
// is far greater than 320; in these cases, it would flush to disk periodically in order to avoid high memory consumption.
Expand Down
7 changes: 5 additions & 2 deletions ledger/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ type BlockEvaluator struct {
type ledgerForEvaluator interface {
ledgerForCowBase
GenesisHash() crypto.Digest
Totals(basics.Round) (ledgercore.AccountTotals, error)
LatestTotals() (basics.Round, ledgercore.AccountTotals, error)
CompactCertVoters(basics.Round) (*VotersForRound, error)
}

Expand Down Expand Up @@ -479,10 +479,13 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, proto con
base.compactCertNextRnd = votersRound + basics.Round(proto.CompactCertRounds)
}

prevTotals, err := l.Totals(eval.prevHeader.Round)
latestRound, prevTotals, err := l.LatestTotals()
if err != nil {
return nil, err
}
if latestRound != eval.prevHeader.Round {
return nil, ledgercore.ErrNonSequentialBlockEval{EvaluatorRound: hdr.Round, LatestRound: latestRound}
}

poolAddr := eval.prevHeader.RewardsPool
// get the reward pool account data without any rewards
Expand Down
14 changes: 5 additions & 9 deletions ledger/evalIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type indexerLedgerForEval interface {
LookupWithoutRewards(map[basics.Address]struct{}) (map[basics.Address]*basics.AccountData, error)
GetAssetCreator(map[basics.AssetIndex]struct{}) (map[basics.AssetIndex]FoundAddress, error)
GetAppCreator(map[basics.AppIndex]struct{}) (map[basics.AppIndex]FoundAddress, error)
Totals() (ledgercore.AccountTotals, error)
LatestTotals() (ledgercore.AccountTotals, error)
}

// Converter between indexerLedgerForEval and ledgerForEvaluator interfaces.
Expand Down Expand Up @@ -112,14 +112,10 @@ func (l indexerLedgerConnector) GenesisHash() crypto.Digest {
}

// Totals is part of ledgerForEvaluator interface.
func (l indexerLedgerConnector) Totals(round basics.Round) (ledgercore.AccountTotals, error) {
if round != l.latestRound {
return ledgercore.AccountTotals{}, fmt.Errorf(
"Totals() evaluator called this function for the wrong round %d, "+
"latest round is %d",
round, l.latestRound)
}
return l.il.Totals()
func (l indexerLedgerConnector) LatestTotals() (rnd basics.Round, totals ledgercore.AccountTotals, err error) {
totals, err = l.il.LatestTotals()
rnd = l.latestRound
return
}

// CompactCertVoters is part of ledgerForEvaluator interface.
Expand Down
5 changes: 3 additions & 2 deletions ledger/evalIndexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func (il indexerLedgerForEvalImpl) GetAppCreator(map[basics.AppIndex]struct{}) (
return nil, errors.New("GetAppCreator() not implemented")
}

func (il indexerLedgerForEvalImpl) Totals() (ledgercore.AccountTotals, error) {
return il.l.Totals(il.latestRound)
func (il indexerLedgerForEvalImpl) LatestTotals() (totals ledgercore.AccountTotals, err error) {
_, totals, err = il.l.LatestTotals()
return
}

// Test that overriding the consensus parameters effects the generated apply data.
Expand Down
17 changes: 14 additions & 3 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,22 @@ func (l *Ledger) LookupWithoutRewards(rnd basics.Round, addr basics.Address) (ba
return data, validThrough, nil
}

// Totals returns the totals of all accounts at the end of round rnd.
func (l *Ledger) Totals(rnd basics.Round) (ledgercore.AccountTotals, error) {
// LatestTotals returns the totals of all accounts for the most recent round, as well as the round number.
func (l *Ledger) LatestTotals() (basics.Round, ledgercore.AccountTotals, error) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
return l.accts.Totals(rnd)
return l.accts.LatestTotals()
}

// OnlineTotals returns the online totals of all accounts at the end of round rnd.
func (l *Ledger) OnlineTotals(rnd basics.Round) (basics.MicroAlgos, error) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
totals, err := l.accts.Totals(rnd)
if err != nil {
return basics.MicroAlgos{}, err
}
return totals.Online.Money, nil
}

// CheckDup return whether a transaction is a duplicate one.
Expand Down
3 changes: 2 additions & 1 deletion ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func makeNewEmptyBlock(t *testing.T, l *Ledger, GenesisID string, initAccounts m
}
}
} else {
totals, err := l.Totals(l.Latest())
latestRound, totals, err := l.LatestTotals()
require.NoError(t, err)
require.Equal(t, l.Latest(), latestRound)
totalRewardUnits = totals.RewardUnits()
}
poolBal, err := l.Lookup(l.Latest(), poolAddr)
Expand Down
12 changes: 12 additions & 0 deletions ledger/ledgercore/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,15 @@ type LogicEvalError struct {
func (err LogicEvalError) Error() string {
return fmt.Sprintf("logic eval error: %v", err.Err)
}

// ErrNonSequentialBlockEval provides feedback when the evaluator cannot be created for
// stale/future rounds.
type ErrNonSequentialBlockEval struct {
EvaluatorRound basics.Round // EvaluatorRound is the round the evaluator was created for
LatestRound basics.Round // LatestRound is the latest round available on disk
}

// Error satisfies builtin interface `error`
func (err ErrNonSequentialBlockEval) Error() string {
return fmt.Sprintf("block evaluation for round %d requires sequential evaluation while the latest round is %d", err.EvaluatorRound, err.LatestRound)
}
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package node

import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -1086,7 +1087,7 @@ func (vb validatedBlock) Block() bookkeeping.Block {
func (node *AlgorandFullNode) AssembleBlock(round basics.Round, deadline time.Time) (agreement.ValidatedBlock, error) {
lvb, err := node.transactionPool.AssembleBlock(round, deadline)
if err != nil {
if err == pools.ErrStaleBlockAssemblyRequest {
if errors.Is(err, pools.ErrStaleBlockAssemblyRequest) {
// convert specific error to one that would have special handling in the agreement code.
err = agreement.ErrAssembleBlockRoundStale

Expand Down

0 comments on commit 1202d32

Please sign in to comment.