Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix waitGroup race condition bug #1959

Merged
merged 1 commit into from
Feb 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ type transactionSyncer struct {

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector

wg sync.WaitGroup
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -453,6 +451,8 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
for zone, endpointSet := range endpointMap {
if endpointSet.Len() == 0 {
Expand All @@ -476,10 +476,10 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if operation == attachOp {
s.attachNetworkEndpoints(zone, batch)
s.attachNetworkEndpoints(zone, batch, &wg)
}
if operation == detachOp {
s.detachNetworkEndpoints(zone, batch)
s.detachNetworkEndpoints(zone, batch, &wg)
}
}
return nil
Expand All @@ -492,34 +492,34 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
}
go s.collectSyncResult()
go s.collectSyncResult(&wg)
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult() {
s.wg.Wait()
func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
wg.Wait()
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap)
wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap, wg)
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap)
wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap, wg)
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
defer s.wg.Done()
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
defer wg.Done()
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand Down