Skip to content

Commit

Permalink
Added tests to sentinel (FINALLY!) (#9066)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Dec 24, 2023
1 parent a4d7b6d commit be4036e
Show file tree
Hide file tree
Showing 15 changed files with 612 additions and 83 deletions.
8 changes: 1 addition & 7 deletions cl/antiquary/state_antiquary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package antiquary
import (
"context"
_ "embed"
"fmt"
"testing"

"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand All @@ -20,7 +19,7 @@ import (

func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postState *state.CachingBeaconState) {
db := memdb.NewTestDB(t)
reader := tests.LoadChain(blocks, db)
reader, _ := tests.LoadChain(blocks, db, t)

ctx := context.Background()
vt := state_accessors.NewStaticValidatorTable()
Expand All @@ -31,21 +30,16 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt
}

func TestStateAntiquaryCapella(t *testing.T) {
t.Skip()
t.Skip()
blocks, preState, postState := tests.GetCapellaRandom()
runTest(t, blocks, preState, postState)
}

func TestStateAntiquaryBellatrix(t *testing.T) {
t.Skip()
blocks, preState, postState := tests.GetBellatrixRandom()
fmt.Println(len(blocks))
runTest(t, blocks, preState, postState)
}

func TestStateAntiquaryPhase0(t *testing.T) {
t.Skip()
blocks, preState, postState := tests.GetPhase0Random()
runTest(t, blocks, preState, postState)
}
30 changes: 15 additions & 15 deletions cl/antiquary/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"embed"
_ "embed"
"strconv"
"testing"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
)

//go:embed test_data/capella/blocks_0.ssz_snappy
Expand Down Expand Up @@ -67,28 +71,24 @@ func (m *MockBlockReader) FrozenSlots() uint64 {
panic("implement me")
}

func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB) *MockBlockReader {
func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB, t *testing.T) (*MockBlockReader, afero.Fs) {
tx, err := db.BeginRw(context.Background())
if err != nil {
panic(err)
}
require.NoError(t, err)
defer tx.Rollback()
fs := afero.NewMemMapFs()
bs := persistence.NewAferoRawBlockSaver(fs, &clparams.MainnetBeaconConfig)
source := persistence.NewBeaconChainDatabaseFilesystem(bs, nil, &clparams.MainnetBeaconConfig)

m := NewMockBlockReader()
for _, block := range blocks {
m.u[block.Block.Slot] = block
h := block.SignedBeaconBlockHeader()
if err := beacon_indicies.WriteBeaconBlockHeaderAndIndicies(context.Background(), tx, h, true); err != nil {
panic(err)
}
if err := beacon_indicies.WriteHighestFinalized(tx, block.Block.Slot+64); err != nil {
panic(err)
}
}
if err := tx.Commit(); err != nil {
panic(err)

require.NoError(t, source.WriteBlock(context.Background(), tx, block, true))
require.NoError(t, beacon_indicies.WriteHighestFinalized(tx, block.Block.Slot+64))
}
return m

require.NoError(t, tx.Commit())
return m, fs
}

