Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIx outstanding know header==nil errors + reduce bor heimdall logging #8878

Merged
merged 9 commits into from
Dec 1, 2023
10 changes: 8 additions & 2 deletions consensus/bor/finality/whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim
cancel()

if err != nil {
config.logger.Warn(fmt.Sprintf("[bor] unable to start the %s service - first run", fnName), "err", err)
if !errors.Is(err, errMissingBlocks) {
config.logger.Warn(fmt.Sprintf("[bor] unable to start the %s service - first run", fnName), "err", err)
}
}

ticker := time.NewTicker(tickerDuration)
Expand All @@ -142,7 +144,11 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim
cancel()

if err != nil {
config.logger.Warn(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err)
if errors.Is(err, errMissingBlocks) {
config.logger.Debug(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err)
} else {
config.logger.Warn(fmt.Sprintf("[bor] unable to handle %s", fnName), "err", err)
}
}
case <-config.closeCh:
return
Expand Down
14 changes: 11 additions & 3 deletions consensus/bor/finality/whitelist_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,25 @@ func fetchWhitelistCheckpoint(ctx context.Context, heimdallClient heimdall.IHeim
return blockNum, blockHash, errCheckpoint
}

config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())

// Verify if the checkpoint fetched can be added to the local whitelist entry or not
// If verified, it returns the hash of the end block of the checkpoint. If not,
// it will return appropriate error.
hash, err := verifier.verify(ctx, config, checkpoint.StartBlock.Uint64(), checkpoint.EndBlock.Uint64(), checkpoint.RootHash.String()[2:], true)

if err != nil {
config.logger.Warn("[bor.heimdall] Failed to whitelist checkpoint", "err", err)
if errors.Is(err, errMissingBlocks) {
config.logger.Debug("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())
config.logger.Debug("[bor.heimdall] Failed to whitelist checkpoint", "err", err)
} else {
config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())
config.logger.Warn("[bor.heimdall] Failed to whitelist checkpoint", "err", err)
}

return blockNum, blockHash, err
}

config.logger.Info("[bor.heimdall] Got new checkpoint", "start", checkpoint.StartBlock.Uint64(), "end", checkpoint.EndBlock.Uint64(), "rootHash", checkpoint.RootHash.String())

blockNum = checkpoint.EndBlock.Uint64()
blockHash = common.HexToHash(hash)

