Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: Ignore invalid tags #4517

Merged
merged 24 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0346b11
don't queue messages with unrecognized tags
cce Aug 31, 2022
0b0f37c
Merge remote-tracking branch 'upstream/master' into ignore-invalid-tags
cce Sep 8, 2022
0e2fe4c
update and add debug logging for bad tags
cce Sep 8, 2022
1b5595b
switch to setting telemetry field
cce Sep 8, 2022
9df1d0c
skip dropping message
cce Sep 8, 2022
06f5c56
add allowCustomTags
cce Sep 8, 2022
bfb3ca1
Merge remote-tracking branch 'upstream/master' into ignore-invalid-tags
cce Nov 9, 2022
ce2af33
turn OutOfProtocol into a counter
cce Nov 9, 2022
5e04cb8
Merge remote-tracking branch 'upstream/master' into ignore-invalid-tags
cce Jan 10, 2023
b7719f7
merge count test
cce Jan 10, 2023
0e702e4
update metric name
cce Jan 10, 2023
18ace9d
update OutOfProtocol to Unknown
cce Jan 10, 2023
ccd5f37
remove drop check behind dedupSafeTag
cce Jan 26, 2023
0c6f169
add test for wsPeer.readLoop to make sure the switch statement checks…
cce Jan 26, 2023
936e568
fix lint
cce Jan 26, 2023
5f0cd15
add protocol.TagList completeness check test
cce Jan 27, 2023
c91c1a4
remove UniCatchupReqTag
cce Jan 27, 2023
ceba110
add license to tags_test.go
cce Jan 27, 2023
12dbc02
add partitiontest for linter
cce Jan 27, 2023
a7dd7a7
add strconv.Unquote to TestTagList
cce Jan 27, 2023
0346817
add TestWebsocketNetworkBasicInvalidTags
cce Jan 27, 2023
e390c9a
update TestTagList
cce Jan 27, 2023
4fbc0f6
add TestHashIDPrefix and a few more comments
cce Jan 28, 2023
4a780fd
Update network/wsNetwork_test.go
algorandskiy Feb 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions agreement/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ type multicastParams struct {
exclude nodeID
}

// UnknownMsgTag ensures the testingNetwork implementation below will drop a message.
const UnknownMsgTag protocol.Tag = "??"

func (n *testingNetwork) multicast(tag protocol.Tag, data []byte, source nodeID, exclude nodeID) {
// fmt.Println("mc", source, "x", exclude)
n.mu.Lock()
Expand Down Expand Up @@ -262,7 +265,7 @@ func (n *testingNetwork) multicast(tag protocol.Tag, data []byte, source nodeID,
msgChans = n.bundleMessages
case protocol.ProposalPayloadTag:
msgChans = n.payloadMessages
case protocol.UnknownMsgTag:
case UnknownMsgTag:
// We use this intentionally - just drop it
return
default:
Expand Down Expand Up @@ -1681,7 +1684,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
// intercept all proposals for the next period; replace with unexpected
baseNetwork.intercept(func(params multicastParams) multicastParams {
if params.tag == protocol.ProposalPayloadTag {
params.tag = protocol.UnknownMsgTag
params.tag = UnknownMsgTag
}
return params
})
Expand Down Expand Up @@ -2280,7 +2283,7 @@ func TestAgreementCertificateDoesNotStallSingleRelay(t *testing.T) {
return params
}
}
params.tag = protocol.UnknownMsgTag
params.tag = UnknownMsgTag
}

