Skip to content

Commit

Permalink
txnsync: fix potential race during TestBasicCatchpointCatchup (algora…
Browse files Browse the repository at this point in the history
…nd#3033)

## Summary

During fast catchup, we restart the transaction sync service very quickly.
This can cause a network message being sent, and the response would be returned to the "restarted" txnsync.

Since we don't want to disconnect the network connection itself ( which could have some messages enqueued ), the transaction sync would need to store the "returned" channel before sending the message. That would avoid the data race ( and safely ignore the incoming message ).

## Test Plan

Use existing testing, and confirm against that.
  • Loading branch information
tsachiherman authored Oct 11, 2021
1 parent e490f05 commit 8568442
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
9 changes: 7 additions & 2 deletions txnsync/outgoing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type messageAsyncEncoder struct {
messageData sentMessageMetadata
roundClock timers.WallClock
peerDataExchangeRate uint64
// sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of
// the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel
// without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback
// from the network library, but that could be susceptible to undesired network disconnections.
sentMessagesCh chan sentMessageMetadata
}

// asyncMessageSent called via the network package to inform the txsync that a message was enqueued, and the associated sequence number.
Expand All @@ -67,7 +72,7 @@ func (encoder *messageAsyncEncoder) asyncMessageSent(enqueued bool, sequenceNumb
encoder.messageData.sequenceNumber = sequenceNumber

select {
case encoder.state.outgoingMessagesCallbackCh <- encoder.messageData:
case encoder.sentMessagesCh <- encoder.messageData:
return nil
default:
// if we can't place it on the channel, return an error so that the node could disconnect from this peer.
Expand Down Expand Up @@ -136,7 +141,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
pendingTransactions.pendingTransactionsGroups, pendingTransactions.latestLocallyOriginatedGroupCounter = s.node.GetPendingTransactionGroups()
profGetTxnsGroups.end()
for _, peer := range peers {
msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate}
msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh}
profAssembleMessage.start()
msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions)
profAssembleMessage.end()
Expand Down
7 changes: 4 additions & 3 deletions txnsync/outgoing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestAsyncMessageSent(t *testing.T) {
},
peer: &Peer{},
},
roundClock: timers.MakeMonotonicClock(time.Now()),
roundClock: timers.MakeMonotonicClock(time.Now()),
sentMessagesCh: s.outgoingMessagesCallbackCh,
}

oldTimestamp := asyncEncoder.messageData.sentTimestamp
Expand All @@ -83,11 +84,11 @@ func TestAsyncMessageSent(t *testing.T) {
a.Equal(asyncEncoder.messageData.sequenceNumber, uint64(1337))

// Make this buffered for now so we catch the select statement
asyncEncoder.state.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1)
asyncEncoder.sentMessagesCh = make(chan sentMessageMetadata, 1)

err = asyncEncoder.asyncMessageSent(true, 1337)
a.Nil(err)
a.Equal(1, len(asyncEncoder.state.outgoingMessagesCallbackCh))
a.Equal(1, len(asyncEncoder.sentMessagesCh))
}

type mockAsyncNodeConnector struct {
Expand Down

0 comments on commit 8568442

Please sign in to comment.