Skip to content

Commit

Permalink
[receiver/splunkhec] fix memory leak (#34911)
Browse files Browse the repository at this point in the history
**Description:** 
Fix memory leak when the receiver is used for both metrics and logs at
the same time

**Link to tracking Issue:** <Issue number if applicable>
Fixes #34886
  • Loading branch information
atoulme committed Aug 30, 2024
1 parent 0216163 commit 90e75d3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 16 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix_memleak_obsreport.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak when the receiver is used for both metrics and logs at the same time

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34886]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
48 changes: 32 additions & 16 deletions receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,21 @@ func (r *splunkReceiver) Shutdown(context.Context) error {
return err
}

func (r *splunkReceiver) processSuccessResponseWithAck(ctx context.Context, resp http.ResponseWriter, eventCount int, channelID string) {
func (r *splunkReceiver) processSuccessResponseWithAck(resp http.ResponseWriter, channelID string) error {
if r.ackExt == nil {
panic("writing response with ack when ack extension is not configured")
}

ackID := r.ackExt.ProcessEvent(channelID)
r.ackExt.Ack(channelID, ackID)
r.processSuccessResponse(ctx, resp, eventCount, []byte(fmt.Sprintf(responseOKWithAckID, ackID)))
return r.processSuccessResponse(resp, []byte(fmt.Sprintf(responseOKWithAckID, ackID)))
}

func (r *splunkReceiver) processSuccessResponse(ctx context.Context, resp http.ResponseWriter, eventCount int, bodyContent []byte) {
func (r *splunkReceiver) processSuccessResponse(resp http.ResponseWriter, bodyContent []byte) error {
resp.Header().Set(httpContentTypeHeader, httpJSONTypeHeader)
resp.WriteHeader(http.StatusOK)
if _, err := resp.Write(bodyContent); err != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, eventCount, err)
}
_, err := resp.Write(bodyContent)
return err
}

func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -308,7 +307,9 @@ func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request)

queriedAcks := r.ackExt.QueryAcks(channelID, ackRequest.Acks)
ackString, _ := json.Marshal(queriedAcks)
r.processSuccessResponse(ctx, resp, 0, []byte(fmt.Sprintf(ackResponse, ackString)))
if err := r.processSuccessResponse(resp, []byte(fmt.Sprintf(ackResponse, ackString))); err != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, 0, err)
}
}

func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -383,12 +384,17 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
if consumerErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, consumerErr)
} else {
var ackErr error
if len(channelID) > 0 && r.ackExt != nil {
r.processSuccessResponseWithAck(ctx, resp, ld.LogRecordCount(), channelID)
ackErr = r.processSuccessResponseWithAck(resp, channelID)
} else {
r.processSuccessResponse(ctx, resp, ld.LogRecordCount(), okRespBody)
ackErr = r.processSuccessResponse(resp, okRespBody)
}
if ackErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, ld.LogRecordCount(), err)
} else {
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil)
}
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil)
}
}

Expand Down Expand Up @@ -521,7 +527,6 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
return
}
decodeErr := r.logsConsumer.ConsumeLogs(ctx, ld)
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), decodeErr)
if decodeErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events), decodeErr)
return
Expand All @@ -531,17 +536,27 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config)

decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md)
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), decodeErr)
if decodeErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(metricEvents), decodeErr)
return
}
}

var ackErr error
if len(channelID) > 0 && r.ackExt != nil {
r.processSuccessResponseWithAck(ctx, resp, len(events)+len(metricEvents), channelID)
ackErr = r.processSuccessResponseWithAck(resp, channelID)
} else {
r.processSuccessResponse(ctx, resp, len(events)+len(metricEvents), okRespBody)
ackErr = r.processSuccessResponse(resp, okRespBody)
}
if ackErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events)+len(metricEvents), ackErr)
} else {
if r.logsConsumer != nil {
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil)
}
if r.metricsConsumer != nil {
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil)
}
}
}

Expand Down Expand Up @@ -576,9 +591,10 @@ func (r *splunkReceiver) failRequest(
}
}

if r.metricsConsumer == nil {
if r.logsConsumer != nil {
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), numRecordsReceived, err)
} else {
}
if r.metricsConsumer != nil {
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), numRecordsReceived, err)
}

Expand Down

0 comments on commit 90e75d3

Please sign in to comment.