return params
Expand Down
2 changes: 1 addition & 1 deletion logging/telemetryspec/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ type PeerConnectionDetails struct {
// DuplicateFilterCount is the number of times this peer has sent us a message hash to filter that it had already sent before.
DuplicateFilterCount uint64
// These message counters count received messages from this peer.
TXCount, MICount, AVCount, PPCount uint64
TXCount, MICount, AVCount, PPCount, UNKCount uint64
// TCPInfo provides connection measurements from TCP.
TCP util.TCPInfo `json:",omitempty"`
}
Expand Down
1 change: 1 addition & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,7 @@ func (wn *WebsocketNetwork) getPeerConnectionTelemetryDetails(now time.Time, pee
MICount: atomic.LoadUint64(&peer.miMessageCount),
AVCount: atomic.LoadUint64(&peer.avMessageCount),
PPCount: atomic.LoadUint64(&peer.ppMessageCount),
UNKCount: atomic.LoadUint64(&peer.unkMessageCount),
}
// unwrap websocket.Conn, requestTrackedConnection, rejectingLimitListenerConn
var uconn net.Conn = peer.conn.UnderlyingConn()
Expand Down
39 changes: 39 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ import (

const sendBufferLength = 1000

func init() {
// this allows test code to use out-of-protocol message tags and have them go through
allowCustomTags = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we confident that go will never load this test package in production?
Why not keep this false, and whichever test that needs this feature, can set it to true then false, and not run in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test code can't be compiled into non-test packages, so it is safe.. but it is true that to parallelize the network tests we'd need to track down which ones needed this flag set. When I did this a while ago I think there were some tests that failed and so I didn't want to track down exactly which ones.

}

func TestMain(m *testing.M) {
logging.Base().SetLevel(logging.Debug)
os.Exit(m.Run())
Expand Down Expand Up @@ -326,6 +331,40 @@ func TestWebsocketNetworkBasic(t *testing.T) {
}
}

// Set up two nodes, test that B drops invalid tags when A ends them.
func TestWebsocketNetworkBasicInvalidTags(t *testing.T) { // nolint:paralleltest // changes global variable allowCustomTags
partitiontest.PartitionTest(t)
// disallow custom tags for this test
allowCustomTags = false
defaultSendMessageTags["XX"] = true
defer func() {
allowCustomTags = true
delete(defaultSendMessageTags, "XX")
}()

netA, netB, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()
counterDone := counter.done
// register a handler that should never get called, because the message will
// be dropped before it gets to the handlers if allowCustomTags = false
netB.RegisterHandlers([]TaggedMessageHandler{
{Tag: "XX", MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
require.Fail(t, "MessageHandler for out-of-protocol tag should not be called")
return OutgoingMessage{}
})}})
// send 2 valid and 2 invalid tags
netA.Broadcast(context.Background(), "TX", []byte("foo"), false, nil)
netA.Broadcast(context.Background(), "XX", []byte("foo"), false, nil)
netA.Broadcast(context.Background(), "TX", []byte("bar"), false, nil)
netA.Broadcast(context.Background(), "XX", []byte("bar"), false, nil)

select {
case <-counterDone:
case <-time.After(2 * time.Second):
t.Errorf("timeout, count=%d, wanted 2", counter.count)
}
}

// Set up two nodes, send proposal
func TestWebsocketProposalPayloadCompression(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down
16 changes: 14 additions & 2 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const msgsInReadBufferPerPeer = 10

var tagStringList []string

// allowCustomTags is set by tests to allow non-protocol-defined message tags. It is false in non-test code.
var allowCustomTags bool

func init() {
tagStringList = make([]string, len(protocol.TagList))
for i, t := range protocol.TagList {
Expand Down Expand Up @@ -96,6 +99,7 @@ var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.Dupl
var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal)
var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal)
var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal)
var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal)

