Skip to content

Commit

Permalink
feat: add sent bytes and received bytes metrics to peerset plus updat…
Browse files Browse the repository at this point in the history
…e grpc (#606)
  • Loading branch information
amirvalhalla authored Jul 28, 2023
1 parent dcf2242 commit 6c36120
Show file tree
Hide file tree
Showing 52 changed files with 1,135 additions and 272 deletions.
8 changes: 4 additions & 4 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (td *testData) shouldPublishBlockAnnounce(t *testing.T, cons *consensus, ha
case msg := <-cons.broadcastCh:
logger.Info("shouldPublishBlockAnnounce", "message", msg)

if msg.Type() == message.MessageTypeBlockAnnounce {
if msg.Type() == message.TypeBlockAnnounce {
m := msg.(*message.BlockAnnounceMessage)
assert.Equal(t, m.Block.Hash(), hash)
return
Expand All @@ -175,7 +175,7 @@ func shouldPublishProposal(t *testing.T, cons *consensus,
case msg := <-cons.broadcastCh:
logger.Info("shouldPublishProposal", "message", msg)

if msg.Type() == message.MessageTypeProposal {
if msg.Type() == message.TypeProposal {
m := msg.(*message.ProposalMessage)
require.Equal(t, m.Proposal.Height(), height)
require.Equal(t, m.Proposal.Round(), round)
Expand All @@ -196,7 +196,7 @@ func (td *testData) shouldPublishQueryProposal(t *testing.T, cons *consensus, he
case msg := <-cons.broadcastCh:
logger.Info("shouldPublishQueryProposal", "message", msg)

if msg.Type() == message.MessageTypeQueryProposal {
if msg.Type() == message.TypeQueryProposal {
m := msg.(*message.QueryProposalMessage)
assert.Equal(t, m.Height, height)
assert.Equal(t, m.Round, round)
Expand All @@ -216,7 +216,7 @@ func (td *testData) shouldPublishVote(t *testing.T, cons *consensus, voteType vo
case msg := <-cons.broadcastCh:
logger.Info("shouldPublishVote", "message", msg)

if msg.Type() == message.MessageTypeVote {
if msg.Type() == message.TypeVote {
m := msg.(*message.VoteMessage)
if m.Vote.Type() == voteType &&
m.Vote.BlockHash().EqualsTo(hash) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *BlockAnnounceMessage) SanityCheck() error {
}

func (m *BlockAnnounceMessage) Type() Type {
return MessageTypeBlockAnnounce
return TypeBlockAnnounce
}

func (m *BlockAnnounceMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestBlockAnnounceType(t *testing.T) {
m := &BlockAnnounceMessage{}
assert.Equal(t, m.Type(), MessageTypeBlockAnnounce)
assert.Equal(t, m.Type(), TypeBlockAnnounce)
}

func TestBlockAnnounceMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m *BlocksRequestMessage) SanityCheck() error {
}

func (m *BlocksRequestMessage) Type() Type {
return MessageTypeBlocksRequest
return TypeBlocksRequest
}

func (m *BlocksRequestMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/blocks_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestLatestBlocksRequestType(t *testing.T) {
m := &BlocksRequestMessage{}
assert.Equal(t, m.Type(), MessageTypeBlocksRequest)
assert.Equal(t, m.Type(), TypeBlocksRequest)
}

func TestBlocksRequestMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/blocks_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (m *BlocksResponseMessage) SanityCheck() error {
}

func (m *BlocksResponseMessage) Type() Type {
return MessageTypeBlocksResponse
return TypeBlocksResponse
}

func (m *BlocksResponseMessage) Count() uint32 {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestLatestBlocksResponseType(t *testing.T) {
m := &BlocksResponseMessage{}
assert.Equal(t, m.Type(), MessageTypeBlocksResponse)
assert.Equal(t, m.Type(), TypeBlocksResponse)
}

func TestBlocksResponseMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/heart_beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (m *HeartBeatMessage) SanityCheck() error {
}

func (m *HeartBeatMessage) Type() Type {
return MessageTypeHeartBeat
return TypeHeartBeat
}

func (m *HeartBeatMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/heart_beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestHeartBeatType(t *testing.T) {
m := &HeartBeatMessage{}
assert.Equal(t, m.Type(), MessageTypeHeartBeat)
assert.Equal(t, m.Type(), TypeHeartBeat)
}

func TestHeartBeatMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *HelloMessage) SetPublicKey(pub crypto.PublicKey) {
}

func (m *HelloMessage) Type() Type {
return MessageTypeHello
return TypeHello
}

func (m *HelloMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/hello_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestHelloType(t *testing.T) {
m := &HelloMessage{}
assert.Equal(t, m.Type(), MessageTypeHello)
assert.Equal(t, m.Type(), TypeHello)
}

func TestHelloMessage(t *testing.T) {
Expand Down
79 changes: 40 additions & 39 deletions sync/bundle/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,34 @@ func (c ResponseCode) String() string {
return fmt.Sprintf("%d", c)
}

type Type int
type Type int32

const (
MessageTypeHello = Type(1)
MessageTypeHeartBeat = Type(2)
MessageTypeTransactions = Type(3)
MessageTypeQueryProposal = Type(4)
MessageTypeProposal = Type(5)
MessageTypeQueryVotes = Type(6)
MessageTypeVote = Type(7)
MessageTypeBlockAnnounce = Type(8)
MessageTypeBlocksRequest = Type(9)
MessageTypeBlocksResponse = Type(10)
TypeUnspecified = Type(0)
TypeHello = Type(1)
TypeHeartBeat = Type(2)
TypeTransactions = Type(3)
TypeQueryProposal = Type(4)
TypeProposal = Type(5)
TypeQueryVotes = Type(6)
TypeVote = Type(7)
TypeBlockAnnounce = Type(8)
TypeBlocksRequest = Type(9)
TypeBlocksResponse = Type(10)
)

func (t Type) TopicID() network.TopicID {
switch t {
case MessageTypeHello,
MessageTypeHeartBeat,
MessageTypeTransactions,
MessageTypeBlockAnnounce:
case TypeHello,
TypeHeartBeat,
TypeTransactions,
TypeBlockAnnounce:
return network.TopicIDGeneral

case MessageTypeQueryProposal,
MessageTypeProposal,
MessageTypeQueryVotes,
MessageTypeVote:
case TypeQueryProposal,
TypeProposal,
TypeQueryVotes,
TypeVote:
return network.TopicIDConsensus

default:
Expand All @@ -72,51 +73,51 @@ func (t Type) TopicID() network.TopicID {

func (t Type) String() string {
switch t {
case MessageTypeHello:
case TypeHello:
return "hello"
case MessageTypeHeartBeat:
case TypeHeartBeat:
return "heart-beat"
case MessageTypeTransactions:
case TypeTransactions:
return "txs"
case MessageTypeQueryProposal:
case TypeQueryProposal:
return "query-proposal"
case MessageTypeProposal:
case TypeProposal:
return "proposal"
case MessageTypeQueryVotes:
case TypeQueryVotes:
return "query-votes"
case MessageTypeVote:
case TypeVote:
return "vote"
case MessageTypeBlockAnnounce:
case TypeBlockAnnounce:
return "block-announce"
case MessageTypeBlocksRequest:
case TypeBlocksRequest:
return "blocks-req"
case MessageTypeBlocksResponse:
case TypeBlocksResponse:
return "blocks-res"
}
return fmt.Sprintf("%d", t)
}

func MakeMessage(t Type) Message {
switch t {
case MessageTypeHello:
case TypeHello:
return &HelloMessage{}
case MessageTypeHeartBeat:
case TypeHeartBeat:
return &HeartBeatMessage{}
case MessageTypeTransactions:
case TypeTransactions:
return &TransactionsMessage{}
case MessageTypeQueryProposal:
case TypeQueryProposal:
return &QueryProposalMessage{}
case MessageTypeProposal:
case TypeProposal:
return &ProposalMessage{}
case MessageTypeQueryVotes:
case TypeQueryVotes:
return &QueryVotesMessage{}
case MessageTypeVote:
case TypeVote:
return &VoteMessage{}
case MessageTypeBlockAnnounce:
case TypeBlockAnnounce:
return &BlockAnnounceMessage{}
case MessageTypeBlocksRequest:
case TypeBlocksRequest:
return &BlocksRequestMessage{}
case MessageTypeBlocksResponse:
case TypeBlocksResponse:
return &BlocksResponseMessage{}
}

Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (m *ProposalMessage) SanityCheck() error {
}

func (m *ProposalMessage) Type() Type {
return MessageTypeProposal
return TypeProposal
}

func (m *ProposalMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestProposalType(t *testing.T) {
m := &ProposalMessage{}
assert.Equal(t, m.Type(), MessageTypeProposal)
assert.Equal(t, m.Type(), TypeProposal)
}

func TestProposalMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/query_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (m *QueryProposalMessage) SanityCheck() error {
}

func (m *QueryProposalMessage) Type() Type {
return MessageTypeQueryProposal
return TypeQueryProposal
}

func (m *QueryProposalMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/query_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestQueryProposalType(t *testing.T) {
m := &QueryProposalMessage{}
assert.Equal(t, m.Type(), MessageTypeQueryProposal)
assert.Equal(t, m.Type(), TypeQueryProposal)
}

func TestQueryProposalMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/query_votes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (m *QueryVotesMessage) SanityCheck() error {
}

func (m *QueryVotesMessage) Type() Type {
return MessageTypeQueryVotes
return TypeQueryVotes
}

func (m *QueryVotesMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/query_votes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestQueryVotesType(t *testing.T) {
m := &QueryVotesMessage{}
assert.Equal(t, m.Type(), MessageTypeQueryVotes)
assert.Equal(t, m.Type(), TypeQueryVotes)
}

func TestQueryVotesMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *TransactionsMessage) SanityCheck() error {
}

func (m *TransactionsMessage) Type() Type {
return MessageTypeTransactions
return TypeTransactions
}

func (m *TransactionsMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestTransactionsType(t *testing.T) {
m := &TransactionsMessage{}
assert.Equal(t, m.Type(), MessageTypeTransactions)
assert.Equal(t, m.Type(), TypeTransactions)
}

func TestTransactionsMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (m *VoteMessage) SanityCheck() error {
}

func (m *VoteMessage) Type() Type {
return MessageTypeVote
return TypeVote
}

func (m *VoteMessage) Fingerprint() string {
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/message/vote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestVoteType(t *testing.T) {
m := &VoteMessage{}
assert.Equal(t, m.Type(), MessageTypeVote)
assert.Equal(t, m.Type(), TypeVote)
}

func TestVoteMessage(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion sync/firewall/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pactus-project/pactus/network"
"github.com/pactus-project/pactus/state"
"github.com/pactus-project/pactus/sync/bundle"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset"
"github.com/pactus-project/pactus/util/errors"
"github.com/pactus-project/pactus/util/logger"
Expand Down Expand Up @@ -98,10 +99,11 @@ func (f *Firewall) openBundle(r io.Reader, source peer.ID) (*bundle.Bundle, erro
func (f *Firewall) decodeBundle(r io.Reader, pid peer.ID) (*bundle.Bundle, error) {
bdl := new(bundle.Bundle)
bytesRead, err := bdl.Decode(r)
f.peerSet.IncreaseReceivedBytesCounter(pid, bytesRead)
if err != nil {
f.peerSet.IncreaseReceivedBytesCounter(pid, message.TypeUnspecified, bytesRead)
return nil, errors.Errorf(errors.ErrInvalidMessage, err.Error())
}
f.peerSet.IncreaseReceivedBytesCounter(pid, bdl.Message.Type(), bytesRead)

return bdl, nil
}
Expand Down
6 changes: 3 additions & 3 deletions sync/handler_block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestParsingBlockAnnounceMessages(t *testing.T) {
t.Run("Receiving new block announce message, without committing previous block", func(t *testing.T) {
assert.NoError(t, td.receivingNewMessage(td.sync, msg2, pid))

msg1 := td.shouldPublishMessageWithThisType(t, td.network, message.MessageTypeBlocksRequest)
msg1 := td.shouldPublishMessageWithThisType(t, td.network, message.TypeBlocksRequest)
assert.Equal(t, msg1.Message.(*message.BlocksRequestMessage).From, lastBlockHeight+1)

peer := td.sync.peerSet.GetPeer(pid)
Expand Down Expand Up @@ -54,15 +54,15 @@ func TestBroadcastingBlockAnnounceMessages(t *testing.T) {
t.Run("Not in the committee, should not broadcast block announce message", func(t *testing.T) {
td.sync.broadcast(msg)

td.shouldNotPublishMessageWithThisType(t, td.network, message.MessageTypeBlockAnnounce)
td.shouldNotPublishMessageWithThisType(t, td.network, message.TypeBlockAnnounce)
})

td.addPeerToCommittee(t, td.sync.SelfID(), td.sync.signers[0].PublicKey())

t.Run("In the committee, should broadcast block announce message", func(t *testing.T) {
td.sync.broadcast(msg)

msg1 := td.shouldPublishMessageWithThisType(t, td.network, message.MessageTypeBlockAnnounce)
msg1 := td.shouldPublishMessageWithThisType(t, td.network, message.TypeBlockAnnounce)
assert.Equal(t, msg1.Message.(*message.BlockAnnounceMessage).Height, msg.Height)
})
}
Loading

0 comments on commit 6c36120

Please sign in to comment.