Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/httpjson: remove concurrency from response processing #36493

Merged
merged 16 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091]
- Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158]
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]

==== Deprecated

Expand Down
183 changes: 112 additions & 71 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
p := newPublisher(trCtx, publisher, true, r.log)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
}

Expand Down Expand Up @@ -118,8 +119,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
return err
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
p := newPublisher(trCtx, publisher, false, r.log)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
if len(ids) == 0 {
n = 0
Expand Down Expand Up @@ -187,13 +189,13 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
resps = intermediateResps
}

var events <-chan maybeMsg
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p)
}
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
n += p.eventCount()
}
}

Expand Down Expand Up @@ -541,49 +543,71 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true)
p := newChainProcessor(r, trCtx, publisher, chainIndex)
r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p)
return p.eventCount()
}

var n int
var eventCount int
for maybeMsg := range events {
if maybeMsg.failed() {
r.log.Errorf("error processing response: %v", maybeMsg)
continue
}
type chainProcessor struct {
req *requester
ctx *transformContext
pub inputcursor.Publisher
idx int
tail bool
n int
}

if n >= 1 { // skip 1st event as it has already ben processed before
var response http.Response
response.StatusCode = 200
body := new(bytes.Buffer)
// we construct a new response here from each of the pagination events
err := json.NewEncoder(body).Encode(maybeMsg.msg)
if err != nil {
r.log.Errorf("error processing chain event: %w", err)
continue
}
response.Body = io.NopCloser(body)
func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor {
return &chainProcessor{
req: req,
ctx: trCtx,
pub: pub,
idx: idx,
}
}

// updates the cursor for pagination last_event & last_response when chaining is present
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()
func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) {
if !p.tail {
// Skip first event as it has already been processed.
p.tail = true
return
}

// for each pagination response, we repeat all the chain steps / blocks
count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
if err != nil {
r.log.Errorf("error processing chain event: %w", err)
continue
}
eventCount += count
var response http.Response
response.StatusCode = 200
body := new(bytes.Buffer)
// we construct a new response here from each of the pagination events
err := json.NewEncoder(body).Encode(msg)
if err != nil {
p.req.log.Errorf("error processing chain event: %w", err)
return
}
response.Body = io.NopCloser(body)

err = response.Body.Close()
if err != nil {
r.log.Errorf("error closing http response body: %w", err)
}
}
// updates the cursor for pagination last_event & last_response when chaining is present
p.ctx.updateLastEvent(msg)
p.ctx.updateCursor()

n++
// for each pagination response, we repeat all the chain steps / blocks
n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log)
if err != nil {
p.req.log.Errorf("error processing chain event: %w", err)
return
}
return eventCount
p.n += n

err = response.Body.Close()
if err != nil {
p.req.log.Errorf("error closing http response body: %w", err)
}
}

func (p *chainProcessor) fail(err error) {
p.req.log.Errorf("error processing response: %v", err)
}

func (p *chainProcessor) eventCount() int {
return p.n
}

// processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
Expand Down Expand Up @@ -675,8 +699,9 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
n += p.eventCount()
}

defer func() {
Expand All @@ -697,36 +722,52 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
return *newUrl, nil
}

// processAndPublishEvents process and publish events based on event type
func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int {
var n int
for maybeMsg := range events {
if maybeMsg.failed() {
log.Errorf("error processing response: %v", maybeMsg)
continue
}
type publisher struct {
ctx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
}

if publish {
event, err := makeEvent(maybeMsg.msg)
if err != nil {
log.Errorf("error creating event: %v", maybeMsg)
continue
}
func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
if !publish {
pub = nil
}
return &publisher{
ctx: trCtx,
pub: pub,
log: log,
}
}

if err := publisher.Publish(event, trCtx.cursorMap()); err != nil {
log.Errorf("error publishing event: %v", err)
continue
}
}
if len(*trCtx.firstEventClone()) == 0 {
trCtx.updateFirstEvent(maybeMsg.msg)
func (p *publisher) event(_ context.Context, msg mapstr.M) {
if p.pub != nil {
event, err := makeEvent(msg)
if err != nil {
p.log.Errorf("error creating event: %v: %v", msg, err)
return
}
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()

n++
if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil {
p.log.Errorf("error publishing event: %v", err)
return
}
}
if len(*p.ctx.firstEventClone()) == 0 {
p.ctx.updateFirstEvent(msg)
}
return n
p.ctx.updateLastEvent(msg)
p.ctx.updateCursor()

p.n++
}

func (p *publisher) fail(err error) {
p.log.Errorf("error processing response: %v", err)
}

func (p *publisher) eventCount() int {
return p.n
}

const (
Expand Down
120 changes: 59 additions & 61 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,83 +180,81 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
return rp
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg {
type sendStream interface {
event(context.Context, mapstr.M)
fail(error)
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
trCtx.clearIntervalData()

ch := make(chan maybeMsg)
go func() {
defer close(ch)
var npages int64

for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
for {
pageStartTime := time.Now()
page, hasNext, err := iter.next()
if err != nil {
ch <- maybeMsg{err: err}
return
}
var npages int64
for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
for {
pageStartTime := time.Now()
page, hasNext, err := iter.next()
if err != nil {
ch.fail(err)
return
}

if !hasNext {
if i+1 != len(resps) {
break
}
return
if !hasNext {
if i+1 != len(resps) {
break
}
return
}

respTrs := page.asTransformables(rp.log)
respTrs := page.asTransformables(rp.log)

if len(respTrs) == 0 {
return
}
if len(respTrs) == 0 {
return
}

// last_response context object is updated here organically
trCtx.updateLastResponse(*page)
npages = page.page
// last_response context object is updated here organically
trCtx.updateLastResponse(*page)
npages = page.page

rp.log.Debugf("last received page: %#v", trCtx.lastResponse)
rp.log.Debugf("last received page: %#v", trCtx.lastResponse)

for _, tr := range respTrs {
for _, t := range rp.transforms {
tr, err = t.run(trCtx, tr)
if err != nil {
ch <- maybeMsg{err: err}
return
}
for _, tr := range respTrs {
for _, t := range rp.transforms {
tr, err = t.run(trCtx, tr)
if err != nil {
ch.fail(err)
return
}
}

if rp.split == nil {
ch <- maybeMsg{msg: tr.body()}
rp.log.Debug("no split found: continuing")
continue
}
if rp.split == nil {
ch.event(stdCtx, tr.body())
rp.log.Debug("no split found: continuing")
continue
}

if err := rp.split.run(trCtx, tr, ch); err != nil {
switch err { //nolint:errorlint // run never returns a wrapped error.
case errEmptyField:
// nothing else to send for this page
rp.log.Debug("split operation finished")
case errEmptyRootField:
// root field not found, most likely the response is empty
rp.log.Debug(err)
default:
rp.log.Debug("split operation failed")
ch <- maybeMsg{err: err}
return
}
if err := rp.split.run(trCtx, tr, ch); err != nil {
switch err { //nolint:errorlint // run never returns a wrapped error.
case errEmptyField:
// nothing else to send for this page
rp.log.Debug("split operation finished")
case errEmptyRootField:
// root field not found, most likely the response is empty
rp.log.Debug(err)
default:
rp.log.Debug("split operation failed")
ch.fail(err)
return
}
}
}

rp.metrics.updatePageExecutionTime(pageStartTime)
rp.metrics.updatePageExecutionTime(pageStartTime)

if !paginate {
break
}
if !paginate {
break
}
}
rp.metrics.updatePagesPerInterval(npages)
}()

return ch
}
rp.metrics.updatePagesPerInterval(npages)
}
Loading
Loading