// defaultSendMessageTags is the default list of messages which a peer would
// allow to be sent without receiving any explicit request.
Expand All @@ -109,7 +113,6 @@ var defaultSendMessageTags = map[protocol.Tag]bool{
protocol.TopicMsgRespTag: true,
protocol.MsgOfInterestTag: true,
protocol.TxnTag: true,
protocol.UniCatchupReqTag: true,
protocol.UniEnsBlockReqTag: true,
protocol.VoteBundleTag: true,
}
Expand Down Expand Up @@ -191,7 +194,7 @@ type wsPeer struct {
duplicateFilterCount uint64

// These message counters need to be 64-bit aligned as well.
txMessageCount, miMessageCount, ppMessageCount, avMessageCount uint64
txMessageCount, miMessageCount, ppMessageCount, avMessageCount, unkMessageCount uint64

wsPeerCore

Expand Down Expand Up @@ -543,6 +546,15 @@ func (wp *wsPeer) readLoop() {
atomic.AddUint64(&wp.avMessageCount, 1)
case protocol.ProposalPayloadTag:
atomic.AddUint64(&wp.ppMessageCount, 1)
// the remaining valid tags: no special handling here
case protocol.NetPrioResponseTag, protocol.PingTag, protocol.PingReplyTag,
protocol.StateProofSigTag, protocol.UniEnsBlockReqTag, protocol.VoteBundleTag:
default: // unrecognized tag
unknownProtocolTagMessagesTotal.Inc(nil)
atomic.AddUint64(&wp.unkMessageCount, 1)
if !allowCustomTags {
continue // drop message, skip adding it to queue
}
cce marked this conversation as resolved.
Show resolved Hide resolved
}
if len(msg.Data) > 0 && wp.incomingMsgFilter != nil && dedupSafeTag(msg.Tag) {
if wp.incomingMsgFilter.CheckIncomingMessage(msg.Tag, msg.Data, true, true) {
Expand Down
99 changes: 99 additions & 0 deletions network/wsPeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ package network
import (
"encoding/binary"
"fmt"
"go/ast"
"go/parser"
"go/token"
"path/filepath"
"sort"
"strings"
"testing"
"time"
"unsafe"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/metrics"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -104,6 +110,11 @@ func TestAtomicVariablesAlignment(t *testing.T) {
require.True(t, (unsafe.Offsetof(p.lastPacketTime)%8) == 0)
require.True(t, (unsafe.Offsetof(p.intermittentOutgoingMessageEnqueueTime)%8) == 0)
require.True(t, (unsafe.Offsetof(p.duplicateFilterCount)%8) == 0)
require.True(t, (unsafe.Offsetof(p.txMessageCount)%8) == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great and why don't we have this for everything we sync/atomic access?

#5001

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like can be done as a custom linter similar to part test, by using fieldalignment as an example

Copy link
Contributor Author

@cce cce Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the fieldalignment linter and it didn't work ... it requires being run on a 32-bit platform to work (!!) and I even tried it using a 32-bit docker image inside qemu and it didn't catch everything. However in Go 1.19 we can switch to the new automatically aligned atomic types and not need to worry about this anymore.

require.True(t, (unsafe.Offsetof(p.miMessageCount)%8) == 0)
require.True(t, (unsafe.Offsetof(p.ppMessageCount)%8) == 0)
require.True(t, (unsafe.Offsetof(p.avMessageCount)%8) == 0)
require.True(t, (unsafe.Offsetof(p.unkMessageCount)%8) == 0)
}

func TestTagCounterFiltering(t *testing.T) {
Expand Down Expand Up @@ -180,3 +191,91 @@ func TestVersionToFeature(t *testing.T) {
})
}
}

func TestPeerReadLoopSwitchAllTags(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

allTags := getProtocolTags(t)
foundTags := []string{}

fset := token.NewFileSet()
f, err := parser.ParseFile(fset, "wsPeer.go", nil, 0)
require.NoError(t, err)

getCases := func(n ast.Node) (ret bool) {
switch x := n.(type) {
case *ast.SwitchStmt:
// look for "switch msg.Tag"
if tagSel, ok := x.Tag.(*ast.SelectorExpr); ok {
if tagSel.Sel.Name != "Tag" {
return false
}
if id, ok := tagSel.X.(*ast.Ident); ok && id.Name != "msg" {
return false
}
}
// found switch msg.Tag, go through case statements
for _, s := range x.Body.List {
cl, ok := s.(*ast.CaseClause)
if !ok {
continue
}
for i := range cl.List {
if selExpr, ok := cl.List[i].(*ast.SelectorExpr); ok {
xid, ok := selExpr.X.(*ast.Ident)
require.True(t, ok)
require.Equal(t, "protocol", xid.Name)
foundTags = append(foundTags, selExpr.Sel.Name)
}
}
}
}
return true
}

readLoopFound := false
ast.Inspect(f, func(n ast.Node) bool {
// look for "readLoop" function
fn, ok := n.(*ast.FuncDecl)
if ok && fn.Name.Name == "readLoop" {
readLoopFound = true
ast.Inspect(fn, getCases)
return false
}
return true
})
require.True(t, readLoopFound)
require.NotEmpty(t, foundTags)
sort.Strings(allTags)
sort.Strings(foundTags)
require.Equal(t, allTags, foundTags)
}

func getProtocolTags(t *testing.T) []string {
file := filepath.Join("../protocol", "tags.go")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does is work for both go test ./network and (cd ./network && go test ./) ?

Copy link
Contributor Author

@cce cce Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read in this blog post that go test sets the working directory to the package where the test is running even in these cases, so you can load up test data from local files/directories that are also in the package directory... it seems to be working from CI which is running from the base go-algorand dir

fset := token.NewFileSet()
f, _ := parser.ParseFile(fset, file, nil, parser.ParseComments)

// look for const declarations in protocol/tags.go
var declaredTags []string
// Iterate through the declarations in the file
for _, d := range f.Decls {
genDecl, ok := d.(*ast.GenDecl)
// Check if the declaration is a constant and if not, continue
if !ok || genDecl.Tok != token.CONST {
continue
}
// Iterate through the specs (specifications) in the declaration
for _, spec := range genDecl.Specs {
if valueSpec, ok := spec.(*ast.ValueSpec); ok {
for _, n := range valueSpec.Names {
declaredTags = append(declaredTags, n.Name)
}
}
}
}
// assert these AST-discovered tags are complete (match the size of protocol.TagList)
require.Len(t, declaredTags, len(protocol.TagList))
return declaredTags
}
41 changes: 41 additions & 0 deletions protocol/hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package protocol

import (
"strings"
"testing"

"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/assert"
)

// TestHashIDPrefix checks if any HashID const declared in hash.go is a prefix of another.
func TestHashIDPrefix(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

values := getConstValues(t, "hash.go", "HashID")
for i, v1 := range values {
for j, v2 := range values {
if i == j {
continue
}
assert.False(t, strings.HasPrefix(v1, v2), "HashID %s is a prefix of %s", v2, v1)
}
}
}
8 changes: 2 additions & 6 deletions protocol/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Tag string
// are encoded using a comma separator (see network/msgOfInterest.go).
// The tags must be 2 bytes long.
const (
UnknownMsgTag Tag = "??"
AgreementVoteTag Tag = "AV"
MsgOfInterestTag Tag = "MI"
MsgDigestSkipTag Tag = "MS"
Expand All @@ -36,17 +35,15 @@ const (
StateProofSigTag Tag = "SP"
TopicMsgRespTag Tag = "TS"
TxnTag Tag = "TX"
UniCatchupReqTag Tag = "UC" //Replaced by UniEnsBlockReqTag. Only for backward compatibility.
UniEnsBlockReqTag Tag = "UE"
//UniCatchupReqTag Tag = "UC" was replaced by UniEnsBlockReqTag
UniEnsBlockReqTag Tag = "UE"
//UniEnsBlockResTag Tag = "US" was used for wsfetcherservice
//UniCatchupResTag Tag = "UT" was used for wsfetcherservice
VoteBundleTag Tag = "VB"
)

// TagList is a list of all currently used protocol tags.
// TODO: generate this and/or have a test that it is complete.
var TagList = []Tag{
UnknownMsgTag,
AgreementVoteTag,
MsgOfInterestTag,
MsgDigestSkipTag,
Expand All @@ -57,7 +54,6 @@ var TagList = []Tag{
StateProofSigTag,
TopicMsgRespTag,
TxnTag,
UniCatchupReqTag,
UniEnsBlockReqTag,
VoteBundleTag,
}
Loading