Expand Down
183 changes: 123 additions & 60 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func BorHeimdallForward(
var eventRecords int
var lastSpanId uint64

logTimer := time.NewTicker(30 * time.Second)
logTimer := time.NewTicker(logInterval)
defer logTimer.Stop()

if endSpanID >= nextSpanId {
Expand Down Expand Up @@ -288,14 +288,30 @@ func BorHeimdallForward(
fetchTime += callTime
}

if err = PersistValidatorSets(u, ctx, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil {
return fmt.Errorf("persistValidatorSets: %w", err)
}
if !mine && header != nil {
sprintLength := cfg.chainConfig.Bor.CalculateSprint(blockNum)
if blockNum > zerothSpanEnd && ((blockNum+1)%sprintLength == 0) {
if err = checkHeaderExtraData(u, ctx, chain, blockNum, header, cfg.chainConfig.Bor); err != nil {
return err
var snap *bor.Snapshot

if header != nil {
snap = loadSnapshot(blockNum, header.Hash(), cfg.chainConfig.Bor, recents, signatures, cfg.snapDb, logger)

if snap == nil && blockNum <= chain.FrozenBlocks() {
snap, err = initValidatorSets(ctx, snap, tx, cfg.blockReader, cfg.chainConfig.Bor,
chain, blockNum, recents, signatures, cfg.snapDb, logger, s.LogPrefix())

if err != nil {
return fmt.Errorf("can't initialise validator sets: %w", err)
}
}

if err = persistValidatorSets(ctx, snap, u, tx, cfg.blockReader, cfg.chainConfig.Bor, chain, blockNum, header.Hash(), recents, signatures, cfg.snapDb, logger, s.LogPrefix()); err != nil {
return fmt.Errorf("can't persist validator sets: %w", err)
}

if !mine {
sprintLength := cfg.chainConfig.Bor.CalculateSprint(blockNum)
if blockNum > zerothSpanEnd && ((blockNum+1)%sprintLength == 0) {
if err = checkHeaderExtraData(u, ctx, chain, blockNum, header, cfg.chainConfig.Bor); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -378,6 +394,10 @@ func fetchAndWriteBorEvents(
to time.Time
)

if header == nil {
return 0, 0, 0, fmt.Errorf("can't fetch events for nil header")
}

blockNum := header.Number.Uint64()

if config.IsIndore(blockNum) {
Expand Down Expand Up @@ -485,10 +505,29 @@ func fetchAndWriteSpans(
return spanId, nil
}

// Not used currently
func PersistValidatorSets(
u Unwinder,
func loadSnapshot(blockNum uint64, hash libcommon.Hash, config *chain.BorConfig, recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
snapDb kv.RwDB,
logger log.Logger) *bor.Snapshot {

if s, ok := recents.Get(hash); ok {
return s
}

if blockNum%snapshotPersistInterval == 0 {
if s, err := bor.LoadSnapshot(config, signatures, snapDb, hash); err == nil {
logger.Trace("Loaded snapshot from disk", "number", blockNum, "hash", hash)
return s
}
}

return nil
}

func persistValidatorSets(
ctx context.Context,
snap *bor.Snapshot,
u Unwinder,
tx kv.Tx,
blockReader services.FullBlockReader,
config *chain.BorConfig,
Expand All @@ -504,7 +543,6 @@ func PersistValidatorSets(
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
// Search for a snapshot in memory or on disk for checkpoints
var snap *bor.Snapshot

headers := make([]*types.Header, 0, 16)
var parent *types.Header
Expand Down Expand Up @@ -565,27 +603,91 @@ func PersistValidatorSets(
default:
}
}
if snap == nil && chain != nil && blockNum <= chain.FrozenBlocks() {

// check if snapshot is nil
if snap == nil {
return fmt.Errorf("unknown error while retrieving snapshot at block number %v", blockNum)
}

// Previous snapshot found, apply any pending headers on top of it
for i := 0; i < len(headers)/2; i++ {
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

if len(headers) > 0 {
var err error
if snap, err = snap.Apply(parent, headers, logger); err != nil {
if snap != nil {
var badHash common.Hash
for _, header := range headers {
if header.Number.Uint64() == snap.Number+1 {
badHash = header.Hash()
break
}
}
u.UnwindTo(snap.Number, BadBlock(badHash, err))
} else {
return fmt.Errorf("snap.Apply %d, headers %d-%d: %w", blockNum, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64(), err)
}
}
}

recents.Add(snap.Hash, snap)

// If we've generated a new persistent snapshot, save to disk
if snap.Number%snapshotPersistInterval == 0 && len(headers) > 0 {
if err := snap.Store(snapDb); err != nil {
return fmt.Errorf("snap.Store: %w", err)
}

logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", snap.Number, "hash", snap.Hash)
}

return nil
}

func initValidatorSets(
ctx context.Context,
snap *bor.Snapshot,
tx kv.Tx,
blockReader services.FullBlockReader,
config *chain.BorConfig,
chain consensus.ChainHeaderReader,
blockNum uint64,
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
snapDb kv.RwDB,
logger log.Logger,
logPrefix string) (*bor.Snapshot, error) {

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

if snap == nil {
// Special handling of the headers in the snapshot
zeroHeader := chain.GetHeaderByNumber(0)
if zeroHeader != nil {
// get checkpoint data
hash := zeroHeader.Hash()

if zeroSnap := loadSnapshot(0, hash, config, recents, signatures, snapDb, logger); zeroSnap != nil {
return nil, nil
}

// get validators and current span
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)
if err != nil {
return err
return nil, err
}
var zeroSpan span.HeimdallSpan
if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil {
return err
return nil, err
}

// new snap shot
snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger)
if err := snap.Store(snapDb); err != nil {
return fmt.Errorf("snap.Store (0): %w", err)
return nil, fmt.Errorf("snap.Store (0): %w", err)
}
logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", 0, "hash", hash)
g := errgroup.Group{}
Expand All @@ -610,12 +712,12 @@ func PersistValidatorSets(
})
}
if header == nil {
log.Debug(fmt.Sprintf("[%s] PersistValidatorSets nil header", logPrefix), "blockNum", i)
return nil, fmt.Errorf("missing header persisting validator sets: (inside loop at %d)", i)
}
initialHeaders = append(initialHeaders, header)
if len(initialHeaders) == cap(initialHeaders) {
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return fmt.Errorf("snap.Apply (inside loop): %w", err)
return nil, fmt.Errorf("snap.Apply (inside loop): %w", err)
}
parentHeader = initialHeaders[len(initialHeaders)-1]
initialHeaders = initialHeaders[:0]
Expand All @@ -627,51 +729,12 @@ func PersistValidatorSets(
}
}
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return fmt.Errorf("snap.Apply (outside loop): %w", err)
}
}
}

// check if snapshot is nil
if snap == nil {
return fmt.Errorf("unknown error while retrieving snapshot at block number %v", blockNum)
}

// Previous snapshot found, apply any pending headers on top of it
for i := 0; i < len(headers)/2; i++ {
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

if len(headers) > 0 {
var err error
if snap, err = snap.Apply(parent, headers, logger); err != nil {
if snap != nil {
var badHash common.Hash
for _, header := range headers {
if header.Number.Uint64() == snap.Number+1 {
badHash = header.Hash()
break
}
}
u.UnwindTo(snap.Number, BadBlock(badHash, err))
} else {
return fmt.Errorf("snap.Apply %d, headers %d-%d: %w", blockNum, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64(), err)
return nil, fmt.Errorf("snap.Apply (outside loop): %w", err)
}
}
}

recents.Add(snap.Hash, snap)

// If we've generated a new persistent snapshot, save to disk
if snap.Number%snapshotPersistInterval == 0 && len(headers) > 0 {
if err := snap.Store(snapDb); err != nil {
return fmt.Errorf("snap.Store: %w", err)
}

logger.Info(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", snap.Number, "hash", snap.Hash)
}

return nil
return snap, nil
}

func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
)

const (
logInterval = 20 * time.Second
logInterval = 30 * time.Second

// stateStreamLimit - don't accumulate state changes if jump is bigger than this amount of blocks
stateStreamLimit uint64 = 1_000
Expand Down
Loading