Skip to content

Commit

Permalink
Fix the deadlock between exporter and conntrack polling go routines
Browse files Browse the repository at this point in the history
Deadlock is due to the access of connection map to update from exporter
go routine to update the "DoneExport" flag. This was caught in scale testing.
Resolved this through a temporary fix by adding same flag in record
data struct. The connection and record deletion logic will be re-evaluated
through PR antrea-io#2360 as it refactors the related code.
  • Loading branch information
srikartati committed Jul 16, 2021
1 parent e4f11f1 commit a2181ae
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 22 deletions.
13 changes: 0 additions & 13 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,3 @@ func (cs *ConntrackConnectionStore) DeleteConnWithoutLock(connKey flowexporter.C
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}

// SetExportDone sets DoneExport field of conntrack connection to true given the connection key.
func (cs *ConntrackConnectionStore) SetExportDone(connKey flowexporter.ConnectionKey) error {
cs.mutex.Lock()
defer cs.mutex.Unlock()

if conn, found := cs.connections[connKey]; !found {
return fmt.Errorf("connection with key %v does not exist in connection map", connKey)
} else {
conn.DoneExport = true
return nil
}
}
9 changes: 2 additions & 7 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,9 @@ func (exp *flowExporter) sendFlowRecords() error {

if flowexporter.IsConnectionDying(&record.Conn) {
// If the connection is in dying state or connection is not in conntrack table,
// we will delete the flow records from records map.
// we set the DoneExport flag to do the deletion later.
klog.V(2).Infof("Deleting the inactive flow records with key: %v from record map", key)
if err := exp.flowRecords.DeleteFlowRecordWithoutLock(key); err != nil {
return err
}
if err := exp.conntrackConnStore.SetExportDone(key); err != nil {
return err
}
record.DoneExport = true
} else {
exp.flowRecords.ValidateAndUpdateStats(key, record)
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/agent/flowexporter/flowrecords/flow_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewFlowRecords() *FlowRecords {
// It makes a copy of the connection object to record, to avoid race conditions between the
// connection store and the flow exporter.
func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
// If the connection is in dying state and the corresponding flow records are already
// exported, then there is no need to add or update the record.
// If the connection is in dying state and is already exported, then there is
// no need to add or update the record.
if flowexporter.IsConnectionDying(conn) && conn.DoneExport {
return nil
}
Expand All @@ -63,8 +63,18 @@ func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, con
IsIPv6: isIPv6,
LastExportTime: conn.StartTime,
IsActive: true,
DoneExport: false,
}
} else {
// If the connection is in dying state and the corresponding flow records are already
// exported, then update the DoneExport flag on the connection.
if flowexporter.IsConnectionDying(conn) && record.DoneExport {
// It is safe to update the connection as we hold the connection map
// lock when calling this function.
conn.DoneExport = true
delete(fr.recordsMap, key)
return nil
}
// set IsActive flag to true when there are changes either in stats or TCP state
if (conn.OriginalPackets > record.PrevPackets) || (conn.ReversePackets > record.PrevReversePackets) || record.Conn.TCPState != conn.TCPState {
record.IsActive = true
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ type FlowRecord struct {
PrevReverseBytes uint64
IsIPv6 bool
LastExportTime time.Time
DoneExport bool
IsActive bool
}

0 comments on commit a2181ae

Please sign in to comment.