Skip to content

Commit

Permalink
LVH support to memory overlay (#4555)
Browse files Browse the repository at this point in the history
* fixed fcu

* fixed leak

* maybe now?

* wrote forkchoice
  • Loading branch information
Giulio2002 authored Jun 28, 2022
1 parent dc5d3ff commit aa79853
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 24 deletions.
39 changes: 26 additions & 13 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,28 @@ func startHandlingForkChoice(
}
}

if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() {
log.Info("Flushing in-memory state")
if err := cfg.hd.FlushNextForkState(tx); err != nil {
return nil, err
}
cfg.hd.BeaconRequestList.Remove(requestId)
rawdb.WriteForkchoiceHead(tx, forkChoice.HeadBlockHash)
canonical, err := safeAndFinalizedBlocksAreCanonical(forkChoice, s, tx, cfg)
if err != nil {
log.Warn(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err)
return nil, err
}
if canonical {
cfg.hd.SetPendingPayloadHash(headerHash)
return nil, nil
} else {
return &privateapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}, nil
}
}

cfg.hd.UpdateTopSeenHeightPoS(headerNumber)
forkingPoint := uint64(0)
if headerNumber > 0 {
Expand All @@ -362,15 +384,6 @@ func startHandlingForkChoice(
}
}

if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() {
log.Info("Flushing in-memory state")
if err := cfg.hd.FlushNextForkState(tx); err != nil {
return nil, err
}
cfg.hd.SetPendingPayloadHash(headerHash)
return nil, nil
}

log.Info(fmt.Sprintf("[%s] Fork choice re-org", s.LogPrefix()), "headerNumber", headerNumber, "forkingPoint", forkingPoint)

if requestStatus == engineapi.New {
Expand Down Expand Up @@ -571,14 +584,14 @@ func verifyAndSaveNewPoSHeader(
// TODO(yperbasis): considered non-canonical because some missing headers were downloaded but not canonized
// Or it's not a problem because forkChoice is updated frequently?
if cfg.memoryOverlay {
status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload)
status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload)
if criticalError != nil {
return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError
}
success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED
return &privateapi.PayloadStatus{
Status: status,
LatestValidHash: currentHeadHash,
LatestValidHash: latestValidHash,
ValidationError: validationError,
}, success, nil
}
Expand All @@ -587,14 +600,14 @@ func verifyAndSaveNewPoSHeader(
}

if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) {
status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload)
status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload)
if criticalError != nil {
return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError
}
success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED
return &privateapi.PayloadStatus{
Status: status,
LatestValidHash: currentHeadHash,
LatestValidHash: latestValidHash,
ValidationError: validationError,
}, success, nil
}
Expand Down
40 changes: 29 additions & 11 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,10 +1093,16 @@ func abs64(n int64) uint64 {
return uint64(n)
}

func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, validationError error, criticalError error) {
func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, latestValidHash common.Hash, validationError error, criticalError error) {
hd.lock.Lock()
defer hd.lock.Unlock()
maxDepth := uint64(16)

currentHeight := rawdb.ReadCurrentBlockNumber(tx)
if currentHeight == nil {
criticalError = fmt.Errorf("could not read block number.")
return
}
if store {
// If it is a continuation of the canonical chain we can stack it up.
if hd.nextForkState == nil {
Expand All @@ -1105,26 +1111,24 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body
hd.nextForkState.UpdateTxn(tx)
}
hd.nextForkHash = header.Hash()
status = remote.EngineStatus_VALID
// Let's assemble the side fork chain if we have others building.
validationError = execPayload(hd.nextForkState, header, body, 0, nil, nil)
if validationError != nil {
status = remote.EngineStatus_INVALID
latestValidHash = header.ParentHash
return
}
return
}
currentHeight := rawdb.ReadCurrentBlockNumber(tx)
if currentHeight == nil {
criticalError = fmt.Errorf("could not read block number.")
status = remote.EngineStatus_VALID
latestValidHash = header.Hash()
hd.sideForksBlock[latestValidHash] = sideForkBlock{header, body}
hd.cleanupOutdateSideForks(*currentHeight, maxDepth)
return
}
// if the block is not in range of MAX_DEPTH from head then we do not validate it.
if abs64(int64(*currentHeight)-header.Number.Int64()) > maxDepth {
status = remote.EngineStatus_ACCEPTED
return
}
// if it is not canonical we validate it as a side fork.
batch := memdb.NewMemoryBatch(tx)
// Let's assemble the side fork backwards
var foundCanonical bool
currentHash := header.ParentHash
Expand Down Expand Up @@ -1155,17 +1159,26 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body
}
hd.sideForksBlock[header.Hash()] = sideForkBlock{header, body}
status = remote.EngineStatus_VALID
// if it is not canonical we validate it as a side fork.
batch := memdb.NewMemoryBatch(tx)
defer batch.Close()
validationError = execPayload(batch, header, body, unwindPoint, headersChain, bodiesChain)
latestValidHash = header.Hash()
if validationError != nil {
latestValidHash = header.ParentHash
status = remote.EngineStatus_INVALID
}
// After the we finished executing, we clean up old forks
hd.cleanupOutdateSideForks(*currentHeight, maxDepth)
return
}

func (hd *HeaderDownload) cleanupOutdateSideForks(currentHeight uint64, maxDepth uint64) {
for hash, sb := range hd.sideForksBlock {
if abs64(int64(*currentHeight)-sb.header.Number.Int64()) > maxDepth {
if abs64(int64(currentHeight)-sb.header.Number.Int64()) > maxDepth {
delete(hd.sideForksBlock, hash)
}
}
return
}

func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error {
Expand All @@ -1174,6 +1187,11 @@ func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error {
if err := hd.nextForkState.Flush(tx); err != nil {
return err
}
// If the side fork hash is now becoming canonical we can clean up.
if _, ok := hd.sideForksBlock[hd.nextForkHash]; ok {
delete(hd.sideForksBlock, hd.nextForkHash)
}
hd.nextForkState.Close()
hd.nextForkHash = common.Hash{}
hd.nextForkState = nil
return nil
Expand Down

0 comments on commit aa79853

Please sign in to comment.