From 12fff37dd4f7bd80a925f99fdc5c7e52312a2156 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 27 Mar 2023 10:15:38 +0700 Subject: [PATCH] e3: recon deadlock fix --- eth/stagedsync/exec3.go | 176 +++++++++++++++++++++------------------- 1 file changed, 92 insertions(+), 84 deletions(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index c328b971ef5..66ff50e240e 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -1024,78 +1024,86 @@ func reconstituteStep(last bool, return h } - var err error // avoid declare global mutable variable - for bn := startBlockNum; bn <= endBlockNum; bn++ { - t = time.Now() - b, err = blockWithSenders(chainDb, nil, blockReader, bn) - if err != nil { - return err - } - if b == nil { - return fmt.Errorf("could not find block %d\n", bn) - } - txs := b.Transactions() - header := b.HeaderNoCopy() - skipAnalysis := core.SkipAnalysis(chainConfig, bn) - signer := *types.MakeSigner(chainConfig, bn) - - f := core.GetHashFn(header, getHeaderFunc) - getHashFnMute := &sync.Mutex{} - getHashFn := func(n uint64) common.Hash { - getHashFnMute.Lock() - defer getHashFnMute.Unlock() - return f(n) - } - blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */) - rules := chainConfig.Rules(bn, b.Time()) + if err := func() (err error) { + defer func() { + close(workCh) + reconDone <- struct{}{} // Complete logging and committing go-routine + _ = g.Wait() + }() - for txIndex := -1; txIndex <= len(txs); txIndex++ { - if bitmap.Contains(inputTxNum) { - binary.BigEndian.PutUint64(txKey[:], inputTxNum) - txTask := &exec22.TxTask{ - BlockNum: bn, - Header: header, - Coinbase: b.Coinbase(), - Uncles: b.Uncles(), - Rules: rules, - TxNum: inputTxNum, - Txs: txs, - TxIndex: txIndex, - BlockHash: b.Hash(), - SkipAnalysis: skipAnalysis, - Final: txIndex == len(txs), - GetHashFn: getHashFn, - EvmBlockContext: blockContext, - Withdrawals: b.Withdrawals(), - } - if txIndex >= 0 && txIndex < len(txs) { - txTask.Tx = txs[txIndex] - txTask.TxAsMessage, err = txTask.Tx.AsMessage(signer, header.BaseFee, txTask.Rules) - if err != nil { - return err + for bn := startBlockNum; bn <= endBlockNum; bn++ { + t = time.Now() + b, err = blockWithSenders(chainDb, nil, blockReader, bn) + if err != nil { + return err + } + if b == nil { + return fmt.Errorf("could not find block %d\n", bn) + } + txs := b.Transactions() + header := b.HeaderNoCopy() + skipAnalysis := core.SkipAnalysis(chainConfig, bn) + signer := *types.MakeSigner(chainConfig, bn) + + f := core.GetHashFn(header, getHeaderFunc) + getHashFnMute := &sync.Mutex{} + getHashFn := func(n uint64) common.Hash { + getHashFnMute.Lock() + defer getHashFnMute.Unlock() + return f(n) + } + blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */) + rules := chainConfig.Rules(bn, b.Time()) + + for txIndex := -1; txIndex <= len(txs); txIndex++ { + if bitmap.Contains(inputTxNum) { + binary.BigEndian.PutUint64(txKey[:], inputTxNum) + txTask := &exec22.TxTask{ + BlockNum: bn, + Header: header, + Coinbase: b.Coinbase(), + Uncles: b.Uncles(), + Rules: rules, + TxNum: inputTxNum, + Txs: txs, + TxIndex: txIndex, + BlockHash: b.Hash(), + SkipAnalysis: skipAnalysis, + Final: txIndex == len(txs), + GetHashFn: getHashFn, + EvmBlockContext: blockContext, + Withdrawals: b.Withdrawals(), + } + if txIndex >= 0 && txIndex < len(txs) { + txTask.Tx = txs[txIndex] + txTask.TxAsMessage, err = txTask.Tx.AsMessage(signer, header.BaseFee, txTask.Rules) + if err != nil { + return err + } + if sender, ok := txs[txIndex].GetSender(); ok { + txTask.Sender = &sender + } + } else { + txTask.Txs = txs } - if sender, ok := txs[txIndex].GetSender(); ok { - txTask.Sender = &sender + + select { + case workCh <- txTask: + case <-ctx.Done(): + return ctx.Err() } - } else { - txTask.Txs = txs } - workCh <- txTask + inputTxNum++ } - inputTxNum++ - } - core.BlockExecutionTimer.UpdateDuration(t) - syncMetrics[stages.Execution].Set(bn) - select { - case <-ctx.Done(): - return ctx.Err() - default: + core.BlockExecutionTimer.UpdateDuration(t) + syncMetrics[stages.Execution].Set(bn) } - } - close(workCh) - reconDone <- struct{}{} // Complete logging and committing go-routine - if err := g.Wait(); err != nil { + if err := g.Wait(); err != nil { + return err + } + return nil + }(); err != nil { return err } @@ -1103,7 +1111,7 @@ func reconstituteStep(last bool, roTxs[i].Rollback() } if err := db.Update(ctx, func(tx kv.RwTx) error { - if err = rs.Flush(tx); err != nil { + if err := rs.Flush(tx); err != nil { return err } return nil @@ -1119,10 +1127,10 @@ func reconstituteStep(last bool, defer plainContractCollector.Close() var transposedKey []byte - if err = db.View(ctx, func(roTx kv.Tx) error { + if err := db.View(ctx, func(roTx kv.Tx) error { clear := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateR, nil, math.MaxUint32) defer clear() - if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error { + if err := roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error { transposedKey = append(transposedKey[:0], k[8:]...) transposedKey = append(transposedKey, k[:8]...) return plainStateCollector.Collect(transposedKey, v) @@ -1131,7 +1139,7 @@ func reconstituteStep(last bool, } clear2 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateD, nil, math.MaxUint32) defer clear2() - if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error { + if err := roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error { transposedKey = append(transposedKey[:0], v...) transposedKey = append(transposedKey, k...) return plainStateCollector.Collect(transposedKey, nil) @@ -1140,7 +1148,7 @@ func reconstituteStep(last bool, } clear3 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeR, nil, math.MaxUint32) defer clear3() - if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error { + if err := roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error { transposedKey = append(transposedKey[:0], k[8:]...) transposedKey = append(transposedKey, k[:8]...) return codeCollector.Collect(transposedKey, v) @@ -1149,7 +1157,7 @@ func reconstituteStep(last bool, } clear4 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeD, nil, math.MaxUint32) defer clear4() - if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error { + if err := roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error { transposedKey = append(transposedKey[:0], v...) transposedKey = append(transposedKey, k...) return codeCollector.Collect(transposedKey, nil) @@ -1158,7 +1166,7 @@ func reconstituteStep(last bool, } clear5 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractR, nil, math.MaxUint32) defer clear5() - if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error { + if err := roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error { transposedKey = append(transposedKey[:0], k[8:]...) transposedKey = append(transposedKey, k[:8]...) return plainContractCollector.Collect(transposedKey, v) @@ -1167,7 +1175,7 @@ func reconstituteStep(last bool, } clear6 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractD, nil, math.MaxUint32) defer clear6() - if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error { + if err := roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error { transposedKey = append(transposedKey[:0], v...) transposedKey = append(transposedKey, k...) return plainContractCollector.Collect(transposedKey, nil) @@ -1178,33 +1186,33 @@ func reconstituteStep(last bool, }); err != nil { return err } - if err = db.Update(ctx, func(tx kv.RwTx) error { - if err = tx.ClearBucket(kv.PlainStateR); err != nil { + if err := db.Update(ctx, func(tx kv.RwTx) error { + if err := tx.ClearBucket(kv.PlainStateR); err != nil { return err } - if err = tx.ClearBucket(kv.PlainStateD); err != nil { + if err := tx.ClearBucket(kv.PlainStateD); err != nil { return err } - if err = tx.ClearBucket(kv.CodeR); err != nil { + if err := tx.ClearBucket(kv.CodeR); err != nil { return err } - if err = tx.ClearBucket(kv.CodeD); err != nil { + if err := tx.ClearBucket(kv.CodeD); err != nil { return err } - if err = tx.ClearBucket(kv.PlainContractR); err != nil { + if err := tx.ClearBucket(kv.PlainContractR); err != nil { return err } - if err = tx.ClearBucket(kv.PlainContractD); err != nil { + if err := tx.ClearBucket(kv.PlainContractD); err != nil { return err } return nil }); err != nil { return err } - if err = chainDb.Update(ctx, func(tx kv.RwTx) error { + if err := chainDb.Update(ctx, func(tx kv.RwTx) error { var lastKey []byte var lastVal []byte - if err = plainStateCollector.Load(tx, kv.PlainState, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + if err := plainStateCollector.Load(tx, kv.PlainState, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { if !bytes.Equal(k[:len(k)-8], lastKey) { if lastKey != nil { if e := next(lastKey, lastKey, lastVal); e != nil { @@ -1236,7 +1244,7 @@ func reconstituteStep(last bool, } lastKey = nil lastVal = nil - if err = codeCollector.Load(tx, kv.Code, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + if err := codeCollector.Load(tx, kv.Code, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { if !bytes.Equal(k[:len(k)-8], lastKey) { if lastKey != nil { if e := next(lastKey, lastKey, lastVal); e != nil { @@ -1268,7 +1276,7 @@ func reconstituteStep(last bool, } lastKey = nil lastVal = nil - if err = plainContractCollector.Load(tx, kv.PlainContractCode, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + if err := plainContractCollector.Load(tx, kv.PlainContractCode, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { if !bytes.Equal(k[:len(k)-8], lastKey) { if lastKey != nil { if e := next(lastKey, lastKey, lastVal); e != nil {