Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conduit: pipeline run loop implementation #1183

Merged
merged 6 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conduit/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ func (f PluginLogFormatter) Format(entry *log.Entry) ([]byte, error) {
entry.Data["_name"] = f.Name
return f.Formatter.Format(entry)
}

func makePluginLogFormatter(pluginType string, pluginName string) PluginLogFormatter {
return PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
},
Type: pluginType,
Name: pluginName,
}
}
121 changes: 70 additions & 51 deletions conduit/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,84 +131,65 @@ type pipelineImpl struct {
importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
round basics.Round
}

func (p *pipelineImpl) Start() error {
p.logger.Infof("Starting Pipeline Initialization")

// TODO Need to change interfaces to accept config of map[string]interface{}

// Initialize Exporter first since the pipeline derives its round from the Exporter
exporterLogger := log.New()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this further, I think we might have a concurrency issue here. Created this ticket for us to follow up later: #1187

exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name()))

jsonEncode := string(json.Encode(p.cfg.Exporter.Config))
err := (*p.exporter).Init(plugins.PluginConfig(jsonEncode), exporterLogger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put a comment somewhere that this is imported first so that we can fetch the round?
Would we ever want multiple exporters, which round do we choose in that scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update w/ comment.

I think it's hard to enumerate all of possible combinations of multiple exporters. I'd prefer to make it really easy to run a pipeline so that people can just spin up multiple pipelines if they need to run multiple exporters.

exporterName := (*p.exporter).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err)
}
p.logger.Infof("Initialized Exporter: %s", exporterName)

// Initialize Importer
importerLogger := log.New()
importerLogger.SetFormatter(
PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
},
Type: "Importer",
Name: (*p.importer).Metadata().Name(),
},
)
importerLogger.SetFormatter(makePluginLogFormatter(plugins.Importer, (*p.importer).Metadata().Name()))

// TODO modify/fix ?
jsonEncode := string(json.Encode(p.cfg.Importer.Config))
jsonEncode = string(json.Encode(p.cfg.Importer.Config))
genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(jsonEncode), importerLogger)

currentRound := basics.Round(0)

var initProvider data.InitProvider = &PipelineInitProvider{
currentRound: &currentRound,
genesis: genesis,
}

p.initProvider = &initProvider

importerName := (*p.importer).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err)
}
p.round = basics.Round((*p.exporter).Round())
err = (*p.exporter).HandleGenesis(*genesis)
if err != nil {
return fmt.Errorf("Pipeline.Start(): exporter could not handle genesis (%s): %w", exporterName, err)
}
p.logger.Infof("Initialized Importer: %s", importerName)

for _, processor := range p.processors {
// Initialize Processors
var initProvider data.InitProvider = &PipelineInitProvider{
currentRound: &p.round,
genesis: genesis,
}
p.initProvider = &initProvider

for idx, processor := range p.processors {
processorLogger := log.New()
processorLogger.SetFormatter(
PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
},
Type: "Processor",
Name: (*processor).Metadata().Name(),
},
)

err := (*processor).Init(p.ctx, *p.initProvider, "")
processorLogger.SetFormatter(makePluginLogFormatter(plugins.Processor, (*processor).Metadata().Name()))
jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config))
err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger)
processorName := (*processor).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize processor (%s): %w", processorName, err)
}
p.logger.Infof("Initialized Processor: %s", processorName)

}

exporterLogger := log.New()
exporterLogger.SetFormatter(
PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
},
Type: "Exporter",
Name: (*p.exporter).Metadata().Name(),
},
)

err = (*p.exporter).Init("", p.logger)
ExporterName := (*p.exporter).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", ExporterName, err)
}
p.logger.Infof("Initialized Exporter: %s", ExporterName)

return nil
return p.RunPipeline()
}

