From 0ded2395b155675a208df739eee7cb4be36ac3c8 Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Mon, 15 Aug 2022 15:56:56 -0700 Subject: [PATCH 1/6] conduit pipeline run loop implementation --- conduit/pipeline.go | 105 +++++++++++++------ exporters/postgresql/postgresql_exporter.go | 8 +- importers/algod/algod_importer.go | 3 +- processors/blockprocessor/block_processor.go | 49 ++++----- processors/noop/noop_processor.go | 5 +- processors/processor.go | 5 +- processors/processor_factory.go | 2 +- processors/processor_factory_test.go | 14 +-- 8 files changed, 116 insertions(+), 75 deletions(-) diff --git a/conduit/pipeline.go b/conduit/pipeline.go index cb426398a..a547ce2e2 100644 --- a/conduit/pipeline.go +++ b/conduit/pipeline.go @@ -131,6 +131,7 @@ type pipelineImpl struct { importer *importers.Importer processors []*processors.Processor exporter *exporters.Exporter + round basics.Round } func (p *pipelineImpl) Start() error { @@ -138,6 +139,25 @@ func (p *pipelineImpl) Start() error { // TODO Need to change interfaces to accept config of map[string]interface{} + exporterLogger := log.New() + exporterLogger.SetFormatter( + PluginLogFormatter{ + Formatter: &log.JSONFormatter{ + DisableHTMLEscape: true, + }, + Type: "Exporter", + Name: (*p.exporter).Metadata().Name(), + }, + ) + + jsonEncode := string(json.Encode(p.cfg.Exporter.Config)) + err := (*p.exporter).Init(plugins.PluginConfig(jsonEncode), exporterLogger) + exporterName := (*p.exporter).Metadata().Name() + if err != nil { + return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err) + } + p.logger.Infof("Initialized Exporter: %s", exporterName) + importerLogger := log.New() importerLogger.SetFormatter( PluginLogFormatter{ @@ -150,25 +170,28 @@ func (p *pipelineImpl) Start() error { ) // TODO modify/fix ? - jsonEncode := string(json.Encode(p.cfg.Importer.Config)) + jsonEncode = string(json.Encode(p.cfg.Importer.Config)) genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(jsonEncode), importerLogger) - currentRound := basics.Round(0) + importerName := (*p.importer).Metadata().Name() + if err != nil { + return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err) + } + p.round = basics.Round((*p.exporter).Round()) + err = (*p.exporter).HandleGenesis(*genesis) + if err != nil { + return fmt.Errorf("Pipeline.Start(): exporter could not handle genesis (%s): %w", exporterName, err) + } + p.logger.Infof("Initialized Importer: %s", importerName) var initProvider data.InitProvider = &PipelineInitProvider{ - currentRound: ¤tRound, + currentRound: &p.round, genesis: genesis, } p.initProvider = &initProvider - importerName := (*p.importer).Metadata().Name() - if err != nil { - return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err) - } - p.logger.Infof("Initialized Importer: %s", importerName) - - for _, processor := range p.processors { + for idx, processor := range p.processors { processorLogger := log.New() processorLogger.SetFormatter( @@ -180,8 +203,8 @@ func (p *pipelineImpl) Start() error { Name: (*processor).Metadata().Name(), }, ) - - err := (*processor).Init(p.ctx, *p.initProvider, "") + jsonEncode = string(json.Encode(p.cfg.Processors[idx])) + err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger) processorName := (*processor).Metadata().Name() if err != nil { return fmt.Errorf("Pipeline.Start(): could not initialize processor (%s): %w", processorName, err) @@ -190,25 +213,7 @@ func (p *pipelineImpl) Start() error { } - exporterLogger := log.New() - exporterLogger.SetFormatter( - PluginLogFormatter{ - Formatter: &log.JSONFormatter{ - DisableHTMLEscape: true, - }, - Type: "Exporter", - Name: (*p.exporter).Metadata().Name(), - }, - ) - - err = (*p.exporter).Init("", p.logger) - ExporterName := (*p.exporter).Metadata().Name() - if err != nil { - return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", ExporterName, err) - } - p.logger.Infof("Initialized Exporter: %s", ExporterName) - - return nil + return p.RunPipeline() } func (p *pipelineImpl) Stop() error { @@ -231,6 +236,44 @@ func (p *pipelineImpl) Stop() error { return nil } +// RunPipeline pushes block data through the pipeline +func (p *pipelineImpl) RunPipeline() error { + for { + // TODO Retries? + p.logger.Infof("Pipeline round: %v", p.round) + // fetch block + blkData, err := (*p.importer).GetBlock(uint64(p.round)) + if err != nil { + p.logger.Errorf("%v\n", err) + return err + } + // run through processors + for _, proc := range p.processors { + blkData, err = (*proc).Process(blkData) + if err != nil { + p.logger.Errorf("%v\n", err) + return err + } + } + // run through exporter + err = (*p.exporter).Receive(blkData) + if err != nil { + p.logger.Errorf("%v\n", err) + return err + } + // Callback Processors + for _, proc := range p.processors { + err = (*proc).OnComplete(blkData) + if err != nil { + p.logger.Errorf("%v\n", err) + return err + } + } + // Increment Round + p.round++ + } +} + // MakePipeline creates a Pipeline func MakePipeline(cfg *PipelineConfig, logger *log.Logger) (Pipeline, error) { diff --git a/exporters/postgresql/postgresql_exporter.go b/exporters/postgresql/postgresql_exporter.go index acf43d583..ab9cdfeef 100644 --- a/exporters/postgresql/postgresql_exporter.go +++ b/exporters/postgresql/postgresql_exporter.go @@ -2,15 +2,19 @@ package postgresql import ( "fmt" + + "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" "github.com/algorand/indexer/idb" + _ "github.com/algorand/indexer/idb/postgres" "github.com/algorand/indexer/importer" "github.com/algorand/indexer/plugins" - "github.com/sirupsen/logrus" - "gopkg.in/yaml.v3" ) const exporterName = "postgresql" diff --git a/importers/algod/algod_importer.go b/importers/algod/algod_importer.go index 7496c4895..be1782655 100644 --- a/importers/algod/algod_importer.go +++ b/importers/algod/algod_importer.go @@ -2,7 +2,6 @@ package algodimporter import ( "context" - "encoding/json" "fmt" "github.com/algorand/go-algorand/data/bookkeeping" "net/url" @@ -84,7 +83,7 @@ func (algodImp *algodImporter) Init(ctx context.Context, cfg plugins.PluginConfi genesis := bookkeeping.Genesis{} - err = json.Unmarshal([]byte(genesisResponse), &genesis) + err = protocol.DecodeJSON([]byte(genesisResponse), &genesis) if err != nil { return nil, err } diff --git a/processors/blockprocessor/block_processor.go b/processors/blockprocessor/block_processor.go index 176a8d296..010432e53 100644 --- a/processors/blockprocessor/block_processor.go +++ b/processors/blockprocessor/block_processor.go @@ -69,15 +69,17 @@ func (proc *blockProcessor) Config() plugins.PluginConfig { return proc.cfg } -func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig) error { +func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *log.Logger) error { + proc.ctx = ctx + proc.logger = logger // First get the configuration from the string var pCfg processors.BlockProcessorConfig err := yaml.Unmarshal([]byte(cfg), &pCfg) - if err != nil { return fmt.Errorf("blockprocessor init error: %w", err) } + proc.cfg = cfg genesis := initProvider.Genesis() round := uint64(initProvider.NextDBRound()) @@ -94,43 +96,27 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv if l == nil { return fmt.Errorf("ledger was created with nil pointer") } + proc.ledger = l if uint64(l.Latest()) > round { return fmt.Errorf("the ledger cache is ahead of the required round (%d > %d) and must be re-initialized", l.Latest(), round) } - if l.Latest() == 0 { - blk, cert, err := l.BlockCert(0) - if err != nil { - return fmt.Errorf("error getting block and certificate: %w", err) - } - vb := ledgercore.MakeValidatedBlock(blk, ledgercore.StateDelta{}) - - blockData := data.MakeBlockDataFromValidatedBlock(vb) - blockData.Certificate = &cert - - proc.saveLastValidatedInformation(vb, blk.Round(), cert) - err = proc.OnComplete(data.BlockData{}) - - if err != nil { - return fmt.Errorf("error adding gensis block: %w", err) - } - } - - proc.ledger = l - proc.ctx = ctx - proc.cfg = cfg - return nil } func (proc *blockProcessor) extractValidatedBlockAndPayset(blockCert *rpcs.EncodedBlockCert) (ledgercore.ValidatedBlock, transactions.Payset, error) { - + var vb ledgercore.ValidatedBlock + var payset transactions.Payset if blockCert == nil { - return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf("cannot process a nil block") + return vb, payset, fmt.Errorf("cannot process a nil block") + } + if blockCert.Block.Round() == 0 && proc.ledger.Latest() == 0 { + vb = ledgercore.MakeValidatedBlock(blockCert.Block, ledgercore.StateDelta{}) + return vb, blockCert.Block.Payset, nil } if blockCert.Block.Round() != (proc.ledger.Latest() + 1) { - return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf("invalid round blockCert.Block.Round(): %d nextRoundToProcess: %d", blockCert.Block.Round(), uint64(proc.ledger.Latest())+1) + return vb, payset, fmt.Errorf("invalid round blockCert.Block.Round(): %d nextRoundToProcess: %d", blockCert.Block.Round(), uint64(proc.ledger.Latest())+1) } // Make sure "AssetCloseAmount" is enabled. If it isn't, override the @@ -138,7 +124,7 @@ func (proc *blockProcessor) extractValidatedBlockAndPayset(blockCert *rpcs.Encod // apply data. proto, ok := config.Consensus[blockCert.Block.BlockHeader.CurrentProtocol] if !ok { - return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf( + return vb, payset, fmt.Errorf( "cannot find proto version %s", blockCert.Block.BlockHeader.CurrentProtocol) } protoChanged := !proto.EnableAssetCloseAmount @@ -152,13 +138,11 @@ func (proc *blockProcessor) extractValidatedBlockAndPayset(blockCert *rpcs.Encod } delta, payset, err := ledger.EvalForIndexer(ledgerForEval, &blockCert.Block, proto, resources) - if err != nil { - return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf("eval err: %w", err) + return vb, transactions.Payset{}, fmt.Errorf("eval err: %w", err) } // validated block - var vb ledgercore.ValidatedBlock if protoChanged { block := bookkeeping.Block{ BlockHeader: blockCert.Block.BlockHeader, @@ -255,6 +239,9 @@ func (proc *blockProcessor) Process(input data.BlockData) (data.BlockData, error func (proc *blockProcessor) OnComplete(_ data.BlockData) error { + if proc.lastValidatedBlockRound == basics.Round(0) { + return nil + } // write to ledger err := proc.ledger.AddValidatedBlock(proc.lastValidatedBlock, proc.lastValidatedBlockCertificate) if err != nil { diff --git a/processors/noop/noop_processor.go b/processors/noop/noop_processor.go index ddcf9c2c4..4fa1a2145 100644 --- a/processors/noop/noop_processor.go +++ b/processors/noop/noop_processor.go @@ -2,6 +2,9 @@ package noop import ( "context" + + "github.com/sirupsen/logrus" + "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" "github.com/algorand/indexer/processors" @@ -36,7 +39,7 @@ func (p *Processor) Config() plugins.PluginConfig { } // Init noop -func (p *Processor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig) error { +func (p *Processor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *logrus.Logger) error { return nil } diff --git a/processors/processor.go b/processors/processor.go index 8c2efe7e1..cf99c5b82 100644 --- a/processors/processor.go +++ b/processors/processor.go @@ -2,6 +2,9 @@ package processors import ( "context" + + "github.com/sirupsen/logrus" + "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" ) @@ -18,7 +21,7 @@ type Processor interface { // Typically, used for things like initializing network connections. // The Context passed to Init() will be used for deadlines, cancel signals and other early terminations // The Config passed to Init() will contain the unmarshalled config file specific to this plugin. - Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig) error + Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error // Close will be called during termination of the Indexer process. Close() error diff --git a/processors/processor_factory.go b/processors/processor_factory.go index e0c12727a..58969bfd2 100644 --- a/processors/processor_factory.go +++ b/processors/processor_factory.go @@ -42,7 +42,7 @@ func ProcessorByName(ctx context.Context, name string, dataDir string, initProvi } processor := constructor.New() cfg := plugins.LoadConfig(logger, dataDir, processor.Metadata()) - if err := processor.Init(ctx, initProvider, cfg); err != nil { + if err := processor.Init(ctx, initProvider, cfg, logger); err != nil { return nil, err } return processor, nil diff --git a/processors/processor_factory_test.go b/processors/processor_factory_test.go index 0b40b1698..a7598bc67 100644 --- a/processors/processor_factory_test.go +++ b/processors/processor_factory_test.go @@ -3,16 +3,18 @@ package processors import ( "context" "fmt" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/indexer/data" "testing" - "github.com/algorand/indexer/plugins" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + + "github.com/algorand/indexer/data" + "github.com/algorand/indexer/plugins" ) var logger *logrus.Logger @@ -40,8 +42,8 @@ type mockProcessor struct { Processor } -func (m *mockProcessor) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig) error { - args := m.Called(ctx, initProvider, cfg) +func (m *mockProcessor) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error { + args := m.Called(ctx, initProvider, cfg, logger) return args.Error(0) } From 7f3b97affef2cb74ff46ee6c6067e11007c3502e Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Mon, 15 Aug 2022 16:21:18 -0700 Subject: [PATCH 2/6] Fix lint error --- exporters/postgresql/postgresql_exporter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporters/postgresql/postgresql_exporter.go b/exporters/postgresql/postgresql_exporter.go index ab9cdfeef..1c403bfc6 100644 --- a/exporters/postgresql/postgresql_exporter.go +++ b/exporters/postgresql/postgresql_exporter.go @@ -12,6 +12,7 @@ import ( "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" "github.com/algorand/indexer/idb" + // Necessary to ensure the postgres implementation has been registered in the idb factory _ "github.com/algorand/indexer/idb/postgres" "github.com/algorand/indexer/importer" "github.com/algorand/indexer/plugins" From 4947a88306d68d7a9c205e4fe960ca503f9df302 Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Mon, 15 Aug 2022 16:37:35 -0700 Subject: [PATCH 3/6] Fix failing test --- processors/blockprocessor/block_processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processors/blockprocessor/block_processor_test.go b/processors/blockprocessor/block_processor_test.go index 07e8145be..e880244b3 100644 --- a/processors/blockprocessor/block_processor_test.go +++ b/processors/blockprocessor/block_processor_test.go @@ -68,7 +68,7 @@ func TestFailedProcess(t *testing.T) { proc := blockprocessor.MakeBlockProcessorHandlerAdapter(&pr, nil) assert.Nil(t, err) err = proc(nil) - assert.Contains(t, err.Error(), "invalid round") + assert.Nil(t, err) genesisBlock, err := l.Block(basics.Round(0)) assert.Nil(t, err) From 818507ef3978bd87daf7f1a88bc621f9428ff6dd Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Mon, 15 Aug 2022 17:03:46 -0700 Subject: [PATCH 4/6] Fix processor factory test --- processors/processor_factory_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processors/processor_factory_test.go b/processors/processor_factory_test.go index a7598bc67..2beb74202 100644 --- a/processors/processor_factory_test.go +++ b/processors/processor_factory_test.go @@ -61,7 +61,7 @@ func (c *mockProcessorConstructor) New() Processor { func TestProcessorByNameSuccess(t *testing.T) { me := mockProcessor{} - me.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + me.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) RegisterProcessor("foobar", &mockProcessorConstructor{&me}) exp, err := ProcessorByName(context.Background(), "foobar", "", mockProvider{}, logger) @@ -78,7 +78,7 @@ func TestProcessorByNameNotFound(t *testing.T) { func TestProcessorByNameConnectFailure(t *testing.T) { me := mockProcessor{} expectedErr := fmt.Errorf("connect failure") - me.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(expectedErr) + me.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedErr) RegisterProcessor("baz", &mockProcessorConstructor{&me}) _, err := ProcessorByName(context.Background(), "baz", "", mockProvider{}, logger) assert.EqualError(t, err, expectedErr.Error()) From 1fa6e6805e0321294371199487fd2494f6121791 Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Tue, 16 Aug 2022 08:59:07 -0700 Subject: [PATCH 5/6] Fix processor config serde --- conduit/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conduit/pipeline.go b/conduit/pipeline.go index a547ce2e2..c49ba24bf 100644 --- a/conduit/pipeline.go +++ b/conduit/pipeline.go @@ -203,7 +203,7 @@ func (p *pipelineImpl) Start() error { Name: (*processor).Metadata().Name(), }, ) - jsonEncode = string(json.Encode(p.cfg.Processors[idx])) + jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config)) err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger) processorName := (*processor).Metadata().Name() if err != nil { From 982f458b0203ef2ab86afe83d19d34b56174c88b Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Tue, 16 Aug 2022 13:39:44 -0700 Subject: [PATCH 6/6] Add logging helper, add comments --- conduit/logging.go | 10 ++++++++++ conduit/pipeline.go | 36 ++++++------------------------------ 2 files changed, 16 insertions(+), 30 deletions(-) diff --git a/conduit/logging.go b/conduit/logging.go index d3b969f50..45dffb17d 100644 --- a/conduit/logging.go +++ b/conduit/logging.go @@ -18,3 +18,13 @@ func (f PluginLogFormatter) Format(entry *log.Entry) ([]byte, error) { entry.Data["_name"] = f.Name return f.Formatter.Format(entry) } + +func makePluginLogFormatter(pluginType string, pluginName string) PluginLogFormatter { + return PluginLogFormatter{ + Formatter: &log.JSONFormatter{ + DisableHTMLEscape: true, + }, + Type: pluginType, + Name: pluginName, + } +} diff --git a/conduit/pipeline.go b/conduit/pipeline.go index c49ba24bf..7a4bc1bfa 100644 --- a/conduit/pipeline.go +++ b/conduit/pipeline.go @@ -139,16 +139,9 @@ func (p *pipelineImpl) Start() error { // TODO Need to change interfaces to accept config of map[string]interface{} + // Initialize Exporter first since the pipeline derives its round from the Exporter exporterLogger := log.New() - exporterLogger.SetFormatter( - PluginLogFormatter{ - Formatter: &log.JSONFormatter{ - DisableHTMLEscape: true, - }, - Type: "Exporter", - Name: (*p.exporter).Metadata().Name(), - }, - ) + exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name())) jsonEncode := string(json.Encode(p.cfg.Exporter.Config)) err := (*p.exporter).Init(plugins.PluginConfig(jsonEncode), exporterLogger) @@ -158,16 +151,9 @@ func (p *pipelineImpl) Start() error { } p.logger.Infof("Initialized Exporter: %s", exporterName) + // Initialize Importer importerLogger := log.New() - importerLogger.SetFormatter( - PluginLogFormatter{ - Formatter: &log.JSONFormatter{ - DisableHTMLEscape: true, - }, - Type: "Importer", - Name: (*p.importer).Metadata().Name(), - }, - ) + importerLogger.SetFormatter(makePluginLogFormatter(plugins.Importer, (*p.importer).Metadata().Name())) // TODO modify/fix ? jsonEncode = string(json.Encode(p.cfg.Importer.Config)) @@ -184,25 +170,16 @@ func (p *pipelineImpl) Start() error { } p.logger.Infof("Initialized Importer: %s", importerName) + // Initialize Processors var initProvider data.InitProvider = &PipelineInitProvider{ currentRound: &p.round, genesis: genesis, } - p.initProvider = &initProvider for idx, processor := range p.processors { - processorLogger := log.New() - processorLogger.SetFormatter( - PluginLogFormatter{ - Formatter: &log.JSONFormatter{ - DisableHTMLEscape: true, - }, - Type: "Processor", - Name: (*processor).Metadata().Name(), - }, - ) + processorLogger.SetFormatter(makePluginLogFormatter(plugins.Processor, (*processor).Metadata().Name())) jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config)) err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger) processorName := (*processor).Metadata().Name() @@ -210,7 +187,6 @@ func (p *pipelineImpl) Start() error { return fmt.Errorf("Pipeline.Start(): could not initialize processor (%s): %w", processorName, err) } p.logger.Infof("Initialized Processor: %s", processorName) - } return p.RunPipeline()