Skip to content

Commit

Permalink
[caplin] topic strings (#9000)
Browse files Browse the repository at this point in the history
  • Loading branch information
elee1766 authored Dec 30, 2023
1 parent 510d62a commit 78bb3cd
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 338 deletions.
45 changes: 0 additions & 45 deletions cl/beacon/handler/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,51 +68,6 @@ func (r *beaconResponse) withVersion(version clparams.StateVersion) (out *beacon
return out
}

//// In case of it being a json we need to also expose finalization, version, etc...
//type beaconHandlerFn func(r *http.Request) *beaconResponse
//
//func beaconHandlerWrapper(fn beaconHandlerFn, supportSSZ bool) func(w http.ResponseWriter, r *http.Request) {
// return func(w http.ResponseWriter, r *http.Request) {
// accept := r.Header.Get("Accept")
// isSSZ := !strings.Contains(accept, "application/json") && strings.Contains(accept, "application/stream-octect")
// start := time.Now()
// defer func() {
// log.Debug("[Beacon API] finished", "method", r.Method, "path", r.URL.Path, "duration", time.Since(start))
// }()
//
// resp := fn(r)
// if resp.internalError != nil {
// http.Error(w, resp.internalError.Error(), http.StatusInternalServerError)
// log.Debug("[Beacon API] failed", "method", r.Method, "err", resp.internalError.Error(), "ssz", isSSZ)
// return
// }
//
// if resp.apiError != nil {
// http.Error(w, resp.apiError.err.Error(), resp.apiError.code)
// log.Debug("[Beacon API] failed", "method", r.Method, "err", resp.apiError.err.Error(), "ssz", isSSZ)
// return
// }
//
// if isSSZ && supportSSZ {
// data := resp.Data
// // SSZ encoding
// encoded, err := data.(ssz.Marshaler).EncodeSSZ(nil)
// if err != nil {
// http.Error(w, err.Error(), http.StatusInternalServerError)
// log.Debug("[Beacon API] failed", "method", r.Method, "err", err, "accepted", accept)
// return
// }
// w.Header().Set("Content-Type", "application/octet-stream")
// w.Write(encoded)
// return
// }
// w.Header().Set("Content-Type", "application/json")
// if err := json.NewEncoder(w).Encode(resp); err != nil {
// log.Warn("[Beacon API] failed", "method", r.Method, "err", err, "ssz", isSSZ)
// }
// }
//}

type chainTag int

var (
Expand Down
25 changes: 25 additions & 0 deletions cl/gossip/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package gossip

import (
"strconv"
"strings"
)

const (
TopicNameBeaconBlock = "beacon_block"
TopicNameBeaconAggregateAndProof = "beacon_aggregate_and_proof"
TopicNameVoluntaryExit = "voluntary_exit"
TopicNameProposerSlashing = "proposer_slashing"
TopicNameAttesterSlashing = "attester_slashing"
TopicNameBlsToExecutionChange = "bls_to_execution_change"

TopicNamePrefixBlobSidecar = "blob_sidecar_"
)

func TopicNameBlobSidecar(d int) string {
return TopicNamePrefixBlobSidecar + strconv.Itoa(d)
}

func IsTopicBlobSidecar(d string) bool {
return strings.Contains(d, TopicNamePrefixBlobSidecar)
}
16 changes: 9 additions & 7 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package network
import (
"context"
"fmt"
"github.com/ledgerwatch/erigon-lib/common"
"sync"

"github.com/ledgerwatch/erigon-lib/common"

"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/gossip"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/sentinel/peers"

Expand Down Expand Up @@ -96,8 +98,8 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
// If the deserialization fails, an error is logged and the loop returns to the next iteration.
// If the deserialization is successful, the object is set to the deserialized value and the loop returns to the next iteration.
var object ssz.Unmarshaler
switch data.Type {
case sentinel.GossipType_BeaconBlockGossipType:
switch data.Name {
case gossip.TopicNameBeaconBlock:
object = cltypes.NewSignedBeaconBlock(g.beaconConfig)
if err := object.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
g.sentinel.BanPeer(ctx, data.Peer)
Expand Down Expand Up @@ -142,19 +144,19 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
}
g.mu.RUnlock()

case sentinel.GossipType_VoluntaryExitGossipType:
case gossip.TopicNameVoluntaryExit:
if err := operationsContract[*cltypes.SignedVoluntaryExit](ctx, g, l, data, int(version), "voluntary exit", g.forkChoice.OnVoluntaryExit); err != nil {
return err
}
case sentinel.GossipType_ProposerSlashingGossipType:
case gossip.TopicNameProposerSlashing:
if err := operationsContract[*cltypes.ProposerSlashing](ctx, g, l, data, int(version), "proposer slashing", g.forkChoice.OnProposerSlashing); err != nil {
return err
}
case sentinel.GossipType_AttesterSlashingGossipType:
case gossip.TopicNameAttesterSlashing:
if err := operationsContract[*cltypes.AttesterSlashing](ctx, g, l, data, int(version), "attester slashing", g.forkChoice.OnAttesterSlashing); err != nil {
return err
}
case sentinel.GossipType_BlsToExecutionChangeGossipType:
case gossip.TopicNameBlsToExecutionChange:
if err := operationsContract[*cltypes.SignedBLSToExecutionChange](ctx, g, l, data, int(version), "bls to execution change", g.forkChoice.OnBlsToExecutionChange); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cl/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *BeaconRpcP2P) PropagateBlock(block *cltypes.SignedBeaconBlock) error {
}
_, err = b.sentinel.PublishGossip(b.ctx, &sentinel.GossipData{
Data: encoded,
Type: sentinel.GossipType_BeaconBlockGossipType,
Name: "beacon_block",
})
return err
}
Expand Down
49 changes: 19 additions & 30 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cl/fork"
"github.com/ledgerwatch/erigon/cl/gossip"
"github.com/ledgerwatch/log/v3"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -36,50 +37,38 @@ var (

const SSZSnappyCodec = "ssz_snappy"

type TopicName string

const (
BeaconBlockTopic TopicName = "beacon_block"
BeaconAggregateAndProofTopic TopicName = "beacon_aggregate_and_proof"
VoluntaryExitTopic TopicName = "voluntary_exit"
ProposerSlashingTopic TopicName = "proposer_slashing"
AttesterSlashingTopic TopicName = "attester_slashing"
BlsToExecutionChangeTopic TopicName = "bls_to_execution_change"
BlobSidecarTopic TopicName = "blob_sidecar_%d" // This topic needs an index
)

type GossipTopic struct {
Name TopicName
Name string
CodecStr string
}

var BeaconBlockSsz = GossipTopic{
Name: BeaconBlockTopic,
Name: gossip.TopicNameBeaconBlock,
CodecStr: SSZSnappyCodec,
}

var BeaconAggregateAndProofSsz = GossipTopic{
Name: BeaconAggregateAndProofTopic,
Name: gossip.TopicNameBeaconAggregateAndProof,
CodecStr: SSZSnappyCodec,
}

var VoluntaryExitSsz = GossipTopic{
Name: VoluntaryExitTopic,
Name: gossip.TopicNameVoluntaryExit,
CodecStr: SSZSnappyCodec,
}

var ProposerSlashingSsz = GossipTopic{
Name: ProposerSlashingTopic,
Name: gossip.TopicNameProposerSlashing,
CodecStr: SSZSnappyCodec,
}

var AttesterSlashingSsz = GossipTopic{
Name: AttesterSlashingTopic,
Name: gossip.TopicNameAttesterSlashing,
CodecStr: SSZSnappyCodec,
}

var BlsToExecutionChangeSsz = GossipTopic{
Name: BlsToExecutionChangeTopic,
Name: gossip.TopicNameBlsToExecutionChange,
CodecStr: SSZSnappyCodec,
}

Expand All @@ -105,7 +94,7 @@ func NewGossipManager(
func GossipSidecarTopics(maxBlobs uint64) (ret []GossipTopic) {
for i := uint64(0); i < maxBlobs; i++ {
ret = append(ret, GossipTopic{
Name: TopicName(fmt.Sprintf(string(BlobSidecarTopic), i)),
Name: gossip.TopicNameBlobSidecar(int(i)),
CodecStr: SSZSnappyCodec,
})
}
Expand Down Expand Up @@ -204,7 +193,7 @@ func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (
if err != nil {
return nil, fmt.Errorf("failed to join topic %s, err=%w", path, err)
}
topicScoreParams := s.topicScoreParams(string(topic.Name))
topicScoreParams := s.topicScoreParams(topic.Name)
if topicScoreParams != nil {
sub.topic.SetScoreParams(topicScoreParams)
}
Expand All @@ -225,7 +214,7 @@ func (s *Sentinel) Unsubscribe(topic GossipTopic, opts ...pubsub.TopicOpt) (err

func (s *Sentinel) topicScoreParams(topic string) *pubsub.TopicScoreParams {
switch {
case strings.Contains(topic, string(BeaconBlockTopic)):
case strings.Contains(topic, gossip.TopicNameBeaconBlock):
return s.defaultBlockTopicParams()
/*case strings.Contains(topic, GossipAggregateAndProofMessage):
return defaultAggregateTopicParams(activeValidators), nil
Expand Down Expand Up @@ -334,14 +323,14 @@ func (s *GossipSubscription) Close() {
}

type GossipMessage struct {
From peer.ID
Topic TopicName
Data []byte
From peer.ID
TopicName string
Data []byte
}

// this is a helper to begin running the gossip subscription.
// function should not be used outside of the constructor for gossip subscription
func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription, topic string) {
func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription, topicName string) {
defer func() {
if r := recover(); r != nil {
log.Error("[Sentinel Gossip] Message Handler Crashed", "err", r)
Expand All @@ -359,16 +348,16 @@ func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription,
if errors.Is(err, context.Canceled) {
return
}
log.Warn("[Sentinel] fail to decode gossip packet", "err", err, "topic", topic)
log.Warn("[Sentinel] fail to decode gossip packet", "err", err, "topicName", topicName)
return
}
if msg.GetFrom() == s.host {
continue
}
s.ch <- &GossipMessage{
From: msg.GetFrom(),
Topic: TopicName(topic),
Data: common.Copy(msg.Data),
From: msg.GetFrom(),
TopicName: topicName,
Data: common.Copy(msg.Data),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cl/sentinel/sentinel_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestSentinelGossipOnHardFork(t *testing.T) {

ans := <-ch
require.Equal(t, ans.Data, msg)
previousTopic = string(ans.Topic)
previousTopic = ans.TopicName

bcfg.AltairForkEpoch = clparams.MainnetBeaconConfig.AltairForkEpoch
bcfg.InitializeForkSchedule()
Expand All @@ -94,12 +94,12 @@ func TestSentinelGossipOnHardFork(t *testing.T) {
msg = []byte("hello1")
go func() {
// delay to make sure that the connection is established
sub1 = sentinel1.subManager.GetMatchingSubscription(string(BeaconBlockSsz.Name))
sub1 = sentinel1.subManager.GetMatchingSubscription(BeaconBlockSsz.Name)
sub1.Publish(msg)
}()

ans = <-ch
require.Equal(t, ans.Data, msg)
require.NotEqual(t, previousTopic, ans.Topic)
require.NotEqual(t, previousTopic, ans.TopicName)

}
22 changes: 9 additions & 13 deletions cl/sentinel/service/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ import (
"fmt"
"sync"

"github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
"github.com/ledgerwatch/erigon/cl/gossip"
)

const (
maxSubscribers = 100 // only 100 clients per sentinel
)

type gossipObject struct {
data []byte // gossip data
t sentinel.GossipType // determine which gossip message we are notifying of
pid string // pid is the peer id of the sender
blobIndex *uint32 // index of the blob
data []byte // gossip data
t string // determine which gossip message we are notifying of
pid string // pid is the peer id of the sender
}

type gossipNotifier struct {
Expand All @@ -30,7 +29,7 @@ func newGossipNotifier() *gossipNotifier {
}
}

func (g *gossipNotifier) notify(t sentinel.GossipType, data []byte, pid string) {
func (g *gossipNotifier) notify(t string, data []byte, pid string) {
g.mu.Lock()
defer g.mu.Unlock()

Expand All @@ -43,18 +42,15 @@ func (g *gossipNotifier) notify(t sentinel.GossipType, data []byte, pid string)
}
}

func (g *gossipNotifier) notifyBlob(t sentinel.GossipType, data []byte, pid string, blobIndex int) {
func (g *gossipNotifier) notifyBlob(data []byte, pid string, blobIndex int) {
g.mu.Lock()
defer g.mu.Unlock()

index := new(uint32)
*index = uint32(blobIndex)
for _, ch := range g.notifiers {
ch <- gossipObject{
data: data,
t: t,
pid: pid,
blobIndex: index,
data: data,
t: gossip.TopicNameBlobSidecar(blobIndex),
pid: pid,
}
}
}
Expand Down
Loading

0 comments on commit 78bb3cd

Please sign in to comment.