From 856844299c4c58f50825dd8ae4dcda7f6c77fd98 Mon Sep 17 00:00:00 2001 From: Tsachi Herman Date: Mon, 11 Oct 2021 14:08:35 -0400 Subject: [PATCH] txnsync: fix potential race during TestBasicCatchpointCatchup (#3033) ## Summary During fast catchup, we restart the transaction sync service very quickly. This can cause a network message being sent, and the response would be returned to the "restarted" txnsync. Since we don't want to disconnect the network connection itself ( which could have some messages enqueued ), the transaction sync would need to store the "returned" channel before sending the message. That would avoid the data race ( and safely ignore the incoming message ). ## Test Plan Use existing testing, and confirm against that. --- txnsync/outgoing.go | 9 +++++++-- txnsync/outgoing_test.go | 7 ++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/txnsync/outgoing.go b/txnsync/outgoing.go index 98697f3117..ac3be91b2f 100644 --- a/txnsync/outgoing.go +++ b/txnsync/outgoing.go @@ -55,6 +55,11 @@ type messageAsyncEncoder struct { messageData sentMessageMetadata roundClock timers.WallClock peerDataExchangeRate uint64 + // sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of + // the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel + // without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback + // from the network library, but that could be susceptible to undesired network disconnections. + sentMessagesCh chan sentMessageMetadata } // asyncMessageSent called via the network package to inform the txsync that a message was enqueued, and the associated sequence number. @@ -67,7 +72,7 @@ func (encoder *messageAsyncEncoder) asyncMessageSent(enqueued bool, sequenceNumb encoder.messageData.sequenceNumber = sequenceNumber select { - case encoder.state.outgoingMessagesCallbackCh <- encoder.messageData: + case encoder.sentMessagesCh <- encoder.messageData: return nil default: // if we can't place it on the channel, return an error so that the node could disconnect from this peer. @@ -136,7 +141,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D pendingTransactions.pendingTransactionsGroups, pendingTransactions.latestLocallyOriginatedGroupCounter = s.node.GetPendingTransactionGroups() profGetTxnsGroups.end() for _, peer := range peers { - msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate} + msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh} profAssembleMessage.start() msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions) profAssembleMessage.end() diff --git a/txnsync/outgoing_test.go b/txnsync/outgoing_test.go index 151c716b2c..94e3f91255 100644 --- a/txnsync/outgoing_test.go +++ b/txnsync/outgoing_test.go @@ -72,7 +72,8 @@ func TestAsyncMessageSent(t *testing.T) { }, peer: &Peer{}, }, - roundClock: timers.MakeMonotonicClock(time.Now()), + roundClock: timers.MakeMonotonicClock(time.Now()), + sentMessagesCh: s.outgoingMessagesCallbackCh, } oldTimestamp := asyncEncoder.messageData.sentTimestamp @@ -83,11 +84,11 @@ func TestAsyncMessageSent(t *testing.T) { a.Equal(asyncEncoder.messageData.sequenceNumber, uint64(1337)) // Make this buffered for now so we catch the select statement - asyncEncoder.state.outgoingMessagesCallbackCh = make(chan sentMessageMetadata, 1) + asyncEncoder.sentMessagesCh = make(chan sentMessageMetadata, 1) err = asyncEncoder.asyncMessageSent(true, 1337) a.Nil(err) - a.Equal(1, len(asyncEncoder.state.outgoingMessagesCallbackCh)) + a.Equal(1, len(asyncEncoder.sentMessagesCh)) } type mockAsyncNodeConnector struct {