From 288faa72e1d4f089f3fd0beab71402573ef20a94 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Fri, 14 Apr 2023 12:13:16 -0400 Subject: [PATCH 1/5] algod importer: Fix archival mode and update tests. --- .../plugins/importers/algod/algod_importer.go | 25 +++++--- .../importers/algod/algod_importer_test.go | 62 ++++++++++++------- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index e8ada7c1..9aa25b34 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -34,7 +34,7 @@ const ( ) const ( - archivalMode = iota + archivalMode = iota + 1 followerMode ) @@ -63,11 +63,12 @@ var algodImporterMetadata = plugins.Metadata{ } func (algodImp *algodImporter) OnComplete(input data.BlockData) error { - if algodImp.mode != followerMode { - return nil + if algodImp.mode == followerMode { + _, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx) + return err } - _, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx) - return err + + return nil } func (algodImp *algodImporter) Metadata() plugins.Metadata { @@ -154,11 +155,14 @@ func (algodImp *algodImporter) monitorCatchpointCatchup() error { } func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error { - // Set the sync round to the round provided by initProvider - _, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx) - if err != nil { - return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err) + if algodImp.mode == followerMode { + // Set the sync round to the round provided by initProvider + _, err := algodImp.aclient.SetSyncRound(uint64(initProvider.NextDBRound())).Do(algodImp.ctx) + if err != nil { + return fmt.Errorf("received unexpected error setting sync round (%d): %w", initProvider.NextDBRound(), err) + } } + // Run Catchpoint Catchup if algodImp.cfg.CatchupConfig.Catchpoint != "" { cpRound, err := parseCatchpointRound(algodImp.cfg.CatchupConfig.Catchpoint) @@ -187,7 +191,8 @@ func (algodImp *algodImporter) catchupNode(initProvider data.InitProvider) error return err } } - _, err = algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx) + + _, err := algodImp.aclient.StatusAfterBlock(uint64(initProvider.NextDBRound())).Do(algodImp.ctx) if err != nil { err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err) } diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index 2db94cc2..48b154ba 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -289,30 +289,47 @@ netaddr: %s func TestGetBlockSuccess(t *testing.T) { tests := []struct { name string + mode string algodServer *httptest.Server }{ - {"", NewAlgodServer(GenesisResponder, - BlockResponder, - BlockAfterResponder, - MakeSyncRoundResponder(http.StatusOK))}, - {"archival", NewAlgodServer(GenesisResponder, - BlockResponder, - BlockAfterResponder, - MakeSyncRoundResponder(http.StatusOK))}, - {"follower", NewAlgodServer(GenesisResponder, - BlockResponder, - BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))}, + { + name: "default", + mode: "", + algodServer: NewAlgodServer(GenesisResponder, + BlockResponder, + BlockAfterResponder, + MakeSyncRoundResponder(http.StatusOK))}, + { + name: "archival", + mode: "archival", + algodServer: NewAlgodServer(GenesisResponder, + BlockResponder, + BlockAfterResponder, + MakeSyncRoundResponder(http.StatusNotFound))}, + { + name: "follower", + mode: "follower", + algodServer: NewAlgodServer(GenesisResponder, + BlockResponder, + BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK)), + }, } - for _, ttest := range tests { - t.Run(ttest.name, func(t *testing.T) { + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + cfg := Config{ + Mode: tc.mode, + NetAddr: tc.algodServer.URL, + } + cfgStr, err := yaml.Marshal(cfg) + require.NoError(t, err) + ctx, cancel = context.WithCancel(context.Background()) - testImporter := New() + defer cancel() + testImporter := &algodImporter{} - cfgStr := fmt.Sprintf(`--- -mode: %s -netaddr: %s -`, ttest.name, ttest.algodServer.URL) - _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger) + _, err = testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(string(cfgStr)), logger) assert.NoError(t, err) assert.NotEqual(t, testImporter, nil) @@ -326,13 +343,12 @@ netaddr: %s assert.NoError(t, err) assert.Equal(t, downloadedBlk.Round(), uint64(10)) assert.True(t, downloadedBlk.Empty()) - if ttest.name == followerModeStr { - // We're not setting the delta yet, but in the future we will - // assert.NotNil(t, downloadedBlk.Delta) + // Delta only set in follower mode + if tc.name == followerModeStr { + assert.NotNil(t, downloadedBlk.Delta) } else { assert.Nil(t, downloadedBlk.Delta) } - cancel() }) } } From c969dffeccf76e2720defb5711e2acda48a4de50 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Fri, 14 Apr 2023 13:00:53 -0400 Subject: [PATCH 2/5] Update more tests. --- .../importers/algod/algod_importer_test.go | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index 48b154ba..f00a9baa 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -9,13 +9,13 @@ import ( "strings" "testing" - "github.com/algorand/go-algorand-sdk/v2/client/v2/common/models" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "github.com/algorand/go-algorand-sdk/v2/client/v2/common/models" sdk "github.com/algorand/go-algorand-sdk/v2/types" "github.com/algorand/conduit/conduit" @@ -65,19 +65,27 @@ netaddr: %s func TestInitSuccess(t *testing.T) { tests := []struct { - name string + name string + responder func(string, http.ResponseWriter) bool }{ - {"archival"}, - {"follower"}, + { + name: "archival", + responder: MakeSyncRoundResponder(http.StatusNotFound), + }, + { + name: "follower", + responder: MakeSyncRoundResponder(http.StatusOK), + }, } - for _, ttest := range tests { - t.Run(ttest.name, func(t *testing.T) { - ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusOK), BlockAfterResponder) + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ts := NewAlgodServer(GenesisResponder, tc.responder, BlockAfterResponder) testImporter := New() cfgStr := fmt.Sprintf(`--- mode: %s netaddr: %s -`, ttest.name, ts.URL) +`, tc.name, ts.URL) _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger) assert.NoError(t, err) assert.NotEqual(t, testImporter, nil) @@ -172,7 +180,9 @@ func TestInitCatchup(t *testing.T) { []string{}}, } for _, ttest := range tests { + ttest := ttest t.Run(ttest.name, func(t *testing.T) { + t.Parallel() testLogger, hook := test.NewNullLogger() testImporter := New() cfgStr := fmt.Sprintf(`--- @@ -270,7 +280,7 @@ func TestConfigDefault(t *testing.T) { } func TestWaitForBlockBlockFailure(t *testing.T) { - ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusOK), BlockAfterResponder) + ts := NewAlgodServer(GenesisResponder, MakeSyncRoundResponder(http.StatusNotFound), BlockAfterResponder) testImporter := New() cfgStr := fmt.Sprintf(`--- mode: %s @@ -298,7 +308,7 @@ func TestGetBlockSuccess(t *testing.T) { algodServer: NewAlgodServer(GenesisResponder, BlockResponder, BlockAfterResponder, - MakeSyncRoundResponder(http.StatusOK))}, + MakeSyncRoundResponder(http.StatusNotFound))}, { name: "archival", mode: "archival", @@ -361,7 +371,7 @@ func TestGetBlockContextCancelled(t *testing.T) { {"archival", NewAlgodServer(GenesisResponder, BlockResponder, BlockAfterResponder, - MakeSyncRoundResponder(http.StatusOK))}, + MakeSyncRoundResponder(http.StatusNotFound))}, {"follower", NewAlgodServer(GenesisResponder, BlockResponder, BlockAfterResponder, LedgerStateDeltaResponder, @@ -393,7 +403,7 @@ func TestGetBlockFailure(t *testing.T) { algodServer *httptest.Server }{ {"archival", NewAlgodServer(GenesisResponder, - BlockAfterResponder, MakeSyncRoundResponder(http.StatusOK))}, + BlockAfterResponder, MakeSyncRoundResponder(http.StatusNotFound))}, {"follower", NewAlgodServer(GenesisResponder, BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))}, } From 016825c817e9cac5fe725643cf6c2fcab385058f Mon Sep 17 00:00:00 2001 From: Will Winder Date: Fri, 14 Apr 2023 13:35:44 -0400 Subject: [PATCH 3/5] Test improvements. --- .../importers/algod/algod_importer_test.go | 45 +++++++------------ 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index f00a9baa..0ae8977d 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -80,6 +80,7 @@ func TestInitSuccess(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.name, func(t *testing.T) { + t.Parallel() ts := NewAlgodServer(GenesisResponder, tc.responder, BlockAfterResponder) testImporter := New() cfgStr := fmt.Sprintf(`--- @@ -211,42 +212,26 @@ catchup-config: } func TestInitParseUrlFailure(t *testing.T) { - tests := []struct { - url string - }{ - {".0.0.0.0.0.0.0:1234"}, - } - for _, ttest := range tests { - t.Run(ttest.url, func(t *testing.T) { - testImporter := New() - cfgStr := fmt.Sprintf(`--- + url := ".0.0.0.0.0.0.0:1234" + testImporter := New() + cfgStr := fmt.Sprintf(`--- mode: %s netaddr: %s -`, "follower", ttest.url) - _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger) - assert.ErrorContains(t, err, "parse") - }) - } +`, "follower", url) + _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger) + assert.ErrorContains(t, err, "parse") } func TestInitModeFailure(t *testing.T) { - tests := []struct { - name string - }{ - {"foobar"}, - } - for _, ttest := range tests { - t.Run(ttest.name, func(t *testing.T) { - ts := NewAlgodServer(GenesisResponder) - testImporter := New() - cfgStr := fmt.Sprintf(`--- + name := "foobar" + ts := NewAlgodServer(GenesisResponder) + testImporter := New() + cfgStr := fmt.Sprintf(`--- mode: %s netaddr: %s -`, ttest.name, ts.URL) - _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger) - assert.EqualError(t, err, fmt.Sprintf("algod importer was set to a mode (%s) that wasn't supported", ttest.name)) - }) - } +`, name, ts.URL) + _, err := testImporter.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil), plugins.MakePluginConfig(cfgStr), logger) + assert.EqualError(t, err, fmt.Sprintf("algod importer was set to a mode (%s) that wasn't supported", name)) } func TestInitGenesisFailure(t *testing.T) { @@ -380,6 +365,7 @@ func TestGetBlockContextCancelled(t *testing.T) { for _, ttest := range tests { t.Run(ttest.name, func(t *testing.T) { + t.Parallel() ctx, cancel = context.WithCancel(context.Background()) testImporter := New() cfgStr := fmt.Sprintf(`--- @@ -409,6 +395,7 @@ func TestGetBlockFailure(t *testing.T) { } for _, ttest := range tests { t.Run(ttest.name, func(t *testing.T) { + t.Parallel() ctx, cancel = context.WithCancel(context.Background()) testImporter := New() From 1eba0118ba77a6584695ac30e9d5e562359a3dd1 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Fri, 14 Apr 2023 16:18:29 -0400 Subject: [PATCH 4/5] Fix unit test. --- conduit/plugins/importers/algod/algod_importer_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index 0ae8977d..5531cab1 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -364,8 +364,10 @@ func TestGetBlockContextCancelled(t *testing.T) { } for _, ttest := range tests { + ttest := ttest t.Run(ttest.name, func(t *testing.T) { - t.Parallel() + // this didn't work... + //t.Parallel() ctx, cancel = context.WithCancel(context.Background()) testImporter := New() cfgStr := fmt.Sprintf(`--- @@ -394,6 +396,7 @@ func TestGetBlockFailure(t *testing.T) { BlockAfterResponder, LedgerStateDeltaResponder, MakeSyncRoundResponder(http.StatusOK))}, } for _, ttest := range tests { + ttest := ttest t.Run(ttest.name, func(t *testing.T) { t.Parallel() ctx, cancel = context.WithCancel(context.Background()) From d2e9a3ed4424de08a032eed2951dd59f1b761f56 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Tue, 18 Apr 2023 08:08:12 -0400 Subject: [PATCH 5/5] Remove unrelated iota + 1 change. --- conduit/plugins/importers/algod/algod_importer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index 9aa25b34..2935f5e7 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -34,7 +34,7 @@ const ( ) const ( - archivalMode = iota + 1 + archivalMode = iota followerMode )