Skip to content

Commit

Permalink
Add recursive split to httpjson (elastic#21214)
Browse files Browse the repository at this point in the history
(cherry picked from commit 43ca900)
  • Loading branch information
marc-gr committed Sep 23, 2020
1 parent 4976e31 commit e69a81c
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 10 deletions.
31 changes: 21 additions & 10 deletions x-pack/filebeat/input/httpjson/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -214,7 +215,7 @@ func (r *requester) processEventArray(publisher stateless.Publisher, events []in
for _, t := range events {
switch v := t.(type) {
case map[string]interface{}:
for _, e := range r.splitEvent(v) {
for _, e := range splitEvent(r.splitEventsBy, v) {
last = e
d, err := json.Marshal(e)
if err != nil {
Expand All @@ -229,15 +230,23 @@ func (r *requester) processEventArray(publisher stateless.Publisher, events []in
return last, nil
}

func (r *requester) splitEvent(event map[string]interface{}) []map[string]interface{} {
func splitEvent(splitKey string, event map[string]interface{}) []map[string]interface{} {
m := common.MapStr(event)

hasSplitKey, _ := m.HasKey(r.splitEventsBy)
if r.splitEventsBy == "" || !hasSplitKey {
// NOTE: this notation is only used internally, not meant to be documented
// and will be removed in the next release
keys := strings.SplitN(splitKey, "..", 2)
if len(keys) < 2 {
// we append an empty key to force the recursive call
keys = append(keys, "")
}

hasSplitKey, _ := m.HasKey(keys[0])
if keys[0] == "" || !hasSplitKey {
return []map[string]interface{}{event}
}

splitOnIfc, _ := m.GetValue(r.splitEventsBy)
splitOnIfc, _ := m.GetValue(keys[0])
splitOn, ok := splitOnIfc.([]interface{})
// if not an array or is empty, we do nothing
if !ok || len(splitOn) == 0 {
Expand All @@ -252,12 +261,14 @@ func (r *requester) splitEvent(event map[string]interface{}) []map[string]interf
return []map[string]interface{}{event}
}

mm := m.Clone()
if _, err := mm.Put(r.splitEventsBy, s); err != nil {
return []map[string]interface{}{event}
// call splitEvent recursively for each part
for _, nestedSplit := range splitEvent(keys[1], s) {
mm := m.Clone()
if _, err := mm.Put(keys[0], nestedSplit); err != nil {
return []map[string]interface{}{event}
}
events = append(events, mm)
}

events = append(events, mm)
}

return events
Expand Down
86 changes: 86 additions & 0 deletions x-pack/filebeat/input/httpjson/requester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSplitEventsBy(t *testing.T) {
event := map[string]interface{}{
"this": "is kept",
"alerts": []interface{}{
map[string]interface{}{
"this_is": "also kept",
"entities": []interface{}{
map[string]interface{}{
"something": "something",
},
map[string]interface{}{
"else": "else",
},
},
},
map[string]interface{}{
"this_is": "also kept 2",
"entities": []interface{}{
map[string]interface{}{
"something": "something 2",
},
map[string]interface{}{
"else": "else 2",
},
},
},
},
}

expectedEvents := []map[string]interface{}{
{
"this": "is kept",
"alerts": map[string]interface{}{
"this_is": "also kept",
"entities": map[string]interface{}{
"something": "something",
},
},
},
{
"this": "is kept",
"alerts": map[string]interface{}{
"this_is": "also kept",
"entities": map[string]interface{}{
"else": "else",
},
},
},
{
"this": "is kept",
"alerts": map[string]interface{}{
"this_is": "also kept 2",
"entities": map[string]interface{}{
"something": "something 2",
},
},
},
{
"this": "is kept",
"alerts": map[string]interface{}{
"this_is": "also kept 2",
"entities": map[string]interface{}{
"else": "else 2",
},
},
},
}

const key = "alerts..entities"

got := splitEvent(key, event)

assert.Equal(t, expectedEvents, got)
}

0 comments on commit e69a81c

Please sign in to comment.