Skip to content

Commit

Permalink
ledger: preload resources argument in EvalForIndexer (#3019)
Browse files Browse the repository at this point in the history
## Summary

rebase Tolik's PR on master; this PR add the ability for the indexer to preload account data so that it would be used by the evaluator.

## Test Plan

Existing unit tests updated.
  • Loading branch information
tsachiherman authored Oct 8, 2021
1 parent c59326f commit ff8139a
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 69 deletions.
8 changes: 1 addition & 7 deletions ledger/appcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,7 @@ func (cb *roundCowState) DelKey(addr basics.Address, aidx basics.AppIndex, globa

// MakeDebugBalances creates a ledger suitable for dryrun and debugger
func MakeDebugBalances(l ledgerForCowBase, round basics.Round, proto protocol.ConsensusVersion, prevTimestamp int64) apply.Balances {
base := &roundCowBase{
l: l,
rnd: round - 1,
proto: config.Consensus[proto],
accounts: make(map[basics.Address]basics.AccountData),
creators: make(map[creatable]FoundAddress),
}
base := makeRoundCowBase(l, round-1, 0, basics.Round(0), config.Consensus[proto])

hdr := bookkeeping.BlockHeader{
Round: round,
Expand Down
40 changes: 18 additions & 22 deletions ledger/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ type roundCowBase struct {
creators map[creatable]FoundAddress
}

func makeRoundCowBase(l ledgerForCowBase, rnd basics.Round, txnCount uint64, compactCertNextRnd basics.Round, proto config.ConsensusParams) *roundCowBase {
return &roundCowBase{
l: l,
rnd: rnd,
txnCount: txnCount,
compactCertNextRnd: compactCertNextRnd,
proto: proto,
accounts: make(map[basics.Address]basics.AccountData),
creators: make(map[creatable]FoundAddress),
}
}

func (x *roundCowBase) getCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) {
creatable := creatable{cindex: cidx, ctype: ctype}

Expand Down Expand Up @@ -438,18 +450,12 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, proto con
return nil, protocol.Error(prevHeader.CurrentProtocol)
}

base := &roundCowBase{
l: l,
// round that lookups come from is previous block. We validate
// the block at this round below, so underflow will be caught.
// If we are not validating, we must have previously checked
// an agreement.Certificate attesting that hdr is valid.
rnd: hdr.Round - 1,
txnCount: prevHeader.TxnCounter,
proto: proto,
accounts: make(map[basics.Address]basics.AccountData),
creators: make(map[creatable]FoundAddress),
}
// Round that lookups come from is previous block. We validate
// the block at this round below, so underflow will be caught.
// If we are not validating, we must have previously checked
// an agreement.Certificate attesting that hdr is valid.
base := makeRoundCowBase(
l, hdr.Round-1, prevHeader.TxnCounter, basics.Round(0), proto)

eval := &BlockEvaluator{
validate: validate,
Expand Down Expand Up @@ -1465,16 +1471,6 @@ func maxAddressesInTxn(proto *config.ConsensusParams) int {
return 7 + proto.MaxAppTxnAccounts
}

// Write the list of addresses referenced in `txn` to `out`. Addresses might repeat.
func getTxnAddresses(txn *transactions.Transaction, out *[]basics.Address) {
*out = (*out)[:0]

*out = append(
*out, txn.Sender, txn.Receiver, txn.CloseRemainderTo, txn.AssetSender,
txn.AssetReceiver, txn.AssetCloseTo, txn.FreezeAccount)
*out = append(*out, txn.ApplicationCallTxnFields.Accounts...)
}

// loadAccounts loads the account data for the provided transaction group list. It also loads the feeSink account and add it to the first returned transaction group.
// The order of the transaction groups returned by the channel is identical to the one in the input array.
func loadAccounts(ctx context.Context, l ledgerForEvaluator, rnd basics.Round, groups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) chan loadedTransactionGroup {
Expand Down
48 changes: 18 additions & 30 deletions ledger/evalIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ type indexerLedgerForEval interface {
LatestTotals() (ledgercore.AccountTotals, error)
}

// EvalForIndexerResources contains resources preloaded from the Indexer database.
// Indexer is able to do the preloading more efficiently than the evaluator loading
// resources one by one.
type EvalForIndexerResources struct {
// The map value is nil iff the account does not exist. The account data is owned here.
accounts map[basics.Address]*basics.AccountData
creators map[creatable]FoundAddress
}

// Converter between indexerLedgerForEval and ledgerForEvaluator interfaces.
type indexerLedgerConnector struct {
il indexerLedgerForEval
Expand Down Expand Up @@ -132,30 +141,24 @@ func makeIndexerLedgerConnector(il indexerLedgerForEval, genesisHash crypto.Dige
}
}

// Returns all addresses referenced in `block`.
func getBlockAddresses(block *bookkeeping.Block) map[basics.Address]struct{} {
// Reserve a reasonable memory size for the map.
res := make(map[basics.Address]struct{}, len(block.Payset)+2)
res[block.FeeSink] = struct{}{}
res[block.RewardsPool] = struct{}{}

var refAddresses []basics.Address
for _, stib := range block.Payset {
getTxnAddresses(&stib.Txn, &refAddresses)
for _, address := range refAddresses {
res[address] = struct{}{}
func saveResourcesInCowBase(resources EvalForIndexerResources, base *roundCowBase) {
for address, accountData := range resources.accounts {
if accountData == nil {
base.accounts[address] = basics.AccountData{}
} else {
base.accounts[address] = *accountData
}
}

return res
base.creators = resources.creators
}

// EvalForIndexer evaluates a block without validation using the given `proto`.
// Return the state delta and transactions with modified apply data according to `proto`.
// This function is used by Indexer which modifies `proto` to retrieve the asset
// close amount for each transaction even when the real consensus parameters do not
// support it.
func EvalForIndexer(il indexerLedgerForEval, block *bookkeeping.Block, proto config.ConsensusParams) (ledgercore.StateDelta, []transactions.SignedTxnInBlock, error) {
func EvalForIndexer(il indexerLedgerForEval, block *bookkeeping.Block, proto config.ConsensusParams, resources EvalForIndexerResources) (ledgercore.StateDelta, []transactions.SignedTxnInBlock, error) {
ilc := makeIndexerLedgerConnector(il, block.GenesisHash(), block.Round()-1)

eval, err := startEvaluator(
Expand All @@ -165,22 +168,7 @@ func EvalForIndexer(il indexerLedgerForEval, block *bookkeeping.Block, proto con
fmt.Errorf("EvalForIndexer() err: %w", err)
}

// Preload most needed accounts.
{
accountDataMap, err := il.LookupWithoutRewards(getBlockAddresses(block))
if err != nil {
return ledgercore.StateDelta{}, []transactions.SignedTxnInBlock{},
fmt.Errorf("EvalForIndexer() err: %w", err)
}
base := eval.state.lookupParent.(*roundCowBase)
for address, accountData := range accountDataMap {
if accountData == nil {
base.accounts[address] = basics.AccountData{}
} else {
base.accounts[address] = *accountData
}
}
}
saveResourcesInCowBase(resources, eval.state.lookupParent.(*roundCowBase))

paysetgroups, err := block.DecodePaysetGroups()
if err != nil {
Expand Down
59 changes: 52 additions & 7 deletions ledger/evalIndexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ledger
import (
"errors"
"fmt"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -176,13 +177,58 @@ func TestEvalForIndexerCustomProtocolParams(t *testing.T) {
latestRound: 0,
}
proto.EnableAssetCloseAmount = true
_, modifiedTxns, err := EvalForIndexer(il, &block, proto)
_, modifiedTxns, err := EvalForIndexer(il, &block, proto, EvalForIndexerResources{})
require.NoError(t, err)

require.Equal(t, 4, len(modifiedTxns))
assert.Equal(t, uint64(70), modifiedTxns[3].AssetClosingAmount)
}

// Test that preloading data in cow base works as expected.
func TestSaveResourcesInCowBase(t *testing.T) {
partitiontest.PartitionTest(t)

var address basics.Address
_, err := rand.Read(address[:])
require.NoError(t, err)

base := makeRoundCowBase(
nil, basics.Round(0), 0, basics.Round(0), config.ConsensusParams{})

resources := EvalForIndexerResources{
accounts: map[basics.Address]*basics.AccountData{
address: {
MicroAlgos: basics.MicroAlgos{Raw: 5},
},
},
creators: map[creatable]FoundAddress{
{cindex: basics.CreatableIndex(6), ctype: basics.AssetCreatable}: {Address: address, Exists: true},
{cindex: basics.CreatableIndex(6), ctype: basics.AppCreatable}: {Address: address, Exists: false},
},
}

saveResourcesInCowBase(resources, base)

{
accountData, err := base.lookup(address)
require.NoError(t, err)
assert.Equal(t, basics.AccountData{MicroAlgos: basics.MicroAlgos{Raw: 5}}, accountData)
}
{
address, found, err :=
base.getCreator(basics.CreatableIndex(6), basics.AssetCreatable)
require.NoError(t, err)
require.True(t, found)
assert.Equal(t, address, address)
}
{
_, found, err :=
base.getCreator(basics.CreatableIndex(6), basics.AppCreatable)
require.NoError(t, err)
require.False(t, found)
}
}

// TestEvalForIndexerForExpiredAccounts tests that the EvalForIndexer function will correctly mark accounts offline
func TestEvalForIndexerForExpiredAccounts(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down Expand Up @@ -214,19 +260,19 @@ func TestEvalForIndexerForExpiredAccounts(t *testing.T) {
latestRound: 0,
}

_, _, err = EvalForIndexer(il, &block, proto)
_, _, err = EvalForIndexer(il, &block, proto, EvalForIndexerResources{})
require.NoError(t, err)

badBlock := block
// First validate that bad block is fine if we dont touch it...
_, _, err = EvalForIndexer(il, &badBlock, proto)
_, _, err = EvalForIndexer(il, &badBlock, proto, EvalForIndexerResources{})
require.NoError(t, err)

// Introduce an unknown address, but this time the Eval function is called with parameters that
// don't necessarily mean that this will cause an error. Just that an empty address will be added
badBlock.ExpiredParticipationAccounts = append(badBlock.ExpiredParticipationAccounts, basics.Address{123})

_, _, err = EvalForIndexer(il, &badBlock, proto)
_, _, err = EvalForIndexer(il, &badBlock, proto, EvalForIndexerResources{})
require.NoError(t, err)

badBlock = block
Expand All @@ -238,14 +284,13 @@ func TestEvalForIndexerForExpiredAccounts(t *testing.T) {
badBlock.ExpiredParticipationAccounts = append(badBlock.ExpiredParticipationAccounts, addressToCopy)
}

_, _, err = EvalForIndexer(il, &badBlock, proto)
_, _, err = EvalForIndexer(il, &badBlock, proto, EvalForIndexerResources{})
require.Error(t, err)

// Sanity Check

badBlock = block

_, _, err = EvalForIndexer(il, &badBlock, proto)
_, _, err = EvalForIndexer(il, &badBlock, proto, EvalForIndexerResources{})
require.NoError(t, err)

}
6 changes: 3 additions & 3 deletions ledger/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,7 @@ func TestEvalFunctionForExpiredAccounts(t *testing.T) {

newBlock := bookkeeping.MakeBlock(genesisInitState.Block.BlockHeader)

blkEval, err := l.StartEvaluator(newBlock.BlockHeader, 0)
blkEval, err := l.StartEvaluator(newBlock.BlockHeader, 0, 0)

// Advance the evaluator a couple rounds...
for i := uint64(0); i < uint64(targetRound); i++ {
Expand Down Expand Up @@ -2105,7 +2105,7 @@ func TestExpiredAccountGenerationWithDiskFailure(t *testing.T) {

newBlock := bookkeeping.MakeBlock(genesisInitState.Block.BlockHeader)

eval, err := l.StartEvaluator(newBlock.BlockHeader, 0)
eval, err := l.StartEvaluator(newBlock.BlockHeader, 0, 0)

// Advance the evaluator a couple rounds...
for i := uint64(0); i < uint64(targetRound); i++ {
Expand Down Expand Up @@ -2195,7 +2195,7 @@ func TestExpiredAccountGeneration(t *testing.T) {

newBlock := bookkeeping.MakeBlock(genesisInitState.Block.BlockHeader)

eval, err := l.StartEvaluator(newBlock.BlockHeader, 0)
eval, err := l.StartEvaluator(newBlock.BlockHeader, 0, 0)

// Advance the evaluator a couple rounds...
for i := uint64(0); i < uint64(targetRound); i++ {
Expand Down

0 comments on commit ff8139a

Please sign in to comment.