Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txnsync: fix potential race during TestBasicCatchpointCatchup #3033

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions txnsync/outgoing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type messageAsyncEncoder struct {
messageData sentMessageMetadata
roundClock timers.WallClock
peerDataExchangeRate uint64
// sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of
// the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel
// without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback
// from the network library, but that could be susceptible to undesired network disconnections.
sentMessagesCh chan sentMessageMetadata
}

// asyncMessageSent called via the network package to inform the txsync that a message was enqueued, and the associated sequence number.
Expand All @@ -67,7 +72,7 @@ func (encoder *messageAsyncEncoder) asyncMessageSent(enqueued bool, sequenceNumb
encoder.messageData.sequenceNumber = sequenceNumber

select {
case encoder.state.outgoingMessagesCallbackCh <- encoder.messageData:
case encoder.sentMessagesCh <- encoder.messageData:
return nil
default:
// if we can't place it on the channel, return an error so that the node could disconnect from this peer.
Expand Down Expand Up @@ -136,7 +141,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
pendingTransactions.pendingTransactionsGroups, pendingTransactions.latestLocallyOriginatedGroupCounter = s.node.GetPendingTransactionGroups()
profGetTxnsGroups.end()
for _, peer := range peers {
msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate}
msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh}
profAssembleMessage.start()
msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions)
profAssembleMessage.end()
Expand Down
7 changes: 4 additions & 3 deletions txnsync/outgoing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestAsyncMessageSent(t *testing.T) {
},
peer: &Peer{},
},
roundClock: timers.MakeMonotonicClock(time.Now()),
roundClock: timers.MakeMonotonicClock(time.Now()),
sentMessagesCh: s.outgoingMessagesCallbackCh,
}

oldTimestamp := asyncEncoder.messageData.sentTimestamp
Expand All @@ -83,11 +84,11 @@ func TestAsyncMessageSent(t *testing.T) {
a.Equal(asyncEncoder.messageData.sequenceNumber, uint64(1337))

// Make this buffered for now so we catch the select statement
asyncEncoder.state.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1)
asyncEncoder.sentMessagesCh = make(chan sentMessageMetadata, 1)

err = asyncEncoder.asyncMessageSent(true, 1337)
a.Nil(err)
a.Equal(1, len(asyncEncoder.state.outgoingMessagesCallbackCh))
a.Equal(1, len(asyncEncoder.sentMessagesCh))
}

type mockAsyncNodeConnector struct {
Expand Down