From 95cff1e58c11c7a9f7be343937c93bd7a79f7593 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Fri, 17 Feb 2023 02:08:14 +0000 Subject: [PATCH] Fix waitGroup race condition bug Initialize one waitGroup for each sync now to avoid race condition due to multiple syncs all work on the same waitGroup. --- pkg/neg/syncers/transaction.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 0277096d6c..fbc1078927 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -107,8 +107,6 @@ type transactionSyncer struct { // syncCollector collect sync related metrics syncCollector metrics.SyncerMetricsCollector - - wg sync.WaitGroup } func NewTransactionSyncer( @@ -453,6 +451,8 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n // syncNetworkEndpoints spins off go routines to execute NEG operations func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { + var wg sync.WaitGroup + syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { for zone, endpointSet := range endpointMap { if endpointSet.Len() == 0 { @@ -476,10 +476,10 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } if operation == attachOp { - s.attachNetworkEndpoints(zone, batch) + s.attachNetworkEndpoints(zone, batch, &wg) } if operation == detachOp { - s.detachNetworkEndpoints(zone, batch) + s.detachNetworkEndpoints(zone, batch, &wg) } } return nil @@ -492,34 +492,34 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m if err := syncFunc(removeEndpoints, detachOp); err != nil { return err } - go s.collectSyncResult() + go s.collectSyncResult(&wg) return nil } // collectSyncResult collects the result of the sync and emits the metrics for sync result -func (s *transactionSyncer) collectSyncResult() { - s.wg.Wait() +func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) { + wg.Wait() } // attachNetworkEndpoints creates go routine to run operations for attaching network endpoints -func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { +func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - s.wg.Add(1) - go s.operationInternal(attachOp, zone, networkEndpointMap) + wg.Add(1) + go s.operationInternal(attachOp, zone, networkEndpointMap, wg) } // detachNetworkEndpoints creates go routine to run operations for detaching network endpoints -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { +func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - s.wg.Add(1) - go s.operationInternal(detachOp, zone, networkEndpointMap) + wg.Add(1) + go s.operationInternal(detachOp, zone, networkEndpointMap, wg) } // operationInternal executes NEG API call and commits the transactions // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync -func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { - defer s.wg.Done() +func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) { + defer wg.Done() var err error start := time.Now() networkEndpoints := []*composite.NetworkEndpoint{}