Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e3: recon deadlock fix #7186

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 92 additions & 84 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,86 +1024,94 @@ func reconstituteStep(last bool,
return h
}

var err error // avoid declare global mutable variable
for bn := startBlockNum; bn <= endBlockNum; bn++ {
t = time.Now()
b, err = blockWithSenders(chainDb, nil, blockReader, bn)
if err != nil {
return err
}
if b == nil {
return fmt.Errorf("could not find block %d\n", bn)
}
txs := b.Transactions()
header := b.HeaderNoCopy()
skipAnalysis := core.SkipAnalysis(chainConfig, bn)
signer := *types.MakeSigner(chainConfig, bn)

f := core.GetHashFn(header, getHeaderFunc)
getHashFnMute := &sync.Mutex{}
getHashFn := func(n uint64) common.Hash {
getHashFnMute.Lock()
defer getHashFnMute.Unlock()
return f(n)
}
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */)
rules := chainConfig.Rules(bn, b.Time())
if err := func() (err error) {
defer func() {
close(workCh)
reconDone <- struct{}{} // Complete logging and committing go-routine
_ = g.Wait()
}()

for txIndex := -1; txIndex <= len(txs); txIndex++ {
if bitmap.Contains(inputTxNum) {
binary.BigEndian.PutUint64(txKey[:], inputTxNum)
txTask := &exec22.TxTask{
BlockNum: bn,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
TxNum: inputTxNum,
Txs: txs,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
}
if txIndex >= 0 && txIndex < len(txs) {
txTask.Tx = txs[txIndex]
txTask.TxAsMessage, err = txTask.Tx.AsMessage(signer, header.BaseFee, txTask.Rules)
if err != nil {
return err
for bn := startBlockNum; bn <= endBlockNum; bn++ {
t = time.Now()
b, err = blockWithSenders(chainDb, nil, blockReader, bn)
if err != nil {
return err
}
if b == nil {
return fmt.Errorf("could not find block %d\n", bn)
}
txs := b.Transactions()
header := b.HeaderNoCopy()
skipAnalysis := core.SkipAnalysis(chainConfig, bn)
signer := *types.MakeSigner(chainConfig, bn)

f := core.GetHashFn(header, getHeaderFunc)
getHashFnMute := &sync.Mutex{}
getHashFn := func(n uint64) common.Hash {
getHashFnMute.Lock()
defer getHashFnMute.Unlock()
return f(n)
}
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */)
rules := chainConfig.Rules(bn, b.Time())

for txIndex := -1; txIndex <= len(txs); txIndex++ {
if bitmap.Contains(inputTxNum) {
binary.BigEndian.PutUint64(txKey[:], inputTxNum)
txTask := &exec22.TxTask{
BlockNum: bn,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
TxNum: inputTxNum,
Txs: txs,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
}
if txIndex >= 0 && txIndex < len(txs) {
txTask.Tx = txs[txIndex]
txTask.TxAsMessage, err = txTask.Tx.AsMessage(signer, header.BaseFee, txTask.Rules)
if err != nil {
return err
}
if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender
}
} else {
txTask.Txs = txs
}
if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender

select {
case workCh <- txTask:
case <-ctx.Done():
return ctx.Err()
}
} else {
txTask.Txs = txs
}
workCh <- txTask
inputTxNum++
}
inputTxNum++
}

core.BlockExecutionTimer.UpdateDuration(t)
syncMetrics[stages.Execution].Set(bn)
select {
case <-ctx.Done():
return ctx.Err()
default:
core.BlockExecutionTimer.UpdateDuration(t)
syncMetrics[stages.Execution].Set(bn)
}
}
close(workCh)
reconDone <- struct{}{} // Complete logging and committing go-routine
if err := g.Wait(); err != nil {
if err := g.Wait(); err != nil {
return err
}
return nil
}(); err != nil {
return err
}

for i := 0; i < workerCount; i++ {
roTxs[i].Rollback()
}
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err = rs.Flush(tx); err != nil {
if err := rs.Flush(tx); err != nil {
return err
}
return nil
Expand All @@ -1119,10 +1127,10 @@ func reconstituteStep(last bool,
defer plainContractCollector.Close()
var transposedKey []byte

if err = db.View(ctx, func(roTx kv.Tx) error {
if err := db.View(ctx, func(roTx kv.Tx) error {
clear := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateR, nil, math.MaxUint32)
defer clear()
if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainStateCollector.Collect(transposedKey, v)
Expand All @@ -1131,7 +1139,7 @@ func reconstituteStep(last bool,
}
clear2 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateD, nil, math.MaxUint32)
defer clear2()
if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return plainStateCollector.Collect(transposedKey, nil)
Expand All @@ -1140,7 +1148,7 @@ func reconstituteStep(last bool,
}
clear3 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeR, nil, math.MaxUint32)
defer clear3()
if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return codeCollector.Collect(transposedKey, v)
Expand All @@ -1149,7 +1157,7 @@ func reconstituteStep(last bool,
}
clear4 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeD, nil, math.MaxUint32)
defer clear4()
if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return codeCollector.Collect(transposedKey, nil)
Expand All @@ -1158,7 +1166,7 @@ func reconstituteStep(last bool,
}
clear5 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractR, nil, math.MaxUint32)
defer clear5()
if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainContractCollector.Collect(transposedKey, v)
Expand All @@ -1167,7 +1175,7 @@ func reconstituteStep(last bool,
}
clear6 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractD, nil, math.MaxUint32)
defer clear6()
if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return plainContractCollector.Collect(transposedKey, nil)
Expand All @@ -1178,33 +1186,33 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
if err = db.Update(ctx, func(tx kv.RwTx) error {
if err = tx.ClearBucket(kv.PlainStateR); err != nil {
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.PlainStateR); err != nil {
return err
}
if err = tx.ClearBucket(kv.PlainStateD); err != nil {
if err := tx.ClearBucket(kv.PlainStateD); err != nil {
return err
}
if err = tx.ClearBucket(kv.CodeR); err != nil {
if err := tx.ClearBucket(kv.CodeR); err != nil {
return err
}
if err = tx.ClearBucket(kv.CodeD); err != nil {
if err := tx.ClearBucket(kv.CodeD); err != nil {
return err
}
if err = tx.ClearBucket(kv.PlainContractR); err != nil {
if err := tx.ClearBucket(kv.PlainContractR); err != nil {
return err
}
if err = tx.ClearBucket(kv.PlainContractD); err != nil {
if err := tx.ClearBucket(kv.PlainContractD); err != nil {
return err
}
return nil
}); err != nil {
return err
}
if err = chainDb.Update(ctx, func(tx kv.RwTx) error {
if err := chainDb.Update(ctx, func(tx kv.RwTx) error {
var lastKey []byte
var lastVal []byte
if err = plainStateCollector.Load(tx, kv.PlainState, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := plainStateCollector.Load(tx, kv.PlainState, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if !bytes.Equal(k[:len(k)-8], lastKey) {
if lastKey != nil {
if e := next(lastKey, lastKey, lastVal); e != nil {
Expand Down Expand Up @@ -1236,7 +1244,7 @@ func reconstituteStep(last bool,
}
lastKey = nil
lastVal = nil
if err = codeCollector.Load(tx, kv.Code, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := codeCollector.Load(tx, kv.Code, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if !bytes.Equal(k[:len(k)-8], lastKey) {
if lastKey != nil {
if e := next(lastKey, lastKey, lastVal); e != nil {
Expand Down Expand Up @@ -1268,7 +1276,7 @@ func reconstituteStep(last bool,
}
lastKey = nil
lastVal = nil
if err = plainContractCollector.Load(tx, kv.PlainContractCode, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := plainContractCollector.Load(tx, kv.PlainContractCode, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if !bytes.Equal(k[:len(k)-8], lastKey) {
if lastKey != nil {
if e := next(lastKey, lastKey, lastVal); e != nil {
Expand Down