From 771337b3a2638edb0f640f5bc5e3340f07271a6a Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 09:33:35 +0930 Subject: [PATCH 01/16] x-pack/filebeat/input/httpjson: hide sends behind methods --- x-pack/filebeat/input/httpjson/request.go | 20 +++++++++++-------- x-pack/filebeat/input/httpjson/response.go | 23 ++++++++++++++-------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index f92d2944c70..edb32ba1ad3 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -82,7 +82,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) - events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true) + events := make(stream) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events) n = processAndPublishEvents(trCtx, events, publisher, true, r.log) continue } @@ -118,7 +119,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values - events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false) + events := make(stream) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events) n = processAndPublishEvents(trCtx, events, publisher, false, r.log) } else { if len(ids) == 0 { @@ -187,11 +189,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p resps = intermediateResps } - var events <-chan maybeMsg + events := make(stream) if rf.isChain { - events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) } else { - events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events) } n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) } @@ -541,7 +543,8 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl // processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it - events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true) + events := make(stream) + r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events) var n int var eventCount int @@ -675,7 +678,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * } resps = intermediateResps } - events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true) + events := make(stream) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) } @@ -698,7 +702,7 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { } // processAndPublishEvents process and publish events based on event type -func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int { +func processAndPublishEvents(trCtx *transformContext, events stream, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int { var n int for maybeMsg := range events { if maybeMsg.failed() { diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 7adfd956fa2..37e539195c0 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -180,10 +180,19 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe return rp } -func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg { +type stream chan maybeMsg + +func (s stream) event(e mapstr.M) { + s <- maybeMsg{msg: e} +} + +func (s stream) fail(err error) { + s <- maybeMsg{err: err} +} + +func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch stream) { trCtx.clearIntervalData() - ch := make(chan maybeMsg) go func() { defer close(ch) var npages int64 @@ -194,7 +203,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran pageStartTime := time.Now() page, hasNext, err := iter.next() if err != nil { - ch <- maybeMsg{err: err} + ch.fail(err) return } @@ -221,13 +230,13 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran for _, t := range rp.transforms { tr, err = t.run(trCtx, tr) if err != nil { - ch <- maybeMsg{err: err} + ch.fail(err) return } } if rp.split == nil { - ch <- maybeMsg{msg: tr.body()} + ch.event(tr.body()) rp.log.Debug("no split found: continuing") continue } @@ -242,7 +251,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran rp.log.Debug(err) default: rp.log.Debug("split operation failed") - ch <- maybeMsg{err: err} + ch.fail(err) return } } @@ -257,6 +266,4 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran } rp.metrics.updatePagesPerInterval(npages) }() - - return ch } From 04c1ff4d036ae136e705db0541e0398fd71e6a7a Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 10:06:46 +0930 Subject: [PATCH 02/16] x-pack/filebeat/input/httpjson: separate publish context, from publish action and control flow --- x-pack/filebeat/input/httpjson/request.go | 78 +++++++++++++++-------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index edb32ba1ad3..70a1e42183e 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -82,9 +82,10 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) + p := newPublisher(trCtx, publisher, true, r.log) events := make(stream) r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events) - n = processAndPublishEvents(trCtx, events, publisher, true, r.log) + n = p.processAndPublishEvents(events) continue } @@ -119,9 +120,10 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values + p := newPublisher(trCtx, publisher, false, r.log) events := make(stream) r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events) - n = processAndPublishEvents(trCtx, events, publisher, false, r.log) + n = p.processAndPublishEvents(events) } else { if len(ids) == 0 { n = 0 @@ -189,13 +191,14 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p resps = intermediateResps } + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) events := make(stream) if rf.isChain { rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) } else { r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events) } - n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) + n += p.processAndPublishEvents(events) } } @@ -678,9 +681,10 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * } resps = intermediateResps } + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) events := make(stream) rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) - n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) + n += p.processAndPublishEvents(events) } defer func() { @@ -701,36 +705,58 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { return *newUrl, nil } +type publisher struct { + ctx *transformContext + pub inputcursor.Publisher + log *logp.Logger +} + +func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) publisher { + if !publish { + pub = nil + } + return publisher{ + ctx: trCtx, + pub: pub, + log: log, + } +} + // processAndPublishEvents process and publish events based on event type -func processAndPublishEvents(trCtx *transformContext, events stream, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int { +func (p publisher) processAndPublishEvents(events stream) int { var n int for maybeMsg := range events { - if maybeMsg.failed() { - log.Errorf("error processing response: %v", maybeMsg) - continue - } + n += p.processAndPublishEvent(maybeMsg) + } + return n +} - if publish { - event, err := makeEvent(maybeMsg.msg) - if err != nil { - log.Errorf("error creating event: %v", maybeMsg) - continue - } +// processAndPublishEvent processes and publishes one events based on event type +func (p publisher) processAndPublishEvent(evt maybeMsg) int { + if evt.failed() { + p.log.Errorf("error processing response: %v", evt) + return 0 + } - if err := publisher.Publish(event, trCtx.cursorMap()); err != nil { - log.Errorf("error publishing event: %v", err) - continue - } - } - if len(*trCtx.firstEventClone()) == 0 { - trCtx.updateFirstEvent(maybeMsg.msg) + if p.pub != nil { + event, err := makeEvent(evt.msg) + if err != nil { + p.log.Errorf("error creating event: %v", evt) + return 0 } - trCtx.updateLastEvent(maybeMsg.msg) - trCtx.updateCursor() - n++ + if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil { + p.log.Errorf("error publishing event: %v", err) + return 0 + } } - return n + if len(*p.ctx.firstEventClone()) == 0 { + p.ctx.updateFirstEvent(evt.msg) + } + p.ctx.updateLastEvent(evt.msg) + p.ctx.updateCursor() + + return 1 } const ( From 8643c178936c4ef9ad4fcefcadfcdde9c1f0574e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 10:20:27 +0930 Subject: [PATCH 03/16] x-pack/filebeat/input/httpjson: clean up missed split use of channels --- x-pack/filebeat/input/httpjson/request.go | 14 +++--- x-pack/filebeat/input/httpjson/response.go | 18 +++++-- x-pack/filebeat/input/httpjson/split.go | 50 ++++++++++---------- x-pack/filebeat/input/httpjson/split_test.go | 10 ++-- 4 files changed, 51 insertions(+), 41 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 70a1e42183e..44b68f9f77e 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) p := newPublisher(trCtx, publisher, true, r.log) - events := make(stream) + events := newStream() r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events) n = p.processAndPublishEvents(events) continue @@ -121,7 +121,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values p := newPublisher(trCtx, publisher, false, r.log) - events := make(stream) + events := newStream() r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events) n = p.processAndPublishEvents(events) } else { @@ -192,7 +192,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - events := make(stream) + events := newStream() if rf.isChain { rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) } else { @@ -546,12 +546,12 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl // processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it - events := make(stream) + events := newStream() r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events) var n int var eventCount int - for maybeMsg := range events { + for maybeMsg := range events.ch { if maybeMsg.failed() { r.log.Errorf("error processing response: %v", maybeMsg) continue @@ -682,7 +682,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - events := make(stream) + events := newStream() rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) n += p.processAndPublishEvents(events) } @@ -725,7 +725,7 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo // processAndPublishEvents process and publish events based on event type func (p publisher) processAndPublishEvents(events stream) int { var n int - for maybeMsg := range events { + for maybeMsg := range events.ch { n += p.processAndPublishEvent(maybeMsg) } return n diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 37e539195c0..eb914002314 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -180,21 +180,31 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe return rp } -type stream chan maybeMsg +type stream struct { + ch chan maybeMsg +} + +func newStream() stream { + return stream{make(chan maybeMsg)} +} func (s stream) event(e mapstr.M) { - s <- maybeMsg{msg: e} + s.ch <- maybeMsg{msg: e} } func (s stream) fail(err error) { - s <- maybeMsg{err: err} + s.ch <- maybeMsg{err: err} +} + +func (s stream) close() { + close(s.ch) } func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch stream) { trCtx.clearIntervalData() go func() { - defer close(ch) + defer ch.close() var npages int64 for i, httpResp := range resps { diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go index 8fa892a6ce0..355870111b6 100644 --- a/x-pack/filebeat/input/httpjson/split.go +++ b/x-pack/filebeat/input/httpjson/split.go @@ -95,13 +95,13 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { // run runs the split operation on the contents of resp, sending successive // split results on ch. ctx is passed to transforms that are called during // the split. -func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error { +func (s *split) run(ctx *transformContext, resp transformable, events stream) error { root := resp.body() - return s.split(ctx, root, ch) + return s.split(ctx, root, events) } // split recursively executes the split processor chain. -func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) error { +func (s *split) split(ctx *transformContext, root mapstr.M, events stream) error { v, err := root.GetValue(s.targetInfo.Name) if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue. return err @@ -110,21 +110,21 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if v == nil { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(root) } return nil } if s.isRoot { if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(root) return errEmptyField } return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(root) return errEmptyField } @@ -138,23 +138,23 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if len(varr) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(root) } return nil } if s.isRoot { - ch <- maybeMsg{msg: root} + events.event(root) return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(root) return errEmptyField } for _, e := range varr { - err := s.sendMessage(ctx, root, s.targetInfo.Name, e, ch) + err := s.sendMessage(ctx, root, s.targetInfo.Name, e, events) if err != nil { s.log.Debug(err) } @@ -170,22 +170,22 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if len(vmap) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(root) } return nil } if s.isRoot { return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(root) return errEmptyField } for k, e := range vmap { - if err := s.sendMessage(ctx, root, k, e, ch); err != nil { + if err := s.sendMessage(ctx, root, k, e, events); err != nil { s.log.Debug(err) } } @@ -200,18 +200,18 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if len(vstr) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } return nil } if s.isRoot { return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(root) return errEmptyField } for _, substr := range strings.Split(vstr, s.delimiter) { - if err := s.sendMessageSplitString(ctx, root, substr, ch); err != nil { + if err := s.sendMessageSplitString(ctx, root, substr, events); err != nil { s.log.Debug(err) } } @@ -224,7 +224,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) // sendMessage sends an array or map split result value, v, on ch after performing // any necessary transformations. If key is "", the value is an element of an array. -func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, ch chan<- maybeMsg) error { +func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events stream) error { obj, ok := toMapStr(v, s.targetInfo.Name) if !ok { return errExpectedSplitObj @@ -252,10 +252,10 @@ func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v } if s.child != nil { - return s.child.split(ctx, clone, ch) + return s.child.split(ctx, clone, events) } - ch <- maybeMsg{msg: clone} + events.event(clone) return nil } @@ -277,7 +277,7 @@ func toMapStr(v interface{}, key string) (mapstr.M, bool) { // sendMessage sends a string split result value, v, on ch after performing any // necessary transformations. If key is "", the value is an element of an array. -func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, ch chan<- maybeMsg) error { +func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events stream) error { clone := root.Clone() _, _ = clone.Put(s.targetInfo.Name, v) @@ -293,10 +293,10 @@ func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v s } if s.child != nil { - return s.child.split(ctx, clone, ch) + return s.child.split(ctx, clone, events) } - ch <- maybeMsg{msg: clone} + events.event(clone) return nil } diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go index 2c4553e9df6..fa390061c9a 100644 --- a/x-pack/filebeat/input/httpjson/split_test.go +++ b/x-pack/filebeat/input/httpjson/split_test.go @@ -704,19 +704,19 @@ func TestSplit(t *testing.T) { for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { - ch := make(chan maybeMsg, len(tc.expectedMessages)) + events := stream{make(chan maybeMsg, len(tc.expectedMessages))} split, err := newSplitResponse(tc.config, logp.NewLogger("")) assert.NoError(t, err) - err = split.run(tc.ctx, tc.resp, ch) + err = split.run(tc.ctx, tc.resp, events) if tc.expectedErr == nil { assert.NoError(t, err) } else { assert.EqualError(t, err, tc.expectedErr.Error()) } - close(ch) - assert.Equal(t, len(tc.expectedMessages), len(ch)) + events.close() + assert.Equal(t, len(tc.expectedMessages), len(events.ch)) for _, msg := range tc.expectedMessages { - e := <-ch + e := <-events.ch assert.NoError(t, e.err) assert.Equal(t, msg.Flatten(), e.msg.Flatten()) } From 0ba484fa67f3974f8d7fd300f5a2a3ba90636822 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 10:55:43 +0930 Subject: [PATCH 04/16] x-pack/filebeat/input/httpjson: protect direction --- x-pack/filebeat/input/httpjson/response.go | 8 +++++++- x-pack/filebeat/input/httpjson/split.go | 8 ++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index eb914002314..28035cde610 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -180,6 +180,12 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe return rp } +type sendStream interface { + event(mapstr.M) + fail(error) + close() +} + type stream struct { ch chan maybeMsg } @@ -200,7 +206,7 @@ func (s stream) close() { close(s.ch) } -func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch stream) { +func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { trCtx.clearIntervalData() go func() { diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go index 355870111b6..0cfb7d3776f 100644 --- a/x-pack/filebeat/input/httpjson/split.go +++ b/x-pack/filebeat/input/httpjson/split.go @@ -95,13 +95,13 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { // run runs the split operation on the contents of resp, sending successive // split results on ch. ctx is passed to transforms that are called during // the split. -func (s *split) run(ctx *transformContext, resp transformable, events stream) error { +func (s *split) run(ctx *transformContext, resp transformable, events sendStream) error { root := resp.body() return s.split(ctx, root, events) } // split recursively executes the split processor chain. -func (s *split) split(ctx *transformContext, root mapstr.M, events stream) error { +func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) error { v, err := root.GetValue(s.targetInfo.Name) if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue. return err @@ -224,7 +224,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events stream) error // sendMessage sends an array or map split result value, v, on ch after performing // any necessary transformations. If key is "", the value is an element of an array. -func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events stream) error { +func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events sendStream) error { obj, ok := toMapStr(v, s.targetInfo.Name) if !ok { return errExpectedSplitObj @@ -277,7 +277,7 @@ func toMapStr(v interface{}, key string) (mapstr.M, bool) { // sendMessage sends a string split result value, v, on ch after performing any // necessary transformations. If key is "", the value is an element of an array. -func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events stream) error { +func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events sendStream) error { clone := root.Clone() _, _ = clone.Put(s.targetInfo.Name, v) From 514917e94d4a46aa2d71ddec4f65dbcc9f675d09 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 11:44:34 +0930 Subject: [PATCH 05/16] x-pack/filebeat/input/httpjson: split out count handling and make publisher satisfy sendStream --- x-pack/filebeat/input/httpjson/request.go | 56 +++++++++++++++-------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 44b68f9f77e..fa6becb42fa 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -85,7 +85,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p p := newPublisher(trCtx, publisher, true, r.log) events := newStream() r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events) - n = p.processAndPublishEvents(events) + p.processAndPublishEvents(events) + n = p.eventCount() continue } @@ -123,7 +124,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p p := newPublisher(trCtx, publisher, false, r.log) events := newStream() r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events) - n = p.processAndPublishEvents(events) + p.processAndPublishEvents(events) + n = p.eventCount() } else { if len(ids) == 0 { n = 0 @@ -198,7 +200,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } else { r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events) } - n += p.processAndPublishEvents(events) + p.processAndPublishEvents(events) + n += p.eventCount() } } @@ -684,7 +687,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) events := newStream() rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) - n += p.processAndPublishEvents(events) + p.processAndPublishEvents(events) + n += p.eventCount() } defer func() { @@ -708,14 +712,15 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { type publisher struct { ctx *transformContext pub inputcursor.Publisher + n int log *logp.Logger } -func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) publisher { +func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher { if !publish { pub = nil } - return publisher{ + return &publisher{ ctx: trCtx, pub: pub, log: log, @@ -723,42 +728,53 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo } // processAndPublishEvents process and publish events based on event type -func (p publisher) processAndPublishEvents(events stream) int { - var n int +func (p *publisher) processAndPublishEvents(events stream) { for maybeMsg := range events.ch { - n += p.processAndPublishEvent(maybeMsg) + p.processAndPublishEvent(maybeMsg) } - return n } // processAndPublishEvent processes and publishes one events based on event type -func (p publisher) processAndPublishEvent(evt maybeMsg) int { +func (p *publisher) processAndPublishEvent(evt maybeMsg) { if evt.failed() { - p.log.Errorf("error processing response: %v", evt) - return 0 + p.fail(evt.err) + return } + p.event(evt.msg) +} +func (p *publisher) event(msg mapstr.M) { if p.pub != nil { - event, err := makeEvent(evt.msg) + event, err := makeEvent(msg) if err != nil { - p.log.Errorf("error creating event: %v", evt) - return 0 + p.log.Errorf("error creating event: %v: %v", msg, err) + return } if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil { p.log.Errorf("error publishing event: %v", err) - return 0 + return } } if len(*p.ctx.firstEventClone()) == 0 { - p.ctx.updateFirstEvent(evt.msg) + p.ctx.updateFirstEvent(msg) } - p.ctx.updateLastEvent(evt.msg) + p.ctx.updateLastEvent(msg) p.ctx.updateCursor() - return 1 + p.n++ +} + +func (p *publisher) fail(err error) { + p.log.Errorf("error processing response: %v", err) } +func (p *publisher) eventCount() int { + return p.n +} + +func (p *publisher) close() {} + const ( // This is generally updated with chain responses, if present, as they continue to occur // Otherwise this is always the last response of the root request w.r.t pagination From 419225a39d9869fa412287f54ae38f72d1bc89b2 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:31:20 +0930 Subject: [PATCH 06/16] x-pack/filebeat/input/httpjson: replay publisher refactors for a chainProcessor type --- x-pack/filebeat/input/httpjson/request.go | 102 ++++++++++++++------- x-pack/filebeat/input/httpjson/response.go | 6 +- x-pack/filebeat/input/httpjson/split.go | 25 ++--- 3 files changed, 86 insertions(+), 47 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index fa6becb42fa..7e398689531 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -549,52 +549,88 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl // processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it + p := newChainProcessor(r, trCtx, publisher, chainIndex) events := newStream() r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events) + p.processRemainingChainEvents(stdCtx, events) + return p.eventCount() +} + +type chainProcessor struct { + req *requester + ctx *transformContext + pub inputcursor.Publisher + idx int + tail bool + n int +} + +func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor { + return &chainProcessor{ + req: req, + ctx: trCtx, + pub: pub, + idx: idx, + } +} - var n int - var eventCount int +func (p *chainProcessor) processRemainingChainEvents(ctx context.Context, events stream) { for maybeMsg := range events.ch { if maybeMsg.failed() { - r.log.Errorf("error processing response: %v", maybeMsg) + p.fail(maybeMsg.err) continue } - if n >= 1 { // skip 1st event as it has already ben processed before - var response http.Response - response.StatusCode = 200 - body := new(bytes.Buffer) - // we construct a new response here from each of the pagination events - err := json.NewEncoder(body).Encode(maybeMsg.msg) - if err != nil { - r.log.Errorf("error processing chain event: %w", err) - continue - } - response.Body = io.NopCloser(body) + p.event(ctx, maybeMsg.msg) + } +} - // updates the cursor for pagination last_event & last_response when chaining is present - trCtx.updateLastEvent(maybeMsg.msg) - trCtx.updateCursor() +func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) { + if !p.tail { + // Skip first event as it has already been processed. + p.tail = true + return + } - // for each pagination response, we repeat all the chain steps / blocks - count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log) - if err != nil { - r.log.Errorf("error processing chain event: %w", err) - continue - } - eventCount += count + var response http.Response + response.StatusCode = 200 + body := new(bytes.Buffer) + // we construct a new response here from each of the pagination events + err := json.NewEncoder(body).Encode(msg) + if err != nil { + p.req.log.Errorf("error processing chain event: %w", err) + return + } + response.Body = io.NopCloser(body) - err = response.Body.Close() - if err != nil { - r.log.Errorf("error closing http response body: %w", err) - } - } + // updates the cursor for pagination last_event & last_response when chaining is present + p.ctx.updateLastEvent(msg) + p.ctx.updateCursor() + + // for each pagination response, we repeat all the chain steps / blocks + n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log) + if err != nil { + p.req.log.Errorf("error processing chain event: %w", err) + return + } + p.n += n - n++ + err = response.Body.Close() + if err != nil { + p.req.log.Errorf("error closing http response body: %w", err) } - return eventCount } +func (p *chainProcessor) fail(err error) { + p.req.log.Errorf("error processing response: %v", err) +} + +func (p *chainProcessor) eventCount() int { + return p.n +} + +func (*chainProcessor) close() {} + // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input // //nolint:bodyclose // response body is closed through drainBody method @@ -740,10 +776,10 @@ func (p *publisher) processAndPublishEvent(evt maybeMsg) { p.fail(evt.err) return } - p.event(evt.msg) + p.event(nil, evt.msg) } -func (p *publisher) event(msg mapstr.M) { +func (p *publisher) event(_ context.Context, msg mapstr.M) { if p.pub != nil { event, err := makeEvent(msg) if err != nil { diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 28035cde610..dc7de5c3b97 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -181,7 +181,7 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe } type sendStream interface { - event(mapstr.M) + event(context.Context, mapstr.M) fail(error) close() } @@ -194,7 +194,7 @@ func newStream() stream { return stream{make(chan maybeMsg)} } -func (s stream) event(e mapstr.M) { +func (s stream) event(_ context.Context, e mapstr.M) { s.ch <- maybeMsg{msg: e} } @@ -252,7 +252,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran } if rp.split == nil { - ch.event(tr.body()) + ch.event(stdCtx, tr.body()) rp.log.Debug("no split found: continuing") continue } diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go index 0cfb7d3776f..dc77fbb0fde 100644 --- a/x-pack/filebeat/input/httpjson/split.go +++ b/x-pack/filebeat/input/httpjson/split.go @@ -5,6 +5,7 @@ package httpjson import ( + "context" "errors" "fmt" "strings" @@ -102,6 +103,8 @@ func (s *split) run(ctx *transformContext, resp transformable, events sendStream // split recursively executes the split processor chain. func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) error { + todo := context.TODO() + v, err := root.GetValue(s.targetInfo.Name) if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue. return err @@ -113,18 +116,18 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e return s.child.split(ctx, root, events) } if s.keepParent { - events.event(root) + events.event(todo, root) } return nil } if s.isRoot { if s.keepParent { - events.event(root) + events.event(todo, root) return errEmptyField } return errEmptyRootField } - events.event(root) + events.event(todo, root) return errEmptyField } @@ -141,15 +144,15 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e return s.child.split(ctx, root, events) } if s.keepParent { - events.event(root) + events.event(todo, root) } return nil } if s.isRoot { - events.event(root) + events.event(todo, root) return errEmptyRootField } - events.event(root) + events.event(todo, root) return errEmptyField } @@ -173,14 +176,14 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e return s.child.split(ctx, root, events) } if s.keepParent { - events.event(root) + events.event(todo, root) } return nil } if s.isRoot { return errEmptyRootField } - events.event(root) + events.event(todo, root) return errEmptyField } @@ -207,7 +210,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e if s.isRoot { return errEmptyRootField } - events.event(root) + events.event(todo, root) return errEmptyField } for _, substr := range strings.Split(vstr, s.delimiter) { @@ -255,7 +258,7 @@ func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v return s.child.split(ctx, clone, events) } - events.event(clone) + events.event(context.TODO(), clone) return nil } @@ -296,7 +299,7 @@ func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v s return s.child.split(ctx, clone, events) } - events.event(clone) + events.event(context.TODO(), clone) return nil } From e6bd82f25d6fff9cd6c441043996883bf6786b2f Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:35:35 +0930 Subject: [PATCH 07/16] x-pack/filebeat/input/httpjson: add sequential version of *responseProcessor.startProcessing --- x-pack/filebeat/input/httpjson/response.go | 76 ++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index dc7de5c3b97..e744069d272 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -283,3 +283,79 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran rp.metrics.updatePagesPerInterval(npages) }() } + +func (rp *responseProcessor) startProcessingSeq(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { + trCtx.clearIntervalData() + + defer ch.close() + var npages int64 + + for i, httpResp := range resps { + iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) + for { + pageStartTime := time.Now() + page, hasNext, err := iter.next() + if err != nil { + ch.fail(err) + return + } + + if !hasNext { + if i+1 != len(resps) { + break + } + return + } + + respTrs := page.asTransformables(rp.log) + + if len(respTrs) == 0 { + return + } + + // last_response context object is updated here organically + trCtx.updateLastResponse(*page) + npages = page.page + + rp.log.Debugf("last received page: %#v", trCtx.lastResponse) + + for _, tr := range respTrs { + for _, t := range rp.transforms { + tr, err = t.run(trCtx, tr) + if err != nil { + ch.fail(err) + return + } + } + + if rp.split == nil { + ch.event(stdCtx, tr.body()) + rp.log.Debug("no split found: continuing") + continue + } + + if err := rp.split.run(trCtx, tr, ch); err != nil { + switch err { //nolint:errorlint // run never returns a wrapped error. + case errEmptyField: + // nothing else to send for this page + rp.log.Debug("split operation finished") + case errEmptyRootField: + // root field not found, most likely the response is empty + rp.log.Debug(err) + default: + rp.log.Debug("split operation failed") + ch.fail(err) + return + } + } + } + + rp.metrics.updatePageExecutionTime(pageStartTime) + + if !paginate { + break + } + } + } + rp.metrics.updatePagesPerInterval(npages) +} From 51c124775cf961a2d88748be7f094fcd9a89724c Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:36:57 +0930 Subject: [PATCH 08/16] x-pack/filebeat/input/httpjson: do chain processing sequentially --- x-pack/filebeat/input/httpjson/request.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 7e398689531..cc5cf74d2c7 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -550,9 +550,7 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it p := newChainProcessor(r, trCtx, publisher, chainIndex) - events := newStream() - r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events) - p.processRemainingChainEvents(stdCtx, events) + r.responseProcessors[0].startProcessingSeq(stdCtx, trCtx, initialResp, true, p) return p.eventCount() } @@ -574,17 +572,6 @@ func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor. } } -func (p *chainProcessor) processRemainingChainEvents(ctx context.Context, events stream) { - for maybeMsg := range events.ch { - if maybeMsg.failed() { - p.fail(maybeMsg.err) - continue - } - - p.event(ctx, maybeMsg.msg) - } -} - func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) { if !p.tail { // Skip first event as it has already been processed. From f39c130e5fe0edc118ed15684458d01977e5abb9 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:38:58 +0930 Subject: [PATCH 09/16] x-pack/filebeat/input/httpjson: do first call sequentially --- x-pack/filebeat/input/httpjson/request.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index cc5cf74d2c7..502ba6ac865 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -83,9 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) p := newPublisher(trCtx, publisher, true, r.log) - events := newStream() - r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events) - p.processAndPublishEvents(events) + r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, true, p) n = p.eventCount() continue } From 7e0ca8b33d3b90abaa55b5dbc84087b8efdf3f6d Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:45:07 +0930 Subject: [PATCH 10/16] x-pack/filebeat/input/httpjson: do second call sequentially --- x-pack/filebeat/input/httpjson/request.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 502ba6ac865..e83fc23cb60 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -120,9 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values p := newPublisher(trCtx, publisher, false, r.log) - events := newStream() - r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events) - p.processAndPublishEvents(events) + r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, false, p) n = p.eventCount() } else { if len(ids) == 0 { From 8d6031186f7a2bfac1392e254505a473fd509300 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:45:29 +0930 Subject: [PATCH 11/16] x-pack/filebeat/input/httpjson: do third call sequentially --- x-pack/filebeat/input/httpjson/request.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index e83fc23cb60..07d68f05cc9 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -190,13 +190,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - events := newStream() if rf.isChain { - rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) + rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p) } else { - r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events) + r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, resps, true, p) } - p.processAndPublishEvents(events) n += p.eventCount() } } From 2b2f9427e7a9924a4ab87c156be97fccda358d97 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:45:43 +0930 Subject: [PATCH 12/16] x-pack/filebeat/input/httpjson: do last call sequentially --- x-pack/filebeat/input/httpjson/request.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 07d68f05cc9..1556b609767 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -702,9 +702,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - events := newStream() - rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events) - p.processAndPublishEvents(events) + rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p) n += p.eventCount() } From 8e4f06149bb0c3bf506ae5f3dee9e18422977487 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:48:17 +0930 Subject: [PATCH 13/16] x-pack/filebeat/input/httpjson: remove concurrent processing --- x-pack/filebeat/input/httpjson/request.go | 12 ++-- x-pack/filebeat/input/httpjson/response.go | 78 ---------------------- 2 files changed, 6 insertions(+), 84 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 1556b609767..d8952896a99 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) p := newPublisher(trCtx, publisher, true, r.log) - r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, true, p) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p) n = p.eventCount() continue } @@ -120,7 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values p := newPublisher(trCtx, publisher, false, r.log) - r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, false, p) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p) n = p.eventCount() } else { if len(ids) == 0 { @@ -191,9 +191,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) if rf.isChain { - rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) } else { - r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, resps, true, p) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p) } n += p.eventCount() } @@ -544,7 +544,7 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it p := newChainProcessor(r, trCtx, publisher, chainIndex) - r.responseProcessors[0].startProcessingSeq(stdCtx, trCtx, initialResp, true, p) + r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p) return p.eventCount() } @@ -702,7 +702,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) n += p.eventCount() } diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index e744069d272..56ea94e4461 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -209,84 +209,6 @@ func (s stream) close() { func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { trCtx.clearIntervalData() - go func() { - defer ch.close() - var npages int64 - - for i, httpResp := range resps { - iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) - for { - pageStartTime := time.Now() - page, hasNext, err := iter.next() - if err != nil { - ch.fail(err) - return - } - - if !hasNext { - if i+1 != len(resps) { - break - } - return - } - - respTrs := page.asTransformables(rp.log) - - if len(respTrs) == 0 { - return - } - - // last_response context object is updated here organically - trCtx.updateLastResponse(*page) - npages = page.page - - rp.log.Debugf("last received page: %#v", trCtx.lastResponse) - - for _, tr := range respTrs { - for _, t := range rp.transforms { - tr, err = t.run(trCtx, tr) - if err != nil { - ch.fail(err) - return - } - } - - if rp.split == nil { - ch.event(stdCtx, tr.body()) - rp.log.Debug("no split found: continuing") - continue - } - - if err := rp.split.run(trCtx, tr, ch); err != nil { - switch err { //nolint:errorlint // run never returns a wrapped error. - case errEmptyField: - // nothing else to send for this page - rp.log.Debug("split operation finished") - case errEmptyRootField: - // root field not found, most likely the response is empty - rp.log.Debug(err) - default: - rp.log.Debug("split operation failed") - ch.fail(err) - return - } - } - } - - rp.metrics.updatePageExecutionTime(pageStartTime) - - if !paginate { - break - } - } - } - rp.metrics.updatePagesPerInterval(npages) - }() -} - -func (rp *responseProcessor) startProcessingSeq(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { - trCtx.clearIntervalData() - defer ch.close() var npages int64 From 9bd1ea9e2b72cc0d3e92285b1ff941412da3f0f8 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 14:51:06 +0930 Subject: [PATCH 14/16] x-pack/filebeat/input/httpjson: remove now redundant locks --- x-pack/filebeat/input/httpjson/transform.go | 29 --------------------- 1 file changed, 29 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/transform.go b/x-pack/filebeat/input/httpjson/transform.go index d4055889bf0..be9e938756e 100644 --- a/x-pack/filebeat/input/httpjson/transform.go +++ b/x-pack/filebeat/input/httpjson/transform.go @@ -10,7 +10,6 @@ import ( "net/http" "net/url" "strconv" - "sync" "github.com/elastic/beats/v7/libbeat/common" conf "github.com/elastic/elastic-agent-libs/config" @@ -25,7 +24,6 @@ type transformsConfig []*conf.C type transforms []transform type transformContext struct { - lock sync.RWMutex cursor *cursor parentTrCtx *transformContext firstEvent *mapstr.M @@ -45,41 +43,28 @@ func emptyTransformContext() *transformContext { } func (ctx *transformContext) cursorMap() mapstr.M { - ctx.lock.RLock() - defer ctx.lock.RUnlock() return ctx.cursor.clone() } func (ctx *transformContext) lastEventClone() *mapstr.M { - ctx.lock.RLock() - defer ctx.lock.RUnlock() clone := ctx.lastEvent.Clone() return &clone } func (ctx *transformContext) firstEventClone() *mapstr.M { - ctx.lock.RLock() - defer ctx.lock.RUnlock() clone := ctx.firstEvent.Clone() return &clone } func (ctx *transformContext) firstResponseClone() *response { - ctx.lock.RLock() - defer ctx.lock.RUnlock() return ctx.firstResponse.clone() } func (ctx *transformContext) lastResponseClone() *response { - ctx.lock.RLock() - defer ctx.lock.RUnlock() return ctx.lastResponse.clone() } func (ctx *transformContext) updateCursor() { - ctx.lock.Lock() - defer ctx.lock.Unlock() - // we do not want to pass the cursor data to itself newCtx := emptyTransformContext() newCtx.lastEvent = ctx.lastEvent @@ -91,8 +76,6 @@ func (ctx *transformContext) updateCursor() { } func (ctx *transformContext) clone() *transformContext { - ctx.lock.Lock() - newCtx := emptyTransformContext() newCtx.lastEvent = ctx.lastEvent newCtx.firstEvent = ctx.firstEvent @@ -100,41 +83,29 @@ func (ctx *transformContext) clone() *transformContext { newCtx.firstResponse = ctx.firstResponse newCtx.cursor = ctx.cursor newCtx.parentTrCtx = ctx - - ctx.lock.Unlock() return newCtx } func (ctx *transformContext) updateLastEvent(e mapstr.M) { - ctx.lock.Lock() - defer ctx.lock.Unlock() *ctx.lastEvent = e } func (ctx *transformContext) updateFirstEvent(e mapstr.M) { - ctx.lock.Lock() - defer ctx.lock.Unlock() *ctx.firstEvent = e } func (ctx *transformContext) updateLastResponse(r response) { - ctx.lock.Lock() - defer ctx.lock.Unlock() *ctx.lastResponse = r } func (ctx *transformContext) updateFirstResponse(r response) { - ctx.lock.Lock() *ctx.firstResponse = r - ctx.lock.Unlock() } func (ctx *transformContext) clearIntervalData() { - ctx.lock.Lock() ctx.lastEvent = &mapstr.M{} ctx.firstEvent = &mapstr.M{} ctx.lastResponse = &response{} - ctx.lock.Unlock() } type transformable mapstr.M From 334b4ee7e293ac1e9e23d0c7e1c16a6f9e162483 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 15:00:19 +0930 Subject: [PATCH 15/16] x-pack/filebeat/input/httpjson: clean up scaffolding --- x-pack/filebeat/input/httpjson/request.go | 20 --------------- x-pack/filebeat/input/httpjson/response.go | 23 ----------------- x-pack/filebeat/input/httpjson/split_test.go | 26 ++++++++++++++------ 3 files changed, 18 insertions(+), 51 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index d8952896a99..6a1d926ab40 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -610,8 +610,6 @@ func (p *chainProcessor) eventCount() int { return p.n } -func (*chainProcessor) close() {} - // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input // //nolint:bodyclose // response body is closed through drainBody method @@ -742,22 +740,6 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo } } -// processAndPublishEvents process and publish events based on event type -func (p *publisher) processAndPublishEvents(events stream) { - for maybeMsg := range events.ch { - p.processAndPublishEvent(maybeMsg) - } -} - -// processAndPublishEvent processes and publishes one events based on event type -func (p *publisher) processAndPublishEvent(evt maybeMsg) { - if evt.failed() { - p.fail(evt.err) - return - } - p.event(nil, evt.msg) -} - func (p *publisher) event(_ context.Context, msg mapstr.M) { if p.pub != nil { event, err := makeEvent(msg) @@ -788,8 +770,6 @@ func (p *publisher) eventCount() int { return p.n } -func (p *publisher) close() {} - const ( // This is generally updated with chain responses, if present, as they continue to occur // Otherwise this is always the last response of the root request w.r.t pagination diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 56ea94e4461..2b8c5de5f4e 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -183,35 +183,12 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe type sendStream interface { event(context.Context, mapstr.M) fail(error) - close() -} - -type stream struct { - ch chan maybeMsg -} - -func newStream() stream { - return stream{make(chan maybeMsg)} -} - -func (s stream) event(_ context.Context, e mapstr.M) { - s.ch <- maybeMsg{msg: e} -} - -func (s stream) fail(err error) { - s.ch <- maybeMsg{err: err} -} - -func (s stream) close() { - close(s.ch) } func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { trCtx.clearIntervalData() - defer ch.close() var npages int64 - for i, httpResp := range resps { iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) for { diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go index fa390061c9a..10f7a40567d 100644 --- a/x-pack/filebeat/input/httpjson/split_test.go +++ b/x-pack/filebeat/input/httpjson/split_test.go @@ -5,6 +5,7 @@ package httpjson import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -702,9 +703,8 @@ func TestSplit(t *testing.T) { } for _, tc := range cases { - tc := tc t.Run(tc.name, func(t *testing.T) { - events := stream{make(chan maybeMsg, len(tc.expectedMessages))} + events := &stream{t: t} split, err := newSplitResponse(tc.config, logp.NewLogger("")) assert.NoError(t, err) err = split.run(tc.ctx, tc.resp, events) @@ -713,13 +713,23 @@ func TestSplit(t *testing.T) { } else { assert.EqualError(t, err, tc.expectedErr.Error()) } - events.close() - assert.Equal(t, len(tc.expectedMessages), len(events.ch)) - for _, msg := range tc.expectedMessages { - e := <-events.ch - assert.NoError(t, e.err) - assert.Equal(t, msg.Flatten(), e.msg.Flatten()) + assert.Equal(t, len(tc.expectedMessages), len(events.collected)) + for i, msg := range tc.expectedMessages { + assert.Equal(t, msg.Flatten(), events.collected[i].Flatten()) } }) } } + +type stream struct { + collected []mapstr.M + t *testing.T +} + +func (s *stream) event(_ context.Context, msg mapstr.M) { + s.collected = append(s.collected, msg) +} + +func (s *stream) fail(err error) { + s.t.Errorf("fail: %v", err) +} From f0bc64b31627934dbabec35f47e461f0bad55c0e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 17:34:42 +0930 Subject: [PATCH 16/16] add changelog entry --- CHANGELOG-developer.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 3b087d19a9d..2e918d757dc 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -168,6 +168,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091] - Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158] - Add device handling to Okta API package for entity analytics. {pull}35980[35980] +- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] ==== Deprecated