diff --git a/pkg/exporter/probe/flow/flow.go b/pkg/exporter/probe/flow/flow.go index 71b19d46..497bef4d 100644 --- a/pkg/exporter/probe/flow/flow.go +++ b/pkg/exporter/probe/flow/flow.go @@ -285,10 +285,6 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error { } tuple := toProbeTuple(&key) - if !p.enablePort { - tuple.Dport = 0 - tuple.Sport = 0 - } labels := probe.BuildTupleMetricsLabels(tuple) diff --git a/pkg/exporter/probe/tracepacketloss/packetloss.go b/pkg/exporter/probe/tracepacketloss/packetloss.go index 9d37c8f7..9ff14f85 100644 --- a/pkg/exporter/probe/tracepacketloss/packetloss.go +++ b/pkg/exporter/probe/tracepacketloss/packetloss.go @@ -214,10 +214,6 @@ func (p *packetLossProbe) stop(probeType probe.Type) error { p.probeConfig[probeType] = nil - if probeType == probe.ProbeTypeEvent { - p.closePerfReader() - } - if p.probeCount() == 0 { p.cleanup() } @@ -225,14 +221,13 @@ func (p *packetLossProbe) stop(probeType probe.Type) error { return nil } -func (p *packetLossProbe) closePerfReader() { +func (p *packetLossProbe) cleanup() { + if p.perfReader != nil { p.perfReader.Close() p.perfReader = nil } -} -func (p *packetLossProbe) cleanup() { for _, link := range p.links { link.Close() } @@ -257,42 +252,29 @@ func (p *packetLossProbe) start(probeType probe.Type, cfg *probeConfig) error { return fmt.Errorf("%s failed install ebpf: %w", probeName, err) } - var err error - - if probeType == probe.ProbeTypeEvent { - p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{}))) - if err != nil { - log.Errorf("%s error create perf reader, err: %v", probeName, err) - return err - } - - go p.perfLoop() - } - return nil } -func (p *packetLossProbe) reinstallBPFLocked() error { - p.closePerfReader() +func (p *packetLossProbe) reinstallBPFLocked() (err error) { p.cleanup() - if err := p.loadAndAttachBPF(); err != nil { - log.Errorf("%s failed load and attach bpf, err: %v", probeName, err) - p.cleanup() - return err - } - - if p.probeConfig[probe.ProbeTypeEvent] != nil { - var err error - p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{}))) + defer func() { if err != nil { - log.Errorf("%s error create perf reader, err: %v", probeName, err) - return err + p.cleanup() } + }() - go p.perfLoop() + if err = p.loadAndAttachBPF(); err != nil { + return fmt.Errorf("%s failed load and attach bpf, err: %w", probeName, err) } + p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{}))) + if err != nil { + return fmt.Errorf("%s error create perf reader, err: %w", probeName, err) + } + + go p.perfLoop() + return nil } diff --git a/pkg/exporter/probe/tracetcpretrans/tcpretrans.go b/pkg/exporter/probe/tracetcpretrans/tcpretrans.go index b1da5d44..7d3637c8 100644 --- a/pkg/exporter/probe/tracetcpretrans/tcpretrans.go +++ b/pkg/exporter/probe/tracetcpretrans/tcpretrans.go @@ -63,7 +63,7 @@ func metricsProbeCreator() (probe.MetricsProbe, error) { VariableLabels: probe.TupleMetricsLabels, SingleMetricsOpts: []probe.SingleMetricsOpts{ {Name: retransTotal, ValueType: prometheus.CounterValue}, - {Name: retransFast, ValueType: prometheus.CounterValue}, + //{Name: retransFast, ValueType: prometheus.CounterValue}, }, } batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce) @@ -77,15 +77,18 @@ func eventProbeCreator(sink chan<- *probe.Event, _ map[string]interface{}) (prob return probe.NewEventProbe(probeName, p), nil } +type probeConfig struct { +} + type metricsProbe struct { } -func (p *metricsProbe) Start(ctx context.Context) error { - return _tcpRetransProbe.start(ctx, probe.ProbeTypeMetrics) +func (p *metricsProbe) Start(_ context.Context) error { + return _tcpRetransProbe.start(probe.ProbeTypeMetrics, &probeConfig{}) } -func (p *metricsProbe) Stop(ctx context.Context) error { - return _tcpRetransProbe.stop(ctx, probe.ProbeTypeMetrics) +func (p *metricsProbe) Stop(_ context.Context) error { + return _tcpRetransProbe.stop(probe.ProbeTypeMetrics) } func (p *metricsProbe) collectOnce(emit probe.Emit) error { @@ -98,7 +101,7 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error { labels := probe.BuildTupleMetricsLabels(&tuple) emit(retransTotal, labels, float64(counter.Total)) - emit(retransFast, labels, float64(counter.Fast)) + //emit(retransFast, labels, float64(counter.Fast)) } return nil } @@ -107,8 +110,8 @@ type eventProbe struct { sink chan<- *probe.Event } -func (e *eventProbe) Start(ctx context.Context) error { - err := _tcpRetransProbe.start(ctx, probe.ProbeTypeEvent) +func (e *eventProbe) Start(_ context.Context) error { + err := _tcpRetransProbe.start(probe.ProbeTypeEvent, &probeConfig{}) if err != nil { return err } @@ -117,8 +120,8 @@ func (e *eventProbe) Start(ctx context.Context) error { return nil } -func (e *eventProbe) Stop(ctx context.Context) error { - return _tcpRetransProbe.stop(ctx, probe.ProbeTypeEvent) +func (e *eventProbe) Stop(_ context.Context) error { + return _tcpRetransProbe.stop(probe.ProbeTypeEvent) } type Counter struct { @@ -127,38 +130,47 @@ type Counter struct { } type tcpRetransProbe struct { - objs bpfObjects - links []link.Link - sink chan<- *probe.Event - refcnt [probe.ProbeTypeCount]int - lock sync.Mutex - perfReader *perf.Reader + objs bpfObjects + links []link.Link + sink chan<- *probe.Event + probeConfig [probe.ProbeTypeCount]*probeConfig + lock sync.Mutex + perfReader *perf.Reader cache *lru.Cache[probe.Tuple, *Counter] } -func (p *tcpRetransProbe) stop(_ context.Context, probeType probe.Type) error { +func (p *tcpRetransProbe) probeCount() int { + var ret int + for _, cfg := range p.probeConfig { + if cfg != nil { + ret++ + } + } + return ret +} +func (p *tcpRetransProbe) stop(probeType probe.Type) error { p.lock.Lock() defer p.lock.Unlock() - if p.refcnt[probeType] == 0 { + if p.probeConfig[probeType] == nil { return fmt.Errorf("probe %s never start", probeType) } - p.refcnt[probeType]-- + p.probeConfig[probeType] = nil - if p.refcnt[probe.ProbeTypeEvent] == 0 { - if p.perfReader != nil { - p.perfReader.Close() - } + if p.probeCount() == 0 { + p.cleanup() } - if p.totalReferenceCountLocked() == 0 { - return p.cleanup() - } return nil } -func (p *tcpRetransProbe) cleanup() error { +func (p *tcpRetransProbe) cleanup() { + if p.perfReader != nil { + p.perfReader.Close() + p.perfReader = nil + } + for _, link := range p.links { link.Close() } @@ -166,45 +178,45 @@ func (p *tcpRetransProbe) cleanup() error { p.links = nil p.objs.Close() - - return nil } -func (p *tcpRetransProbe) totalReferenceCountLocked() int { - var c int - for _, n := range p.refcnt { - c += n - } - return c -} - -func (p *tcpRetransProbe) start(ctx context.Context, probeType probe.Type) (err error) { +func (p *tcpRetransProbe) start(probeType probe.Type, cfg *probeConfig) (err error) { p.lock.Lock() defer p.lock.Unlock() - if p.refcnt[probeType] != 0 { + if p.probeConfig[probeType] != nil { return fmt.Errorf("%s(%s) has already started", probeName, probeType) } - p.refcnt[probeType]++ - if p.totalReferenceCountLocked() == 1 { - if err = p.loadAndAttachBPF(); err != nil { - log.Errorf("%s failed load and attach bpf, err: %v", probeName, err) - _ = p.cleanup() - return - } + p.probeConfig[probeType] = cfg + + if err := p.reinstallBPFLocked(); err != nil { + return fmt.Errorf("%s failed install ebpf: %w", probeName, err) } - if p.refcnt[probe.ProbeTypeEvent] == 1 { - p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspTcpRetransEvent, int(unsafe.Sizeof(bpfInspTcpretransEventT{}))) + return nil +} + +func (p *tcpRetransProbe) reinstallBPFLocked() (err error) { + p.cleanup() + + defer func() { if err != nil { - log.Errorf("%s error create perf reader, err: %v", probeName, err) - _ = p.stop(ctx, probeType) - return + p.cleanup() } + }() + + if err = p.loadAndAttachBPF(); err != nil { + return fmt.Errorf("%s failed load and attach bpf, err: %w", probeName, err) + } - go p.perfLoop() + p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspTcpRetransEvent, int(unsafe.Sizeof(bpfInspTcpretransEventT{}))) + if err != nil { + return fmt.Errorf("%s error create perf reader, err: %w", probeName, err) } + + go p.perfLoop() + return nil }