Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

algod importer: Fix archival mode and update tests. #57

Merged
merged 5 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ var algodImporterMetadata = plugins.Metadata{
}

func (algodImp *algodImporter) OnComplete(input data.BlockData) error {
if algodImp.mode != followerMode {
return nil
if algodImp.mode == followerMode {
algochoi marked this conversation as resolved.
Show resolved Hide resolved
_, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx)
return err
}
_, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx)
return err

return nil
}

func (algodImp *algodImporter) Metadata() plugins.Metadata {
Expand Down Expand Up @@ -154,11 +155,14 @@ func (algodImp *algodImporter) monitorCatchpointCatchup() error {
}

func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error {
// Set the sync round to the round provided by initProvider
_, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
if err != nil {
return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err)
if algodImp.mode == followerMode {
// Set the sync round to the round provided by initProvider
_, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
if err != nil {
return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err)
}
}

// Run Catchpoint Catchup
if algodImp.cfg.CatchupConfig.Catchpoint != "" {
cpRound, err := parseCatchpointRound(algodImp.cfg.CatchupConfig.Catchpoint)
Expand Down Expand Up @@ -187,7 +191,8 @@ func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error
return err
}
}
_, err = algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)

_, err := algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx)
if err != nil {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err)
}
Expand Down
142 changes: 79 additions & 63 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"strings"
"testing"

"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit"
Expand Down Expand Up @@ -65,19 +65,28 @@ netaddr: %s

func TestInitSuccess(t *testing.T) {
tests := []struct {
name string
name string
responder func(string, http.ResponseWriter) bool
}{
{"archival"},
{"follower"},
{
name: "archival",
responder: MakeSyncRoundResponder(http.StatusNotFound),
},
{
name: "follower",
responder: MakeSyncRoundResponder(http.StatusOK),
},
}
for _, ttest := range tests {
t.Run(ttest.name, func(t *testing.T) {
ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusOK), BlockAfterResponder)
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ts := NewAlgodServer(GenesisResponder, tc.responder, BlockAfterResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, ttest.name, ts.URL)
`, tc.name, ts.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)
Expand Down Expand Up @@ -172,7 +181,9 @@ func TestInitCatchup(t *testing.T) {
[]string{}},
}
for _, ttest := range tests {
ttest := ttest
t.Run(ttest.name, func(t *testing.T) {
t.Parallel()
testLogger, hook := test.NewNullLogger()
testImporter := New()
cfgStr := fmt.Sprintf(`---
Expand Down Expand Up @@ -201,42 +212,26 @@ catchup-config:
}

func TestInitParseUrlFailure(t *testing.T) {
tests := []struct {
url string
}{
{".0.0.0.0.0.0.0:1234"},
}
for _, ttest := range tests {
t.Run(ttest.url, func(t *testing.T) {
testImporter := New()
cfgStr := fmt.Sprintf(`---
url := ".0.0.0.0.0.0.0:1234"
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, "follower", ttest.url)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.ErrorContains(t, err, "parse")
})
}
`, "follower", url)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.ErrorContains(t, err, "parse")
}

func TestInitModeFailure(t *testing.T) {
tests := []struct {
name string
}{
{"foobar"},
}
for _, ttest := range tests {
t.Run(ttest.name, func(t *testing.T) {
ts := NewAlgodServer(GenesisResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
name := "foobar"
ts := NewAlgodServer(GenesisResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, ttest.name, ts.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.EqualError(t, err, fmt.Sprintf("algod importer was set to a mode (%s) that wasn't supported", ttest.name))
})
}
`, name, ts.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
assert.EqualError(t, err, fmt.Sprintf("algod importer was set to a mode (%s) that wasn't supported", name))
}

func TestInitGenesisFailure(t *testing.T) {
Expand Down Expand Up @@ -270,7 +265,7 @@ func TestConfigDefault(t *testing.T) {
}

func TestWaitForBlockBlockFailure(t *testing.T) {
ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusOK), BlockAfterResponder)
ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusNotFound), BlockAfterResponder)
testImporter := New()
cfgStr := fmt.Sprintf(`---
mode: %s
Expand All @@ -289,30 +284,47 @@ netaddr: %s
func TestGetBlockSuccess(t *testing.T) {
tests := []struct {
name string
mode string
algodServer *httptest.Server
}{
{"", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusOK))},
{"archival", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusOK))},
{"follower", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))},
{
name: "default",
mode: "",
algodServer: NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusNotFound))},
{
name: "archival",
mode: "archival",
algodServer: NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusNotFound))},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these http.StatusOK to http.StatusNotFound changes led to tests that fail without the bugfix.

{
name: "follower",
mode: "follower",
algodServer: NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK)),
},
}
for _, ttest := range tests {
t.Run(ttest.name, func(t *testing.T) {
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
cfg := Config{
Mode: tc.mode,
NetAddr: tc.algodServer.URL,
}
cfgStr, err := yaml.Marshal(cfg)
require.NoError(t, err)

ctx, cancel = context.WithCancel(context.Background())
testImporter := New()
defer cancel()
testImporter := &algodImporter{}

cfgStr := fmt.Sprintf(`---
mode: %s
netaddr: %s
`, ttest.name, ttest.algodServer.URL)
_, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger)
_, err = testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(string(cfgStr)), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

Expand All @@ -326,13 +338,12 @@ netaddr: %s
assert.NoError(t, err)
assert.Equal(t, downloadedBlk.Round(), uint64(10))
assert.True(t, downloadedBlk.Empty())
if ttest.name == followerModeStr {
// We're not setting the delta yet, but in the future we will
// assert.NotNil(t, downloadedBlk.Delta)
// Delta only set in follower mode
if tc.name == followerModeStr {
assert.NotNil(t, downloadedBlk.Delta)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
} else {
assert.Nil(t, downloadedBlk.Delta)
}
cancel()
})
}
}
Expand All @@ -345,15 +356,18 @@ func TestGetBlockContextCancelled(t *testing.T) {
{"archival", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder,
MakeSyncRoundResponder(http.StatusOK))},
MakeSyncRoundResponder(http.StatusNotFound))},
{"follower", NewAlgodServer(GenesisResponder,
BlockResponder,
BlockAfterResponder, LedgerStateDeltaResponder,
MakeSyncRoundResponder(http.StatusOK))},
}

for _, ttest := range tests {
ttest := ttest
t.Run(ttest.name, func(t *testing.T) {
// this didn't work...
algochoi marked this conversation as resolved.
Show resolved Hide resolved
//t.Parallel()
ctx, cancel = context.WithCancel(context.Background())
testImporter := New()
cfgStr := fmt.Sprintf(`---
Expand All @@ -377,12 +391,14 @@ func TestGetBlockFailure(t *testing.T) {
algodServer *httptest.Server
}{
{"archival", NewAlgodServer(GenesisResponder,
BlockAfterResponder, MakeSyncRoundResponder(http.StatusOK))},
BlockAfterResponder, MakeSyncRoundResponder(http.StatusNotFound))},
{"follower", NewAlgodServer(GenesisResponder,
BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))},
}
for _, ttest := range tests {
ttest := ttest
t.Run(ttest.name, func(t *testing.T) {
t.Parallel()
ctx, cancel = context.WithCancel(context.Background())
testImporter := New()

Expand Down