Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

algod importer: Combines algod and algod_follower #1452

Merged
merged 9 commits into from
Feb 16, 2023
85 changes: 70 additions & 15 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,31 @@ import (
"github.com/algorand/go-algorand/rpcs"
)

const importerName = "algod"
const (
importerName = "algod"
archivalModeStr = "archival"
followerModeStr = "follower"
)

const (
archivalMode = iota
followerMode
)

// Retry w/ exponential backoff
const (
initialWait = time.Millisecond * 200
waitMultiplier = 1.5
retries = 5
)

type algodImporter struct {
aclient *algod.Client
logger *logrus.Logger
cfg Config
ctx context.Context
cancel context.CancelFunc
mode int
}

//go:embed sample.yaml
Expand All @@ -49,6 +66,14 @@ func New() importers.Importer {
return &algodImporter{}
}

func (algodImp *algodImporter) OnComplete(input data.BlockData) error {
if algodImp.mode != followerMode {
return nil
}
_, err := algodImp.aclient.SetSyncRound(input.Round() + 1).Do(algodImp.ctx)
return err
}

func (algodImp *algodImporter) Metadata() conduit.Metadata {
return algodImporterMetadata
}
Expand All @@ -67,6 +92,23 @@ func (algodImp *algodImporter) Init(ctx context.Context, cfg plugins.PluginConfi
if err != nil {
return nil, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}

// To support backwards compatibility with the daemon we default to archival mode
if algodImp.cfg.Mode == "" {
algodImp.cfg.Mode = archivalModeStr
}

switch algodImp.cfg.Mode {
case archivalModeStr:
algodImp.mode = archivalMode
break
case followerModeStr:
algodImp.mode = followerMode
break
default:
return nil, fmt.Errorf("algod importer was set to a mode (%s) that wasn't supported", algodImp.cfg.Mode)
}

var client *algod.Client
u, err := url.Parse(algodImp.cfg.NetAddr)
if err != nil {
Expand Down Expand Up @@ -119,27 +161,40 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
var err error
var blk data.BlockData

for retries := 0; retries < 3; retries++ {
// nextRound - 1 because the endpoint waits until the subsequent block is committed to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this retry loop is necessary. there's already retries at the pipeline level. in this loop there's no wait before the next try, could this loop always finish before the next block is available?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just added retires w/ backoff to the most recent version.

for r := 0; r < retries; r++ {
		time.Sleep(time.Duration(waitMultiplier*float64(r)) * initialWait)
...

We could just rely on the pipeline-level retires like you said, but retries with backoff make more sense in the case of the follower node since the catchup service gets started/stopped so frequently.

_, err = algodImp.aclient.StatusAfterBlock(rnd - 1).Do(algodImp.ctx)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
algodImp.logger.Errorf(
"r=%d error getting status %d", retries, rnd)
continue
for r := 0; r < retries; r++ {
time.Sleep(time.Duration(waitMultiplier*float64(r)) * initialWait)
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
start := time.Now()
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
return blk, err
algodImp.logger.Errorf(
"r=%d error getting block %d", r, rnd)
continue
}
tmpBlk := new(rpcs.EncodedBlockCert)
err = protocol.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

if algodImp.mode == followerMode {
// We aren't going to do anything with the new delta until we get everything
// else converted over
// Round 0 has no delta associated with it
if rnd != 0 {
_, err = algodImp.aclient.GetLedgerStateDelta(rnd).Do(algodImp.ctx)
if err != nil {
algodImp.logger.Errorf(
"r=%d error getting delta %d", r, rnd)
continue
}
}
}

blk = data.BlockData{
BlockHeader: tmpBlk.Block.BlockHeader,
Expand All @@ -148,8 +203,8 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
}
return blk, err
}
algodImp.logger.Error("GetBlock finished retries without fetching a block.")
return blk, fmt.Errorf("finished retries without fetching a block")
algodImp.logger.Error("GetBlock finished retries without fetching a block. Check that the indexer is set to start at a round that the current algod node can handle")
return blk, fmt.Errorf("finished retries without fetching a block. Check that the indexer is set to start at a round that the current algod node can handle")
}

func (algodImp *algodImporter) ProvideMetrics(subsystem string) []prometheus.Collector {
Expand Down
2 changes: 2 additions & 0 deletions conduit/plugins/importers/algod/algod_importer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package algodimporter

// Config specific to the algod importer
type Config struct {
// <code>mode</code> is the mode of operation of the algod importer. It must be either <code>archival</code> or <code>follower</code>.
Mode string `yaml:"mode"`
// <code>netaddr</code> is the Algod network address. It must be either an <code>http</code> or <code>https</code> URL.
NetAddr string `yaml:"netaddr"`
// <code>token</code> is the Algod API endpoint token.
Expand Down
Loading