Skip to content

Commit

Permalink
etl: distinct empty values from nil (#6934)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Mar 7, 2023
1 parent 77065ee commit c5acfd0
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 28 deletions.
9 changes: 5 additions & 4 deletions cmd/devnet/devnetutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"math/big"
"os/exec"
"strconv"
"strings"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cmd/devnet/models"
"github.com/ledgerwatch/erigon/cmd/rpctest/rpctest"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/crypto"
"math/big"
"os/exec"
"strconv"
"strings"
)

// ClearDevDB cleans up the dev folder used for the operations
Expand Down
8 changes: 4 additions & 4 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,10 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {

must(sync.SetCurrentStage(stages.Senders))

if reset {
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx) })
}

tx, err := db.BeginRw(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -672,10 +676,6 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
return nil
}

if reset {
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx) })
}

s := stage(sync, tx, nil, stages.Senders)
log.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

Expand Down
2 changes: 0 additions & 2 deletions cmd/release/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"sort"
"strings"
"time"

"github.com/hashicorp/go-version"
)

type Binary struct {
Expand Down
12 changes: 8 additions & 4 deletions eth/integrity/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32)
defer clear()

trieAcc2, err := tx.Cursor(kv.TrieOfAccounts)
if err != nil {
Expand All @@ -56,7 +57,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer accC.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
if errc != nil {
Expand Down Expand Up @@ -157,7 +159,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32)
defer clear()

trieStorage, err := tx.Cursor(kv.TrieOfStorage)
if err != nil {
Expand All @@ -170,7 +173,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer storageC.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
if errc != nil {
Expand Down
21 changes: 14 additions & 7 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
libstate "github.com/ledgerwatch/erigon-lib/state"
state2 "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/log/v3"
"github.com/torquem-ch/mdbx-go/mdbx"
atomic2 "go.uber.org/atomic"

"github.com/ledgerwatch/erigon/cmd/state/exec22"
"github.com/ledgerwatch/erigon/cmd/state/exec3"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
Expand Down Expand Up @@ -1109,48 +1109,55 @@ func reconstituteStep(last bool,
plainContractCollector := etl.NewCollector(fmt.Sprintf("%s recon plainContract", s.LogPrefix()), dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer plainContractCollector.Close()
var transposedKey []byte

if err = db.View(ctx, func(roTx kv.Tx) error {
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateR, nil, math.MaxUint32)
clear := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateR, nil, math.MaxUint32)
defer clear()
if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainStateCollector.Collect(transposedKey, v)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateD, nil, math.MaxUint32)
clear2 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateD, nil, math.MaxUint32)
defer clear2()
if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return plainStateCollector.Collect(transposedKey, nil)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeR, nil, math.MaxUint32)
clear3 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeR, nil, math.MaxUint32)
defer clear3()
if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return codeCollector.Collect(transposedKey, v)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeD, nil, math.MaxUint32)
clear4 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeD, nil, math.MaxUint32)
defer clear4()
if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return codeCollector.Collect(transposedKey, nil)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractR, nil, math.MaxUint32)
clear5 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractR, nil, math.MaxUint32)
defer clear5()
if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainContractCollector.Collect(transposedKey, v)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractD, nil, math.MaxUint32)
clear6 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractD, nil, math.MaxUint32)
defer clear6()
if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
Expand Down
6 changes: 4 additions & 2 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func RegenerateIntermediateHashes(logPrefix string, db kv.RwTx, cfg TrieCfg, exp
defer log.Info(fmt.Sprintf("[%s] Regeneration ended", logPrefix))
_ = db.ClearBucket(kv.TrieOfAccounts)
_ = db.ClearBucket(kv.TrieOfStorage)
kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxUint32)
kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxUint32)
clean := kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxUint32)
defer clean()
clean2 := kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxUint32)
defer clean2()

accTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18

require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce
github.com/ledgerwatch/erigon-lib v0.0.0-20230307023045-f4a02864a931
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3
github.com/ledgerwatch/log/v3 v3.7.0
github.com/ledgerwatch/secp256k1 v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce h1:KsoGX2RLGvqTc97Et7wAwaRNk1/RDAUbe20B9TM7/70=
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce/go.mod h1:/xzmS14QeWZFjRXDiTdeqcSW4IrrUSYkCXZRfsZ5XbI=
github.com/ledgerwatch/erigon-lib v0.0.0-20230307023045-f4a02864a931 h1:BSh4RT/llAZX3gA+7D9kT5yxTmkHcLeLqOw6Tf8ax6E=
github.com/ledgerwatch/erigon-lib v0.0.0-20230307023045-f4a02864a931/go.mod h1:nyJqfX9uPm1P/poZB1211DFe5DnAKOhYqvkEPyW7dXM=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3 h1:tfzawK1gIIgRjVZeANXOr0Ziu+kqCIBuKMe0TXfl5Aw=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.7.0 h1:aFPEZdwZx4jzA3+/Pf8wNDN5tCI0cIolq/kfvgcM+og=
Expand Down
6 changes: 4 additions & 2 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,10 +1395,12 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
return true, nil
}
if doWarmup && !warmupSenders.Load() && blockNum%1_000 == 0 {
kv.ReadAhead(warmupCtx, db, warmupSenders, kv.Senders, hexutility.EncodeTs(blockNum), 10_000)
clean := kv.ReadAhead(warmupCtx, db, warmupSenders, kv.Senders, hexutility.EncodeTs(blockNum), 10_000)
defer clean()
}
if doWarmup && !warmupTxs.Load() && blockNum%1_000 == 0 {
kv.ReadAhead(warmupCtx, db, warmupTxs, kv.EthTx, hexutility.EncodeTs(body.BaseTxId), 100*10_000)
clean := kv.ReadAhead(warmupCtx, db, warmupTxs, kv.EthTx, hexutility.EncodeTs(body.BaseTxId), 100*10_000)
defer clean()
}
senders, err := rawdb.ReadSenders(tx, h, blockNum)
if err != nil {
Expand Down

0 comments on commit c5acfd0

Please sign in to comment.