Skip to content

Commit

Permalink
algod importer: Fix archival mode and update tests. (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Apr 18, 2023
1 parent 595e84f commit fefc6fd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 72 deletions.
23 changes: 14 additions & 9 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ var algodImporterMetadata = plugins.Metadata{
}

func (algodImp *algodImporter) OnComplete(input data.BlockData) error {
if algodImp.mode != followerMode {
return nil
if algodImp.mode == followerMode {
_, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx)
return err
}
_, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx)
return err

return nil
}

func (algodImp *algodImporter) Metadata() plugins.Metadata {
Expand Down Expand Up @@ -154,11 +155,14 @@ func (algodImp *algodImporter) monitorCatchpointCatchup() error {
}

func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error {
// Set the sync round to the round provided by initProvider
_, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
if err != nil {
return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err)
if algodImp.mode == followerMode {
// Set the sync round to the round provided by initProvider
_, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
if err != nil {
return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err)
}
}

// Run Catchpoint Catchup
if algodImp.cfg.CatchupConfig.Catchpoint != "" {
cpRound, err := parseCatchpointRound(algodImp.cfg.CatchupConfig.Catchpoint)
Expand Down Expand Up @@ -187,7 +191,8 @@ func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error
return err
}
}
_, err = algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)

_, err := algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
if err != nil {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err)
}
Expand Down
142 changes: 79 additions & 63 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"strings"
"testing"

"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit"
Expand Down Expand Up @@ -65,19 +65,28 @@ netaddr: %s

func TestInitSuccess(t *testing.T) {
tests := []struct {
name string
name string
responder func(string, http.ResponseWriter) bool
}{
{"archival"},
{"follower"},
{
name: "archival",
responder: MakeSyncRoundResponder(http.StatusNotFound),
},
{
name: "follower",
responder: MakeSyncRoundResponder(http.StatusOK),
},
}
for _, ttest := range tests {
t.Run(ttest.name, func(t *testing.T) {
ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusOK), BlockAfterResponder)
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ts := NewAlgodServer(GenesisResponder, tc.responder, BlockAfterResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, ttest.name, ts.URL)
`, tc.name, ts.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)
Expand Down Expand Up @@ -172,7 +181,9 @@ func TestInitCatchup(t *testing.T) {
[]string{}},
}
for _, ttest := range tests {
ttest := ttest
t.Run(ttest.name, func(t *testing.T) {
t.Parallel()
testLogger, hook := test.NewNullLogger()
testImporter := New()
cfgStr := fmt.Sprintf(`---
Expand Down Expand Up @@ -201,42 +212,26 @@ catchup-config:
}

func TestInitParseUrlFailure(t *testing.T) {
tests := []struct {
url string
}{
{".0.0.0.0.0.0.0:1234"},
}
for _, ttest := range tests {
t.Run(ttest.url, func(t *testing.T) {
testImporter := New()
cfgStr := fmt.Sprintf(`---
url := ".0.0.0.0.0.0.0:1234"
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, "follower", ttest.url)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.ErrorContains(t, err, "parse")
})
}
`, "follower", url)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.ErrorContains(t, err, "parse")
}

func TestInitModeFailure(t *testing.T) {
tests := []struct {
name string
}{
{"foobar"},
}
for _, ttest := range tests {
t.Run(ttest.name, func(t *testing.T) {
ts := NewAlgodServer(GenesisResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
name := "foobar"
ts := NewAlgodServer(GenesisResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, ttest.name, ts.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.EqualError(t, err, fmt.Sprintf("algod importer was set to a mode (%s) that wasn't supported", ttest.name))
})
}
`, name, ts.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.EqualError(t, err, fmt.Sprintf("algod importer was set to a mode (%s) that wasn't supported", name))
}

func TestInitGenesisFailure(t *testing.T) {
Expand Down Expand Up @@ -270,7 +265,7 @@ func TestConfigDefault(t *testing.T) {
}

func TestWaitForBlockBlockFailure(t *testing.T) {
ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusOK), BlockAfterResponder)
ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusNotFound), BlockAfterResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
Expand All @@ -289,30 +284,47 @@ netaddr: %s
func TestGetBlockSuccess(t *testing.T) {
tests := []struct {
name string
mode string
algodServer *httptest.Server
}{
{"", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusOK))},
{"archival", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusOK))},
{"follower", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))},
{
name: "default",
mode: "",
algodServer: NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusNotFound))},
{
name: "archival",
mode: "archival",
algodServer: NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusNotFound))},
{
name: "follower",
mode: "follower",
algodServer: NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK)),
},
}
for _, ttest := range tests {
t.Run(ttest.name, func(t *testing.T) {
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
cfg := Config{
Mode: tc.mode,
NetAddr: tc.algodServer.URL,
}
cfgStr, err := yaml.Marshal(cfg)
require.NoError(t, err)

ctx, cancel = context.WithCancel(context.Background())
testImporter := New()
defer cancel()
testImporter := &algodImporter{}

cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, ttest.name, ttest.algodServer.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
_, err = testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(string(cfgStr)), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

Expand All @@ -326,13 +338,12 @@ netaddr: %s
assert.NoError(t, err)
assert.Equal(t, downloadedBlk.Round(), uint64(10))
assert.True(t, downloadedBlk.Empty())
if ttest.name == followerModeStr {
// We're not setting the delta yet, but in the future we will
// assert.NotNil(t, downloadedBlk.Delta)
// Delta only set in follower mode
if tc.name == followerModeStr {
assert.NotNil(t, downloadedBlk.Delta)
} else {
assert.Nil(t, downloadedBlk.Delta)
}
cancel()
})
}
}
Expand All @@ -345,15 +356,18 @@ func TestGetBlockContextCancelled(t *testing.T) {
{"archival", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusOK))},
MakeSyncRoundResponder(http.StatusNotFound))},
{"follower", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder, LedgerStateDeltaResponder,
MakeSyncRoundResponder(http.StatusOK))},
}

for _, ttest := range tests {
ttest := ttest
t.Run(ttest.name, func(t *testing.T) {
// this didn't work...
//t.Parallel()
ctx, cancel = context.WithCancel(context.Background())
testImporter := New()
cfgStr := fmt.Sprintf(`---
Expand All @@ -377,12 +391,14 @@ func TestGetBlockFailure(t *testing.T) {
algodServer *httptest.Server
}{
{"archival", NewAlgodServer(GenesisResponder,
BlockAfterResponder, MakeSyncRoundResponder(http.StatusOK))},
BlockAfterResponder, MakeSyncRoundResponder(http.StatusNotFound))},
{"follower", NewAlgodServer(GenesisResponder,
BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))},
}
for _, ttest := range tests {
ttest := ttest
t.Run(ttest.name, func(t *testing.T) {
t.Parallel()
ctx, cancel = context.WithCancel(context.Background())
testImporter := New()

Expand Down

0 comments on commit fefc6fd

Please sign in to comment.