Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local Ledger Deployment #1013

Merged
merged 38 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a96fa70
integrate block processor
shiqizng May 12, 2022
ed32cb1
update ledger path
shiqizng May 12, 2022
8568fa2
add ledger close
shiqizng May 12, 2022
fda48cf
refactor AddBlock
shiqizng May 13, 2022
075f265
api handler test fails
shiqizng May 16, 2022
ac04503
Merge branch 'develop' into shiqi/AddBlock
shiqizng May 16, 2022
d7b8012
fixed handler e2e
shiqizng May 17, 2022
4dbda7d
updating postgres integration tests
shiqizng May 18, 2022
c0726b9
more test updates
shiqizng May 19, 2022
aee8e16
refactor processor to use EvalForIndexer
shiqizng May 19, 2022
a12262c
more test refactoring
shiqizng May 20, 2022
4c15d63
make integration failing
shiqizng May 20, 2022
7a8c05d
adding ledger for eval tests
shiqizng May 23, 2022
8de6a18
more tests
shiqizng May 24, 2022
088434b
make integration working
shiqizng May 25, 2022
dfd2e83
fix linting errors
shiqizng May 25, 2022
a40789f
remove debugging lines
shiqizng May 25, 2022
1abe9f9
error fix
shiqizng May 25, 2022
d0fe6a4
fix test failure
shiqizng May 25, 2022
39dda1b
refactoring
shiqizng May 25, 2022
1e7c0a6
more refactoring
shiqizng May 25, 2022
2ca22f3
fix failed test
shiqizng May 25, 2022
c4ab181
fix
shiqizng May 25, 2022
4d38249
fix failing test
shiqizng May 25, 2022
6ea76de
ready for review
shiqizng May 26, 2022
cae1cf7
adding ledger migration
shiqizng Jun 1, 2022
5989eb3
update logging messages and check migration round
shiqizng Jun 1, 2022
f9b22ef
remove unused file
shiqizng Jun 1, 2022
3fa3682
fix linting errors
shiqizng Jun 1, 2022
1f8b802
trying httpmock
shiqizng Jun 2, 2022
b505031
rm testing files
shiqizng Jun 2, 2022
55bcf52
update migration test
shiqizng Jun 2, 2022
adaabb2
update test
shiqizng Jun 2, 2022
df6bcb6
formatting
shiqizng Jun 2, 2022
80fc318
update test
shiqizng Jun 7, 2022
c29f416
merge localledger/integration
shiqizng Jun 8, 2022
9a3e93e
update make processor call
shiqizng Jun 8, 2022
7dd1d65
renaming vars
shiqizng Jun 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ Settings can be provided from the command line, a configuration file, or an envi
| default-balances-limit | | default-balances-limit | INDEXER_DEFAULT_BALANCES_LIMIT |
| max-applications-limit | | max-applications-limit | INDEXER_MAX_APPLICATIONS_LIMIT |
| default-applications-limit | | default-applications-limit | INDEXER_DEFAULT_APPLICATIONS_LIMIT |

| data-dir | i | data-dir | INDEXER_DATA_DIR |
## Command line

The command line arguments always take priority over the config file and environment variables.
Expand Down
74 changes: 59 additions & 15 deletions api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
"time"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"

"github.com/algorand/indexer/api/generated/v2"
Expand Down Expand Up @@ -70,9 +72,6 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeepin
err = db.LoadGenesis(genesis)
require.NoError(t, err)

err = db.AddBlock(&genesisBlock)
require.NoError(t, err)

return db, newShutdownFunc
}

Expand Down Expand Up @@ -112,7 +111,12 @@ func TestApplicationHandlers(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn, &optInTxnA, &optInTxnB)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

//////////
Expand Down Expand Up @@ -238,7 +242,12 @@ func TestAccountExcludeParameters(t *testing.T) {
&appOptInTxnA, &appOptInTxnB, &assetOptInTxnA, &assetOptInTxnB)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

//////////
Expand Down Expand Up @@ -443,7 +452,12 @@ func TestAccountMaxResultsLimit(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, ptxns...)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