func (p *pipelineImpl) Stop() error {
Expand All @@ -231,6 +212,44 @@ func (p *pipelineImpl) Stop() error {
return nil
}

// RunPipeline pushes block data through the pipeline
func (p *pipelineImpl) RunPipeline() error {
for {
// TODO Retries?
p.logger.Infof("Pipeline round: %v", p.round)
// fetch block
blkData, err := (*p.importer).GetBlock(uint64(p.round))
if err != nil {
p.logger.Errorf("%v\n", err)
return err
}
// run through processors
for _, proc := range p.processors {
blkData, err = (*proc).Process(blkData)
if err != nil {
p.logger.Errorf("%v\n", err)
return err
}
}
// run through exporter
err = (*p.exporter).Receive(blkData)
if err != nil {
p.logger.Errorf("%v\n", err)
return err
}
// Callback Processors
for _, proc := range p.processors {
err = (*proc).OnComplete(blkData)
if err != nil {
p.logger.Errorf("%v\n", err)
return err
}
}
// Increment Round
p.round++
}
}

// MakePipeline creates a Pipeline
func MakePipeline(cfg *PipelineConfig, logger *log.Logger) (Pipeline, error) {

Expand Down
9 changes: 7 additions & 2 deletions exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package postgresql

import (
"fmt"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"

"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
"github.com/algorand/indexer/idb"
// Necessary to ensure the postgres implementation has been registered in the idb factory
_ "github.com/algorand/indexer/idb/postgres"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)

const exporterName = "postgresql"
Expand Down
3 changes: 1 addition & 2 deletions importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package algodimporter

import (
"context"
"encoding/json"
"fmt"
"github.com/algorand/go-algorand/data/bookkeeping"
"net/url"
Expand Down Expand Up @@ -84,7 +83,7 @@ func (algodImp *algodImporter) Init(ctx context.Context, cfg plugins.PluginConfi

genesis := bookkeeping.Genesis{}

err = json.Unmarshal([]byte(genesisResponse), &genesis)
err = protocol.DecodeJSON([]byte(genesisResponse), &genesis)
if err != nil {
return nil, err
}
Expand Down
49 changes: 18 additions & 31 deletions processors/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,17 @@ func (proc *blockProcessor) Config() plugins.PluginConfig {
return proc.cfg
}

func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig) error {
func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *log.Logger) error {
proc.ctx = ctx
proc.logger = logger

// First get the configuration from the string
var pCfg processors.BlockProcessorConfig
err := yaml.Unmarshal([]byte(cfg), &pCfg)

if err != nil {
return fmt.Errorf("blockprocessor init error: %w", err)
}
proc.cfg = cfg

genesis := initProvider.Genesis()
round := uint64(initProvider.NextDBRound())
Expand All @@ -94,51 +96,35 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv
if l == nil {
return fmt.Errorf("ledger was created with nil pointer")
}
proc.ledger = l

if uint64(l.Latest()) > round {
return fmt.Errorf("the ledger cache is ahead of the required round (%d > %d) and must be re-initialized", l.Latest(), round)
}

if l.Latest() == 0 {
blk, cert, err := l.BlockCert(0)
if err != nil {
return fmt.Errorf("error getting block and certificate: %w", err)
}
vb := ledgercore.MakeValidatedBlock(blk, ledgercore.StateDelta{})

blockData := data.MakeBlockDataFromValidatedBlock(vb)
blockData.Certificate = &cert

proc.saveLastValidatedInformation(vb, blk.Round(), cert)
err = proc.OnComplete(data.BlockData{})

if err != nil {
return fmt.Errorf("error adding gensis block: %w", err)
}
}

proc.ledger = l
proc.ctx = ctx
proc.cfg = cfg

return nil
}

func (proc *blockProcessor) extractValidatedBlockAndPayset(blockCert *rpcs.EncodedBlockCert) (ledgercore.ValidatedBlock, transactions.Payset, error) {

var vb ledgercore.ValidatedBlock
var payset transactions.Payset
if blockCert == nil {
return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf("cannot process a nil block")
return vb, payset, fmt.Errorf("cannot process a nil block")
}
if blockCert.Block.Round() == 0 && proc.ledger.Latest() == 0 {
vb = ledgercore.MakeValidatedBlock(blockCert.Block, ledgercore.StateDelta{})
return vb, blockCert.Block.Payset, nil
}
if blockCert.Block.Round() != (proc.ledger.Latest() + 1) {
return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf("invalid round blockCert.Block.Round(): %d nextRoundToProcess: %d", blockCert.Block.Round(), uint64(proc.ledger.Latest())+1)
return vb, payset, fmt.Errorf("invalid round blockCert.Block.Round(): %d nextRoundToProcess: %d", blockCert.Block.Round(), uint64(proc.ledger.Latest())+1)
}

// Make sure "AssetCloseAmount" is enabled. If it isn't, override the
// protocol and update the blocks to include transactions with modified
// apply data.
proto, ok := config.Consensus[blockCert.Block.BlockHeader.CurrentProtocol]
if !ok {
return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf(
return vb, payset, fmt.Errorf(
"cannot find proto version %s", blockCert.Block.BlockHeader.CurrentProtocol)
}
protoChanged := !proto.EnableAssetCloseAmount
Expand All @@ -152,13 +138,11 @@ func (proc *blockProcessor) extractValidatedBlockAndPayset(blockCert *rpcs.Encod
}

delta, payset, err := ledger.EvalForIndexer(ledgerForEval, &blockCert.Block, proto, resources)

if err != nil {
return ledgercore.ValidatedBlock{}, transactions.Payset{}, fmt.Errorf("eval err: %w", err)
return vb, transactions.Payset{}, fmt.Errorf("eval err: %w", err)
}

// validated block
var vb ledgercore.ValidatedBlock
if protoChanged {
block := bookkeeping.Block{
BlockHeader: blockCert.Block.BlockHeader,
Expand Down Expand Up @@ -255,6 +239,9 @@ func (proc *blockProcessor) Process(input data.BlockData) (data.BlockData, error

func (proc *blockProcessor) OnComplete(_ data.BlockData) error {

if proc.lastValidatedBlockRound == basics.Round(0) {
return nil
}
// write to ledger
err := proc.ledger.AddValidatedBlock(proc.lastValidatedBlock, proc.lastValidatedBlockCertificate)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion processors/blockprocessor/block_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestFailedProcess(t *testing.T) {
proc := blockprocessor.MakeBlockProcessorHandlerAdapter(&pr, nil)
assert.Nil(t, err)
err = proc(nil)
assert.Contains(t, err.Error(), "invalid round")
assert.Nil(t, err)

genesisBlock, err := l.Block(basics.Round(0))
assert.Nil(t, err)
Expand Down
5 changes: 4 additions & 1 deletion processors/noop/noop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package noop

import (
"context"

"github.com/sirupsen/logrus"

"github.com/algorand/indexer/data"
"github.com/algorand/indexer/plugins"
"github.com/algorand/indexer/processors"
Expand Down Expand Up @@ -36,7 +39,7 @@ func (p *Processor) Config() plugins.PluginConfig {
}

// Init noop
func (p *Processor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig) error {
func (p *Processor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *logrus.Logger) error {
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package processors

import (
"context"

"github.com/sirupsen/logrus"

"github.com/algorand/indexer/data"
"github.com/algorand/indexer/plugins"
)
Expand All @@ -18,7 +21,7 @@ type Processor interface {
// Typically, used for things like initializing network connections.
// The Context passed to Init() will be used for deadlines, cancel signals and other early terminations
// The Config passed to Init() will contain the unmarshalled config file specific to this plugin.
Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig) error
Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error

// Close will be called during termination of the Indexer process.
Close() error
Expand Down
2 changes: 1 addition & 1 deletion processors/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func ProcessorByName(ctx context.Context, name string, dataDir string, initProvi
}
processor := constructor.New()
cfg := plugins.LoadConfig(logger, dataDir, processor.Metadata())
if err := processor.Init(ctx, initProvider, cfg); err != nil {
if err := processor.Init(ctx, initProvider, cfg, logger); err != nil {
return nil, err
}
return processor, nil
Expand Down
Loading