func GetCapellaRandom() ([]*cltypes.SignedBeaconBlock, *state.CachingBeaconState, *state.CachingBeaconState) {
Expand Down
15 changes: 10 additions & 5 deletions cl/persistence/beacon_indicies/indicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,16 @@ func PruneBlockRoots(ctx context.Context, tx kv.RwTx, fromSlot, toSlot uint64) e
func ReadBeaconBlockRootsInSlotRange(ctx context.Context, tx kv.Tx, fromSlot, count uint64) ([]libcommon.Hash, []uint64, error) {
blockRoots := make([]libcommon.Hash, 0, count)
slots := make([]uint64, 0, count)
err := RangeBlockRoots(ctx, tx, fromSlot, fromSlot+count, func(slot uint64, beaconBlockRoot libcommon.Hash) bool {
blockRoots = append(blockRoots, beaconBlockRoot)
slots = append(slots, slot)
return true
})
cursor, err := tx.Cursor(kv.CanonicalBlockRoots)
if err != nil {
return nil, nil, err
}
currentCount := uint64(0)
for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(fromSlot)); err == nil && k != nil && currentCount != count; k, v, err = cursor.Next() {
currentCount++
blockRoots = append(blockRoots, libcommon.BytesToHash(v))
slots = append(slots, base_encoding.Decode64FromBytes4(k))
}
return blockRoots, slots, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ func (r *ExecutionSnapshotReader) Withdrawals(number uint64, hash libcommon.Hash
}
ret := solid.NewStaticListSSZ[*cltypes.Withdrawal](int(r.beaconCfg.MaxWithdrawalsPerPayload), 44)
for _, w := range body.Withdrawals {
if w.Index == 0 {
return nil, fmt.Errorf("withdrawal index is zero")
}
ret.Append(&cltypes.Withdrawal{
Index: w.Index,
Validator: w.Validator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postState *state.CachingBeaconState) {
db := memdb.NewTestDB(t)
reader := tests.LoadChain(blocks, db)
reader, _ := tests.LoadChain(blocks, db, t)

ctx := context.Background()
vt := state_accessors.NewStaticValidatorTable()
Expand Down
2 changes: 2 additions & 0 deletions cl/sentinel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type SentinelConfig struct {
NoDiscovery bool
TmpDir string
LocalDiscovery bool

EnableBlocks bool
}

func convertToCryptoPrivkey(privkey *ecdsa.PrivateKey) (crypto.PrivKey, error) {
Expand Down
46 changes: 45 additions & 1 deletion cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,54 @@ func (s *GossipManager) unsubscribe(topic string) {
if _, ok := s.subscriptions[topic]; !ok {
return
}
s.subscriptions[topic].Close()
sub := s.subscriptions[topic]
go func() {
timer := time.NewTimer(time.Hour)
ctx := sub.ctx
select {
case <-ctx.Done():
sub.Close()
case <-timer.C:
sub.Close()
}
}()
delete(s.subscriptions, topic)
}

func (s *Sentinel) forkWatcher() {
prevDigest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
return
}
iterationInterval := time.NewTicker(30 * time.Millisecond)
for {
select {
case <-s.ctx.Done():
return
case <-iterationInterval.C:
digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
return
}
if prevDigest != digest {
subs := s.subManager.subscriptions
for path, sub := range subs {
s.subManager.unsubscribe(path)
newSub, err := s.SubscribeGossip(sub.gossip_topic)
if err != nil {
log.Error("[Gossip] Failed to resubscribe to topic", "err", err)
return
}
newSub.Listen()
}
prevDigest = digest
}
}
}
}

func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) {
digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
Expand Down
70 changes: 28 additions & 42 deletions cl/sentinel/handlers/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package handlers

import (
"errors"
"io"

libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -42,50 +41,44 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
return err
}

if req.Step != 1 {
return errors.New("step must be 1")
}

tx, err := c.indiciesDB.BeginRo(c.ctx)
if err != nil {
return err
}
defer tx.Rollback()

// Limit the number of blocks to the count specified in the request.
if int(req.Count) > MAX_REQUEST_BLOCKS {
req.Count = MAX_REQUEST_BLOCKS
}

beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count-1)
beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count)
if err != nil {
return err
}

if len(beaconBlockRooots) == 0 || len(slots) == 0 {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
}
// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

resourceAvaiable := false
for i, slot := range slots {
r, err := c.beaconDB.BlockReader(c.ctx, slot, beaconBlockRooots[i])
if err != nil {
return err
}
defer r.Close()

if !resourceAvaiable {
if _, err := s.Write([]byte{1}); err != nil {
return err
}
resourceAvaiable = true
version := c.beaconConfig.GetCurrentStateVersion(slot / c.beaconConfig.SlotsPerEpoch)
// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GetForkVersionByVersion(version)),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

if _, err := s.Write([]byte{0}); err != nil {
return err
}

if _, err := s.Write(forkDigest[:]); err != nil {
Expand All @@ -96,9 +89,7 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
return err
}
}
if !resourceAvaiable {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
}

return nil
}

Expand Down Expand Up @@ -132,16 +123,6 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
}
defer tx.Rollback()

// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

resourceAvaiable := false
for i, blockRoot := range blockRoots {
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot)
if slot == nil {
Expand All @@ -157,11 +138,18 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
}
defer r.Close()

if !resourceAvaiable {
if _, err := s.Write([]byte{1}); err != nil {
return err
}
resourceAvaiable = true
if _, err := s.Write([]byte{0}); err != nil {
return err
}

version := c.beaconConfig.GetCurrentStateVersion(*slot / c.beaconConfig.SlotsPerEpoch)
// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GetForkVersionByVersion(version)),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

if _, err := s.Write(forkDigest[:]); err != nil {
Expand All @@ -178,9 +166,7 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
return err
}
}
if !resourceAvaiable {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
}

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions cl/sentinel/handlers/blocks_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestBlocksByRootHandler(t *testing.T) {
}

reqData := libcommon.CopyBytes(reqBuf.Bytes())
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV1))
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV2))
require.NoError(t, err)

_, err = stream.Write(reqData)
Expand All @@ -86,7 +86,7 @@ func TestBlocksByRootHandler(t *testing.T) {
firstByte := make([]byte, 1)
_, err = stream.Read(firstByte)
require.NoError(t, err)
require.Equal(t, firstByte[0], byte(1))
require.Equal(t, firstByte[0], byte(0))

for i := 0; i < int(count); i++ {
forkDigest := make([]byte, 4)
Expand Down Expand Up @@ -133,6 +133,7 @@ func TestBlocksByRootHandler(t *testing.T) {
require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot)
require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex)
require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber)
stream.Read(make([]byte, 1))
}

_, err = stream.Read(make([]byte, 1))
Expand Down
5 changes: 3 additions & 2 deletions cl/sentinel/handlers/blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBlocksByRangeHandler(t *testing.T) {
}

reqData := libcommon.CopyBytes(reqBuf.Bytes())
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV1))
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV2))
require.NoError(t, err)

_, err = stream.Write(reqData)
Expand All @@ -89,7 +89,7 @@ func TestBlocksByRangeHandler(t *testing.T) {
firstByte := make([]byte, 1)
_, err = stream.Read(firstByte)
require.NoError(t, err)
require.Equal(t, firstByte[0], byte(1))
require.Equal(t, firstByte[0], byte(0))

for i := 0; i < len(blockRoots); i++ {
forkDigest := make([]byte, 4)
Expand Down Expand Up @@ -132,6 +132,7 @@ func TestBlocksByRangeHandler(t *testing.T) {
require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot)
require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex)
require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber)
stream.Read(make([]byte, 1))
}

_, err = stream.Read(make([]byte, 1))
Expand Down
Loading

0 comments on commit be4036e

Please sign in to comment.