//////////
Expand Down Expand Up @@ -845,7 +859,12 @@ func TestInnerTxn(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

for _, tc := range testcases {
Expand Down Expand Up @@ -897,7 +916,12 @@ func TestPagingRootTxnDeduplication(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

testcases := []struct {
Expand Down Expand Up @@ -1044,7 +1068,12 @@ func TestKeyregTransactionWithStateProofKeys(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

e := echo.New()
Expand Down Expand Up @@ -1147,7 +1176,12 @@ func TestAccountClearsNonUTF8(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &createAsset)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

verify := func(params generated.AssetParams) {
Expand Down Expand Up @@ -1268,7 +1302,12 @@ func TestLookupInnerLogs(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

for _, tc := range testcases {
Expand Down Expand Up @@ -1366,7 +1405,12 @@ func TestLookupMultiInnerLogs(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = db.AddBlock(&block)
l := test.MakeTestLedger("ledger")
defer l.Close()
proc, err := blockprocessor.MakeProcessor(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
blockCert := rpcs.EncodedBlockCert{Block: block}
err = proc.Process(&blockCert)
require.NoError(t, err, "failed to commit")

for _, tc := range testcases {
Expand Down
93 changes: 79 additions & 14 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@ package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
algodConfig "github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/api"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/config"
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/metrics"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var (
Expand Down Expand Up @@ -118,11 +130,19 @@ var daemonCmd = &cobra.Command{
}

opts.MaxConn = maxConn
opts.IndexerDatadir = indexerDataDir
opts.AlgodDataDir = algodDataDir
opts.AlgodToken = algodToken
opts.AlgodAddr = algodAddr

db, availableCh := indexerDbFromFlags(opts)
defer db.Close()
var wg sync.WaitGroup
if bot != nil {
if indexerDataDir == "" {
fmt.Fprint(os.Stderr, "missing indexer data directory")
os.Exit(1)
}
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -135,13 +155,27 @@ var daemonCmd = &cobra.Command{
_, err := importer.EnsureInitialImport(db, genesisReader, logger)
maybeFail(err, "importer.EnsureInitialImport() error")
logger.Info("Initializing block import handler.")

nextRound, err := db.GetNextRoundToAccount()
maybeFail(err, "failed to get next round, %v", err)
bot.SetNextRound(nextRound)

imp := importer.NewImporter(db)
handler := blockHandler(imp, 1*time.Second)

genesisReader = importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger)
genesis, err := readGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")
genesisBlock, err := getGenesisBlock(bot.Algod())
maybeFail(err, "Error getting genesis block")
initState, err := util.CreateInitState(&genesis, &genesisBlock)
maybeFail(err, "Error creating init state")

logger.Info("Initializing local ledger.")
localLedger, err := ledger.OpenLedger(logging.NewLogger(), filepath.Join(path.Dir(indexerDataDir), "ledger"), false, initState, algodConfig.GetDefaultLocal())
maybeFail(err, "failed to open ledger %v", err)
defer localLedger.Close()
bot.SetNextRound(uint64(localLedger.Latest()) + 1)

proc, err := blockprocessor.MakeProcessor(localLedger, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeProcessor() err %v", err)
}
handler := blockHandler(proc, 1*time.Second)
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
Expand Down Expand Up @@ -265,10 +299,10 @@ func makeOptions() (options api.ExtraOptions) {

// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
func blockHandler(proc processor.Processor, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
return func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
for {
err := handleBlock(block, imp)
err := handleBlock(block, proc)
if err == nil {
// return on success.
return nil
Expand All @@ -285,12 +319,12 @@ func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context.
}
}

func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error {
func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error {
start := time.Now()
err := imp.ImportBlock(block)
err := proc.Process(block)
if err != nil {
logger.WithError(err).Errorf(
"adding block %d to database failed", block.Block.Round())
"block %d import failed", block.Block.Round())
return fmt.Errorf("handleBlock() err: %w", err)
}
dt := time.Since(start)
Expand All @@ -313,3 +347,34 @@ func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error {

return nil
}

func readGenesis(reader io.Reader) (bookkeeping.Genesis, error) {
var genesis bookkeeping.Genesis
if reader == nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: reader is nil")
}
gbytes, err := ioutil.ReadAll(reader)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err)
}
err = protocol.DecodeJSON(gbytes, &genesis)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err)
}
return genesis, nil
}

func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) {
data, err := client.BlockRaw(0).Do(context.Background())
if err != nil {
return bookkeeping.Block{}, fmt.Errorf("getGenesisBlock() client err: %w", err)
}

var block rpcs.EncodedBlockCert
err = protocol.Decode(data, &block)
if err != nil {
return bookkeeping.Block{}, fmt.Errorf("getGenesisBlock() decode err: %w", err)
}

return block.Block, nil
}
Loading